AWS Elasticsearch를 중심으로 하는 데이터 파이프라인 구축 기초

2020-07-29

.

Data_Engineering_TIL(20200729)

실습요약

로컬 PC(data producer) –> Elasticsearch –> kibana 구조의 데이터 파이프라인을 간단하게 구축해본다.

** 데이터 : key,value 형태의 json파일

해당 json 파일 데이터 구조

{“header”:[{“key1”:”data1”, … ,”key2”:”data2”}],”category2”:[{“key3”:”data3”, … ,”key4”:”data4”}],”category3”:[{“key5”:”data5”, … ,”key6”:”data6”}],”category4”:[],”category5”:[],”category6”:[{“key7”:”data7”, … ,”key8”:”data8”}]}

step 1) AWS Elasticsearch 도메인 생성

아래와 같은 옵션으로 도메인을 생성해준다.

1

step 2) data producing 및 elasticsearch 전송을 위한 파이썬 모듈 작성

s3에 저장된 샘플 데이터를 로딩해서 데이터 내용의 일부분을 정제하고, 5000건을 랜덤으로 생성하여 elasticsearch에 전송하는 로직

from elasticsearch import Elasticsearch
import boto3
import pandas as pd
import json
import random
from random import randint
import datetime
import time
import string
import logging
import sys

host = 'elasticsearch URL 주소'
region = 'ap-northeast-2'
service = 'es'

def load_json_sample_file():
    # s3에 저장된 sample json 파일 load
    s3 = boto3.resource('s3')
    json_object = s3.Object('bucket_name', 'tran.json')
    get_json_object = json_object.get()['Body'].read().decode('utf-8')
    json_data = json.loads(get_json_object)
    return json_data

def make_sample_test_data(json_data):
    ## 데이터를 랜덤으로 수정해서 샘플데이터 생성

    # header 부분만 추출해서 sample data 만들기
    data = json_data['header'][0]
    data['00'] = str(random.randint(0,9))
    data['00'] = '20200'+str(random.randint(1, 9))+'0'+str(random.randint(1,9))
    data['00'] = '0'+str(random.randint(0, 4))
    data['0000'] = float(random.randint(1000, 100000))
    data['0000'] = '000'+str(random.randint(1, 9))
    data['0000'] = str(random.randint(1, 1000000))

    # Kibana가 인식할 수 있는 timedata 형태로 샘플데이터의 일부 time 데이터를 변환하는 작업
    dtime = (datetime.datetime.now()+ datetime.timedelta(hours=randint(-5,5))).strftime('%Y/%m/%d %H:%M:%S')
    data['time'] = dtime
    data['time2'] = dtime
    return data

def make_es_index_id():
    ## index의 고유아이디를 생성하기 위한
    ## 30자리의 랜덤한 문자열 생성

    # es id에 넣을 30자리의 랜덤한 문자열 만들기
    _LENGTH = 30 # 30자리
    string_pool = string.ascii_letters # 대소문자
    result = "" # 결과 값
    for i in range(_LENGTH):
        result += random.choice(string_pool)  # 랜덤한 문자열 하나 선택
    return result

def main():
    # ES 연결설정
    es = Elasticsearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = ('user_id', 'Password321!'),
        scheme="https"
    )

    # s3로부터 샘플데이터 로딩
    json_data = load_json_sample_file()

    # Elasticsearch 클러스터에 for문으로 샘플데이터 5000개씩 저장
    try:
        for number in range(1,5001):
            test_data = make_sample_test_data(json_data)
            json_id = make_es_index_id()
            es.index(index="data5", doc_type="header_data", id=json_id, body=test_data)
        print("job done")
    except:
        logging.error("something wrong")
        sys.exit(1)

if __name__=='__main__':
    main()

작성후 아래와 같이 파이썬 모듈 실행

user@DESKTOP-QKRJSLG MINGW64 ~/Desktop/aa/bb
$ python new_consumer_v2.py
job done

step 3) 키바나에 접속하여 방금 파이썬 모듈로 실행하여 생성된 5000건의 데이터가 잘 들어왔는지 확인

2