트위터 스트림 실시간 대시보드 구현실습

2019-04-15

Data_Engineering_TIL_(20190413)

study program : https://www.fastcampus.co.kr/extension_des

[학습목표]

  • kafka와 spark, dash를 이용한 실시간 대시보드 구현실습

[학습기록]

실습프리뷰

카프카에서 오는 스트림 데이터를 파이스파크로 처리하여 Dash를 이용해 실시간 대시보드를 구현하는 실습을 진행 할 것이다.

다시말해 카프카에서 오는 스트림 데이터를 스파크에서 간단한 워드카운트 처리해서 대시라는 것을 이용해서 실시간으로 업데이트해서 시각화하는 실습이다.

Dash는 파이썬으로 코드를 짜고 서버를 구동해 시각화 웹페이지를 띄워주는 툴이다.

카프카로 실시간으로 트위터 스트림을 받아오는 작업을 먼저 set up하고(관련 내용 URL : https://minman2115.github.io/DE_TIL12),

./spark-submit –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 hashtag_ranking.py 명령어를 실행하여 카프카기능을 포함한 상태로 파이스파크를 띄우는 작업을 추가로 할 것이다.

그러면 아래와 같은 아키텍처로 데이터가 흐르면서 Dash에서 실시간으로 대시보드를 구성할 것이다.

0

#### 실습 상세내용

step 1)

우리는 먼저 대시를 다운받아서 getting start라는 예제를 구동해봄으로써 정상적으로 실행이 되는지 확인하고자 한다.

그래서 step 1) ~ step 5)는 dash를 설치하고, 정상적으로 실행이 되는지 확인할 것이다.

getting start 예제는 로컬호스트로 서버를 띄워서 대시의 예제를 구동하고, 만든 로컬호스트로 접속하여 확인해보는 간단한 테스트이다.

우리가 지금하고자 하는 내용의 관련 URL : https://dash.plot.ly/getting-started

터미널에서 ‘pip install dash’ 입력 및 실행

dash 설치관련 URL : https://dash.plot.ly/installation

step 2) 설치가 완료되면 임의의 폴더에서 텍스트 편집기(메모장)을 이용해서 ‘app.py’라는 파일생성

step 3) app.py 내용은 아래의 코드를 복붙하고, 저장을 해준다.

# html 그리는 것을 파이썬 언어로 할 수 있음

import dash
import dash_core_components as dcc
import dash_html_components as html

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
# dash app 초기화

# dash app에 무엇을 띄울 것인가에 대한 정의
# app.layout은 레이아웃을 지정
# html 그리는 것을 html, 자바스크립트가 아니라 아래와 같이 파이썬 코드로 작성하는 것이 dash의 특징이다.
app.layout = html.Div(children=[
    html.H1(children='Hello Dash'),

    html.Div(children='''
        Dash: A web application framework for Python.
    '''),

    dcc.Graph(
        id='example-graph',
        figure={
            'data': [
                {'x': [1, 2, 3], 'y': [4, 1, 2], 'type': 'bar', 'name': 'SF'},
                {'x': [1, 2, 3], 'y': [2, 4, 5], 'type': 'bar', 'name': u'Montréal'},
            ],
            'layout': {
                'title': 'Dash Data Visualization'
            }
        }
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)
    # 실행을하면 서버가 뜸

step 4) app.py가 있는 위치에서 ‘python app.py’ 명령어를 입력하고 실행한다.

step 5) 웹브라우저에서 ‘http:127.0.0.1:8050/’ 로 접속하여 아래 그림과 같이 접속이 잘 되는지 확인한다.

1

step 6) 본격적으로 실시간 대시보드 구현해보자 먼저 트위터 스트림을 받아오는 카프카 서버 set up 해야한다.

관련 URL 1: https://minman2115.github.io/DE_TIL12

(카프카 관련 학습자료)

관련 URL 2: https://kafka.apache.org/quickstart

(카프카 공식 홈페이지 quickstart 가이드 페이지)

위의 관련 URL 2에서 STEP 2) ~ STEP 5)을 수행해주면 되는데 아래의 내용과 같다.

먼저 터미널 하나를 띄워서 카프카-트위터-컨넥트 폴더로 이동한 다음에 mvn clean package 입력 및 실행한다.

그리고 아래 그림과 같이 build success가 정상적으로 출력이 되는지 확인할 것.

(카프카-트위터-컨넥트 패키지 빌드)

2

step 7) 그 다음에 방금 그 터미널에서 바로 다음줄에 아래와 같은 명령어 입력

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

그 다음에 echo $CLASSPATH 명령어를 입력했을때

/home/minman/다운로드/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar 처럼 지정된 경로가 잘 출력이 되는 지 확인한다.

그 다음에 카프카 폴더로 이동한 다음에 bin/zookeeper-server-start.sh config/zookeeper.properties 실행

(주키퍼 서버 구동)

실행하면 아래 그림과 같이 전시가 될 것이다.

3

그 다음에 터미널 하나를 새로 띄워서 카프카 폴더로 이동한 다음에 bin/kafka-server-start.sh config/server.properties 실행

(카프카 서버 구동)

실행하면 아래 그림과 같이 전시가 될 것이다.

4

또 터미널 하나를 새로 띄워서 카프카 폴더로 이동한 다음에 bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic test 실행

(토픽 생성)

실행하면 특별하게 뭐가 출력되는 것은 없다.(아래 그림 참조)

그리고 그 터미널에서 바로 다음줄에 bin/kafka-topics.sh –list –bootstrap-server localhost:9092 실행하여 아래 그림과 같이 test라는 문자가 출력되는지 확인한다.

(토픽 생성여부 확인)

그리고 그 터미널에서 바로 다음줄에 bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test 를 실행해준다.

그리고 아래 그림과 같이 테스트 메세지를 보내본다.

This is a message

This is another message

(producer 구동 및 테스트 메세지 발송)

5

그 다음에 새로 터미널을 하나 띄운 다음에 카프카 폴더에서 bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning 를 실행해준다.

(컨수머 구동)

결과는 아까 프로듀서에서 입력했던 테스트 메세지가 아래 그림과 같이 출력된다.

6

그 다음에 새로 터미널을 하나 띄운 다음에 카프카-트위터 컨넥트 폴더로 이동해서 아래와 같은 명령어 입력

export CLASSPATH=`pwd`/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar

그 다음에 echo $CLASSPATH 명령어를 입력했을때

/home/minman/다운로드/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar 처럼 지정된 경로가 잘 출력이 되는 지 확인한다.

그리고 그 터미널 바로 다음줄에 kafka-connect-twitter 폴더에서 /home/minman/다운로드/kafka_2.11-2.2.0/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties 명령어를 실행하고 컨수머 구동한 터미널창에서 트위터 스트림이 잘 들어오는지 확인한다.

(카프카-트위터 컨넥트 구동)

카프카-트위터 컨넥터를 구동하면 아래 그림과 같은 결과가 출력된다.

7

그리고 그 터미널 바로 다음줄에 kafka-connect-twitter 폴더에서 /home/minman/다운로드/kafka_2.11-2.2.0/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties 명령어를 실행하고 컨수머 구동한 터미널창에서 트위터 스트림이 잘 들어오는지 확인한다.

** 주의사항 **

/home/minman/다운로드/kafka_2.11-2.2.0/bin/connect-standalone.sh connect-simple-source-standalone.properties twitter-source.properties 명령어를 넣어주는 터미널 창에서 사전에 export CLASSPATH=’pwd’/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar를 실행해줘야 하고 echo $CLASSPATH를 통해 경로가 출력되는 것을 확인하고 해줘야 한다.

**

step 8) 그리고 위에서 언급한 ./spark-submit –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 hashtag_ranking.py 명령어를 실행하여 카프카기능을 포함한 상태로 파이스파크를 띄우는 작업을 이제 할 것이다.

** 참고사항 **

스파크 폴더의 bin 폴더에서 ./pyspark –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 명령어를 입력하여 실행하면 카프카모듈이 포함된 상태로 파이스파크 쉘이 실행된다.

./spark-submit –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 hashtag_ranking.py는 어떤 의미냐면 spark-submit –packages에다가 똑같이 카프카 패키기 포함하고, 실행하고자 하는 파일이름을 넣어주면 그 파일까지 실행이 된다.

**

그래서 그 다음에 우리는 터미널을 하나 띄워서 스파크 폴더의 bin 폴더로 가서 텍스트 편집기(메모장)로 app3.py를 생성해준다.

그 다음에 그 파일 내용은 아래와 코드와 같이 복붙하고 저장한다.

# -*- coding: utf-8 -*-
from flask import Flask, jsonify, abort, make_response, request, url_for
import ast

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.graph_objs as go
from dash.dependencies import Input, Output

labels = ['Waiting']
values = [0]

server = Flask(__name__)

@server.route('/test', methods=['GET'])
def test_api_request():
    print('IM IN ROUTE /TEST')
    return 'success'

@server.route('/update_data', methods=['POST'])
def update_data():
    print('IM IN ROUTE /UPDATE_DATA')
    
    global labels, values
    if not request.form or 'tag_count' not in request.form:
        return "error",400
    print(request)
    print(request.form)
    labels = ast.literal_eval(request.form['tag'])
    labels.reverse()
    values = ast.literal_eval(request.form['tag_count'])
    values.reverse()
    print("labels received: " + str(labels))
    print("data received: " + str(values))
    return "success",201
# labels, values로 초기화해주고, 플라스크로 만들어놓은 api에 의해서 value랑 label을
# 입력을 받아서 넣어주는 형태가 된다.
# @server.route('/update_data', methods=['POST'])
# def update_data():
# 위에 두줄처럼 하면 update_data라는 api가 만들어진다.
# request가 들어왔을때 처리방법을 구현해놓았다.
# 리퀘스트가 들어오면 검사같은걸하고 디버깅을 위한 프린팅을 한다음에
# ast.literal_eval(request.form['tag']) 또는 ast.literal_eval(request.form['tag_count'])해서
# hashtag_ranking.py에서 보내준 post의 데이터를 받아서 labels랑 values에 넣어주게 된다.

app = dash.Dash(__name__, server=server)
# 대시보드를 여는 것으로 server=server는 
# 위에서 수동으로 코드로 작성한 flask를 임의로 초기화를 시켜서 서버를 넣어준다.

app.layout = html.Div(children=[
    html.H1('Twitter stream DashBoard'),

    dcc.Graph(
        id='live-update-graph'
    ),
    dcc.Interval(
        id='interval-component',
        interval=1*1000, # in milliseconds
        n_intervals=0
    )
])
# 그 다음에 대시보드 layout에 하나의 그래프를 넣어주는데 
# 그 그래프는 라이브 업데이트 그래프라고해서 인터벌을 넣어주면
# 얘를 1초마다 업데이트하는 그래프이다.
# 그래서 dcc.Graph를 넣어주고 그 밑에는 눈에 보이지는 않지만 
# 인터벌 업데이트를 담당하는 컴포넌트를 넣어준다.

# 인터널 컴포넌트가 인풋아웃풋으로해서 인터널컴포넌트에 업데이트가 되었을때
# 라이브업데이트그래프를 업데이트하라는 의미
# 여기에다 figure에 데이터를 넣어서 피큐어를 리턴을 해주면 이게 그래프에 들어가게된다.
# 들어오는 데이터는 벨류와 레이블로 넣을것인데 가로방향으로 그릴것이다.
# x에 value, y에 label을 넣을 것이다.
# 벨류와 레이블은 위에 update_data 함수있는곳에서 초기화 한다.
# 그리고 이 api는 update_data라는 api를 호스트로 호출하면 그 데이터를 뽑아서
# label이랑 value로 넣어주는 api이다.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):

    figure={
        'data': [
            go.Bar(x=values, 
                    y=labels,
                    orientation='h',
                    width=0.2)
        ],
        'layout': {
            'title': 'Dash Data Visualization 13'
        }
    }
    return figure

if __name__ == '__main__':
    app.run_server(debug=True, dev_tools_hot_reload=True)
# dev_tools_hot_reload=True하면 코드 업데이트하면 바로 업데이트가 자동으로 된다.

step 9) 다시 스파크 폴더의 bin 폴더로 가서 텍스트 편집기(메모장)로 hashtag_ranking.py를 생성해준다.

그 다음에 그 파일 내용은 아래와 코드와 같이 복붙하고 저장한다.

import requests

import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql.types import *
from pyspark.sql.functions import *

spark.sparkContext.setLogLevel("ERROR")

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").load()
# 스파크 데이터프레임에 spark.readStream.format("kafka") 이런식으로 명령어를 입력하면 카프카를 읽어드릴 수 있다.
# 그 다음에 .option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test") 명령어를 보면 콘솔컨수머를 띄울때랑 똑같은
# 옵션들이 들어간다. 부트스트랩 서버에 로컬호스트 9092, 그다음에 test 토픽에다 올려준다. 이런뜻이고 이렇게 한다음에 로드하면
# 데이터 프레임이 만들어 지는 것이다.
kafka_df_string = kafka_df.select(col("key").cast("STRING").alias("key"),col("value").cast("STRING").alias("value"))
# 그 다음에 데이터가 키벨류 형태로 들어오는데 

kafka_df_string_2 = kafka_df_string.select(col("value"))
# 그거를 키는 무시고 벨류만 가져오게 select을 하고

kafka_df_tags = kafka_df_string_2.select(explode(split('value', ' ')).alias('tag')).filter(col('tag').startswith('#'))
# #(해시테그)으로 시작하는 것만 select함
kafka_df_tag_count = kafka_df_tags.groupBy('tag').count().withColumnRenamed('count', 'tag_count').orderBy(col('count').desc())
# #(해시테그) 개수가 많은것만 뽑아진다.
# 결론적으로 위에 두줄의 코드는 들어오는 데이터에 대해 #(해시테그)로 시작하는 것을 wordcount해 갯수가 많은 단어를 순서대로 보여주게 된다.

output = kafka_df_tag_count.writeStream.outputMode("complete").format("console").option("truncate", "false").trigger(processingTime="3 seconds").start()
# kafka_df_tag_count 변수를 writeStream.outputMode에서 콘솔에다가 업데이트 모드로 뿌려주는 것을 의미한다. 
# 그리고 3초 주기로 배치테이블을 뿌려준다는 것이다.

def send_df_to_dashboard(df, id):
    tag = [str(t.tag) for t in df.select("tag").take(10)]
    tag_count = [str(t.tag_count) for t in df.select("tag_count").take(10)]
    url = 'http://localhost:8050/update_data'
    request_data = {'tag': str(tag), 'tag_count': str(tag_count)}
    print('update dashboard')
    response = requests.post(url, data=request_data)
# 데이터프레임의 값을 가져와서 df.select("tag"), df.select("tag_count") 두개를 뽑아준다.
# 얘가 로우라는 클래스로 나오는데 얘를 뽑으려면 .take()를 해줘야한다.
# 이렇게 뽑은 형태를 딕셔너리 형태로 만들어서(request_data)
# localhost:8050/update_data라는 곳에 
# requests라는 파이썬의 기본 모듈을 이용해서 post를 해준다.
# post한건 어디서 받냐면 dash 띄웠던 곳 app3.py에 넘긴다.

kafka_df_tag_count.writeStream.outputMode("complete").foreachBatch(send_df_to_dashboard).trigger(processingTime="3 seconds").start()
# kafka_df_tag_count 변수를 writeStream.outputMode에서 foreachBatch라는 api를 이용해서
# batch를 할때마다 send_df_to_dashboard 함수를 실행하게 한다.
# 그래서 dash에다가 업데이트 모드로 3초 주기로 뿌려주는 것을 의미한다. 

output.awaitTermination()

step 10) 스파크 폴더의 bin 폴더로 이동해서 python app3.py를 실행하여 플라스크 서버를 띄워준다.

그 다음에 이 플라스크 api를 스파크 스트리밍에서 주기적으로 호출을 해서 데이터를 업데이트 해줘야 한다.

그래서 우리는 터미널을 하나 새로 띄워서 역시 스파크 폴더의 bin 폴더로 이동해서

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 hashtag_ranking.py

위의 명령어를 실행해준다.

** 참고사항 **

이때 참고로 오류가 발생하면 m2, ivy 로컬 리포지토리 캐시 지워보자.

먼저 rm -rf ~/.m2/ 실행

그 다음에 rm -rf ~/.ivy2/cache/ 실행

그 다음에 다시 ./spark-submit –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 hashtag_ranking.py 실행

**

실행하면 해당 터미널에서 아래와 같은 결과가 전시된다.

8

위와같은 화면이 전시되다가 조금만 기라디면 아래 그림과 같이 배치테이블이 실시간으로 업데이트 된다.

9

step 11) 웹브라우저를 실행하고 http:127.0.0.1:8050 에 접속하면 대시보드가 아래와 같이 실시간으로 전시된다.

10

위의 컨슈머가 작업하는 job을 pyspark 쉘을 통해서 데이터 흐름들을 눈으로 보고 싶을때

step 12) 터미널을 하나 띄워서 스파크 폴더의 bin 폴더로 가서

./pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

위의 명령어를 실행해준다.

step 13) spark가 구동되면 아래 코드들을 입력하고 실행해준다.

방법 1) 아래의 코드들도 실행해보고

from pyspark.sql.types import *
from pyspark.sql.functions import *

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").load()
kafka_df_string = kafka_df.select(col("key").cast("STRING").alias("key"),col("value").cast("STRING").alias("value"))

kafka_df_string_2 = kafka_df_string.select(col("value"))

output = kafka_df_string_2.writeStream.outputMode("update").format("console").option("truncate", "false").trigger(processingTime="5 seconds").start()

방법 2) 아래의 코드들을 실행해서 어떻게 다른지 확인해본다.

from pyspark.sql.types import *
from pyspark.sql.functions import *

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test").load()
kafka_df_string = kafka_df.select(col("key").cast("STRING").alias("key"),col("value").cast("STRING").alias("value"))

kafka_df_string_2 = kafka_df_string.select(col("value"))

kafka_df_tags = kafka_df_string_2.select(explode(split('value', ' ')).alias('name')).filter(col('name').startswith('#'))
kafka_df_tag_count = kafka_df_tags.groupBy('name').count().orderBy(col('count').desc())

output2 = kafka_df_tag_count.writeStream.outputMode("complete").format("console").option("truncate", "false").trigger(processingTime="5 seconds").start()