카프카 기초개념 및 트위터 스트림 수집 실습

2019-04-08

Data_Engineering_TIL_(20190406)

study program : https://www.fastcampus.co.kr/extension_des

[학습목표]

  • 카프카 기초개념 이해

  • 카프카를 이용한 트위터 스트림 수집

[학습기록]

번외로 오픈소스란

오픈소스 사전적 의미: 소스코드가 공개된 소프트웨어, 따라서 소스의 수정이나 공유가 가능하다

Open Source Software의 반대말은 Proprietary Software인데 주의해야할 점은 오픈소스는 무료, Proprietary는 유료인것만은 아니다.

오픈소스이면서도 유료일수도 있고, Proprietary이면서도 무료일수도 있다.

카프카란

현업에서 사용하는 데이터 파이프라인은 점점 복잡해지고 있고, 대두분 회사들의 파이프라인은 아래처럼 매우 복잡할 것이다.

이렇게 복잡하면 회사입장에서는 관리비용이 증가하는 문제가 발생한다.

11

그래서 메세지를 모으고 다시 분배하는 통로를 잘 만들면 데이터 파이프라인의 복잡한 의존성을 줄일 수 있겠다는 아이디어로 카프카가 나오게 되었다.

12

카프카는 크게 프로듀서, 브로커, 컨슈머로 구성되어 있다.

데이터나 메세지를 만들어내는 시스템들이 여러가지가 있을텐데 그것들을 프로듀서라고 하고, 생산된 메세지를 저장하거나 활용해서 처리하는 것들을 컨슈머라고 한다. 이 프로듀서와 컨슈머 사이에 중간에서 관장하는 브로커 역할을 카프카가 한다.

중간에서 관장을 하려면 매우 효율적이어야 하고, 매우 고성능이어야 하고, 안정적이어야 하고, 스케일을 감당할 수 있어야 한다. 카프카는 그런 조건을 잘 만족시켰다.

  • 카프카 용어

1) Topic : 메세지 피드를 topic이라는 카테고리로 정리한다.

카프카는 메세지를 타픽이라는 카테고리로 정리한다.

예를 들어서 1번서버와 2번서버에서 사용자 로그 데이터가 들어온다고 하면 ‘user log’이라는 토픽을 만들어서 1번서버와 2번서버를 유저로그라는 토픽으로 묶는 개념이다.

2) Producer : Kafka의 topic에 메세지를 publish하는 프로세스들을 producer라고 한다.

3) Consumer : Topic을 subscribe하는 프로세스들을 consumer라고 한다.

4) Broker : Kafka는 1개 이상의 서버로 이루어진 클러스터로 동작하며, 이를 broker라고 한다.

브로커는 클러스터로 동작을 하게된다. 얘가 죽었을때 백업이 있어야하기도 하고, 메세지 양이 많아질때 그럴때도 활용해야한다.

  • 카프카의 성능이 좋은 이유

1) Kafka는 고성능의 메세지 큐(High throughput, low latency)

2) 네트웍 효율을 위해 메세지를 배치로 전송, append, consume을 묶음 단위로 수행

네트워크에서 여러 시스템에서 메시지를 모으는 것이기 때문에 메세지 한건한건을 보내면 네트워크로 보내는 오버헤드가 커지기 때문에 그 메세지들을 어느정도 단위로 묶어서 메세지를 보내게 된다. 그래서 한건한건 보낼때 보다 효율성이 높아진다.

3) 매우 효율적인 I/O를 구현 (Copy없는 I/O, 캐쉬 사용의 극대화)

파일이 뭔가 카피되는 것은 매우 느리기 때문에 그런 형태구조를 아예 없애서 들어오는대로 저장하거나 카피하는 것이 아니라 바로 빠져나간다.

캐시사용도 자동적으로 시스템에 있는 메모리를 다 사용하게 되어있다.

시스템을 빠르게 만들어서 프로듀서에서 생산된 메시지가 브로커로와서 어딘가에 저장되기 전에 빨리빨리 컨슈머들한테 뿌려지게한다. 즉 어딘가에 저장했다 다시 읽어드리는 것이 아니라 들어오자마자 메모리에 들고 있다가 바로 뿌려주는 캐시사용의 극대화를 추구했다.

  • 메세지를 중복이나 유실에 관한 처리도 중요하다.

이런 고성능을 추구하는 시스템에서는 메세지 중복이나 유실에 관한 처리도 같이 커버하기는 어렵다.

카프카에서는 설정을 통해 다음과 같은 세가지 옵션을 제공한다.

Exactly once는 두가지를 같이 처리하는 것을 말한다. 구현하기도 어렵고 효율도 떨어지는 편이다. 모든 메세지가 정확하게 한번처리되려면 하나하나 잘 되었는지 체크해야하는 과정이 필요하기 때문이다.

Exactly once는 메세지를 보내고 그걸 잘 받았다는 ack을 받고, 또 다른 메세지를 보내고 이런방식이다. 사실 이런방식이 상당히 비효율적일 수 있는데 카프카에서는 효율적으로 잘 구현한 것이 특징이다.

그래서 보통은 At least once와 At most once가 많이 쓰인다.

At least once는 카프카에서 디폴트이다. 이것은 최소 한번이라는 뜻이기 때문에 메세지를 100퍼센트 딜리버리를 보장한다는 뜻이기 때문에 유실은 없다는 말이다. 그 대신 메세지가 두번 딜리버리 될 수도 있다.

At most once는 메세지가 유실될 수 있지만 중복 처리되지는 않는다는 것이다.

  • 토픽안에 파티션이라는 개념이 있다. 토픽이 하나의 메세지 줄이라고 생각하면 되는데 그걸 시간별로 파티션을 여러개 나누어서 처리하면 된다. 파티션 단위로 ack을 하기 때문에 파티션 안에서는 순서가 바뀔 수 있다. 파티션끼리는 순서가 정해져있다.

  • 카프카의 활용처는 상당히 많은데 실시간 스트리밍 프로세싱을 할때 스파크 스트리밍과 결합해서 쓸때도 있고, 메세지 버스, 사용자 로그 수집 등 다양하다.

  • 카프카와 비슷한 개념의 서비스로는 아마존 웹서비스에서 Amazon Managed Streaming for Kafka(MSK) 라는 것이 있고, AWS 키네시스, 구글의 pubsub이 있다. aws 키네시스 뒤에는 카프카가 숨어있다는 썰이 있다.

카프카를 이용한 트위터 스트림 수집 실습

** 실습 전 참고사항 : 이번 실습에서 자바버전은 7이나 8로 한다.

스텝 1) 카프카 다운 및 압축해제

다운로드 URL : https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.11-2.2.0.tgz

스텝 2) 트위터 스트림을 카프카로 보내줄 것인데 그렇게 해주는 툴을 어떤 유저가 개발했다. 그래서 우리는 https://github.com/Eneco/kafka-connect-twitter 에 접속한다.

스텝 3) 터미널을 실행해서 git clone https://github.com/Eneco/kafka-connect-twitter 명령어 입력 및 실행으로 코드 다운로드

스텝 4) 다운로드 받은 코드 빌드를 해야하는데 빌드하는 방법은 https://github.com/Eneco/kafka-connect-twitter 하단에 running 메뉴를 참고하면 된다.

빌드라는 개념은 한마디로 소스코드를 실행가능하게 바꿔준다는 것이다. 빌드를 한다는 것이 코드 하나를 실행가능한 형태로 바꿔주는 것인데 최근에는 빌드해야 할 코드가 너무 많고, 여러가지 다른 프로젝트들의 코드들도 import해서 써야하기 때문에 빌드과정이 복잡해졌다. 그래서 메이븐이라는 도구를 쓰면 복잡한 이런 빌드과정을 쉽게 할 수 있다.

그래서 우리는 터미널을 열고 카프카-컨넥트 폴더에서 mvn clean package 입력 및 실행한다.

위와 같이 메이븐 클린 패키지를 입력해서 실행한다는 것은 이 패키지들을 총채적으로 빌드한다는 것이다.

  • 주의사항 : 다운로드 받은 카프카-컨넥트 폴더 안에서 실행해야함

정상적으로 실행이 완료되면 아래의 그림처럼 빌드 석세스라는 매세지를 확인할 수 있다.

1

스텝 5) 아래의 명령어 입력 및 실행

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

자바기반의 프로젝트에서 빌드라는 것을 하면 실행가능한 바이너리들과 필요한 파일들이 압축된 형태로 .jar라는 실행파일이 생성된다. 이거를 CLASSPATH라고 해서 실행가능한 클래스패스들이 있는 패스에다 추가를 해놓으면 이거를 카프카에서 읽어 연결해서 사용할 수 있다.

그 다음에 echo $CLASSPATH 명령어를 입력했을때

/home/minman/다운로드/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar 처럼 지정된 경로가 잘 출력이 되면 성공한 것이다.

스텝 6) 카프카가 정상적으로 실행되는지 확인해야한다.

결론적으로는 https://kafka.apache.org/quickstart 에서 스텝 5까지 정상적으로 작동되면 된다.

카프카가 실행되는 것은 두가지가 실행이 되야 한다. 주키퍼 서버랑 카프카 서버가 실행되야 한다.

참고로 주키퍼는 서버들을 매니지먼트하는 툴이다. 카프카 서버를 매니지 하는 툴을 주키퍼를 쓰고 있다.

창을 여러개 띄워서 할 것이다.

스텝 하나하나 해보면

먼저 터미널을 하나 띄워서 카프카 폴더 최상단에서

bin/zookeeper-server-start.sh config/zookeeper.properties

위의 명령어 입력 및 실행

(주키퍼 서버 구동)

스텝 7) 새로운 터미널 창을 열어서 마찬가지로 카프카 최상단 폴더에서

bin/kafka-server-start.sh config/server.properties

위의 명령어 입력 및 실행

(카프카 서버 구동)

스텝 8) 또 다른 새로운 터미널 창을 열어서 마찬가지로 카프카 최상단 폴더에서

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 

위의 명령어 입력 및 실행

(토픽 생성)

그리고

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

위의 명령어 입력 및 실행

(토픽 생성확인)

결과로 아래 그림과 같이 test가 출력되면 정상이다.

그리고 다른 터미널 창 열어서

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

위의 명령어 입력 및 실행

아래에 This is a message 치고 엔터, This is another message 치고 엔터한다.

(프로듀서 생성 및 테스트 메세지 발송)

2

그리고 다른 터미널 창 열어서

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

위의 명령어를 입력 및 실행하면 위에 입력한 두개의 메세지가 아래의 그림과 같이 출력된다.

(컨수머 구동)

3

정리하면 방금 입력하고 실행한 것을 요약한 아키텍쳐가 아래 그림과 같고

15

우리가 이번실습에서 최종적으로 구현하고자 하는 아키텍처는 아래 그림과 같다.

14

스텝 9) 그래서 카프카-트위터 컨넥터 폴더에가서

$KAFKA/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties 

위의 명령어 입력 및 실행을 하는데 실행을 하면 오류가 발생하면서 꺼질것이다.

각종 설정을 안해줘서 그렇다.

그래서 우리는 각종 설정을 해줄 것이다.

스텝 10) 카프카서버나 주키퍼서버, 컨수머 켜져있는 터미널에서 다 일단 실행되고 있는거 전부 종료해준다.

그런 다음에 kafka-connect-twitter 디렉토리에서 들어가서 설정파일을 만들어야 한다.

그래서 kafka-connect-twitter 폴더 들어가서

cp twitter-source.properties.example twitter-source.properties 

위의 명령어 입력 및 실행해준다.

스텝 11) 그 다음 스텝으로는 스위터 스트림을 가져오기 위한 몇가지 작업을 해야한다.

참고로 트위터에서는 트위터 스트림을 무료로 제공하고 있다.

트위터 스트림 관련 api 문서 URL : https://dev.twitter.com/streaming/overview

결론적으로 우리는 트위터 스트림을 가져오기 위해 Twitter Developer Site에서 App을 만들고 Access Token을 취득해야한다.

사전에 트위터, 트위터 디벨로퍼 가입하고, 전화번호와 이메일을 먼저 인증해준다.

그래서 우리는 트위터에 로그인하고 https://developer.twitter.com/en/docs 에 접속해준다.

스텝 12) 우측 상단에 apps 접속

스텝 13) create an apps 클릭

스텝 14) 아래 그림과 같이 입력 후 create 클릭

앱이름은 충분이 유니크한 이름으로 써주면 된다.

5

6

스텝 15) keys and tokens을 아래의 그림과 같이 확인

10

스텝 16) kafka-connect-twitter 폴더에 들어가서 vi twitter-source.properties 로 들가서

twitter.consumerkey= 위의 그림에서 API 키 복붙

twitter.consumersecret= 위의 그림에서 API 시크릿 키 복붙

twitter.token=위의 그림에서 엑세스토큰 복붙

twitter.secret=위의 그림에서 엑세스토큰 시크릿 복붙

위와 같이 수정해준다.

스텝 17) 그리고 또 우리가 해줘야할게 Kafka는 계속 업데이트가 되었는데 kafka-connect-twitter 는 변화없이 오래되었기 때문에 커넥터를 그냥 써버리면 버전이 안맞는 현상으로 오류가 발생할 것이다. 그래서 일부 코드를 수정해줘야 한다.

위와 같은 문제점을 어떤 유저가 수정해서 올린 것이 있다. https://github.com/Eneco/kafka-connect-twitter/pull/56 를 참고하면 되고,

kafka-connect-twitter 폴더 내에 pom.xml 과 TwitterSourceConnector.scala 파일을 아래 그림 파일과 같이 수정한다.

  • pom.xml은 아래 그림과 같이 빨간색 부분을 초록색으로 수정해주면 된다.

7

  • 마찬가지로 TwitterSourceConnector.scala도 아래 그림과 같이 빨간색 부분을 초록색으로 수정해주면 된다.

8

스텝 18) kafka-connect-twitter 폴더내에 twitter-source.properties 파일의 다음 항목을 수정해준다.

output.format=string 주석 해제

track.terms=… 라인 주석 해제

스텝 19) 카프카서버나 주키퍼서버, 컨수머 켜져있는 터미널에서 다 일단 실행되고 있는거 전부 종료해준다.

그리고 새 터미널을 열어서 트위터 컨넥트 폴더에서 mvn clean package 입력 및 실행해서 다시 빌드

실행이 완료되었으면 빌드 석세스 메세지가 잘 뜨는지 확인하고, 아래의 명령어 실행

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

그 다음에 echo $CLASSPATH 명령어를 입력했을때

/home/minman/다운로드/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar 처럼 지정된 경로가 잘 출력이 되는지 확인

스텝 20) 새 터미널을 열어서 카프카 폴더에서 다음과 같은 명령어를 실행하여 주키퍼 서버를 구동한다.

bin/zookeeper-server-start.sh config/zookeeper.properties

스텝 21) 새 터미널을 열어서 카프카 폴더에서 다음과 같은 명령어를 실행하여 카프카 서버를 구동한다.

bin/kafka-server-start.sh config/server.properties

스텝 22) 새 터미널 열어 카프카 폴더에서 다음과 같은 명령어를 실행하여 토픽을 생성한다.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

명령어를 실행이 완료되면 특별히 출력되는 메세지는 없다. 그래서 실행이 되고나면 바로 다음줄에

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

명령어를 실행하고 결과로 ‘test’가 잘 출력이 되는지 확인한다.

(생성한 토픽 정상출력여부 확인)

그리고 그 터미널에서 바로 다음줄에

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  실항하여 프로듀서를 구동한다.

그리고 아래 그림과 같이 프로듀서에서 테스트 메세지를 보내본다.

This is a message

This is another message

2

step 23) 그 다음에 새로 터미널을 하나 띄운 다음에 카프카 폴더에서

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

위의 명령어를 실행하여 컨수머를 구동한다.

결과는 아까 프로듀서에서 입력했던 테스트 메세지가 아래 그림과 같이 출력된다.

3

스텝 24) 이제 드디어 카프카-트위터 컨넥터를 구동해볼 것이다.

새 터미널 열어서 카프카-컨넥트 폴더에서

mvn clean package 명령어를 먼저 실행하고 실행결과가 석세스 인것을 확인한 다음에 아래와 같은 명령어 실행한다.

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

그다음에 echo $CLASSPATH 입력 및 실행으로 다음과 같은 경로가 잘 나오는지 확인

/home/minman/다운로드/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

그런다음에 카프카-컨넥트 폴더에서

/home/minman/다운로드/kafka_2.11-2.2.0/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties

위의 명령어 입력 및 실행

그 다음에 컨수머를 띄운 터미널로 이동하면 아래 그림처럼 트위터 스트림이 실시간으로 전시가 될 것이다.

9

마지막으로 위에 실행한 주키퍼 서버나 카프카 서버 등을 종료하려면 그냥 해당 터미널에서 컨트롤+c로 종료하는 방법도 있고, 아래와 같이 카프카 폴더안에 bin 폴더에서 stop 류의 스크립트로 종료해도 되고, ps aux 류의 명령어로 process 잡아서 프로세스를 kill해도 된다.

13