Kafka consumer application 실습

2020-08-27

.

Data_Engineering_TIL(20200826)

학습한 프로그램 : T아카데미 Apache Kafka 입문과 활용

URL : https://tacademy.skplanet.com/live/player/onlineLectureDetail.action?seq=183

이전내용 참고자료 : https://minman2115.github.io/DE_TIL113

# consumer 란

  • 데이터를 가져가는(polling) 주체

브로커가 보내는게 아니라 컨슈머가 달라고 할때만 그때 polling해서 가져간다.

따라서 카프카 컨슈머가 데이터 처리하는 속도가 느리다면 느리게 폴링할 것이고, 빠르다면 빠르게 폴링해서 데이터를 가져갈 것이다.

  • commit을 통해 읽은 consumer offset을 카프카에 기록

카프카 컨슈머 그룹이 내가 어디까지 읽었는지 consumer offset을 카프카에 기록한다는 것이다.

  • Java Kafka-client 제공

그 외 3rd party language의 경우 아래 링크 참고

https://cwiki.apache.org/confluence/display/KAFKA/Clients

  • 어디에 데이터를 저장하나요?

FileSystem(.csv .log .tsv)

Object Storage(S3, Minio)

Hadoop(Hdfs, Hive)

RDBMS(Oracle, Mysql)

NoSql(MongoDB, CouchDB)

기타 다양한 저장소들(Elasticsearch, influxDB)

# 주의사항

컨슈머 코드에도 클라이언트가 들어가기 때문에 버전 호환성을 반드시 체크해줘야 한다.

image

[실습내용]

실습 1. 기본적인 consumer application 실습

  • client ec2에 접속해서 아래와 같이 명령어들을 실행해본다.
# 먼저 Client ec2에서 아래와 같이 console producer를 띄우고 데이터를 넣어준다.
# ./kafka-console-producer.sh --bootstrap-server {broker ec2 public ip}:9092 --topic test
[ec2-user@ip-10-1-10-115 bin]$ ./kafka-console-producer.sh --bootstrap-server 13.209.84.144:9092 --topic test
>aaa
>bbb
>ccc
>ddd
>eee
>fff
>

# 그 다음에 다른 클라이언트 ec2에서 터미널을 하나 더 열고 거기에서 아래와 같이 머캔드를 실행해준다.
[ec2-user@ip-10-1-10-115 tacademy-kafka]$ ls
image                        kafka-producer-exact-partition          simple-kafka-producer
kafka-consumer-auto-commit   kafka-producer-key-value                telegraf.conf
kafka-consumer-multi-thread  아파치 카프카 입문과 활용 강의자료.pdf  실습 커맨드 리스트.txt
kafka-consumer-save-metric   README.md
kafka-consumer-sync-commit   simple-kafka-consumer

[ec2-user@ip-10-1-10-115 tacademy-kafka]$ cd simple-kafka-consumer

[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

# 아래와 같이 소스를 열어서 broker ec2 public ip 부분을 브로커 퍼블릭 아이피로 치환해준다.
[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ sudo vim /home/ec2-user/tacademy-kafka/simple-kafka-consumer/src/main/java/com/tacademy/SimpleConsumer.java
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{broker ec2 public ip}:9092";

    public static void main(String[] args) {
        # 아래와 같이 properties를 설정한다.
        Properties configs = new Properties();
        # 부트스트랩, 그룹 아이디, key 디시리얼라이저, value 디시리얼라이저가 들어가줘야 한다.
        # producer로 직렬화 한것을 역직렬화 해줘야 하기 때문이다.
        # 콘솔 프로듀서도 기본적으로 직렬화해서 데이터가 들어간다.
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        # 카프카 컨슈머 인스턴스를 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
'
        # 특정 토픽을 구독하겠다고 선언
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            # consumer.poll 메서드를 통해서 1초동안 기다리면서 데이터를 가져오게 된다.
            # 데이터를 배치형태로 가져오기 때문에 데이터를 여러개를 가져온다. 
            # 그리고 for 문을 통해서 가져오는 데이터를 출력해준다.
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

# sudo vim을 이용하여 build.gradle을 아래와 같이 수정해준다.
[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ sudo vim build.gradle
apply plugin: 'application'

mainClassName = 'com.tacademy.SimpleConsumer'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ gradle wrapper

BUILD SUCCESSFUL in 0s
1 actionable task: 1 executed
    
[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ ./gradlew build

BUILD SUCCESSFUL in 1s
5 actionable tasks: 5 executed

# 빌드하고 run을 하면 아까 프로듀서로 넣었던 데이터를 정상적으로 가져오는 것을 확인할 수 있다.
[ec2-user@ip-10-1-10-115 simple-kafka-consumer]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
aaa
bbb
eee
fff
ccc
ddd
<=========----> 75% EXECUTING [16s]
> :run
    
# 계속 컨슈머 어플리케이션을 켠 상태로 다시 콘솔 프로듀서를 띄운 터미널로 넘어가서 데이터를 넣으면
# 넣는 순간 컨슈머 어플리케이션에서 캐치해가는 것을 확인할 수 있을 것이다.


# 그리고 컨트롤+c로 컨슈머 어플리케이션을 종료해준다.

실습 2. consumer commit 동작원리 이해를 위한 어플리케이션 실습

kafka-consumer-auto-commit 라는 어플리케이션을 통해 commit의 동작원리를 이해하고 데이터가 중복처리 되는 케이스를 경험해본다.

  • 마찬가지로 client ec2에 접속해서 아래와 같이 명령어들을 실행해본다.
[ec2-user@ip-10-1-10-115 tacademy-kafka]$ ls
image                        kafka-consumer-sync-commit              README.md              실습 커맨드 리스트.txt
kafka-consumer-auto-commit   kafka-producer-exact-partition          simple-kafka-consumer
kafka-consumer-multi-thread  kafka-producer-key-value                simple-kafka-producer
kafka-consumer-save-metric   아파치 카프카 입문과 활용 강의자료.pdf  telegraf.conf

[ec2-user@ip-10-1-10-115 tacademy-kafka]$ cd kafka-consumer-auto-commit/

[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

# 마찬가지로 아래 브로커 ec2 아이피에 브로커 아이피를 넣어준다.
[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ sudo vim /home/ec2-user/tacademy-kafka/kafka-consumer-auto-commit/src/main/java/com/tacademy/ConsumerWithAutoCommit.java
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWithAutoCommit {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{broker ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        # 위의 config들은 아까 했던 simple kafka consumer 어플리케이션과 동일하지만
        # 아래에 보면 commit을 오토로 할것인가에 대해 true,false로 지정해줄 수 있다.
        # 즉 commit이 자동으로 된다는 것이다. 아까는 명시적으로 커밋해주는 부분이 없었는데 이 말은 polling할때
        # 내부적으로 commit을 한다는 것이다. 60초 간격으로 컨슈머가 어디까지 읽었는지 브로커에게 알려준다는 것이다.
        # 또한 auto commit interval을 몇초 간격으로 할것인가에 대한 옵션도 지정해준다.
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

# 마찬가지로 아래와 같이 build.gradle을 수정해준다.
[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ sudo vim build.gradle
apply plugin: 'application'
mainClassName = 'com.tacademy.ConsumerWithAutoCommit'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ gradle wrapper

BUILD SUCCESSFUL in 0s
1 actionable task: 1 executed
    
[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ ./gradlew build

BUILD SUCCESSFUL in 1s
5 actionable tasks: 5 executed
    
# 마찬가지로 어플리케이션을 run하고 콘솔 프로듀서에서 123 456 등 데이터를 넣으면 아래와 같이 전시될 것이다.
[ec2-user@ip-10-1-10-115 kafka-consumer-auto-commit]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
123
456
789
10101010
11111111
<=========----> 75% EXECUTING [29s]
> :run


# 소스에서 testgroup이라는 그룹아이디를 갖은게 현재 입력한 123 456 등 데이터만 전시하고 이전 데이터에 대해서는 전시를 하지 않는다.

# 그리고 컨트롤+c로 컨슈머 어플리케이션을 종료해준다.

# 그리고 다시 run을 하면 입력했던
123
456
789
10101010
11111111
# 가 다시 전시가 된다. 

# 이게 무슨말이냐면 데이터가 중복처리 되었다는 것이다. 어플리케이션을 실행하고 60초 전에 kill 시켰기 때문에 인터벌 60초 전에 꺼진것이고
# 따라서 commit이 안되었기 때문이다. 데이터가 중복처리되면 이슈가 발생할 수 있다.
# 그래서 autocommit은 인터벌 간격에 따라 데이터가 유실되거나 중복처리될 수도 있다.


# 그리고 컨트롤+c로 컨슈머 어플리케이션을 종료해준다.

# consumer commit

  • enable.auto.commit=true

일정 간격(auto.commit.interval.ms), poll() 메서드 호출시 자동 commit

commit 관련 코드를 작성할 필요없음. 편리함.

명시적으로 commit 하는거 대비해서 속도가 매우 빠름

중복 또는 유실이 발생할 수 있음

–> 중복/유실을 허용하지 않는 곳(은행, 카드 등)에서는 사용하면 안됨!!

–> 일부 데이터가 중복/유실되도 상관 없는 곳(센서, GPS 등)에서 사용

configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());

image

# 데이터 중복처리? 무슨 문제지?

  • 커머스의 장바구니 시스템에서 중복 이슈 발생

나는 장바구니에 1개 상품을 담았는데 2개 상품이 담김

  • 카드사의 결제 시스템에서 중복 이슈 발생

편의점에서 아이스크림 결제를 했는데 2번 결재됨

  • 택배사 SMS 발송 시스템에서 중복 이슈 발생

집에 도착 했다는 SMS가 2번 발송

# 데이터 중복을 막을 수 있는 방법

1) 오토 커밋을 사용하되, 컨슈머가 죽지 않도록 잘 돌봐준다

불가능. 서버/애플리케이션은 언젠가 죽을 수 있다. ex. 배포

2) 오토 커밋을 사용하지 않는다

Kafka consumer의 commitSync(), commitAsync() 사용

# enable.auto.commit=false

1) commitSync() : 동기 커밋

  • ConsumerRecord 처리 순서를 보장함

  • 가장 느림(커밋이 완료될 때 까지 block하기 때문임)

  • poll() 메서드로 반환된 ConsumerRecord의 마지막 offset을 커밋

그런데 만약에 컨슈머 레코드를 예를들어 50개를 받게 되면 중간중간마다 계속 커밋을 할 수 있는데 이때 Map 자료구조를 사용해서 토픽파티션과 offsetandmetadata를 이용해서 한개가 처리될때마다 한번씩 offset을 커밋할 수 있다.

즉 50개 데이터가 들어왔을때 polling 처리가 완료된 이후에 commit sync를 명시적으로 적어서 50개마다 50개가 데이터가 잘 처리되었다고 구현할 수도 있고, 혹은 offset 1개 처리될때마다 커밋 한번을 처리할 수도 있다.

  • Map<TopicPartition, OffsetAndMetadata> 을 통해 오프셋 지정 커밋 가능

실제 사용 예시

  • commitSync() : 동기 커밋
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        System.err.println("commit failed");
    }
}

# consumer.commitSync()할때 CommitFailedException는 반드시 받아줘야 한다. 왜냐하면 네트워크 장애 등 상황에서 대처를 할수 있기 때문이다.
# 커밋을 재시도를 하던지 멈추던지 대처를 해줄때 꼭 필요하다
  • commitSync(Map<TopicPartition, OffsetAndMetadata>) : offset 지정 커밋
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
# record.topic() 은 내 토픽, record.partition()은 파티션을 몇번했는지 말하는 것이다.
offset.put(new TopicPartition(record.topic(), record.partition()), null);

# 그다음에 아래와 같이 오프셋 번호를 지정하면 몇번 오프셋까지 처리를 했는지 commit을 할 수 있다.
try {
    consumer.commitSync(offset);
} catch (CommitFailedException e) {
    System.err.println("commit failed");
}

2) commitAsync() : 비동기 커밋

커밋이 될때까지 기다리는 메서드는 아님

커밋을 요청하는 시간동안 폴링을 기다리지 않고 바로 진행해버림

그래서 일시적인 통신문제로 비동기 커밋이 커밋이 안되었을때 당연히 카프카 브로커는 처리가 안된것으로 인지하여 데이터를 다시 보낼수도 있다. 그래서 중복처리에 대한 여지가 있는 메서드다.

  • 동기 커밋보다 빠름

  • 중복이 발생할 수 있음

일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋 될 때

  • ConsumerRecord 처리 순서를 보장하지 못함

처리 순서가 중요한 서비스(주문, 재고관리 등)에서는 사용 제한

결론적으로는 commitSync()가 가장 안전하다.

실제 사용 예시

  • commitAsync() : 비동기 커밋
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    consumer.commitASync();
}
  • commitAsync() + commitSync() : 비동기, 동기 커밋 같이 쓰는 경우도 있음
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
        consumer.commitAsync();
    }
} catch (CommitFailedException e) {
    System.err.println("commit failed");
}finally {
    consumer.commitSync();
}

실습 3. kafka-consumer-sync-commit

  • 마찬가지로 client ec2에 접속해서 아래와 같이 명령어들을 실행해본다.
[ec2-user@ip-10-1-10-115 tacademy-kafka]$ ls
image                        kafka-consumer-sync-commit              README.md              실습 커맨드 리스트.txt
kafka-consumer-auto-commit   kafka-producer-exact-partition          simple-kafka-consumer
kafka-consumer-multi-thread  kafka-producer-key-value                simple-kafka-producer
kafka-consumer-save-metric   아파치 카프카 입문과 활용 강의자료.pdf  telegraf.conf

[ec2-user@ip-10-1-10-115 tacademy-kafka]$ cd kafka-consumer-sync-commit/

[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ sudo vim /home/ec2-user/tacademy-kafka/kafka-consumer-sync-commit/src/main/java/com/tacademy/ConsumerWithSyncCommit.java
package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWithSyncCommit {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{broker ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        # 아래와 같이 auto commit 이 false로 설정이 되어있다.
        # 즉 오토커밋을 하지 않겠다. sync 커밋이나 async 커밋으로 하겠다는 것이다.
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
                # 아래와 같이 consumer.commitSync(); 라고 명시적으로 sync 커밋을 쓰겠다는 것이다.
                consumer.commitSync();
                record.offset();
            }
        }
    }
}

# 마찬가지로 build.gradle을 아래와 같이 수정
[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ sudo vim build.gradle
apply plugin: 'application'
mainClassName = 'com.tacademy.ConsumerWithSyncCommit'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

# 아래 명령어과 같이 어플리케이션 빌드 및 실행
[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ gradle wrapper
Starting a Gradle Daemon (subsequent builds will be faster)

BUILD SUCCESSFUL in 5s
1 actionable task: 1 executed

[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ ./gradlew build

BUILD SUCCESSFUL in 2s
5 actionable tasks: 5 executed

# 컨슈머 어플리케이션 실행하기 전에 콘솔 프로듀서로 kkk aaa bbb 를 넣어준다.
# 그리고 실행하면 아래와 같이 전시가 될 것이다.
[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
kkk
aaa
bbb
<=========----> 75% EXECUTING [47s]
> :run
    
# 다시 콘솔 프로듀서가 띄워진 터미널로 넘어가서 1 2 3 4 5 6 을 추가적으로 넣어준다
# 그런다음에 다시 컨슈머 어플리케이션이 띄워진 터미널로 넘어가면 아래와 같이 전시가 될 것이다.
# 1부터 6까지 폴링해서 가져간 것을 알 수 있다. 그러면서 데이터 하나씩 처리할때마다 commit을 했다.
[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
kkk
aaa
bbb
1
2
3
4
5
6
<=========----> 75% EXECUTING [4m 58s]
> :run
    
# 그러면 여기서 컨슈머 어플리케이션을 kill 시킨 다음에 콘솔 프로듀서로 korea japan china usa 데이터를 넣어본다.
# 이전까지의 데이터를 폴링하고 commit을 했기때문에 이제는 다시 컨슈머 어플리케이션을 실행한다면 korea 부터 데이터를 받아와야 할 것이다.
# 그러면 다시 컨슈머 어플리케이션을 아래와 같이 run 하면 6 데이터 이후부터 잘 받아온것을 확인할 수 있다.
# 아까처럼 중복처리 되지않고 정상적으로 후속데이터를 처리한 것을 알 수 있다.
[ec2-user@ip-10-1-10-115 kafka-consumer-sync-commit]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
japan
usa
china
korea
<=========----> 75% EXECUTING [21s]
> :run
    
# 그리고 컨트롤+c로 컨슈머 어플리케이션을 종료해준다.
# 컨슈머는 데이터를 폴링해서 가져가기 때문에 사용하지 않으면 반드시 잘 꺼줘야 한다.

# Consumer rebalance

리밸런스 - 컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상

컨슈머가 장애가 나면 리밸런스라는게 발생할 수도 있다.

컨슈머 그룹의 파티션 소유권 즉, 컨슈머와 파티션 할당이 변경될때 일어나는 현상

  • 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 없음

  • 리밸런스 발생시 데이터 유실/중복 발생 가능성 있음

왜냐면 데이터를 처리할때 synccommit을 하지 않았는데 다른걸로 할당해버리면 어디까지 commit했는지 명시적으로 확인할 수 없다. 그래서 commitsync를 꼭 리밸런스 리스터를 통해서 선언해서 사용하면 좋다.

또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지 할 수 있다.

  • 언제 리밸런스 발생?

consumer.close() 호출시 또는 consumer의 세션이 끊어졌을 때, 장애가 발생했을때

예를 들어서 컨슈머 세션이 끊겼을때의 상황을 가정해보자.

컨슈머 세션이 끊기면 카프카 클라이언스 –> 카프카 브로커로 아래와 같이 동작할 것이다.

1) (디폴트 옵션으로) 3초마다 크룹 코디네이터(브로커 중 1대)에게 하트비트 전송

2) 10초의 세션 타임아웃 시간안에 하트비트가 왔는지 확인

3) 만약 하트비트가 없으면 코디네이터가 해당 컨슈머는 죽은것으로 마킹해버림

4) 세션이 끊어졌다고 판단했으니 리밸런스가 시작됨

아래와 같이 세션시간에 대해서 하트비트 시간 x 3을 보통 하나의 세션시간으로 설정하는 편이다.

configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);

# Consumer rebalance listener

  • 코드 예시
...

# 토픽에 뉴리밸런스리스너 메소드를 넣어서 정의가 가능하다.
consumer.subscribe(Arrays.asList("test"), new RebalanceListener());
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}
}
# 위에서 정의한 뉴리밸런스 리스너를 implement해서 
# 리보크 혹은 어싸인 할 수 있다. 그래서 파티션이 끊어졌거나 새로 할당되었을때
# 각각의 경우에 대해 처리를 할 수 있다.
static class RebalanceListener implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions.");
    }
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Assigned partitions.");
    }
}
  • 리벨런스 리스너는 언제 사용할까?

리밸런스 발생에 따른 offset commit을 하거나

또는 컨슈머가 많을때 할당되는 시간이 오래걸릴 수 있기 때문에 운영상 참고를 위해 리밸런스 시간 측정을 통한 컨슈머 모니터링을 하기도 한다.

시간측정을 위해 로그를 남겨주거나 할 수 있다.

# Consumer wakeup

컨슈머를 정상적으로 종료를 해야하는데 그럴때는 wakeup이라는 메서드를 사용한다.

이걸 사용하면 wakeup session이 발생한다.

이 세션을 통해 안전하게 빠져나올 수 있다.

  • 정상동작 예시
while (true) {
    # poll() 호출하고, records 100개 반환 : offset 101번 ~ 200번
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    # for 구문으로 100개의 records loop 구문 수행
    for (ConsumerRecord<String, String> record : records) {
        # record.value() system print
        System.out.println(record.value());
    }
    # offset 200 커밋
    consumer.commitSync();
}
# while문이므로 반복
  • SIGKILL로 인한 중복처리 발생 예시

SIGKILL : 강제로 어플리케이션을 죽인것

1) poll() 호출

마지막 커밋된 오프셋이 100

records 100개 반환 : 오프셋 101 ~ 200

image

2) records loop 구문 수행

3) record.value() system 150번 오프셋 print 중, SIGKILL 호출

101번~150번 오프셋 처리완료, 151번 오프셋~200번 오프셋 미처리

image

4) offset 200 커밋 불가

브로커에는 100번 오프셋이 마지막 커밋

컨슈머 재시작, 다시 오프셋 101부터 처리 시작, 101번~150번 중복처리

image

위와 같은 문제를 막고 안전하게 exception 처리해서 나오기 위해 wakeup을 발생하면 된다.

아래코드와 같이 자바에서는 셧다운 훅을 받을 수 있다.

sigterm을 통해서 셧다운 시그널을 자바 어플리케이션에 날릴 수 있다. 이거를 통해서 데이터 commit이 완료할 수 있도록 서포트해준다.

# java code
Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        consumer.wakeup();
    }
});

위의 메서드를 응용해서 아래와 같이 컨슈머 어플리케이션을 짤 수 있다.

폴링구문에서 트라이, 캐치 부분에서 웨이크업 exception을 받을 수 있다.

다시말해서 어플리케이션을 바로 끄는게 아니라 위에 코드와 같이 셧다운 훅을 받아서 웨이크업 메소드를 호출하거나 아니면 아래 코드와 같이 wakeupexception을 받아서 WakeupException이 발생했다는 것을 인지하여 finally에 commitsync하고 consumer close를 통해서 안전하게 종료하게 된다.

이를 통해서 리밸런싱 하는 것을 명시적으로 브로커에게 알려줄 수 있다.

아까처럼 그냥 강제로 죽이면 하트비트가 사라진 상태에서 브로커는 그냥 사라졌다고 인식해서 리밸런싱이 발생하기 때문에 안정적으로 상황이 처리되었다고 할 수 없다.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    System.out.println("poll() method trigger WakeupException");
}finally {
    consumer.commitSync();
    consumer.close();
}

wakeup()을 통한 graceful shutdown 필수!

SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요

SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 –> 중복/유실 발생

# Consumer thread 전략

컨슈머의 쓰레드를 어떻게 가져갈것이냐에 관한 것이다.

1안) 1 프로세스 + 1 스레드(컨슈머)

간략한 코드

프로세스 단위 실행/종료

다수의 컨슈머 실행 필요시 다수의 프로세스 실행 필요

어떤 어플리케이션을 자르파일로 압축해서 실행한다고 하면 아래와 같이 실행할 수 있다.

토픽이나 그룹아이디를 받아서 config를 집어넣고 자르파일을 돌릴 수 있고, 이 어플리케이션을 하나만 실행할 수도 있고 여러개를 실행할 수도 있다.

$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers"}

$ java -jar one-process-one-consumer.jar --path consumer.conf

image

2안) 1 프로세스 + n 스레드(동일 컨슈머 그룹)

복잡한 코드

스레드 단위 실행/종료

스레드간 간섭 주의(세마포어, 데드락 등)

다수 컨슈머 실행시 다수 스레드 실행 가능

예를 들어서 아래와 같이 "consumer.no":20 를 받아서 20개의 컨슈머를 만들어 달라고 설정할 수도 있다.

그래서 이런 옵션을 줘서 자르파일이 실행될때 config로 부여하여 20개의 쓰레드가 생성되도록 할 수 있다.

$ cat consumer.conf
{"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20}

$ java -jar one-process-multiple-consumer.jar --path consumer.conf

1 프로세스 (20 스레드) ——-> Kafka

3안) 1 프로세스 + n 스레드(다수 컨슈머 그룹)

복잡한 코드

컨슈머 그룹별 스레드 개수 조절 주의

$ cat consumer.conf
[
    {"topic":"click_log", "group.id":"hadoop-consumers", "consumer.no":20},
    {"topic":"click_log", "group.id":"elasticsearch-consumers", "consumer.no":1},
    {"topic":"application_log", "group.id":"hadoop-consumers", "consumer.no":5}
]

$ java -jar one-process-multiple-consumer-multiple-group.jar --path consumer.conf

1 프로세스 (26 스레드) ——-> Kafka

위와 같이 총 26개가 띄워지는 어플리케이션도 만들 수 있다.

단, 쓰레드 갯수 조절이나 디버깅 중에 이슈가 발생할 수도 있다.

# PRACTICE - Consumer multiple thread pool

한개의 프로세스에 멀티플 쓰레드를 가져가는 실습을 해보자

[ec2-user@ip-10-1-10-115 tacademy-kafka]$ ls
image                        kafka-consumer-sync-commit              README.md              실습 커맨드 리스트.txt
kafka-consumer-auto-commit   kafka-producer-exact-partition          simple-kafka-consumer
kafka-consumer-multi-thread  kafka-producer-key-value                simple-kafka-producer
kafka-consumer-save-metric   아파치 카프카 입문과 활용 강의자료.pdf  telegraf.conf

[ec2-user@ip-10-1-10-115 tacademy-kafka]$ cd kafka-consumer-multi-thread/

[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ls
build.gradle  gradle  gradlew  gradlew.bat  settings.gradle  src

# 위에 코드와는 다르게 메인쓰레드와 워크쓰레드 두개의 소스가 따로 있다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ls /home/ec2-user/tacademy-kafka/kafka-consumer-multi-thread/src/main/java/com/tacademy
ConsumerWithMultiThread.java  ConsumerWorker.java

# 아래 브로커 ec2 아이피 부분을 브로커의 아이피 주소로 바꿔준다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ sudo vim /home/ec2-user/tacademy-kafka/kafka-consumer-multi-thread/src/main/java/com/tacademy/ConsumerWithMultiThread.java
# ConsumerWorker.java를 실행하는 메인 쓰레드이다.

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerWithMultiThread {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{broker ec2 public ip}:9092";
    private static int CONSUMER_COUNT = 3;
    # 워커쓰레드라는 것을 미리 리스트로 지정하여 추후 사용할 수 있도록 정의함
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        # 아래에 ShutdownThread을 지정하여 shutdown hook을 받았다는 것을 명시적으로 할 수 있다.
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        # 오토커밋 false
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        # newCachedThreadPool는 쓰레드가 완료되면 쓰레드가 죽는 구조이다.
        # 그래서 아래 ConsumerWorker.java에서 while true를 통해서 계속 폴링하다가 안전하게 close되면, 쓰레드가 죽도록 하였다.
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            # 컨슈머 워커를 지정하고
            # workerThreads에 저장을 한다.
            # 왜냐하면 workerThreads를 기준으로 각각 shutdown 시켜서 종료할 것이기 때문이다.
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            # 각각 생성된 worker를 executorService 쓰레드 풀을 통해 쓰레드를 실행시킨다.
            # 이렇게 하면 쓰레드 3개가 안전하게 실행될 것이다.
            executorService.execute(worker);
        }
    }

    # 아래 ConsumerWorker.java에서 shutdown 메서드를 호출
    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}

[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ cat /home/ec2-user/tacademy-kafka/kafka-consumer-multi-thread/src/main/java/com/tacademy/ConsumerWorker.java
# 카프카 컨슈머가 실행되고, subscribe해서 polling을 계속하다가 wakeup이 발생하면 안전하게 어플리케이션을 종료하는 로직이다.

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

# 컨슈머 워커 쓰레드는 runnable로 implement 되어 있다.
# 쓰레드 풀에서 쓰레드로 실행시킬때 사용된다. 
public class ConsumerWorker implements Runnable {
    # 카프카 컨슈머가 한개의 쓰레드를 실행시키도록 할떄는 properties도 필요하고, 토픽이나 쓰레드 네임이 필요하기 때문에
    # 아래와 같이 private를 지정했다.
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    # 그리고 생성될때 properties와 토픽 가져오도록 하였고,
    # 쓰레드 네임을 통해서 이 쓰레드가 어떤 쓰레드인지, 어떤 데이터를 가져왔는지 확인할 수 있도록 하였다.
    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }
    
    # 아래에 run도 override 되어 있는 것도 확인할 수 있다.
    @Override
    public void run() {
        # 쓰레드가 run될때 KafkaConsumer가 instance를 새로 생성하고
        # 그리고 topic을 구독하므로써 토픽을 가져오게 된다.
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        try {
            # 그리고 while true문을 통해서 poll이 지속적으로 진행되도록 했다.
            # 출력하고 출력이 되면 컨슈머가 commitsync까지 하는 걸로 했다.
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(threadName + " >> " + record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    # 컨슈머 워커의 셧다운 메서드를 호출하게 되면 consumer.wakeup을 호출하게 된다. 
    # 이 호출을 통해서 poll을 실행할때 wakeupexception이 발생하게 된다.
    # 그리고 바로 위에 코드처럼 wakeupexcption이 발생된 걸 확인하고
    # commitSync하고 close까지 하므로써 안전하게 어플리케이션을 종료하게 된다.
    public void shutdown() {
        consumer.wakeup();
    }
}

# 마찬가지로 아래와 같이 build.gradle를 수정해준다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ sudo vim build.gradle
apply plugin: 'application'
mainClassName = 'com.tacademy.ConsumerWithMultiThread'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ gradle wrapper

BUILD SUCCESSFUL in 0s
1 actionable task: 1 executed
    
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ./gradlew build

BUILD SUCCESSFUL in 1s
5 actionable tasks: 5 executed

# 아래와 같이 어플리케이션을 띄우면 아무것도 안나올것이다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
<=========----> 75% EXECUTING [1m 6s]
> :run
    
# 콘솔 프로듀서가 띄워진 터미널로 이동해서 아래와 같이 데이터를 임의로 막 입력해본다.
[ec2-user@ip-10-1-10-115 bin]$ ./kafka-console-producer.sh --bootstrap-server 13.209.84.144:9092 --topic test
>a
>b
>c
>d
>e
>f
>aaa
>bbb
>cc
>ddddd
>1
>2
>3
>4
>5
>

# 그런 다음에 다시 어플리케이션이 띄워진 터미널로 돌아오면 아래와 같이 전시된 것을 확인할 수 있다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
consumer-thread-1 >> a
consumer-thread-1 >> b
consumer-thread-1 >> c
consumer-thread-1 >> d
consumer-thread-2 >> e
consumer-thread-2 >> f
consumer-thread-1 >> aaa
consumer-thread-1 >> bbb
consumer-thread-0 >> cc
consumer-thread-1 >> ddddd
consumer-thread-1 >> 1
consumer-thread-1 >> 2
consumer-thread-0 >> 3
consumer-thread-0 >> 4
consumer-thread-0 >> 5
<=========----> 75% EXECUTING [3m 26s]
> :run
    
# 워커 어플리케이션 소스를 보면 쓰레드 네임과 벨류를 출력하게 되어 있다. 
# 그래서 위와 같이 쓰레드 3개가 폴링해서 데이터를 처리하고 있는 것을 확인할 수 있다.
# 각각의 쓰레드 0,1,2가 파티션 0,1,2에 각각 할당되어 폴링하고 있는 것이다.

# 어플리케이션을 안전하게 종료하고 싶을때는 어떻게 해야하나
# 먼저 클라이언트 ec2 터미널을 하나 새로 띄운다
# 그리고 아래와 같은 명령어를 실행시킨다.

# ConsumerWithMultiThread가 돌고 있는 것을 확인할 수 있다.
[ec2-user@ip-10-1-10-115 ~]$ jps
16930 GradleDaemon
27762 ConsumerWithMultiThread
27720 GradleWrapperMain
14058 ConsoleProducer
28267 Jps

[ec2-user@ip-10-1-10-115 ~]$ kill -term 27762

# 그런다음에 다시 컨슈머 어플리케이션이 띄워진 터미널로 이동해보면
# bye가 나오고 각각의 쓰레드가 trigger가 걸리면서 wakeupexception이 발동한 것을 확인할 수 있다.
# 이러면 브로커도 안전하게 종료가 되었다는것을 인지하게 된다.
# 상당한 시간이 걸리는 데이터 처리작업이 있을수록 이렇게 안전하게 종료하는게 중요하다
# 이런식으로 wakeupexception, shutdown, close까지 완벽하게 해줘야 한다.
[ec2-user@ip-10-1-10-115 kafka-consumer-multi-thread]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
consumer-thread-1 >> a
consumer-thread-1 >> b
consumer-thread-1 >> c
consumer-thread-1 >> d
consumer-thread-2 >> e
consumer-thread-2 >> f
consumer-thread-1 >> aaa
consumer-thread-1 >> bbb
consumer-thread-0 >> cc
consumer-thread-1 >> ddddd
consumer-thread-1 >> 1
consumer-thread-1 >> 2
consumer-thread-0 >> 3
consumer-thread-0 >> 4
consumer-thread-0 >> 5
Bye
consumer-thread-0 trigger WakeupException
consumer-thread-1 trigger WakeupException
consumer-thread-2 trigger WakeupException

> Task :run FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':run'.
> Process 'command '/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.amzn2.0.1.x86_64/bin/java'' finished with non-zero exit value 143

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 10m 13s
2 actionable tasks: 1 executed, 1 up-to-date

# Consumer lag

컨슈머의 마지막 커밋 offset과 토픽의 마지막 offset을 말한다. 즉, 아래 그림과 같이 토픽의 마지막 offset이 123이고, 컨슈머가 처리한 offset이 117이면 그 둘의 차이인 6이 컨슈머 랙이다.

만약에 producer가 보내는 데이터가 컨슈머가 처리하는 양보다 많다면 lag이 늘어날 것이다. 그러면 producer는 주구장창 계속 데이터를 보내기만 하는데 컨슈머가 못따라가면 lag이 늘어날 것이다. 지연을 어느정도 허용하는 비지니스 환경이라면 상관없지만 예를 들어 이벤트, 선착순 이런 비지니스 환경에 적용해야 할때는 적합하지 않다.

따라서 컨슈머 랙을 모니터링 하는 것은 정말 중요하다.

image

  • 컨슈머 랙은 컨슈머의 상태를 나타내는 지표.

  • 컨슈머 랙의 최대값은 컨슈머 인스턴스를 통해 직접 확인할 수 있음

직접 확인할 수 있다는 것은 코드에서 확인할 수 있다는 것이다.

consumer.metrics()를 통해 확인할 수 있는 지표가 있는데 그 중에 하나가 records-lag-max라는 것이다. 이는 토픽의 파티션 중 최대 랙을 의미한다. 다시말해서 파티션이 3개가 있든 20개가 있든 그 중에서 최대 랙을 확인할 수 있다. 이외에도 fetch-size-avg(1번 polling하여 가져올 때 레코드 byte평균), fetch-rate(1초 동안 레코드 가져오는 회수) 등을 확인할 수 있다.

## 아래와 같이 consumer.metrics() 메서드를 통해서 메트릭 네임을 통해 
## 각각의 벨류를 확인할 수 있다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    Map<MetricName, ? extends Metric> metrics = consumer.metrics();
    for (MetricName metric : metrics.keySet()) {
        System.out.println(metric.name() + " is " + metrics.get(metric).metricValue());
    }
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

그러나 이렇게 인스턴스를 통해 확인하는 것은 문제가 있다.

왜냐하면 파티션은 여러개고 렉이 다양하게 발생할 것이기 때문에

인스턴스에 장애가 발생하면 지표수집자체가 불가능하다.

왜냐하면 지표를 수집하는 어플리케이션 자체가 죽어버리면 컨슈머 렉이 어디까지 잡혀있는지 알 수 없기 때문이다. 수집도 안되고, 폴링도 안되고, 메트릭에 대한 로그도 안남을거기 때문이다.

  • 컨슈머 컨슈머 인스턴스를 통한 컨슈머 랙 수집의 문제점

  • 컨슈머 인스턴스 장애가 발생하면 지표 수집 불가능

  • 구현하는 컨슈머마다 지표를 수집하는 로직 개발 필요

구현하는 컨슈머마다 렉을 수집해야 한다. 예들들어서 내가 하둡에 저장하는 컨슈머를 만들었는데 이 컨슈머 렉을 수집하는 혹은 기록하는 어플을 따로 만들어줘야 하고, 일라스틱 서치에 저장하는 컨슈머를 만들면 이 컨슈머 렉을 수집하는 어플을 따로 만들어줘야 한다.

  • 컨슈머 랙 최대값(records-lag-max) 만 알 수 있음

토픽에 파티션은 n개가 있을 수 있음.

최대값을 제외한 나머지 파티션의 컨슈머 랙은 알 수 없음

실제로는 파티션별로 렉이 얼만큼 되는지 알고 싶은 경우가 많을 것이다. 왜냐하면 프로듀서가 특정파티션에만 데이터를 넣을수도 있기 때문이다. 이 경우에 파티션별로 컨슈머 렉을 알고 싶으니까 이를 해결하기 위해서 나온게 외부의 모니터링 어플리케이션이 있다.

image

# 컨슈머 랙 모니터링

  • 외부 모니터링 애플리케이션을 사용권장

대표주자

Confluent Platform, Datadog, Kafka Burrow(Open source)

  • 카프카 버로우

sk 플레닛에서 활용하고 있는 툴

https://github.com/linkedin/Burrow

인스턴스 통해서 각각의 렉을 수집을 안해도 되고, 카프카 브로커와 붙어서 전체렉을 수집할 수 있다.

Linkedin에서 오픈소스로 제공하는 컨슈머 랙 체크 툴

버로우 실행 시 Kafka, Zookeeper정보를 통해 랙 정보 자체 수집

슬라이딩 윈도우를 통한 컨슈머 상태 정의

1) OK : 정상

2) ERROR : 컨슈머가 polling을 멈춤

3) WARNING : 컨슈머가 polling을 수행하지만 lag이 계속 증가

위와 같은 지표를 통해서 컨슈머의 상태를 파악할 수 있다.

  • 버로우 설치 및 운영방법

https://blog.voidmainvoid.net/279

# 기타 주의사항

토픽의 파티션은 늘리는 건 가능하지만, 줄이는 건 불가능하다.

무작정 파티션을 늘리는 것도 바람직하지는 않다. 왜냐하면 파티션이 컨슈머랑 할당되는 리벨런싱 과정에서 파티션이 늘어나면 늘어날수록 시간이 더 많이 걸린다.

관련해서 컨플루언트에서 article을 많이 발표하고 있으니까 적절한 파티션 갯수, 적절한 컨슈머 갯수를 지정하는 것이 중요하다.