Kafka 기본개념 및 생태계

2020-07-17

.

Data_Engineering_TIL(20200717)

  • 학습한 프로그램 : T아카데미 Apache Kafka 입문과 활용

  • URL : https://tacademy.skplanet.com/live/player/onlineLectureDetail.action?seq=183

1. Kafka 기본개념 및 생태계

image

카프카가 등장하기 전에는 앤드투앤드 연결방식의 아키텍처를 많이 사용했다. 그렇기 때문에 데이터 연동의 복잡성이 컸고 운영요소가 많아졌다. 예를 들어서 위와같이 Mysql에서 하둡으로 보내고 nosql에서 하둡으로 보내는 등의 각기다른 파이프라인을 따로따로 만들어줘야 했다. 각각의 운영체제가 다르고 하드웨어가 다르고 그래서 장애가 일어나면 너무 많은 관리요소가 발생했다. 그래서 코드의 복잡성도 더 높아졌다.

링크드인에서 이런문제를 겪고나서 모든 시스템으로 데이터 전송을 실시간 처리도 가능하면서 데이터가 많아져도 확장이 용이한 시스템이 필요해서 카프카를 개발하게 되었다.

image

카프카가 개발되고 나서는 카프카가 마치 중추신경처럼 한군대로 데이터가 모였다가 원하는 destination으로 보내주게 되는데 이를 구현하기 위해서 프로듀서와 컨슈머를 분리하였다.

그리고 메세지 데이터를 여러 컨슈머에 허용하므로써 데이터가 카프카에 들어간 이후에는 여러번 사용할 수 있도록 하였다. 그리고 높은 처리량을 위해서 메세지를 최적화 하였고, 스케일 아웃을 가능하게 하였다. 스케일 아웃이 가능하다는 의미는 카프카 클러스터를 만들고 데이터가 많아지면 스케일 아웃을 하는건데 무중단으로 스케일 아웃이 가능하다. 또한 관련 생태계도 다양하게 제공하기 때문에 많은 기업에서 사용중에 있다.

image

sk 플레닛에서는 위와 같이 카프카가 활용되고 있다. log definition tool이라는 걸 통해 최종 데이터베이스인 하둡에 어떻게 저장될지 정의를 해주고 app이나 db 또는 was를 통해서 수집한 데이터를 카프카를 통해서 저장하고, 카프카에서 하둡으로 컨슈밍하는 과정을 거쳐서 스트리밍 데이터를 수집&저장하게 된다. 그런 다음에 EDA 툴인 하이브를 통해 데이터를 검색하거나 쿼리를 날리거나 이런식으로 db 데이터 등을 확인하고 있다. 스케쥴러나 메타 딕셔너리로 데이터 활용성을 높이고 있다.

2. Kafka broker

image

브로커는 실행된 카프카 어플리케이션 서버 중 1대를 말한다. 서버 한대에 어플리케이션이 구동되고 있다는 말이다. 물론 카프카 어플리케이션이 서버에 두대이상 떠있을수도 있다. 왜냐하면 카프카 어플리케이션은 JVM으로 올라가기 때문에 당연히 두개이상 뜰 수 있는데 실제로 이런식으로 운영하는 경우는 거의없다.

일반적으로는 3대 이상의 브로커로 클러스터를 구성한다. 반드시 홀수대를 맞출필요없고 3대 이상으로 운영이 가능하다. 또한 주키퍼와의 연동이 필요하다. 추후에는 카프카에서 주키퍼를 뺄것이라고 한다. 주키퍼와 연동할때 운영상 이슈도 있고 또한 같이 운영해야하다 보니까 카프카 브로커 클러스터 자체로써 다양한 정보들을 저장하려고 한다. 현재 카프카에서 주키퍼의 역할은 메타데이터를 저장하는 역할이다. 브로커 아이디나 컨트롤러 아이디 이런것들을 추후에는 카프카 클러스터의 브로커 안에 저장한다고 한다. 어쨌든 여전히 카프카는 주키퍼와의 연동이 필요하다.

이 여러대의 브로커 중에 한대는 컨트롤러 역할을 수행한다. 컨트롤러는 각 브로커에게 담당 파티션 할당을 수행하고, 브로커가 정상적으로 동작하는지 모니터링 하는 역할이다. 누가 컨트롤러인지는 주키퍼에 해당정보가 저장이 된다.

3. Record

image

위에 그림과 같이 ProducerRecord가 토픽, 키, 메세지를 지정해서 위와같은 형태로 보내면, ConsumerRecord를 통해서 토픽의 데이터를 다시 레코드로 받아온다. 레코드도 키와 벨류로 된 string으로 받아오게 된다.

** topic : 데이터가 보내지는 테이블 같은 저장소라고 보면된다.

왜 이런형식으로 하냐면 객체를 프로듀서에서 컨슈머로 전달하기에 카프카 내부에서 바이트 형태로 저장을 하기 때문이다. 그래서 직렬화와 역직렬화가 필요하다. 카프카에서 기본제공하는 것으로 StringSerializer나 shortserializer를 제공한다.

sk플래닛 데이터 플랫폼에서는 Key는 Null로 사용하고, value는 자체형식으로 된 제이슨을 사용한다. string으로 벨류값이 들어오면 바이트형태로 직렬화와 역직렬화가 이루어진다.

레코드에서 주의해야 할 점은 producer할때와 consumer할때 동일하게 직렬화와 역직렬화를 맞춰줘야 한다.

4. Topic과 partition

image

위에 그림을보면 파티션이 3개가 있는데 토픽의 파티션은 한개이상이 반드시 존재하여야 한다. 파티션 옆에 partition 0은 0부터 12까지, partition 1은 0부터 9까지, partition 2는 0부터 12까지 있는 것을 볼 수 있는데 이거는 각 파티션마다 offset이라고 하는 번호가 붙는다. 0번이 가장 오래된 번호이다. 그리고 12번이 가장 최신번호라고 할 수 있다. 메세지 처리순서는 파티션별로 유지 관리된다는 말은 파티션이 결국에는 큐이기 때문에 먼저들어온놈이 먼저나가는 구조이다. 파티션이 여러개인 경우는 메세지 처리속도가 완벽하게 들어간 순서대로 같을수가 없다.

5. Producer와 Consumer

image

프로듀서는 파티션의 offset이 지정된 레코드들을 보낸다. 각각의 다른 기능을 가진 컨슈머는 동일한 데이터를 여러번 가져갈 수 있다. 위에 그림을 예로들면 컨슈머 B가 11을 가져갔다는 얘기는 이미 0부터 10을 가져갔다는 의미기도 하다. 프로듀서는 레코드를 생성하여 브로커로 전송할때 offset이 지정된 레코드로 저장을 해서 보낸다. 예를들어서 offset이 12번이 지정이되면 컨슈머는 브로커로 레코드를 요청한다. 레코드가 필요하다고 하면 poling 기준으로 가져간다. 절대로 브로커가 컨슈머로 보내는 개념이 아니다.

6. Kafka log and segment

image

이렇게 보낸 레코드는 실제로는 파일시스템 단위로 저장이 된다. 메세지가 저장될때는 segment 파일이라는 걸로 저장되는데 이 파일은 시간 또는 크기 기준으로 닫치게 된다. 닫힌 이후에는 브로커에 설정된 또는 토픽에 설정된 일정시간 혹은 용량에 따라 삭제 또는 압축이 된다. 즉 세크먼트로 적재된 레코드들이 일정 시간이나 용량을 기준으로 삭제가 되면 더이상 그 해당 레코드를 사용할 수 없다. 그래서 카프카에 들어간 데이터는 영구적으로 쓸수도 있지만 일반적으로는 기간이나 용량에 대한 제한 옵션을 주게 된다.

세그먼트 파일은 예를들어서 0000.index 또는 0000.timeindex 이런식으로 저장이 된다.

7. 파티션 3개인 토픽과 컨슈머 1대

image

토픽이나 파티션에 데이터가 들어가고 프로듀서와 컨슈머가 어떻게 동작하는지에 대한 예제 케이스이다. 위의 경우는 토픽에 파티션이 3개 있는 경우이다. 프로듀서 1개가 파티션에 데이터를 보내고 있는 형태이다. 그리고 컨슈머 1개가 파티션 3개에 할당된 것을 알 수 있다. 컨슈머는 파티션으로부터 파티션 0번,1번, 2번에 할당이 되어서 파티션 3개의 데이터를 계속해서 폴링해간다.

8. 파티션 3개인 토픽과 컨슈머 3대

image

토픽이 3개이고 컨슈머가 3개이면 각각의 파티션이 컨슈머가 할당이 되어서 1대1 매칭이 된다. 그래서 컨슈머는 각 파티션의 데이터를 가져가게 된다. 그래서 토픽에 있는 모든 파티션이 할당이 되고 컨슈머는 같이 일을 하게 된다. 컨슈머는 파일을 저장한다던가 s3에 저장한다던가 처리를 하게 될텐데 컨슈머 1대랑 그 컨슈머가 처리를 하는 프로세스 시간은 한정적일 것이다. 그래서 컨슈머를 여러대로 해서 병렬처리를 한다면 각 파티션의 데이터를 각각의 쓰레드 혹은 프로세스가 실행하면 더욱 빠른 속도로 데이터를 처리할 수 있을 것이다.

9. 파티션 3개인 토픽과 컨슈머 4대

image

컨슈머가 4대가 되면 파티션은 더이상 컨슈머에 할당이 되지 못하고 컨슈머 1대가 놀게될 것이다. 그래서 컨슈머는 반드시 파티션 갯수보다 같거나 혹은 작게 만들어야 한다. 컨슈머 갯수가 많다면 나머지 컨슈머는 놀게 된다. 그래서 이런구조는 좋지 못하다. 각각의 4개의 컨슈머가 같은 컨슈머 그룹 안에 들어 있을때를 가정한 것임을 유의해야 한다.

10. 컨슈머 3대 중 1대 장애 발생

image

컨슈머 2번 1대가 장애가 났다고 가정하자. 그러면 컨슈머 1번이 2개의 파티션이 할당이 된다. 그래서 모든 파티션이 끝가지 데이터를 처리할 수 있도록 재할당되는 과정이다. 이걸 리벨런스라고 한다. 리벨런스가 되고 나면 파티션이 새로운 컨슈머로 할당이 되는데 그러면 끊김없이 데이터를 처리하게 된다. 참고로 리벨런스가 발생하게 되면 할당하는 과정이 중단이 된다. 어떤 파티션은 어떤 컨슈머에 할당이 될지 모르는 상태이기 때문에 할당하는 과정에서 중단이 일어나니까 리벨런스 리스닝과 같이 이런 것들을 기록을해서 운영을 할때 리벨런스가 발생할때 얼마나 얼마나 못듣는지 이런거를 참고하기도 한다.

11. 두개 이상의 컨슈머 그룹

image

이 경우 컨슈머 a그룹은 컨슈머를 한개만 갖고 있고, b도 마찬가지로 1개를 가지고 있다. 이렇게되면 토픽안에있는 파티션 데이터를 컨슈머가 각각 따로따로 목적에 따라 컨슈머 그룹을 분리해서 처리할 수 있다. 컨슈머 a의 1번이 파티션 0번의 12번을 처리해도 컨슈머 b의 1은 컨슈머 a의 1번과 상관없이 컨슈머 0번의 12번을 처리할 수 있다. 그래서 목적에 따라 이런식으로 컨슈머 그룹을 분리할 수 있다.

sk 플레닛 같은 경우에는 장애에 대응하기 위해 컨슈머 그룹을 분리했다. 재입수(또는 재처리) 목적으로 임시 신규 컨슈머 그룹을 생성하여 사용하고 있다.

12. 어플리케이션 로그 적재용 컨슈머 그룹 2개

image

어플리케이션 로그를 적재하는 시나리오를 가정해보자 목적에 따라 elasticsearch 와 Hadoop에 동시에 적재하는 형태인데 이렇게 목적에 따라 분리하면 운영을 조금 더 원활하게 할 수 있다.

13. 컨슈머 그룹 장애에 격리되는 다른 컨슈머 그룹

image

만약에 하둡 적재 컨슈머가 장애가 나서 적재지연 현상이 발생했다고 가정하자. 하둡에 적재를 못하는데 elasticsearh도 장애가 나면 안될것이다. 하둡 컨슈머에서 장애가 나서 먹통이 나더라도 elasticsearch consumer는 독립적인 컨슈머이기 때문에 elasticsearch에는 지장이 없을 것이다. 하둡이 복구가 되면 중단된 시점부터 다시 컨슈밍을 할 것이다. 그러면 언젠가는 두개의 컨슈머 모두 다시 최신의 offset을 컨슈밍 하고 있을 것이다.

14. Broker partition replication

image

위에 그림에서 명령어는 토픽을 생성하는 명령어이다. bin/kafka-topics.sh를 통해 토픽을 생성하거나 리스트를 보거나 수정을 할 수 있다. --bootstrap-server localhost:9092는 내 로컬컴퓨터에 띄워져 있는 카프카 브로커에 명령을 내린다는 의미인데 --create는 생성하겠다는 의미이고, topic name은 topic_name이라는 것이다. partition 3은 파티션 수를 3으로 지정하겠다는 의미이다.

브로커가 3개일 경우는 파티션이 위에 그림과 같이 균등하게 1개씩 만들어질것이다.

image

그런데 만약에 브로커 1번에 장애가 생기면 파티션 1번은 사용할 수 없을 것이다. 그렇다면 이런 kafka broker 이슈에 대응하기 위해 사용할 수 있는 방법은 뭐가 있을까

대응방안은 partition을 다른 브로커에 복제하여 이슈에 대응하는 방법이다. 1번 브로커에 이슈가 생기면 다른 브로커에 복제된 데이터를 사용하면 된다.

image

위에 그림과 같이 브로커를 생성할때 위와 같이 레플리케이션 갯수를 지정할 수 있다.

image

그러면 위에 그림과 같이 고가용성을 유지할 수 있다. sk 플레닛 역시 카프카 클러스터를 운영할때 replication을 3으로 주고 운영하고 있다.

15. 리더 파티션, 팔로워 파티션

image

원본 파티션을 그래서 리더 파티션이라고 부르고, 복제된 파티션을 팔로워 파티션이라고 부른다. 리더 파티션은 카프카 컨슈머나 프로듀서와 직접 클라이언트와 데이터를 직접 주고 받는 역할을 한다. 리더 파티션에 데이터가 지속적으로 쌓이면 팔로우 파티션도 이것을 캐치하고 리버파티션을 보고 offset을 복제를 하게 된다.

만약에 리더 파티션이 동작이 불가능 할 경우에는 팔로우 중에 하나가 리더로 선출이 된다.

16. ISR, 리더와 팔로워의 싱크

image

위의 예시는 팔로워 3개와 리플리케이션 3개로 이루어진 형태이다. 균등하게 분배되어 있는 형태인데 여기에 offset까지 모두 복제가 되어 있는 상태라면 이 상태를 ISR이라고 부른다.

만약에 팔로우 파티션에 offset이 0 ~ 90까지만 복제되어 있고, 리더 파티션이 0 ~ 100까지 복제되어 있는 상태라면 이때 리더 파티션이 장애가 발생하면 91 ~ 100 offset은 팔로우 파티션에 복제가 안되어 있는 상태라는 것이다. 이때 복제할때까지 기다리거나 복제 안하고 그냥 넘어갑시다 이렇게 할 수도 있다. 이런걸 처리하는게 unclean.leader.election.enable이다. 이 옵션은 기본으로 false로 되어 있다. 이게 false라는 의미는 ISR이 아닌 예를 들어 팔로우 파티션 3번이 리더 파티션으로부터 전부 복제가 안된 상태라면 리더파티션이 복구될때까지 기다리라는 의미이다. 이거를 만약에 True로 하면 어떻게 될까. 이거는 그냥 91번부터 100번 offset은 그냥 포기하고 계속 하던일을 처리하자는 의미다.

image

그래서 위와 같이 브로커 1번에 장애가 발생하면 파티션 1의 리더가 브로커 2 또는 3중에 하나의 파티션을 새로 리더로 할당할 것이다.

17. Kafka rack-awareness

image

하나의 파워로 여러개의 서버가 물려있는 단위를 rack이라고 한다. 그래서 하나의 렉에 파워가 나가버리면 여러대의 서버도 같이 셧다운 되는 것이다.

image

이런 형태로 만약에 카프카가 구성이 되어 있다면 1개의 Rack에 1개의 브로커를 넣는 옵션을 주게되면 위에 그림과 같이 다수의 Rack에 분산하여 브로커를 배치할 수 있다. 파티션 할당시나 레플리케이션 동작시에 특정 브로커에 몰리는 현상을 방지할 수 있다.

18. 왜 카프카 클러스터는 서버장애에 대응하는 로직이 다양할까?

image

19. Kafka의 핵심요소

  • Broker : 카프카 어플리케이션 서버 단위

  • Topic : 데이터 분리 단위. 다수 파티션 보유

  • Partition : 레코드를 담고 있음. 컨슈머 요청시 레코드 전달

  • Offset : 각 레코드당 파티션에 할당된 고유번호

  • Consumer : 레코드를 polling하는 어플리케이션

1) Consumer group : 다수 컨슈머를 묶음

2) Consumer offset : 특정 컨슈머가 가져간 레코드 번호

  • Producer : 레코드를 브로커로 전송하는 어플리케이션

  • Replication : 파티션 복제 기능

  • ISR : 리터 + 팔로우 파티션의 싱크가 된 묶음

  • Rack-awareness : server rack 이슈에 대응

20. Kafka Client

image

위에서 언급했던 producer나 consumer는 카프카 클라이언트라고 보면 된다. 이것들은 기본적으로 자바 라이브러리로 제공이 된다. 따라서 메이븐 같은걸로 임포트해서 사용할 수 있다. 자바가 아닌 서드 파티 라이브러리도 제공한다.

21. Kafka Streams

image

Exactly-once 처리 : 장애가 나더라도 각각의 offset record를 단 한번만 처리하는 것 –> 고가용성

22. Kafka connect

image

카프카 클라이언트에 자바로 프로듀서와 컨슈머로 직접 전부 구현해도 무방하다. 카프카 컨넥트는 미리제공된 임포트 또는 익스포트하는 어플리케이션이라고 보면된다. 이거를 통해서 소스시스템으로부터 타겟시스템으로 데이터를 보낼 수 있다. 코드없이 configuration으로 데이터를 이동시키는 것을 목적으로 하고 있다.

23. Kafka Mirror maker

image

카프카 소스 클러스터에서 타겟 클러스터에 토픽의 데이터를 전송하는 목적이 있다. 클러스터간에 토픽에 대한 모든 것을 복제하는 것을 목적으로 하고 있다.

24. 그 외에 Kafka 생태계를 지탱하는 서드파트 어플리케이션들

  • confluent/ksqlDB : sql 구문을 통한 스트림 데이터 프로세싱 지원

  • confluent/Schema Registry : avro 기반의 스키마 저장소

  • confluent/REST Proxy : REST api를 통한 컨슈머/프로듀서

  • linkedin/Kafka burrow : consumer lag 수집 및 분석

  • yahoo/CMAK : 카프카 클러스터 매니저

  • uber/uReplicator : 카프카 클러스터 간 토픽 복제 (전달)

  • Spark stream : 다양한 소스(카프카 포함)로 부터 실시간 데이터 처리

  • 등등등

–> 아파치 라이센스가 아닌 일부 오픈소스의 경우 라이센스 이슈가 있을 수 있기 때문에 라이센스를 잘 확인해야 한다.

25. SK 플레닛의 카프카 사용 현황

image