kafka 기초개념 - youtube 최범균님 자료

2021-01-31

.

Data_Engineering_TIL(20210130)

[학습자료]

  • youtube ‘최범균’님 채널 “kafka 조금 아는 척하기 (개발자용)” 영상을 공부하고 정리한 내용입니다.

URL : https://www.youtube.com/watch?v=0Ssx7jJJADI

[개요 및 동작원리]

  • kafka 홈페이지에 접속하면 카프라를 이렇게 소개하고 있다.

“분산 이벤트 스트리밍 플랫폼”

스트리밍 데이터를 처리하기 위한 플랫폼으로 소개하고 있는데 주목할만한 점은 하이 퍼포먼스 즉 고성능이라는 것을 강조하고 있다.

  • 기본구조

1

카프카를 사용하려면 크게 4가지 구성요소가 필요하다.

구성요소 1. 카프카 클러스터

메세지를 저장하는 저장소이다. 하나의 카프카 클러스터는 여러개의 브로커(서버)로 구성된다. 이 브로커들이 메세지를 분산해서 저장하고, 이중화 처리도 하고, 장애가 나면 대체를 하는 등의 역할을 수행한다.

구성요소 2. 주키퍼(클러스터)

카프카 클러스터를 관리. 주키퍼에 카프카 클러스터와 관련된 정보가 기록이 되고 관리된다.

구성요소 3. 프로듀서

카프카 클러스터에 메세지를 보내는 주체. 메세지를 카프카에 넣는 역할

구성요소 4. 컨슈머

메세지를 카프카에서 읽어오는 역할.

프로듀서가 카프카에 메세지를 넣고, 컨슈머는 카프카에 넣은 메세지를 읽어와서 필요한 처리를 하는 주체다.

그래서 카프카 클러스터는 데이터를 이동하는데 필요한 핵심 역할을 수행한다.

  • 토픽과 파티션

2

카프카에서 메세지를 저장하는 단위가 토픽이다. 토픽은 메세지를 구분하는 용도로 사용된다. 여러종류의 메세지가 있을때 이 메세지가 어떤 종류의 메세지인지를 구분할 필요가 있는데 이를 위해 사용하는 것이 토픽이다. 예를 들어서 뉴스용 토픽, 주문용 토픽 이런식으로 각각의 메세지를 알맞게 구분하기 위한 목적으로 사용된다. 그래서 토픽은 파일시스템의 폴더개념과 유사하다고 볼 수 있다.

토픽은 메세지를 구분하는 단위 : 파일시스템의 폴더개념과 유사

한개의 토픽은 한개 이상의 파티션으로 구성. 파티션은 메세지를 저장하는 물리적인 파일

그래서 프로듀서는 카프카에 저장할때 어떤 토픽에 저장해달라고 요청을 하게 되고, 컨슈머는 어떤 토픽에서 메세지를 읽어올래라고 요청을 해서 메세지를 가져오게 된다. 이렇게 프로듀서와 컨슈머가 토픽을 기준으로 메세지를 주고받게 된다.

  • 파티션과 오프셋, 메세지 순서

파티션은 추가만 가능한(append-only) file 개념임.(물론 카프카가 일부를 삭제할 수도 있고, 축약도 하기는 하지만 일반적으로는 그렇다는 거임)

각 메세지 저장위치를 오프셋(offset)이라고 함

프로듀서가 넣은 메세지는 파티션의 맨 뒤에 추가

컨슈머는 오프셋 기준으로 메세지를 순서대로 읽음

(예를 들어서 컨슈머가 특정 파티션에 3번 offset 부터 메세지를 읽으라고 하면 3번 offset부터 메세지를 순차적으로 읽게 된다)

메세지는 삭제되지 않음(컨슈머가 메세지를 읽어갔는지 여부에 상관없이 그대로 메세지가 유지된다는 것이다. 설정에 따라 일정시간이 지난 뒤에 삭제되는 개념)

3

  • 여러 파티션과 프로듀서

토픽은 여러개의 파티션으로 구성될 수 있다.그러면 여러개의 파티션에 메세지를 어떻게 저장하느냐.

프로듀서는 라운드로빈 알고리즘으로 각각의 파티션에 메세지를 저장하거나 또는 키를 이용해서 파티션별로 데이터를 저장하게 된다.

프로듀서가 카프카에 메세지를 전송할때 토픽의 이름뿐만 아니라 키도 지정할 수 있다. 키가 있는 경우에는 키의 해시값을 이용해서 저장할 토픽을 선택하게 된다. 그래서 같은 키를 갖는 메세지는 같은 파티션에 저장이 되고 이는 같은 키에 해당하는 메세지는 순서를 보장할 수 있다는 말이다.

4

  • 여러 파티션과 컨슈머

컨슈머는 컨슈머 그룹이라는 개념에 속하게 된다. 컨슈머가 카프카 브로커에 연결할때 나는 어떤 컨슈머 그룹인지 지정하게 되어 있다.

그리고 중요한게 한개의 파티션은 컨슈머그룹의 한개의 컨슈머에만 연결이 가능하다. 그룹에 속해있는 컨슈머들이 특정한 파티션 하나를 공유할 수 없다. 예를 들어서 그룹 A에 있는 컨슈머 1, 2는 파티션 0이나 파티션 1에만 연결할 수 있고 컨슈머 1과 컨슈머2가 파티션 0을 함께 공유한다거나 파티션 1을 함께 공유할수는 없다.

그래서 어떤게 가능해지냐. 한개의 컨슈머만 한개의 파티션에 연결할 수 있기 때문에 컨슈머 그룹 기준으로 파티션의 메세지가 순서대로 처리될 수 있는 것을 보장할 수 있다. 한개의 파티션이 한개의 컨슈머에만 연결할 수 있다는 제한사항은 컨슈머그룹 내에서만 적용되기 때문에 한개의 파티션을 서로다른 그룹의 컨슈머는 공유할 수 있다.

5

  • 카프카가 성능이 좋아서 개발자들 사이에서 부상하게 되었는데 왜 그러면 성능이 좋은지 알아보자.

이유 1) 카프카는 파티션 파일에 대해서 OS가 제공하는 페이지캐시 사용

그래서 파일 IO가 실제로는 메모리에서 처리되기 때문에 IO 속도가 빠를 수 밖에 없다.

파티션에 대한 파일 IO를 메모리에서 처리

서버에서 페이지캐시를 카프카만 사용해야 성능에 유리

이유 2) Zero Copy라는 개념을 사용한다.

Zero Copy를 사용하면 디스크에서 데이터를 읽어서 네트워크로 보내는 속도가 빨라지게 된다.

디스크 버퍼에서 네트워크 버퍼로 직접 데이터 복사

이유 3) 브로커가 컨슈머에 대해서 별로 하는 일이 없다.

컨슈머 추적을 위해 브로커가 하는 일이 비교적 단순

메세지 필터, 메세지 재전송과 같은 일은 브로커가 하지 않음

이런 것은 프로듀서, 컨슈머가 직접 해야함

브로커는 컨슈머와 파티션 간 매핑관리

이유 4) 묶어서 보내고, 묶어서 받을 수 있다. (배치처리가 가능)

프로듀서 같은 경우는 일정 크기만큼 메세지를 모아서 한번에 전송할 수 있고, 컨슈머는 일정크기 이상의 메세지를 모아서 데이터를 읽어올 수 있다.

그래서 낱개로 건건히 네트워크로 데이터를 보내고 네트워크로 데이터를 받는 것보다 처리량과 데이터 처리 효율성을 높일 수 있다.

이유 5) 카프카는 처리량(throughput)을 확장하기 쉽다.

1개의 장비에서 용량 한계가 오게되면 브로커와 파티션을 추가해주면 된다. 또는 컨슈머가 느리면 컨슈머와 파티션을 추가시키면 된다. 이런식으로 수평확장이 용이하다.

6

  • 카프카는 장애가 났을때 이를 대처하기 위해서 리플리카(복제)라는 개념을 사용한다.

리플리카 : 파티션의 복제본

복제수(replication factor) 만큼 파티션의 복제본이 각 브로커에 생김

예를 들어서 토픽을 생성할때 복제수를 2로 지정하면 아래 그림과 같이 동일한 데이터를 갖고 있는 파티션이 서로다른 브로커에 두개가 생긴다.

여러 파티션 중에서 하나가 리더가 되고 나머지는 팔로워가 된다.

프로듀서와 컨슈머는 리더를 통해서만 메세지를 처리

팔로워는 리더로부터 데이터를 읽어와서 저장하는 즉 복제하는 역할을 수행한다.

리더가 속한 브로커에 장애가 발생하면 다른 팔로워 중에서 하나가 리더가 된다.

그러면 프로듀서와 컨슈머는 새로운 리더를 통해서 다시 메세지를 처리할 수 있게 된다.

7

[프로듀서 기초개념]

  • 토픽에 메세지 전송하는 예시 코드
Properties prop = new Properties();
prop.put("bootstrap.servers","kafka01:9092,kafka01:9092,kafka01:9092");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);

producer.send(new ProducerRecord<>("topicname","key","value"));
producer.send(new ProducerRecord<>("topicname","value"));

producer.close();

먼저 properties를 이용해서 프로듀서가 사용할 속성(설정정보)을 지정함

이 설정정보에는 브로커 목록이나 키와 벨류를 직렬화 할때 사용할 시리얼라이저, 그외에 ack, 배치사이즈 이런 설정을 properties를 이용해서 지정을 하게 되고, 이 properties를 이용해서 카프카 프로듀서 객체를 생성한다.

카프카 프로듀서 객체는 send 메소드를 제공한다. 이 send 메소드에 producer record를 전달읋 하고, 바로 이 프로듀서 레코드가 카프카 브로커에 전송할 메세지가 된다.

프로듀서 레코드는 크게 두가지 방법으로 생성할 수 있는데 하나는 토픽이름과 키벨류를 사용해서 생성하는 방법이고, 또하나는 토픽이름과 벨류만 사용해서 생성하는 방법이다.

그리고 프로듀서를 다 사용했다면 close 메소드를 이용하여 닫아주면 된다.

  • 프로듀서의 기본 동작원리

8

send 메소드를 이용해서 레코드를 전송하면 먼저 시리얼라이저를 이용해서 바이트 배열로 변환을 하고, 파티셔너를 이용해서 그 메세지를 어느토픽의 파티션으로 보낼지 결정을 한다. 그리고 변환된 바이트 배열 메세지를 버퍼에 저장한다. 이때 버퍼에 바로 저장하는게 아니고 배치로 묶어서 메세지를 저장하게 된다. 카프카의 성능이 좋은 이유가 여기에서 나온다. 메세지를 여러개를 묶어서 보내기 때문에 전송 효율성이 올라가게 된다. 그리고 sender라는 주체가 메세지를 가져와서 카프카 브로커로 전송하게 된다.

9

sender는 별도의 쓰레드로 동작한다. sender는 배치를 차례대로 꺼내서 브로커로 보내는 역할을 하는데 배치가 찼는지 여부와 관계없이 읽어서 보낸다. sender는 배치를 브로커에 보내는 동안에 send 메소드를 통해 들어온 레코드는 계속 배치에 누적해서 쌓이게 된다. 따라서 sender는 sender 대로 배치를 꺼내서 브로커에 보내고, send 메소드는 send 메소드 대로 계속해서 메세지를 배치에 누적하게 된다. 그래서 이 둘은 서로 다른 쓰레드로 동작하기 때문에 메세지를 보내는 동안 배치가 쌓이지 않는다거나 또는 배치가 쌓이는 동안 sender가 메세지를 브로커로 보내지 않는 현상은 거의 일어나지 않을것이다.

그리고 sender는 배치가 다 차지 않아도 그냥 보내버린다. 이말은 무슨말이냐면 배치에 메세지가 1개가 있던 여러개가 있던지 sender가 배치를 보낼수만 있으면 그냥 보내버린다.

10

그래서 배치와 sender와 관련된 설정이 데이터 처리량에 영향을 주게 된다. 먼저 배치 사이즈 설정이 있는데 이거는 배치의 최대 크기를 지정한다. 그리고 이 크기만큼 메세지가 차면 배치를 바로 전송하게 된다. 그래서 배치사이즈가 너무 작으면 한번에 보낼 수 있는 메세지의 갯수가 줄고, 그렇게 되면 전송 횟수가 늘어나기 때문에 처리량이 떨어지게 된다.

linger.ms는 sender가 배치를 전송하는 대기시간을 지정하는 것이다. 그러면 이게 처리량과 어떤 관계가 있냐. 대기시간을 주면 그 시간만큼 기다리기 때문에 그 대기시간 만큼 배치에 다른 메세지가 쌓일 것이므로 한번에 더 많은 메세지를 보낼 수 있는 여지가 생긴다. 즉 linger.ms를 0이 아니라 30 또는 100 등 약간의 지연시간을 주게되면 그만큼 한번에 보낼 수 있는 메세지가 많아지니까 전반적인 처리량이 높아지는 효과를 볼 수 있게 된다.

아래 코드와 같이 producer.send 메소드로 레코드를 전송한 다음에 특별히 아무것도 하지 않으면 전송이 성공했는지 또는 실패했는지 알수없다. 전송실패에 대한 별도 처리가 필요없는 경우에는 아래 코드와 같이 그냥 send 메소드로 데이터를 보내면 끝나는 것이다.

producer.send(new ProducerRecord<>("topicname","value"));

반면에 메세지 전송성공 여부를 경우에 따라서는 반드시 확인해야할 경우도 있는데 이 경우에는 send 메소드가 리턴하는 Future를 사용하면 된다. Future의 get 메소드를 이용해서 성공결과를 알수 있다. 이 방법의 문제가 있는데 Future의 get 메소드를 사용하면 그 시점에서 블로킹이 된다. 즉 루프를 돌면서 아래의 코드를 실행한다면 하나의 메세지를 보내고 블로킹되고, 하나를 보내고 블로킹되고 이런 코드가 된다. 그래서 배치에 쌓이지 않는다. 배치에 메세지가 한개씩만 들어가는 것이다. 그래서 배치효과가 떨어지기 때문에 데이터 처리량도 떨어지게 된다. 대신에 메세지 건별로 확실하게 데이터 전송성공 여부를 확인할 수 있는 것이다. 비지니스 로직상 처리량이 낮아도 되는 경우에만 사용하는 것이 좋다.

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topicname","value"));
try{
    RecordMetadata meta = f.get(); //블로킹
} catch (ExecutionException ex){
}

메세지 전송성공 여부를 확인할 수 있는 또 다른 방법은 send 메소드에 callback 객체를 전달하는 것이다. 이 콜백객체는 전송이 완료되면 결과를 onCompletion 메소드로 받게된다. 이때 Exception 객체를 받게되면 전송에 실패한 것이다. 그래서 성공이냐 실패냐에 따라서 알맞은 후처리를 할 수 있다. 이 방식은 블로킹 하는 방식이 아니기 때문에 배치가 쌓이지 않는 단점이 없어서 처리량 저하가 없다고 할 수 있다.

producer.send(new ProducerRecord<>("topicname","value"),
   new Callback() {
      @Override
      public void onCompletion(RecordMetadata metadata, Exception ex){}
   }
   );
  • 프로듀서는 전송보장을 위해서 ack라는 설정을 사용한다.

11

1) ack = 0

서버응답을 기다리지 않음, 처리량은 다소 높아지겠지만 메세지 전송성공에 대한 보장도 없음

2) ack = 1

파티션의 리더에 저장되면 응답을 받음

리더 장애시 메세지 유실이 가능한 구조

예를 들어서 리더에 성공적으로 저장이 되어서 ack를 받았는데 그 상태에서 팔로워에 메세지가 복제되지는 않았다. 근데 마침 딱 이시점에 리더가 장애가 나면 리더에 저장된 메세지가 아직 팔로워에 복제되지 않은 상태에서 팔로워중에 하나가 리더가 될 것이다. 그러면 기존 리더에 저장이 되었던 메세지를 잃게 되는 것이다.

3) ack = all (또는 -1)

그래서 엄격하게 메세지 전송성공 여부에 대한 보장을 하고 싶다면 ack = all로 저장하면 되는데 이경우에는 모든 리플리카에 저장이 되었을때 비로소 성공응답을 받게된다. 물론 모든리플리카가 어떤거냐 라고 정의하는 것은 브로커 min.insync.replicas 설정에 따라 달라지게 된다.

  • ack + min.insync.replicas

12

min.insync.replicas는 브로커 옵션이다. 프로듀서의 ack 옵션이 all일때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카의 최소갯수를 지정할 수 있다.

13

예3 과 같은 설정을 할 경우는 조심해야 하는게 팔로워 하나라도 장애가 발생하면 항상 메세지 저장에 실패가 난다는 것이다. 그래서 일반적으로는 min.insync.replicas는 리플리카 갯수와 동일하게 지정하지 않는다. 팔로워중 하나라도 장애가 나면 ack = all일때 데이터를 저장할 수 없게 된다.

  • 프로듀서의 Error 유형

유형 1. 전송과정에서 실패

전송 타임아웃 (일시적인 네트워크 오류 등)

리더 다운에 의한 새 리더 선출 진행 중

브로커 설정 메세지 크기 한도 초가 등등

유형 2. 전송전에 실패

직렬화 실패, 프로듀서 자체 요청크기 제한 초과

프로듀서 버퍼가 차서 기다린 시간이 최대 대기시간 초과 등등

  • Error시 났을때 대응방법 : 전송을 재시도

브로커 응답이 타임아웃 났거나 리더가 일시적으로 없는 경우에는 기다렸다 메세지를 재전송하면 성공할수도 있다. 프로듀서는 기본적으로 재시도를 하는 옵션설정을 갖고 있다. 브로커에 전송하는 과정에서 Error가 날때 재시도가 가능한 Error에 대해서는 재전송을 시도한다. 그리고 send 메소드에서 exception이 발생하면 exception 타입에 따라서 직접 send 메소드를 재호출해도 되고, 콜백 메소드에서 exception이 있으면 exception 타입에 따라서 재시도가 가능한 exception인지 확인해서 send 메소드를 재호출하면 된다. 그런데 주의할 점은 무한정 재시도를 하면 안된다는 것이다. 재시도를 계속 시도한다는 것은 다음 보낼 메세지가 밀린다는 얘기이므로 재시도를 일정시간이나 횟수로 제한을 해서 전체적인 메세지가 밀리지 않도록 주의해야 한다.

  • Error시 났을때 대응방법 : 기록

추후에 별도로 다시 처리하기 위해서 디비등 어딘가에 남겨두는 것이다. 실패한 메세지를 별도 파일이나 디비에 남겨두고 추후에 수동이나 자동으로 보정작업을 진행하면 된다.

send() 메서드에서 익셉션이 발생하거나, 콜백에서 익셉션을 받는경우, 또는 future의 get() 메서드에서 익셉션 발생시 일반적으로 가능하다.

  • 재시도와 메세지 중복전송 가능성

14

메세지를 재전송할때 주의할점이 있다. 메세지가 중독 전송될 수 있다. 위에 그림과 같이 프로듀서가 메세지를 전송해서 브로커에 성공적으로 저장이 되었는데 ack 답신이 늦어져서 타임아웃으로 실패가 났을 수도 있다. 그런데 이때 프로듀서는 자기가 보낸 메세지가 실패인줄 알고 메세지를 다시보내게 되면 데이터 저장이 중복으로 발생하는 것이다. 따라서 재전송을 할때는 이런 가능성에 대한 것을 주의해야 한다.

참고로 enable.idempotence라는 속성을 지정하면 중복전송될 가능성을 줄일 수 있다고 한다.

  • 재시도와 순서

15

재시도는 전송순서를 바꾸기도 한다. 프로듀서 속성중에 max.in.flight.requests.per.connection이라는게 있는데 이 속성은 하나의 컨넥션에서 전송할 수 있는 최대 전송중 요청갯수를 지정하는 것이다.

예를 들어서 전송중인 요청갯수가 세개라고 해보면 배치1을 꺼내서 전송을 하는데 실패했다. 그리고 배치 2와 3을 보냈을때는 성공했다. 그리고 배치 1을 일정시간 있다가 재전송을 해서 성공했다. 이렇게 되면 원래 순서는 배치 1,2,3이 순차적으로 가야하는데 이 경우는 배치 2,3,1 순으로 가게 된 것이다. 따라서 재전송을 하게되면 이와 같이 데이터 저장 순서를 바꾸기도 한다. 정말로 전송순서가 중요한 경우에는 max.in.flight.requests.per.connection를 1로 설정해야 한다.

[컨슈머 기초개념]

  • 특정 토픽의 파티션에서 레코드를 조회하는 컨슈머 어플리케이션 구현 예시
Properties prop = new Properties();
prop.put("bootstrap.servers","localhost:9092");
prop.put("group.id","group1");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
consumer.subscribe(Collections.singleton("simple")); //토픽구독
while(조건){
    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecords<String,String> record:records){
        System.out.println(record.value()+":"+record.topic()+":"+record.partition()+":"+record.offset());
    }
}

consumer.close();

서버지정하고, 그룹 아이디 지정한 다음 메세지를 읽어와 역직렬화를 위한 시리얼라이저를 설정한다.

그런 다음에 카프카 컨슈머 객체를 생성한다 그리고 컨슈머의 subscribe 메소드를 호출한다. 이 메소드를 호출할때 내가 구독할 토픽목록을 전달한다. 특정 조건을 충족하는 동안 루프를 돌면서 컨슈머의 poll 메소드를 호출하고, 이 poll 메소드는 일정시간 동안 대기하다가 브로커로부터 컨슈머 레코드 목록을 읽어온다. 이렇게 읽어온 컨슈머 레코드를 루프를 돌면서 필요한 처리를 한다. 다 사용한 다음에는 close 메소드를 이용해서 종료처리를 한다.

  • 토픽의 파티션은 그룹단위로 할당된다.

이때 그룹은 앞선 코드에서 그룹 아이디라는 설정으로 지정한 값이 그룹이 된다. 파티션 갯수와 컨슈머 갯수는 밀접하게 관련되어 있다. 파티션 갯수보다 컨슈머 그룹이 많아지게 되면 컨슈머는 놀게 된다.

예를 들어 파티션이 두개있는 그런 토픽을 생각해보자. 파티션이 두개이고 컨슈머가 한개면(아래에 좌측그림) 이럴경우 컨슈머 한개가 두개의 파티션으로부터 데이터를 읽어오게 된다. 이 상황에서 컨슈머를 하나 더 추가하게 되면 아래에 가운데 그림처럼 각 컨슈머가 각 파티션에 연결이 된다. 즉 1대 1 매칭이 된다. 그런데 파티션 갯수보다 더 많은 컨슈머 갯수가 생기게 되면 이후로 생긴 컨슈머는 놀게 된다. (아래에 우측그림)

그래서 중요한점은 컨슈머 갯수가 파티션 갯수보다 커지면 안된다. 만약에 처리량이 떨어져서 컨슈머를 늘려야한다면 파티션 갯수도 함께 늘려야 한다.

16

  • 커밋과 오프셋

카프카 컨슈머를 잘 사용하려면 커밋과 오프셋에 대한 이해가 필요하다. 컨슈머의 poll 메소드는 이전에 commit한 offset이 있으면 그 offset 이후에 레코드를 읽어온다. 그리고 읽어온 다음에 마지막에 읽어온 레코드의 offset을 commit을 하고, 다음 poll 메소드를 실행하면 앞서 commit한 offset 이후로 레코드를 읽어온다. 또 읽어온 레코드 offset을 commit을 한다. 이런 과정을 계속 반복한다.

17

  • 커밋된 오프셋이 없는 경우

poll 메소드로 레코드를 읽어오려고 하는데 처음 접근해서 poll을 하거나 그럴때 커밋된 오프셋이 없을수도 있는데 이 경우에는 auto.offset.reset 이라는 설정을 사용한다. 이 설정은 세가지 값을 지정할 수 있다. earliest라는 옵션을 사용하면 맨처음 offset 부터 읽어오기 시작한다. latest를 사용하면 가장 마지막 오프셋을 사용한다(default 값). 그리고 none을 지정하면 exception이 난다. 그래서 일반적으로는 none을 사용하지 않고, earliest나 latest를 사용한다.

18

  • 컨슈머가 조회하는데 영향을 주는 주요설정

1) fetch.min.bytes : 브로커가 전송할 최소 데이터 크기를 지정하는 설정

즉 poll 메소드로 브로커에 요청을 하면 브로커가 이 설정값 이상의 데이터가 쌓일때까지 기다렸다 보낸다.

그래서 이 값이 크면 대기 시간은 늘지만 처리량이 증가하는 효과를 볼 수 있다.

기본값 : 1

2) fetch.max.wait.ms : 데이터가 최소 크기가 될때까지 기다릴 시간

데이터가 최소크기를 충족하지 않는다고 무한정 기다릴수는 없으니까 기다리는 최대의 시간을 지정하는 것이다.

기본값 : 500 ms(500 밀리 세컨드 즉, 0.5초)

그런데 주의할점은 fetch.max.wait.ms는 브로커가 최소 데이터를 모으기까지 대기하는 시간이라는 것이다. 브로커가 리턴할때까지 대기하는 시간으로 poll() 메서드의 대기시간과 다름

3) max.partition.fetch.bytes : 파티션당 브로커가 리턴할 수 있는 최대 크기

이 최대 크기가 넘어가면 바로 리턴을 하게 된다.

기본값 : 10485576 (1MB)

  • 자동 커밋/수동 커밋

컨슈머가 레코드를 읽어온 다음에 마지막 레코드의 오프셋을 커밋한다고 했는데 이 커밋에는 크게 두가지 종류가 있다.

자동으로 하는 커밋과 수동으로 하는 커밋이 있는데 이 둘중에 뭘 사용할건지는 enable.auto commit 설정으로 지정할 수 있다.

1) enable.auto commit 설정

true : 일정주기로 컨슈머가 읽은 오프셋을 커밋 (기본값)

여기서 이 일정주기는 아래에 auto.commit.interval.ms 로 설정할 수 있다.

false : 수동으로 커밋 실행

2) auto.commit.interval.ms : 자동 커밋 주기

기본값 : 5000 (5초)

3) 그러면 언제 자동 커밋이 이루어지냐

poll(), close() 메서드 호출 시 자동커밋 실행

4) 수동커밋 예시 : 동기/비동기 커밋

동기커밋 예시

commitSync 메소드를 이용한 동기 커밋이 있다. 커밋에 성공하면 exception이 발생하지 않고, 실패하면 exception이 발생한다. 커밋에 실패하면 exception을 catch해서 알맞게 처리를 해줘야 한다.

ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecords<String,String> record:records) {
    ... 처리
}
try {
    consumer.commitSync();
} catch(Exception ex) {
    // 커밋실패시 애러발생
}

비동기 커밋 예시

commitAsync 메소드를 이용해서 비동기 처리도 가능하다. 비동기이므로 코드 자체에서 바로 실패여부를 알 수 없다. 만약에 성공실패를 알고 싶다면 callback을 받아서 처리해야 한다.

ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecords<String,String> record:records) {
    ... 처리
}
consumer.commitAsync(); //commitAsync(OffsetCommitCallback callback)
  • 컨슈머 구현시 주의할 점 : 재처리와 순서

카프카를 사용할때 주의해야 할 점은 컨슈머가 동일한 메세지를 중복해서 읽어올 수 있다는 것이다. 일시적으로 commit에 실패했다거나 새로운 컨슈머가 추가되어서 혹은 컨슈머가 빠져서 리벨런스가 발생하는 경우에 발생한다.

그래서 컨슈머는 멱등성을 고려해서 구현해야한다. 예를 들어서 ‘조회수 1증가’ 메세지, ‘좋아요 1증가’ 메세지, ‘조회수 1증가’ 메세지를 순서대로 읽어왔는데 동일한 순서대로 다시 메세지가 조회될수 있다는 것이다. 이 경우에 멱등성을 고려하지 않고 처리하게 되면 조회수가 2가 아니라 4가 된다.

그래서 컨슈머는 데이터 특성에 따라서 타임스탬프나 일련번호등을 활용해서 데이터를 중복해서 두번이상 처리해도 문제가 없도록 구현해줘야 한다.

  • 카프카는 컨슈머 그룹을 알맞게 유지하기 위해서 몇가지 설정을 사용한다.

그 중에 대표적인게 하트비트와 세션 타임아웃이라는 개념이다.

컨슈머는 하트비트를 계속해서 브로커에 전송하고 이를 통해서 연결을 유지한다. 브로커는 일정시간동안 컨슈머로부터 하트비트가 없으면 해당 컨슈머를 그룹에서 빼버린다. 그리고 리벨런스를 진행한다.

하트비트 연결과 관련된 설정은 두가지가 있다.

session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)

지정한 시간동안 하트비트가 없으면 컨슈머가 올바르지 않다고 판단하고 해당 컨슈머를 빼버리게 된다.

heartbeat.interval.ms : 하트비트 전송 주기 (기본값 3초)

하트비트를 어느정도의 주기로 보낼거냐를 설정하는 것이다. 카프카 도큐먼트에 따르면 일반적으로는 session.timeout.ms의 1/3 이하로 할 것을 추천한다.

또 다른 설정으로는 max.poll.interval.ms라는 섲정이 있는데 poll 메소드의 최대 호출간격을 지정한다. 이 시간이 지나도록 poll()을 하지 않으면 컨슈머를 그룹에서 빼고 리벨런스를 진행한다.

  • 컨슈머를 다 사용하고 나면 close 메소드를 이용해서 종료처리를 해야한다.

일반적으로는 아래 코드와 같이 무한루프를 돌면서 poll 메소드로 레코드를 읽어오는 형태인데 그러면 이 무한루프를 어떻게 벗어날거냐.

무한루프를 벗어날때 사용할 수 있는것이 wakeup 메소드이다. 주의할점은 현재 쓰레드가 아니라 다른 쓰레드에서 wakeup 메소드를 호출하면 poll 메소드는 WakeupException을 발생시킨다. 이 Exception을 while 루프 밖에서 catch를 하고 그런 다음에 컨슈머의 close 메소드를 호출하는 방식으로 종료처리를 하게 된다.

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
consumer.subscribe(Collections.singleton("simple"));
try{
  while (true) {
      ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1)); //wakeup() 호출시 익셉션 발생
           ... record 처리
      try{
          consumer.commitAsync();
      } catch(Exception e) {
          e.printStackTrace();
      }
  } 
} catch(Exception e) {
  ...
} finally{
  consumer.close();
}
  • 컨슈머를 사용할때 주의해야 할점이 있는데 카프카 컨슈머는 쓰레드에 안전하지 않다는 것이다.

그래서 여러 쓰레드에서 카프카 컨슈머 객체를 동시에 사용하면 안된다. 딱하나 예외가 있는데 앞에서 언급한 wakeup 메소드를 사용할때는 괜찮다. wakeup 메소드를 제외한 나머지 메소드들은 여러 쓰레드에서 호출하면 안된다.