Kafka producer application 실습

2020-08-04

.

Data_Engineering_TIL(20200804)

  • 학습한 프로그램 : T아카데미 Apache Kafka 입문과 활용

  • URL : https://tacademy.skplanet.com/live/player/onlineLectureDetail.action?seq=183

  • 이전내용 참고자료 : https://minman2115.github.io/DE_TIL102

실습전 준비사항

1) 카프카 클라이언트에 아래와 같은 명령어로 test라는 토픽을 만들고 컨슈머를 띄워놓는다.

[ec2-user@ip-10-1-10-86 bin]$ ./kafka-topics.sh --create --bootstrap-server {브로커 아이피}:9092 --replication-factor 1 --partitions 3 --topic test
Created topic test.

[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server {브로커 아이피}:9092 --topic test --from-beginning

2) 클라이언트에서 다른 터미널을 하나 띄워서 아래의 명령어와 같이 gradle을 설치한다.

# gradle.zip 다운로드
[ec2-user@ip-10-1-10-86 ~]$ wget https://services.gradle.org/distributions/gradle-5.2.1-bin.zip
--2020-08-04 01:53:45--  https://services.gradle.org/distributions/gradle-5.2.1-bin.zip
Resolving services.gradle.org (services.gradle.org)... 104.18.190.9, 104.18.191.9, 2606:4700::6812:bf09, ..                                                    .
Connecting to services.gradle.org (services.gradle.org)|104.18.190.9|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://downloads.gradle-dn.com/distributions/gradle-5.2.1-bin.zip [following]
--2020-08-04 01:53:45--  https://downloads.gradle-dn.com/distributions/gradle-5.2.1-bin.zip
Resolving downloads.gradle-dn.com (downloads.gradle-dn.com)... 104.17.159.20, 104.17.160.20, 2606:4700::681                                                    1:a014, ...
Connecting to downloads.gradle-dn.com (downloads.gradle-dn.com)|104.17.159.20|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 87430521 (83M) [application/zip]
Saving to: gradle-5.2.1-bin.zip

100%[=================================================================>] 87,430,521   320MB/s   in 0.3s

2020-08-04 01:53:46 (320 MB/s) - gradle-5.2.1-bin.zip saved [87430521/87430521]

[ec2-user@ip-10-1-10-86 ~]$ ls
gradle-5.2.1-bin.zip  kafka_2.12-2.5.0  kafka_2.12-2.5.0.tgz  tacademy-kafka

# gradle 설치 디렉토리 생성
[ec2-user@ip-10-1-10-86 ~]$ sudo mkdir /opt/gradle

# 다운받은 gradle.zip 압축해제
[ec2-user@ip-10-1-10-86 ~]$ sudo unzip -d /opt/gradle gradle-5.2.1-bin.zip
Archive:  gradle-5.2.1-bin.zip
   creating: /opt/gradle/gradle-5.2.1/
...

# 임시 환경변수 설정
[ec2-user@ip-10-1-10-86 ~]$ export PATH=$PATH:/opt/gradle/gradle-5.2.1/bin
    
# gradle 설치확인
[ec2-user@ip-10-1-10-86 ~]$ gradle -version

Welcome to Gradle 5.2.1!

Here are the highlights of this release:
 - Define sets of dependencies that work together with Java Platform plugin
 - New C++ plugins with dependency management built-in
 - New C++ project types for gradle init
 - Service injection into plugins and project extensions

For more details see https://docs.gradle.org/5.2.1/release-notes.html


------------------------------------------------------------
Gradle 5.2.1
------------------------------------------------------------

Build time:   2019-02-08 19:00:10 UTC
Revision:     f02764e074c32ee8851a4e1877dd1fea8ffb7183

Kotlin DSL:   1.1.3
Kotlin:       1.3.20
Groovy:       2.5.4
Ant:          Apache Ant(TM) version 1.9.13 compiled on July 10 2018
JVM:          1.8.0_252 (Oracle Corporation 25.252-b09)
OS:           Linux 4.14.186-146.268.amzn2.x86_64 amd64

** 참고사항

[Kafka Producer란]

  • 카프카로 데이터(key,value) 전송

  • ProducerRecord 객체를 반드시 생성해서 보내야함

  • Java Kafka-client 제공

build.gradle처럼 디펜던시를 추가할 수도 있고, 그 외 써드파티를 참고할수도 있는데 써드파티 language의 경우 아래 링크를 참고해서 import해서 사용할 수 있음

https://cwiki.apache.org/confluence/display/KAFKA/Clients

  • 어떤 데이터를 보내나요?

1) Web, Application 클릭로그

2) 공유 자전거/자동차의 위치(GPS) 정보

3) 스마트 팩토리의 머신 센서정보

4) 상호 통신을 위한 application

5) 등등 무긍무진함

실습 1: simple-kafka-producer 어플리케이션 실습

STEP 1) simple-kafka-producer 다운로드 및 프로젝트 내용 확인

클라이언트에 git clone https://github.com/AndersonChoi/tacademy-kafka.git 으로 실습코드를 다운받는다. 그리고 다운로드받은 zip파일을 압축풀면 각 디렉토리들이 각각의 프로젝트이다. 이 프로젝트들(소스코드들)을 import해서 실습을 하려고 한다.

이 프로젝트 코드에서 가장 중요한 코드는 build.gradle 파일이고, 다른 하나는 src 폴더안에 SimpleProducer.java 이다.

[ec2-user@ip-10-1-10-86 ~]$ ls
kafka_2.12-2.5.0  kafka_2.12-2.5.0.tgz  tacademy-kafka

[ec2-user@ip-10-1-10-86 ~]$ cd tacademy-kafka/

[ec2-user@ip-10-1-10-86 tacademy-kafka]$ ls
image                        kafka-consumer-save-metric      kafka-producer-key-value                simple-kafka-consumer  실습 커맨드 리스트.txt
kafka-consumer-auto-commit   kafka-consumer-sync-commit      아파치 카프카 입문과 활용 강의자료.pdf  simple-kafka-producer
kafka-consumer-multi-thread  kafka-producer-exact-partition  README.md                               telegraf.conf

[ec2-user@ip-10-1-10-86 tacademy-kafka]$ cd simple-kafka-producer/

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ cat build.gradle
plugins {
    id 'java'
}

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

위에 명령어에서 볼 수 있듯이 build.gradle에서 디펜던시를 보면 kafka를 미리 임포트 시켜놓았다. 이 디펜턴시를 통해서 카프카 클라이언트 관련 컨슈머나 프로듀서 , 인스턴스를 생성하는 클래스들이 미리 정의되어 있다. 반드시 카프카를 미리 임포트 시켜놓아야 한다. 디펜던시에서 카프카를 임포트할때 주의해야할 점은 producer와 consumer의 kafka 버전이 동일한게 좋다.

STEP 2) SimpleProducer.java 메인코드를 이용해서 카프카에 데이터를 넣어보자

  • src 폴더에 SimpleProducer.java 를 열면 내용이 다음과 같다.

여기서 {aws ec2 public ip} 부분을 Broker로 사용하는 EC2의 퍼블릭 아이피로 치환해준다.

내용을 보면 부트스트랩 서버를 설정하고, 키와 벨류를 직렬화하였다. 그리고 우리가 보내고자 하는 데이터의 형태가 string이기 때문에 StringSerializer를 이용해서 string 설정을 하였다.

실제 데이터는 This is record 0부터 This is record 9까지의 value를 for문으로 보내게 된다.

package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleProducer {
    # 데이터를 보낼 토픽
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        # 키를 직렬화
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        # 벨류를 직렬화
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        # 카프카 producer를 위의 properites 내용을 반영하여 최초로 생성하고, 
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        # 브로커로 보내는 데이터는 아래와 같이 0부터 9까지 for문으로 돌게된다.
        for (int index = 0; index < 10; index++) {
            # this is record 에 인덱스를 붙여서 날라가게 될 것이다.
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                # 하나보내고 1초 쉰다.
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

STEP 3) 그런 다음에 아래 코드와 같이 build.gradle을 수정하고, 프로젝트를 실행한다.

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ sudo vim build.gradle
apply plugin: 'application'

mainClassName = 'com.tacademy.SimpleProducer'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ gradle wrapper

BUILD SUCCESSFUL in 2s
1 actionable task: 1 executed

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ./gradlew build
Downloading https://services.gradle.org/distributions/gradle-5.2.1-all.zip
..............................................................................................................................

BUILD SUCCESSFUL in 14s
5 actionable tasks: 5 executed

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Send to test | data : This is record 0
Send to test | data : This is record 1
Send to test | data : This is record 2
Send to test | data : This is record 3
Send to test | data : This is record 4
Send to test | data : This is record 5
Send to test | data : This is record 6
Send to test | data : This is record 7
Send to test | data : This is record 8
Send to test | data : This is record 9

BUILD SUCCESSFUL in 11s
2 actionable tasks: 1 executed, 1 up-to-date

그런 다음에 클라이언트의 컨슈머가 띄워진 터미널로 가보면 아래와 같이 어플리케이션이 잘 실행된 것을 확인할 수 있다.

[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server 15.165.76.100:9092 --topic test --from-beginning
This is record 0
This is record 1
This is record 2
This is record 3
This is record 4
This is record 5
This is record 6
This is record 7
This is record 8
This is record 9

실습 2: kafka-producer-key-value 어플리케이션 실습

STEP 1) 클라이언트에서 아래와 같이 명령어를 실행하여 자바 프로젝트에서 몇가지 셋팅을 해준다.

[ec2-user@ip-10-1-10-86 tacademy-kafka]$ ls
image                        kafka-consumer-save-metric      kafka-producer-key-value                simple-kafka-consumer  실습 커맨드 리스트.txt
kafka-consumer-auto-commit   kafka-consumer-sync-commit      아파치 카프카 입문과 활용 강의자료.pdf  simple-kafka-producer
kafka-consumer-multi-thread  kafka-producer-exact-partition  README.md                               telegraf.conf

[ec2-user@ip-10-1-10-86 tacademy-kafka]$ cd kafka-producer-key-value

[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ cat build.gradle
plugins {
    id 'java'
}

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

# 아래와 같이 build.gradle 수정
[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ sudo vim build.gradle
apply plugin: 'application'

mainClassName = 'com.tacademy.ProducerWithKeyValue'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

# 자바 소스코드로 이동
[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ cd /home/ec2-user/tacademy-kafka/kafka-producer-key-value/src/main/java/com/tacademy

[ec2-user@ip-10-1-10-86 tacademy]$ ls
ProducerWithKeyValue.java

# 아래와 같이 ec2 퍼블릭 아이피 주소란에 아이피를 입력하고 저장해준다.
[ec2-user@ip-10-1-10-86 tacademy]$ sudo vim ProducerWithKeyValue.java
package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithKeyValue {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            # 위에 simpleproducer와 다르게 프로듀서레코드 객체생성할때 중간에 Integer.toString(index) 이게 더 추가되었다.
            # Integer.toString(index)가 key이고, data가 value다.
            # ProducerRecord(String topic, key, value)
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

STEP 2) 클라이언트에서 컨슈머가 띄워진 터미널로 이동해서 아래와 같이 명령어를 입력한다.

# 현재 띄워져 있는 컨슈머를 컨트롤+c를 눌러 종료시킨다.
[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server 15.165.76.100:9092 --topic test --from-beginning
1
2
3
a
b
c
This is record 0
This is record 1
This is record 2
This is record 3
This is record 4
This is record 5
This is record 6
This is record 7
This is record 8
This is record 9
^CProcessed a total of 16 messages

# 다음 명령어와 같이 컨슈머를 띄워준다.
# 컨슈머에 프로퍼티를 몇개 추가하였다. key 출력하는거를 true로 해주고, key.seperator를 -로 구분하여 value로 출력해달라는 것이다.
[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"

STEP 3) 다시 다른 클라이언트의 터미널로 넘어와서 아래와 같은 명령어로 어플리케이션을 실행한다.

[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ cd /home/ec2-user/tacademy-kafka/kafka-producer-key-value

[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ gradle wrapper

BUILD SUCCESSFUL in 0s
1 actionable task: 1 executed
    
[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ ./gradlew build
Downloading https://services.gradle.org/distributions/gradle-5.2.1-bin.zip
...................................................................................

BUILD SUCCESSFUL in 3s
5 actionable tasks: 5 executed
[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Send to test | data : This is record 0
Send to test | data : This is record 1
Send to test | data : This is record 2
Send to test | data : This is record 3
Send to test | data : This is record 4
Send to test | data : This is record 5
Send to test | data : This is record 6
Send to test | data : This is record 7
Send to test | data : This is record 8
Send to test | data : This is record 9

BUILD SUCCESSFUL in 11s
2 actionable tasks: 1 executed, 1 up-to-date

클라이언트에서 컨슈머가 띄워진 터미널로 이동하면 아래와 같이 키벨류 형태로 데이터가 들어온 것을 확인할 수 있다.

[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server 15.165.76.100:9092 --topic test --property print.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
6-This is record 6
7-This is record 7
8-This is record 8
9-This is record 9

STEP 4) simple-kafka-producer를 다시 실행시켜서 kafka-producer-key-value 어플리케이션과의 차이점을 확인해보자.

클라이언트 터미널에서 아래와 같이 명령어를 실행하여 simple-kafka-producer 어플리케이션을 실행한다.

[ec2-user@ip-10-1-10-86 kafka-producer-key-value]$ cd /home/ec2-user/tacademy-kafka/simple-kafka-producer

[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ gradle wrapper

BUILD SUCCESSFUL in 0s
1 actionable task: 1 executed
[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ./gradlew build

BUILD SUCCESSFUL in 0s
5 actionable tasks: 5 up-to-date
<-------------> 0% WAITING
[ec2-user@ip-10-1-10-86 simple-kafka-producer]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Send to test | data : This is record 0
Send to test | data : This is record 1
Send to test | data : This is record 2
Send to test | data : This is record 3
Send to test | data : This is record 4
Send to test | data : This is record 5
Send to test | data : This is record 6
Send to test | data : This is record 7
Send to test | data : This is record 8
Send to test | data : This is record 9

BUILD SUCCESSFUL in 11s
2 actionable tasks: 1 executed, 1 up-to-date

클라이언트에서 컨슈머가 띄워진 터미널로 이동하면 아래와 같이 키벨류 형태로 데이터가 들어온 것을 확인할 수 있다.

[ec2-user@ip-10-1-10-86 bin]$ ./kafka-console-consumer.sh --bootstrap-server 15.165.76.100:9092 --topic test --property print.key=true --property key.separator="-"
0-This is record 0
1-This is record 1
2-This is record 2
3-This is record 3
4-This is record 4
5-This is record 5
6-This is record 6
7-This is record 7
8-This is record 8
9-This is record 9
null-This is record 0
null-This is record 1
null-This is record 2
null-This is record 3
null-This is record 4
null-This is record 5
null-This is record 6
null-This is record 7
null-This is record 8
null-This is record 9

** 참고사항

image

동일키는 일반적으로 동일 파티션에 적재되고, default partitioner는 key를 해시값으로해서 동일한 파티션에 들어가게 해준다. 동일한 키같은 경우에는 순서를 보장하기 때문에 상태머신으로 사용이 가능하다.

또한 역할에 따라서 컨슈머 할당 적용도 가능하다. 예를 들어서 키가 주문인 레코드와, 키가 결제인 레코드라는게 있다라고 가정하면 각각의 키별로 다른 파티션에 들어가기 때문에 주문 데이터를 처리하는 컨슈머 어플리케이션과 결제 데이터를 처리하는 컨슈머 어플리케이션으로 각각 할당하여 처리할 수 있다.

키에 레코드값, 해쉬값을 넣어서 중복처리도 방지할 수 있다.

[Record value]

  • 레코드 값

  • 약할 : 실질적으로 전달하려는 데이터

  • 어떤 type을 보낼 수 있나요?

string, bytearray, int 등 사실상 제한이 없고 직렬화 역직렬화만 잘 해주면 된다.

  • 어떤 데이터 포맷이 좋나요?

csv, tsv, json, object 등 서비스의 특징에 맞게 사용하는 것을 권장

json 사용 시 key,value 형태로서 확장성이 뛰어나다. 컬럼 정보(key) 포함. 아무래도 string 형태로 되어있으니까 kafka console consumer를 통해서 데이터를 눈으로 확인해서 디버그 할 수 있다는 점이 운영상의 이점이다.

csv 사용 시 콤마(,) 기준으로 데이터 구분하기 때문에 용량에 있어서 이득을 볼 수 있다.

  • 포맷을 관리하는 다른방법?

예를 들어 회사에서 쓰는 스키마를 등록해서 버전관리를 할 수 있다.

컨플루언스 스키마 레지스트리 –> confluentinc/schema-registry

실습 3: kafka-producer-exact-partition 어플리케이션 실습

파티션을 직접 지정해서 데이터를 넣는 어플리케이션을 실습해보자.

[ec2-user@ip-10-1-10-86 kafka-producer-exact-partition]$ cd /home/ec2-user/tacademy-kafka/kafka-producer-exact-partition

[ec2-user@ip-10-1-10-86 kafka-producer-exact-partition]$ sudo vim /home/ec2-user/tacademy-kafka/kafka-producer-exact-partition/src/main/java/com/tacademy/ProducerExactParition.java
package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerExactParition {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
    # 아래와 같이 파티션 넘버를 지정해서 데이터를 보낼 수도 있다.
    # 파티션 넘버를 
    private static int PARTITION_NUMBER = 1;

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            # 프로듀서 레코드를 만들때 아래와 같이 파티션 넘버 변수가 들어가는 파라미터가 하나가 있다.
            # ProducerRecord(String topic,Integer partition, key, value)
            # 파티션 1번에는 우리가 지정한 데이터만 들어갈 것이기 때문에 그 안에서는 반드시 순서를 보장하게 되어 있는점이 특징임
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

# 아래와 같이 build.gradle 수정
[ec2-user@ip-10-1-10-86 kafka-producer-exact-partition]$ sudo vim build.gradle
apply plugin: 'application'

mainClassName = 'com.tacademy.ProducerExactParition'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

** 참고사항

[prodecer acks]

프로듀서에서 카프카로 데이터를 보낼때 유실여부 데이터 전송 속도를 조절할 수 있는 중요한 옵션이다.

  • acks = 0

가장 속도가 빠름, 유실 가능성이 높음

예를 들어서 센서데이터

  • acks = 1(default)

속도 보통, 유실 가능성이 있음

  • akcs = all 또는 -1

속도 가장 느림, 메시지 전달 손실가능성 없음

image

[Producer options]

  • 필수옵션 : 반드시 사용자가 설정을 해줘야 하는 부분

bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록

key.serializer : 메세지 키 직렬화에 사용되는 클래스

value.serializer : 메세지 값을 직렬화 하는데 사용되는 클래스

  • 선택옵션 : default 값 존재, 따로 변경하지 않으면 default 값으로 적용

acks : 레코드 전송 신뢰도 조절(리플리카)

compression.type : snappy, gzip, lz4 중 하나로 압축하여 전송

retries : 클러스터 장애에 대응하여 메세지 전송을 재시도하는 횟수

buffer.memory : 브로커에 전송될 메세지의 버퍼로 사용될 메모리 양

batch.size : 여러 데이터를 함께 보내기 위한 레코드 크기

linger.ms : 현재의 배치를 전송하기 전까지 기다리는 시간

client.id : 어떤 클라이언트인지 구분하는 식별자