Kafka 활용 실습

2020-08-27

.

Data_Engineering_TIL(20200827)

학습한 프로그램 : T아카데미 Apache Kafka 입문과 활용

URL : https://tacademy.skplanet.com/live/player/onlineLectureDetail.action?seq=183

이전내용 참고자료 : https://minman2115.github.io/DE_TIL117

[실습목표]

그러면 지금까지 배운 카프카를 어떻게 활용할 것인가에 대해 실습하면서 감을 잡아보자.

[실습전 개념정리]

  • 텔레그래프

Telegraf is an agent for collecting, processing, aggregating, and writing metrics.

https://github.com/influxdata/telegraf

다양한 플러그인을 제공한다.

ex) 컨슈머 렉 수집할때 사용하는 버로우 등을 제공

텔레그래프가 수집한 데이터를 어디로 보낼거냐. kinesis로 보내거나, http로 전송, 또는 파일로 저장하는 등 다양하게 처리할 수 있다.

# 텔레그래프 활용사레 : 서버 메트릭 수집 및 적재 파이프라인 구현

아래 그림과 같이 컴퓨터 마다 텔레그레프 에이전트를 각각 설치해서 카프카를 중간 큐 또는 버퍼 용도로 사용하고 엘라스틱 써치에 저장하는 컨슈머가 있으면 이거를 그라파나로 시각화 할 수 있다.

image

# 오늘의 실습

my computer라고 되어 있는건 로컬피시가 아니라 하나의 ec2를 띄워서 해보자

ec2의 cpu,ram 메트릭을 실습용 카프카에 넣어보고, 텔레그래프로 수집해서 csv로 적재하는 컨슈머 어플리케이션을 구현해보자

# Metric collect pipeline

요구 사항 : CPU, RAM 정보 수집

how to 정보수집 :

Telegraf agent 사용

https://github.com/influxdata/telegraf

Telegraf : influxdata에서 opensource로 제공하는 plugin 기반의 metric수집 server agent.

간단한 configuration으로 다양한 plugin을 붙일 수 있음

## 에이전트가 10초마다 data를 polling해서 input을 가지고 output으로 처리하라는 config
## my-computer-metric이라는 토픽에 수집한 데이터가 들어갈 것이다.
## 어떤 데이터가 들어가냐면 아래와 같은 cpu정보가 들어갈 것이다.

[agent]
  interval = "10s"

[[outputs.kafka]]
  brokers = ["{broker ec2 public ip}:9092"]
  ## Kafka topic for producer messages
  topic = "my-computer-metric"

[[inputs.cpu]]
  percpu = true
  totalcpu = true
  fielddrop = ["time_*"]

[[inputs.mem]]
  • 파일단위로 저장

write와 함께 flush

파일 포맷은 CSV(Comma Separater values)

$ cat 2020-06-19.csv
mem,host=SKP1003855MA0001.local used=19689496576i,inactive=14526488576i, ...

토픽에도 위와같이 csv 형태로 데이터가 들어갈 것이다.

  • 카프카 관련 정보

1) 토픽명 : my-computer-metric

2) 파티션 개수 : 5개

3) 컨슈머 개수 : 5개(multiple thread)

4) 키 활용 여부 : 사용하지 않음

  • 실습하기전 참고사항

전 강의에서 사용하던 자원을 그대로 활용할 것이다.

kafka broker ec2는 그대로 kafka broker ec2로 활용할 것이고, kafka client ec2는 my computer라고 가정하자.

# 실습 내용

Topic을 먼저 생성해준다.

my computer에서 아래 명령어를 이용해서 토픽을 생성해준다.

토픽명 : my-computer-metric

파티션 개수 : 5개

레플리케이션 : 1개(브로커가 1대이므로)

[ec2-user@ip-10-1-10-164 bin]$ ./kafka-topics.sh --create --bootstrap-server {broker ec2 public ip}:9092 --replication-factor 1 --partitions 5 --topic my-computer-metric
Created topic my-computer-metric.

그 담에 Telegraf 설치해준다.

[ec2-user@ip-10-1-10-164 telegraf]$ wget https://dl.influxdata.com/telegraf/releases/telegraf-1.15.2-1.x86_64.rpm

[ec2-user@ip-10-1-10-164 telegraf]$ sudo yum localinstall -y telegraf-1.15.2-1.x86_64.rpm

[ec2-user@ip-10-1-10-164 telegraf]$ cd /etc/telegraf

[ec2-user@ip-10-1-10-164 telegraf]$ ls
telegraf.conf  telegraf.d

[ec2-user@ip-10-1-10-164 telegraf]$ sudo mv telegraf.conf telegraf_backup.conf

[ec2-user@ip-10-1-10-164 telegraf]$ ls
telegraf_backup.conf  telegraf.d

# conf 작성할때 들여쓰기 유의해야한다.
[ec2-user@ip-10-1-10-164 telegraf]$ sudo vim telegraf.conf
[agent]
  interval = "10s"

[[outputs.kafka]]
  brokers = ["{kafka broker ec2 public ip}:9092"]
  ## Kafka topic for producer messages
  topic = "my-computer-metric"

[[inputs.cpu]]
  percpu = true
  totalcpu = true
  fielddrop = ["time_*"]

[[inputs.mem]]

# 텔레그래프 에이전트 실행
[ec2-user@ip-10-1-10-164 telegraf]$ telegraf --config telegraf.conf
2020-08-27T07:08:09Z I! Starting Telegraf 1.15.2
2020-08-27T07:08:09Z I! Loaded inputs: cpu mem
2020-08-27T07:08:09Z I! Loaded aggregators:
2020-08-27T07:08:09Z I! Loaded processors:
2020-08-27T07:08:09Z I! Loaded outputs: kafka
2020-08-27T07:08:09Z I! Tags enabled: host=ip-10-1-10-164.ap-northeast-2.compute.internal
2020-08-27T07:08:09Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"ip-10-1-10-164.ap-northeast-2.compute.internal", Flush Interval:10s

카프카 콘솔컨슈머로 my-computer-metric 토픽의 데이터가 정상적으로 들어가는지 확인해봐야 한다.

mycomputer의 터미널을 하나 더 열어준다음 거기로 이동해서 아래와 같이 명령어를 실행해보자.

# ./kafka-console-consumer.sh --bootstrap-server {broker ec2 public ip}:9092 --topic my-computer-metric --from-beginning

[ec2-user@ip-10-1-10-164 bin]$ pwd
/home/ec2-user/kafka_2.12-2.5.0/bin

[ec2-user@ip-10-1-10-164 bin]$ ./kafka-console-consumer.sh --bootstrap-server 13.125.84.223:9092 --topic my-computer-metric --from-beginning
cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0,usage_idle=100,usage_softirq=0,usage_steal=0,usage_guest=0,usage_guest_nice=0,usage_user=0,usage_nice=0,usage_iowait=0,usage_irq=0 1598512120000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_nice=0,usage_steal=0,usage_guest=0,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_guest_nice=0,usage_user=0,usage_system=0,usage_idle=100 1598512130000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0,usage_irq=0,usage_softirq=0,usage_steal=0,usage_guest=0,usage_user=0,usage_idle=100,usage_nice=0,usage_iowait=0,usage_guest_nice=0 1598512150000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_nice=0,usage_iowait=0,usage_steal=0,usage_guest=0,usage_user=0,usage_system=0,usage_idle=100,usage_irq=0,usage_softirq=0,usage_guest_nice=0 1598512160000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0,usage_iowait=0,usage_guest=0,usage_guest_nice=0,usage_steal=0.05000000000001691,usage_system=0,usage_idle=99.95000000000799,usage_nice=0,usage_irq=0,usage_softirq=0 1598512190000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_idle=100,usage_nice=0,usage_iowait=0,usage_softirq=0,usage_steal=0,usage_user=0,usage_system=0,usage_irq=0,usage_guest=0,usage_guest_nice=0 1598512200000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_iowait=0,usage_softirq=0,usage_steal=0.8000000000000007,usage_guest_nice=0,usage_user=0.1999999999999602,usage_system=0,usage_irq=0,usage_guest=0,usage_idle=98.99999999999636,usage_nice=0 1598512210000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal swap_free=0i,available_percent=85.02809168358294,page_tables=5582848i,low_total=0i,sunreclaim=36818944i,swap_cached=0i,vmalloc_total=35184372087808i,used=130400256i,committed_as=501043200i,free=655454208i,high_free=0i,available=1743130624i,commit_limit=1025032192i,low_free=0i,write_back=0i,dirty=139264i,huge_pages_free=0i,buffered=2768896i,high_total=0i,huge_pages_total=0i,inactive=860282880i,swap_total=0i,vmalloc_used=0i,total=2050064384i,used_percent=6.360788325368029,shared=475136i,slab=120012800i,sreclaimable=83193856i,active=358207488i,huge_page_size=2097152i,vmalloc_chunk=0i,write_back_tmp=0i,cached=1261441024i,mapped=81317888i 1598512230000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0,usage_idle=100,usage_irq=0,usage_steal=0,usage_guest=0,usage_guest_nice=0,usage_user=0,usage_nice=0,usage_iowait=0,usage_softirq=0 1598512230000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal active=358346752i,buffered=2768896i,cached=1261441024i,sunreclaim=36818944i,write_back=0i,commit_limit=1025032192i,huge_pages_total=0i,mapped=81317888i,sreclaimable=83193856i,huge_page_size=2097152i,slab=120012800i,swap_free=0i,vmalloc_used=0i,available=1743134720i,inactive=860286976i,low_total=0i,vmalloc_chunk=0i,swap_cached=0i,write_back_tmp=0i,used=130400256i,used_percent=6.360788325368029,committed_as=501043200i,shared=475136i,vmalloc_total=35184372087808i,total=2050064384i,available_percent=85.02829148218596,page_tables=5582848i,swap_total=0i,dirty=0i,low_free=0i,free=655454208i,high_free=0i,high_total=0i,huge_pages_free=0i 1598512260000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0,usage_idle=100,usage_iowait=0,usage_irq=0,usage_guest_nice=0,usage_user=0,usage_nice=0,usage_softirq=0,usage_steal=0,usage_guest=0 1598512260000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal mapped=81317888i,slab=120012800i,write_back_tmp=0i,used_percent=6.360788325368029,inactive=860286976i,vmalloc_total=35184372087808i,buffered=2768896i,high_total=0i,huge_pages_free=0i,sunreclaim=36818944i,swap_total=0i,huge_pages_total=0i,vmalloc_chunk=0i,used=130400256i,available_percent=85.02829148218596,commit_limit=1025032192i,committed_as=501043200i,huge_page_size=2097152i,write_back=0i,active=358346752i,cached=1261441024i,high_free=0i,sreclaimable=83193856i,swap_free=0i,total=2050064384i,dirty=0i,free=655454208i,low_free=0i,swap_cached=0i,available=1743134720i,low_total=0i,page_tables=5582848i,shared=475136i,vmalloc_used=0i 1598512270000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest=0,usage_guest_nice=0,usage_user=0,usage_system=0,usage_softirq=0,usage_steal=0.10010010010008193,usage_idle=99.89989989989773,usage_nice=0,usage_iowait=0,usage_irq=0 1598512270000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest=0,usage_guest_nice=0,usage_system=0,usage_irq=0,usage_softirq=0,usage_steal=0.1000500250124961,usage_user=0,usage_idle=99.89994997498532,usage_nice=0,usage_iowait=0 1598512270000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0,usage_nice=0,usage_iowait=0,usage_guest=0,usage_system=0,usage_idle=100,usage_irq=0,usage_softirq=0,usage_steal=0,usage_guest_nice=0 1598512280000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0,usage_nice=0,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_steal=0.05002501250624805,usage_guest=0,usage_user=0,usage_idle=99.94997498748356,usage_guest_nice=0 1598512290000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_idle=99.90009990009771,usage_nice=0,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_steal=0,usage_user=0,usage_system=0.09990009990007785,usage_guest_nice=0,usage_guest=0 1598512300000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.049999999999954525,usage_system=0,usage_idle=99.89999999999782,usage_nice=0,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_steal=0.049999999999998934,usage_guest=0,usage_guest_nice=0 1598512320000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal mapped=81317888i,huge_pages_free=0i,cached=1261441024i,dirty=0i,free=655056896i,inactive=860286976i,vmalloc_chunk=0i,total=2050064384i,available_percent=85.00891101769416,sreclaimable=83193856i,swap_cached=0i,swap_free=0i,swap_total=0i,available=1742737408i,vmalloc_used=0i,commit_limit=1025032192i,committed_as=503447552i,huge_page_size=2097152i,write_back_tmp=0i,buffered=2768896i,high_total=0i,page_tables=5582848i,shared=475136i,slab=120086528i,vmalloc_total=35184372087808i,high_free=0i,used_percent=6.380168789859821,active=358375424i,huge_pages_total=0i,low_free=0i,low_total=0i,used=130797568i,write_back=0i,sunreclaim=36892672i 1598512330000000000

...

위와 같이 콘솔 컨슈머로 데이터가 들어오는 것을 확인했고, 컨슈머 어플리케이션을 통해 들어오는 데이터를 처리해보자.

mycomputer의 터미널을 새로열고 그쪽으로 이동해서 아래와 같이 명령어를 실행해보자.

[ec2-user@ip-10-1-10-164 ~]$ cd tacademy-kafka/

[ec2-user@ip-10-1-10-164 tacademy-kafka]$ ls
image                                   README.md
kafka-consumer-auto-commit              simple-kafka-consumer
kafka-consumer-multi-thread             simple-kafka-producer
kafka-consumer-save-metric              telegraf-1.15.2-1.x86_64.rpm
kafka-consumer-sync-commit              telegraf.conf
kafka-producer-exact-partition          실습 커맨드 리스트.txt
kafka-producer-key-value                zmon-installer
아파치 카프카 입문과 활용 강의자료.pdf

[ec2-user@ip-10-1-10-164 tacademy-kafka]$ cd kafka-consumer-save-metric/

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ ls
build.gradle           consumer-thread-1.csv  gradle   gradlew.bat      src
consumer-thread-0.csv  consumer-thread-2.csv  gradlew  settings.gradle

# 소스 파일에서 브로커 아이피 부분 수정필요
[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ sudo vim /home/ec2-user/tacademy-kafka/kafka-consumer-save-metric/src/main/java/com/tacademy/ConsumerSaveMetric.java
## 멀티쓰레드 컨슈머 개념의 어플리케이션이다. 

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerSaveMetric {
    private static String TOPIC_NAME = "my-computer-metric";
    private static String GROUP_ID = "metric-consumers";
    private static String BOOTSTRAP_SERVERS = "{broker ec2 public ip}:9092";
    # 3에서 5로 바꿔줌
    private static int CONSUMER_COUNT = 5;
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ cat /home/ec2-user/tacademy-kafka/kafka-consumer-save-metric/src/main/java/com/tacademy/ConsumerWorker.java
# 폴링한 데이터를 가져와서 파일을 새로 생성해서 계속해서 저장할 것이다.
# fileWriterBuffer를 통해서 이 레코드 벨류를 계속해서 추가할 것이다.
# 추가한 뒤에 filewriter를 close하여 
# 데이터를 폴링할때망 파일을 지속적으로 열고 쓰고 닫고 하는 과정이 실행된다.
# 마지막에 wakeupexception이 shutdown hook에 의해서 발생하면 
# wakeupexception을 통해 안전하게 종료된다.

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWorker implements Runnable {
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        File file = new File(threadName + ".csv");

        try {
            while (true) {
                FileWriter fw = new FileWriter(file, true);
                StringBuilder fileWriteBuffer = new StringBuilder();
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    fileWriteBuffer.append(record.value()).append("\n");
                }
                fw.write(fileWriteBuffer.toString());
                consumer.commitSync();
                fw.close();
            }
        } catch (IOException e) {
            System.err.println(threadName + " IOException"+e);
        } catch (WakeupException e) {
            System.out.println(threadName + " WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ ls
build.gradle           consumer-thread-2.csv  gradlew.bat
consumer-thread-0.csv  gradle                 settings.gradle
consumer-thread-1.csv  gradlew                src

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ sudo vim build.gradle
apply plugin: 'application'
mainClassName = 'com.tacademy.ConsumerSaveMetric'

group 'com.tacademy'
version '1.0'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.5.0'
}

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ gradle wrapper    
Starting a Gradle Daemon (subsequent builds will be faster)

BUILD SUCCESSFUL in 5s
1 actionable task: 1 executed
    
[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ ./gradlew build
Downloading https://services.gradle.org/distributions/gradle-5.2.1-bin.zip
...................................................................................

BUILD SUCCESSFUL in 11s
5 actionable tasks: 5 executed
    
[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ ./gradlew run

> Task :run
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
<=========----> 75% EXECUTING [42s]
> :run

mycomputer 터미널을 하나 더 열어주고 거기로 이동해서 아래와 같이 실행한 어플리케이션으로 부터 생성된 csv 파일이 잘 생성되고 있는지 확인해보자.

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ ls
build                  consumer-thread-2.csv  gradle-5.2.1-bin.zip  src
build.gradle           consumer-thread-3.csv  gradlew
consumer-thread-0.csv  consumer-thread-4.csv  gradlew.bat
consumer-thread-1.csv  gradle                 settings.gradle

[ec2-user@ip-10-1-10-164 kafka-consumer-save-metric]$ tail -f *.csv
==> consumer-thread-0.csv <==
cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0.7944389275073104,usage_irq=0,usage_softirq=0.099304865938416,usage_steal=2.482621648460398,usage_guest=0,usage_guest_nice=0,usage_user=3.3267130089369417,usage_nice=0,usage_iowait=0.14895729890762952,usage_idle=93.14796425024424 1598513950000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.7007007007005913,usage_nice=0,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_steal=0.40040040040034547,usage_guest=0,usage_system=0,usage_idle=98.89889889889325,usage_guest_nice=0 1598513970000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal mapped=112635904i,vmalloc_used=0i,vmalloc_total=35184372087808i,committed_as=1294147584i,low_free=0i,sreclaimable=84611072i,write_back=0i,total=2050064384i,used_percent=39.34753768201653,swap_cached=0i,huge_pages_free=0i,swap_free=0i,vmalloc_chunk=0i,used=806649856i,dirty=81920i,high_total=0i,huge_pages_total=0i,shared=606208i,active=1066508288i,high_free=0i,huge_page_size=2097152i,slab=124436480i,buffered=2179072i,cached=1168691200i,free=72544256i,sunreclaim=39825408i,commit_limit=1025032192i,inactive=721911808i,page_tables=11071488i,swap_total=0i,write_back_tmp=0i,available=1066016768i,available_percent=51.99918482169973,low_total=0i 1598513990000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0.2002002002002003,usage_idle=98.99899899899533,usage_nice=0,usage_irq=0,usage_steal=0.30030030030031823,usage_user=0.500500500500483,usage_iowait=0,usage_softirq=0,usage_guest=0,usage_guest_nice=0 1598514000000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal dirty=372736i,huge_page_size=2097152i,mapped=112635904i,buffered=2179072i,high_total=0i,huge_pages_total=0i,sreclaimable=84627456i,total=2050064384i,cached=1168760832i,committed_as=1294372864i,high_free=0i,inactive=721850368i,slab=124461056i,swap_free=0i,vmalloc_total=35184372087808i,available_percent=51.9642200661733,commit_limit=1025032192i,low_free=0i,shared=606208i,vmalloc_chunk=0i,used=807358464i,low_total=0i,sunreclaim=39833600i,write_back=0i,huge_pages_free=0i,used_percent=39.38210284033694,page_tables=11071488i,write_back_tmp=0i,available=1065299968i,free=71766016i,swap_cached=0i,vmalloc_used=0i,active=1066717184i,swap_total=0i 1598514010000000000


==> consumer-thread-1.csv <==
cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0.09980039920162617,usage_idle=94.21157684630731,usage_nice=0,usage_irq=0,usage_guest_nice=0,usage_user=3.493013972056384,usage_iowait=0,usage_softirq=0,usage_steal=2.1956087824354213,usage_guest=0 1598513960000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_steal=2.1999999999999886,usage_guest=0,usage_system=0.15000000000000568,usage_nice=0,usage_softirq=0,usage_irq=0,usage_guest_nice=0,usage_user=4.499999999999957,usage_idle=93.05000000000291,usage_iowait=0.09999999999999787 1598513960000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_nice=0,usage_softirq=0,usage_steal=0.501504513540662,usage_guest=0,usage_idle=98.59578736209201,usage_system=0.10030090270814664,usage_iowait=0,usage_irq=0,usage_guest_nice=0,usage_user=0.8024072216650305 1598513970000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal swap_free=0i,active=1066332160i,free=73179136i,huge_pages_free=0i,vmalloc_total=35184372087808i,write_back_tmp=0i,available=1066651648i,mapped=112635904i,sreclaimable=84611072i,buffered=2179072i,swap_cached=0i,huge_page_size=2097152i,low_total=0i,sunreclaim=39813120i,vmalloc_used=0i,write_back=0i,available_percent=52.03015360516599,dirty=73728i,shared=606208i,total=2050064384i,cached=1168691200i,high_free=0i,huge_pages_total=0i,swap_total=0i,used=806014976i,used_percent=39.31656889855026,commit_limit=1025032192i,low_free=0i,page_tables=11071488i,slab=124424192i,vmalloc_chunk=0i,committed_as=1294147584i,high_total=0i,inactive=721911808i 1598513980000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_softirq=0.050000000000000044,usage_steal=0.30000000000001137,usage_user=0.6499999999999773,usage_system=0.400000000000027,usage_iowait=0,usage_irq=0,usage_idle=98.60000000000582,usage_nice=0,usage_guest=0,usage_guest_nice=0 1598513980000000000


==> consumer-thread-2.csv <==
cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0.15037593984961609,usage_idle=98.74686716791962,usage_nice=0,usage_irq=0,usage_guest_nice=0,usage_user=0.7017543859650175,usage_softirq=0,usage_steal=0.4010025062657141,usage_guest=0,usage_iowait=0 1598513990000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_irq=0,usage_softirq=0.050025012506240064,usage_steal=0.25012506253120365,usage_user=0.5502751375686373,usage_system=0.20010005002499134,usage_idle=98.94947473735513,usage_nice=0,usage_iowait=0,usage_guest=0,usage_guest_nice=0 1598514000000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_steal=1.5920398009954297,usage_guest=0,usage_system=0.24875621890552152,usage_idle=97.31343283583396,usage_nice=0,usage_iowait=0.049751243781106076,usage_irq=0,usage_softirq=0.04975124378110718,usage_user=0.7462686567166353,usage_guest_nice=0 1598514010000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.6006006006006365,usage_nice=0,usage_guest_nice=0,usage_irq=0,usage_softirq=0,usage_steal=0.30030030030030047,usage_guest=0,usage_system=0.4004004004004006,usage_idle=98.69869869870668,usage_iowait=0 1598514020000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal high_free=0i,huge_pages_free=0i,huge_pages_total=0i,write_back_tmp=0i,total=2050064384i,available=1065304064i,buffered=2179072i,committed_as=1294372864i,write_back=0i,inactive=721854464i,shared=606208i,slab=124461056i,swap_cached=0i,swap_free=0i,dirty=434176i,low_free=0i,low_total=0i,mapped=112635904i,free=71766016i,huge_page_size=2097152i,vmalloc_chunk=0i,vmalloc_total=35184372087808i,used=807354368i,cached=1168764928i,commit_limit=1025032192i,page_tables=11071488i,swap_total=0i,sunreclaim=39833600i,vmalloc_used=0i,used_percent=39.381903041733935,available_percent=51.9644198647763,active=1066749952i,high_total=0i,sreclaimable=84627456i 1598514030000000000


==> consumer-thread-3.csv <==
cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_irq=0,usage_softirq=0,usage_guest=0,usage_system=0.30030030030028265,usage_idle=98.29829829829754,usage_nice=0,usage_iowait=0,usage_steal=0.4004004004004006,usage_guest_nice=0,usage_user=1.001001001001037 1598513990000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal used_percent=39.35992519540304,commit_limit=1025032192i,page_tables=11071488i,vmalloc_used=0i,write_back_tmp=0i,write_back=0i,active=1066524672i,low_total=0i,swap_free=0i,available=1065762816i,used=806903808i,buffered=2179072i,mapped=112635904i,slab=124436480i,sreclaimable=84611072i,total=2050064384i,free=72290304i,huge_page_size=2097152i,swap_cached=0i,vmalloc_chunk=0i,vmalloc_total=35184372087808i,cached=1168691200i,dirty=86016i,high_free=0i,low_free=0i,available_percent=51.98679730831322,high_total=0i,huge_pages_free=0i,huge_pages_total=0i,inactive=721911808i,shared=606208i,sunreclaim=39825408i,swap_total=0i,committed_as=1294147584i 1598514000000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_irq=0,usage_softirq=0,usage_steal=1.3944223107568565,usage_guest=0,usage_user=0.49800796812741843,usage_nice=0,usage_iowait=0,usage_system=0.09960159362546953,usage_idle=98.00796812748297,usage_guest_nice=0 1598514010000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest=0,usage_guest_nice=0,usage_user=0.9000000000000341,usage_idle=98.40000000000146,usage_iowait=0,usage_softirq=0,usage_steal=0.30000000000001137,usage_system=0.3999999999999915,usage_nice=0,usage_irq=0 1598514030000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_nice=0,usage_softirq=0,usage_guest=0,usage_guest_nice=0,usage_user=0.9004502251125183,usage_idle=98.49924962481617,usage_irq=0,usage_steal=0.35017508754374527,usage_system=0.2501250625312314,usage_iowait=0 1598514030000000000


==> consumer-thread-4.csv <==
cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.8964143426294381,usage_nice=0,usage_iowait=0,usage_irq=0,usage_guest=0,usage_system=0.2988047808764794,usage_idle=96.9123505976149,usage_softirq=0.09960159362548944,usage_steal=1.7928286852588053,usage_guest_nice=0 1598514010000000000

mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal vmalloc_chunk=0i,vmalloc_total=35184372087808i,available=1065299968i,buffered=2179072i,huge_pages_free=0i,huge_pages_total=0i,low_total=0i,free=71766016i,page_tables=11071488i,swap_total=0i,vmalloc_used=0i,used_percent=39.381903041733935,sunreclaim=39833600i,high_free=0i,high_total=0i,active=1066741760i,low_free=0i,swap_cached=0i,write_back=0i,used=807354368i,commit_limit=1025032192i,huge_page_size=2097152i,sreclaimable=84627456i,swap_free=0i,write_back_tmp=0i,total=2050064384i,available_percent=51.9642200661733,cached=1168764928i,committed_as=1294372864i,mapped=112635904i,dirty=397312i,inactive=721850368i,shared=606208i,slab=124461056i 1598514020000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=1.2024048096192654,usage_nice=0,usage_irq=0,usage_steal=0.30060120240481636,usage_guest_nice=0,usage_system=0.3006012024048342,usage_idle=98.19639278556815,usage_iowait=0,usage_softirq=0,usage_guest=0 1598514020000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.9013520280419736,usage_system=0.35052578868300555,usage_iowait=0,usage_softirq=0,usage_guest_nice=0,usage_idle=98.44767150725443,usage_nice=0,usage_irq=0,usage_steal=0.3004506760139971,usage_guest=0 1598514020000000000

cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest=0,usage_user=0.9999999999999432,usage_system=0.10000000000001563,usage_nice=0,usage_iowait=0,usage_irq=0,usage_steal=0.3999999999999915,usage_idle=98.50000000000364,usage_softirq=0,usage_guest_nice=0 1598514030000000000


==> consumer-thread-2.csv <==
cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_system=0.19980019980019117,usage_idle=97.15284715284866,usage_iowait=0.09990009990009559,usage_irq=0,usage_guest=0,usage_guest_nice=0,usage_user=1.6983016983016073,usage_softirq=0,usage_steal=0.8491508491508392,usage_nice=0 1598514040000000000


==> consumer-thread-1.csv <==
mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal available=1065304064i,used=807350272i,cached=1168769024i,free=71766016i,swap_total=0i,total=2050064384i,available_percent=51.9644198647763,shared=606208i,sunreclaim=39833600i,high_total=0i,low_total=0i,mapped=112635904i,sreclaimable=84627456i,active=1066786816i,dirty=90112i,high_free=0i,huge_pages_free=0i,huge_pages_total=0i,slab=124461056i,vmalloc_chunk=0i,vmalloc_used=0i,huge_page_size=2097152i,page_tables=11071488i,used_percent=39.38170324313092,buffered=2179072i,committed_as=1294372864i,vmalloc_total=35184372087808i,write_back_tmp=0i,commit_limit=1025032192i,inactive=721854464i,swap_cached=0i,write_back=0i,low_free=0i,swap_free=0i 1598514040000000000

cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest_nice=0,usage_idle=95.79579579579497,usage_nice=0,usage_iowait=0.2002002002002181,usage_softirq=0,usage_guest=0,usage_user=2.7027027027027217,usage_system=0.2002002002002003,usage_irq=0,usage_steal=1.1011011011011194 1598514040000000000


==> consumer-thread-4.csv <==
cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_user=0.6000000000000227,usage_system=0.19999999999999574,usage_idle=98.69999999998981,usage_nice=0,usage_irq=0,usage_softirq=0,usage_iowait=0,usage_steal=0.5000000000000071,usage_guest=0,usage_guest_nice=0 1598514040000000000


==> consumer-thread-0.csv <==
mem,host=ip-10-1-10-164.ap-northeast-2.compute.internal total=2050064384i,available=1065308160i,used=807350272i,free=71757824i,buffered=2179072i,low_free=0i,committed_as=1294372864i,page_tables=11120640i,shared=606208i,available_percent=51.96461966337932,active=1066946560i,cached=1168777216i,dirty=90112i,high_free=0i,high_total=0i,huge_pages_total=0i,mapped=112721920i,slab=124461056i,swap_free=0i,swap_total=0i,vmalloc_used=0i,huge_page_size=2097152i,sreclaimable=84627456i,swap_cached=0i,write_back_tmp=0i,write_back=0i,used_percent=39.38170324313092,commit_limit=1025032192i,huge_pages_free=0i,low_total=0i,vmalloc_total=35184372087808i,inactive=721858560i,sunreclaim=39833600i,vmalloc_chunk=0i 1598514050000000000


==> consumer-thread-2.csv <==
cpu,cpu=cpu0,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_idle=99.09729187562553,usage_iowait=0,usage_irq=0,usage_softirq=0,usage_user=0.4012036108325134,usage_system=0.20060180541622105,usage_guest=0,usage_guest_nice=0,usage_nice=0,usage_steal=0.30090270812433156 1598514050000000000

cpu,cpu=cpu-total,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_guest_nice=0,usage_system=0.25050100200404035,usage_irq=0,usage_softirq=0,usage_steal=0.3507014028056102,usage_guest=0,usage_user=0.5010020040080807,usage_idle=98.89779559117649,usage_nice=0,usage_iowait=0 1598514050000000000


==> consumer-thread-0.csv <==
cpu,cpu=cpu1,host=ip-10-1-10-164.ap-northeast-2.compute.internal usage_softirq=0,usage_guest_nice=0,usage_irq=0,usage_steal=0.3996003996003275,usage_guest=0,usage_user=0.6993006993005598,usage_system=0.2997002997002501,usage_idle=98.60139860138653,usage_nice=0,usage_iowait=0 1598514050000000000

...

# 기타 kafka 관련 FAQ

  • 토픽의 라이프 싸이클이 어떻게 되는지

데이터 파이프라인 관점에서 제3의 어떤 서비스 그룹이 토픽에 대해서 적재하고 싶다고 요청하면 신규 토픽을 만들어서 적재를 시작하고, 더이상 적재할 필요가 없다면 해당 데이터를 적재하는 컨슈머를 끄고, 토픽도 삭제하는 과정을 거친다.

  • 파티션내에만 메세지 순서가 보장이 되는 것으로 알고 있는데, 컨슈머가 여러개인 경우 저장(처리) 된 데이터의 순서를 어떻게 보장하는지

순서보장을 위해 레코드를 생성할때부터 코드 안에다가 타임스탬프를 박아버린다. 그러면 병렬처리 되더라도 이게 적재가 되면 그거를 쿼리구문으로 파임스탬프로 order by 해버리면 된다.

아니면 파티션을 한개만 사용하면 완벽하게 순서보장이 될것이다.

  • 대용량 처리에 있어 파티셔닝을 어떻게 설정하는게 효율적인지

sk 플래닛에서는 대용량처리를 하고 있지만 키처리를 따로 하고 있지는 않다. 파티션은 기본적으로 세개만 설정해서 토픽을 만들고 있다. 컨슈머 렉 추이에 따라서 컨슈머 렉이 너무많이 늘어난다 그러면 파티션을 늘리는 의사결정을 한다. 이 의사결정은 카프카 브로커를 운영하는 엔지니어와 프로듀서&컨슈머를 운영하는 엔지니어 모두 모여서 진행한다 왜냐하면 파티션을 늘리면 늘릴수록 관련해서 리벨런싱하는데 시간이 많이 걸리거나 프로듀서에서 데이터를 보낼때 이슈가 발생할수도 있고 이런 것들을 종합적으로 고려해야 하기 때문이다.

  • 카프카 클라이언트의 성능에 영향을 주는 것은 무엇이 있나요?

정말 다양한 요소가 영향을 준다. 브로커와 연결되어 있기 때문에 서버와 클라이언트 간에 네트워크 통신이 원활하지 않아서 또는 밴드위스가 작아서 통신에 이슈가 발생할 수도 있고, 카프카 클라이언트가 돌아가는 서버의 사양의 문제가 있을수도 있고, 또는 쓰레드가 몇개나 돌고 있는지도 영향을 줄수도 있다. 혹은 카프카 컨슈머 입장에서 배치를 얼마나 가져올 것인가 그리고 얼마나 기다렸다 가져올 것인가 등과 같은 상세옵션에 따라 성능이 달라질 것이다.

  • 카프카 브로커 장애가 났을 경우 클라이언트에 영향이 있나요?

영향이 있다. 커밋 씽크할때 씽크할때 브로커가 죽어버리면 커밋을 못하기 때문에 영향이 있을수 있다. 그래서 내부적으로 처리를 완료한 것을 따로 offset정보를 가지고 있도록 조치하는 방법도 있다. 새로 브로커가 올라와도 offset 중복처리하지 않도록 내부적으로 메모리에 정보를 갖고 있는 방법도 있다.

관련한 로그를 최대한 남겨서 확인하는게 좋다.

  • 카프카 클라이언트는 컨슈머 잡이 돌아가는 서버인가요?

그렇다. 보통은 브로커 따로 두고 컨슈머와 프로듀서 서버를 각각 따로 두게 된다.

  • 카프카 컨슈머 운영상에 주로 보는 기능은 무엇인가요?

크게 두가지를 본다. 카프카 버로우를 운영하여 관련해서 메트릭을 확인하고, 카프카 클라이언트의 성능을 봐야하기 때문에 메트릭스 메서드를 이용하면 다양한 메트릭을 체크한다.

  • 리밸런싱 상태를 체크하느 상태값이 있나요?

상태값이라기 보다는 리스너를 사용하기 때문에 이거를 로그로 남기면 각각의 컨슈머 그룹과 컨슈머가 리밸런스 상태인지 확인할 수 있다.

  • 추천서적

카프카, 데이터 플랫폼의 최강자 / 고승범 지음