Kafka 설치 및 기본 CLI 활용

2020-07-20

.

Data_Engineering_TIL(20200720)

  • 학습한 프로그램 : T아카데미 Apache Kafka 입문과 활용

  • URL : https://tacademy.skplanet.com/live/player/onlineLectureDetail.action?seq=183

  • 이전내용 참고자료 : https://minman2115.github.io/DE_TIL101

[Question]

1) 고가용성을 위한 옵션으로 브로커 갯수와 레플리케이션 펙터는 수를 동일하게 줘야하는 건지?

일단 브로커 갯수 이상으로는 replication-fcator를 지정할 수 없다. 예를들어서 브로커가 1대면 replication-factor는 여러개를 할 수 없을 것이다. 왜냐하면 브로커가 1대이기 때문이다. 또한 replication-factor가 많은게 무조건 좋은 것은 아니다. ISR 팔로우 파티션이 리더 파티션으로부터 offset을 맞추려고 데이터를 가져오게 되는데 이때 가져오는 과정에서 네트워크를 타게 된다. 그런과정에서 ISR이 안맞게 되는 경우도 있기 때문에 일반적인 경우는 replication-factor를 2나 3으로 설정한다.

** sk 플래넷에서는 replciation-factor를 3으로 설정해서 운영중에 있다.

2) Kafka Mirror maker 복제시 partition offset은 첫번째부터 복제하나요?

첫번째 offset부터 할수도 있고, 가장최신의 offset부터 할 수도 있다.

3) AWS kinesis와 Kafka는 어떤 차이점이 있나요?

kinesis는 컨슈머 그룹으로 가져가는 것이 아니고, 온프라미스에서도 사용이 불가한점이 있다. 처리량에서도 차이가 있다고 한다.

[EC2 한대에 카프카 브로커 1대를 구성(설치 및 실행)하는 실습]

STEP 1) AWS EC2 생성

  • Amazon linux AMI 2

  • t3.small

  • 일반적인 퍼블릭 서브넷

  • 해당 EC2의 22번, 9092포트 개방

STEP 2) 생성한 EC2에 22번포트 SSH 접속

STEP 3) 아래와 같이 명령어를 실행하여 자바 jdk 설치

sudo yum update -y
sudo yum install -y java-1.8.0-openjdk-devel.x86_64

STEP 4) 아래와 같이 명령어를 실행하여 자바 바이너리 파일 다운로드

wget http://mirror.navercorp.com/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
    
# 그 다음에 ls 명령어로 kafka_2.12-2.5.0.tgz 가 정상적으로 다운로드 받아졌는지 확인

# 그 다음에 타르파일 압축해제
tar xvf kafka_2.12-2.5.0.tgz

압축해제후 아래와 같이 파일이 잘 생성되었는지 확인

[ec2-user@ip-10-1-10-111 ~]$ ls
kafka_2.12-2.5.0  kafka_2.12-2.5.0.tgz

[ec2-user@ip-10-1-10-111 ~]$ cd kafka_2.12-2.5.0/

[ec2-user@ip-10-1-10-111 kafka_2.12-2.5.0]$ ls
bin  config  libs  LICENSE  NOTICE  site-docs

STEP 5) kafka heap size 조정

카프카 2.5.0 버전은 디폴트로 1기가의 힙메모리가 설정되어 있다. 그러나 우리가 생성한 t3.small 사양을 고려해서 1기가에서 400메가로 약간 낮춰주도록 환경변수를 바꿔주는 것이다. 변경없이 그냥 실행하게 되면 메모리가 부족하다는 메세지가 뜰 수도 있다. 그래서 사양이 좋은 ec2를 쓸 경우 굳이 안바꿔줘도 된다.

export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"

STEP 6) kafka의 server.properties 파일내용 일부 수정

# 먼저 kafka_2.12-2.5.0/config 디렉토리로 이동

sudo vim server.properties

내용중에 broker.id=0 으로 되어있는데 브로커 아이디는 항상 고유해야 한다는 점을 유의해야 한다. 예를 들어서 카프카 클러스터 내에서 broker.id가 0인 것은 고유하게 하나여만 한다. 따라서 브로커를 여러개를 띄울 경우 아이디를 어떻게 부여할지도 고민해야 한다. 일단은 테스트로 1개만 띄울거기 때문에 그냥 둔다.

listeners=PLAINTEXT://:9092 맨앞에 #을 지워서 포트를 열어주도록 각주를 제거해준다.

advertised.listeners=PLAINTEXT://your.host.name:9092 이것도 맨앞에 #를 지워서 각주를 제거해준다.

그 다음에 advertised.listeners=PLAINTEXT://your.host.name:9092의 your.host.name에 ec2의 public ip를 치환해준다.

ex) advertised.listeners=PLAINTEXT://13.125.218.26:9092

그런 다음에 저장하고 vim을 빠져나온다

** config/server.properties 설정파일 내 config 참고사항

  • broker.id : 정수로 된 브로커 번호. 클러스터 내 고유번호로 지정

  • listeners : kafka 통신에 사용되는 host:port

  • advertised.listeners : Kafka client가 접속할 host:port

  • log.dirs : 메세지를 저장할 디스크 디렉토리. 세그먼트가 저장됨

  • log.segment.bytes : 메세지가 저장되는 파일의 크기 단위

  • log.retention.ms : 메세지를 얼마나 보존할지 지정. 닫힌 세그먼트를 처리. sk플레닛에서는 3일로 지정해서 쓰고 있음, 3일 이상인 경우 삭제하도록 처리함

  • zookeeper.connect : 브로커의 메타데이터를 저장하는 주키퍼의 위치

  • auto.create.topics.enable : 자동으로 토픽 생성여부

  • num.partitions : 자동생성된 토픽의 default partition 갯수

  • message.max.bytes : kafka broker에 쓰려는 메세지 최대 크기

STEP 7) 카프카 구동을 위한 주키퍼 실행

일반적으로는 주키퍼가 3대 이상의 앙상블로 이루어져야 하지만 1대로도 실행이 가능하도록 할 수 있다.

아래와 같이 명령어를 실행해준다.

# 먼저 kafka_2.12-2.5.0 디렉토리로 이동

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

아래와 같이 jps로 자바 프로세스를 확인했을때 QuorumPeerMain가 구동중에 있으면 정상이다.

[ec2-user@ip-10-1-10-111 kafka_2.12-2.5.0]$ jps
21657 Jps
21631 QuorumPeerMain

STEP 8) 아래와 같은 명령어로 카프카 실행

아래 명령어를 실행하면 백그라운드로 카프카가 실행될 것이다.

bin/kafka-server-start.sh -daemon config/server.properties

마찬가지로 jps 명령어를 실행했을때 아래와 같이 kafka가 돌고 있는 것을 확인할 수 있다.

[ec2-user@ip-10-1-10-111 kafka_2.12-2.5.0]$ jps
22022 Kafka
22092 Jps
21631 QuorumPeerMain

또한 혹시 오류가 나는건 아닌지 아래와 같은 명령어로 로그파일을 확인해보자.

tail -f logs/*

[다른 EC2를 띄워서 위에서 구동한 브로커와 연동하여 토픽도 생성해보고, 데이터도 넣어보자]

STEP 1) AWS EC2 생성

  • Amazon linux AMI 2

  • t3.small

  • 일반적인 퍼블릭 서브넷

  • 해당 EC2의 22번, 9092포트 개방

STEP 2) 생성한 EC2에 22번포트 SSH 접속

STEP 3) 아래와 같이 명령어를 실행하여 자바 jdk 설치

sudo yum update -y
sudo yum install -y java-1.8.0-openjdk-devel.x86_64

STEP 4) 아래와 같이 명령어를 실행하여 자바 바이너리 파일 다운로드

wget http://mirror.navercorp.com/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
    
# 그 다음에 ls 명령어로 kafka_2.12-2.5.0.tgz 가 정상적으로 다운로드 받아졌는지 확인

# 그 다음에 타르파일 압축해제
tar xvf kafka_2.12-2.5.0.tgz

STEP 5) 아래와 같은 명렁어를 실행하여 토픽을 생성해본다.

# 먼저 kafka_2.13-2.5.0의 bin 디렉토리로 이동한다.

./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test
# ex) ./kafka-topics.sh --create --bootstrap-server 13.125.215.168:9092 --replication-factor 1 --partitions 3 --topic test

실행하면 아래와 같이 create topic test 라는 메세지가 뜰 것이다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-topics.sh --create --bootstrap-server 13.125.215.168:9092 --replication-factor 1 --partitions 3 --topic test
Created topic test.

STEP 6) 아래와 같은 명렁어를 실행하여 토픽에다가 데이터를 주입해보자.

콘솔 프로듀서로 데이터를 집어넣는 것이다.

./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
    
# ex) ./kafka-console-producer.sh --bootstrap-server 13.125.215.168:9092 --topic test

그러면 아래와 같이 콘솔 프로듀서로 토픽을 날릴 수 있도록 꺽쇠가 뜨는데 원하는 내용을 막 날려본다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-console-producer.sh --bootstrap-server 13.125.215.168:9092 --topic test
>hello minman
>minman zzang
>go home
>vation
>vacation
>

STEP 7) 위의 명령어를 날렸던 EC2의 터미널을 하나 더 띄워서 test 토픽에 있는 데이터를 한번 확인해본다.

# 먼저 kafka_2.13-2.5.0의 bin 디렉토리로 이동한다.

./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning
    
# ex) ./kafka-console-consumer.sh --bootstrap-server 13.125.215.168:9092 --topic test --from-beginning

위에 명령어에서 –from-beginning 옵션은 가장 최초의 offset부터 데이터를 가져와 달라는 옵션이다.

실행하면 아래와 같이 데이터 내용을 확인할 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-console-consumer.sh --bootstrap-server 13.125.215.168:9092 --topic test --from-beginning
hello minman
vacation
go home
minman zzang
vation

위에 명령어는 콘솔컨슈머를 띄워서 프로듀서에서 생성한 데이터를 브로커에 쏘면 컨슈머가 데이터를 폴링해서 가져오는 것이다.

컨트롤+c를 눌러서 위의 콘솔 컨슈머를 빠져나온다.

STEP 8) group에 testgroup이라고 지정해서 콘솔 컨슈머로 데이터를 폴링해오는 실습을 해보자

아래와 같이 명령어를 실행해보자.

./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test -group testgroup --from-beginning
    
# ex) ./kafka-console-consumer.sh --bootstrap-server 13.125.215.168:9092 --topic test -group testgroup --from-beginning

실행하면 아래와 같이 결과를 얻을 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-console-consumer.sh --bootstrap-server 13.125.215.168:9092 --topic test -group testgroup --from-beginning
hello minman
vacation
5
go home
minman zzang
vation
1
2
3
4

위의 실행결과에서 흥미로운점은 컨슈머가 데이터를 폴링해 왔는데 offset 순서가 뒤죽박죽 되었다.

왜 그러냐면 파티션 3개에 데이터가 들어가는데 데이터를 가져가는 것은 순서가 없다. 왜냐하면 파티션 3개를 만들고 데이터를 넣었는데 컨슈머가 파티션 3개에 할당이 된다는것 뿐이지 가져가는 것은 순서를 따지지 않기 때문에 실제로 데이터가 들어간 순서와 다르다.

그러면 들어온 순서를 맞춰주고 싶다면 파티션을 하나만 하면 된다. 그러면 자료구조 큐와 유사한 형태기 때문에 들어온 순서대로 가져갈 수 있다.

그러면 컨트롤 + c를 눌러서 컨슈머에서 빠져나온다.

그 다음에 프로듀서를 띄워놓은 터미널로 이동해서 아래와 같이 a b c d e 를 차례대로 입력해본다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-console-producer.sh --bootstrap-server 13.125.215.168:9092 --topic test
>hello minman
>minman zzang
>go home
>vation
>vacation
>1
>2
>3
>4
>5
>a
>b
>c
>d
>e
>

아까 컨슈머를 띄웠던 콘솔로 이동해서 다시 아래와 같은 명령어를 실행하여 데이터를 컨슈밍하면 5까지 데이터가 컨슈밍 된 것이 기록이 되어 있기 때문에 새로입력한 a b c d e 알파벳들을 가져올 것이다. 그리고 역시 데이터가 순서가 뒤죽박죽인 것을 알 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-console-consumer.sh --bootstrap-server 13.125.215.168:9092 --topic test -group testgroup --from-beginning
e
c
d
a
b

그래서 위에서 testgroup과 같이 테스트 그룹을 지정해주면 그 테스트그룹이 컨슈밍한 기록에 따라 처리했던거 이후로 데이터를 가져오게 될 것이다.

그리고 역시 컨트롤 + c로 컨슈머를 빠져나온다.

그리고 아래와 같은 명령어를 실행하면 컨슈머 그룹 리스트를 확인할 수 있는데 testgroup이 존재하는 것을 알 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --list
testgroup

그러면 아래와 같은 명령어를 이용하여 테스트그룹의 상태를 한번 체크해보자.

--describe 옵션이 이 테스트그룹이 어떤 상태인지 확인해달라는 옵션이다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            1          4               4               0               -               -               -
testgroup       test            0          3               3               0               -               -               -
testgroup       test            2          8               8               0               -               -               -

Consumer group ‘testgroup’ has no active members. 는 현재 그룹이 active 멤버인지 여부

PARTITION 컬럼의 내용은 파티션 넘버를 말하는 것이다.

CURRENT-OFFSET은 컬럼은 이 파티션의 가장 최신의 offset이 뭔지의 정보이다.

LOG-END-OFFSET는 컨슈머 offset으로 컨슈머가 몇번 offset까지 가져갔는지의 정보이다.

LAG은 컨슈머 렉을 말하는건데 현재 컨슈머 offset과 마지막 offset의 차이를 뜻한다. 즉 이 렉이 많다는 것은 컨슈머가 들어있는 데이터보다 느리게 처리하고 있다는 의미다.

그러면 프로듀서 터미널로 이동해서 100 200 300을 순서대로 입력해본다.

그런 다음에 다시 컨슈머 콘솔로 이동해서 ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe을 날려보면 아래와 같다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            1          4               5               1               -               -               -
testgroup       test            0          3               3               0               -               -               -
testgroup       test            2          8               10              2               -               -               -

3개의 데이터를 프로듀싱해서 보냈는데 컨슈머가 아직 컨슈밍하지 않은 상태이기 때문에 렉이 총 3만큼 발생한 것을 확인할 수 있다. 그래서 카프카를 관리하는 엔지니어는 이 LAG 상태를 확인해서 프로듀싱하고 들어오는 데이터대로 잘 컨슈밍하는지 이 지표를 통해 확인하기도 한다.

그런데 만약에 이 testgroup으로 컨슈밍하다가 문제가 생겨서 offset을 거꾸로 되돌릴수 없는지에 대한 고민이 있을 수 있다.

아래와 같은 명령어로 실행하면 offset을 reset할 수 있다.

--reset-offsets --to-earliest 옵션은 가장 낮은숫자 가장 earlist한 offset으로 리셋해달라는 옵션이다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      test                           0          0
testgroup                      test                           1          0
testgroup                      test                           2          0

그리고 다시 ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe 명령을 실행해보면 current-offset이 0으로 리셋된 것을 확인할 수 있다. LAG도 가장 큰숫자로 늘어난 것을 확인 할 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            1          0               5               5               -               -               -
testgroup       test            0          0               3               3               -               -               -
testgroup       test            2          0               10              10              -               -               -

파티션을 동시에 가져가는 것이 아니라 특정 파티션만 조정하고 싶다면 아래와 같이 명령어를 사용해서 조절이 가능하다.

1번 파티션을 리셋을 하는데 2번 offset으로 한다는 의미이다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 2 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      test                           1          2

그리고 다시 ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe 명령을 실행해보면 1번파티션만 current offset이 2번으로 설정된 것을 확인할 수 있다.

[ec2-user@ip-10-1-10-228 bin]$ ./kafka-consumer-groups.sh --bootstrap-server 13.125.215.168:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            1          2               5               3               -               -               -
testgroup       test            0          0               3               3               -               -               -
testgroup       test            2          0               10              10              -               -               -