AWS Elasticsearch를 중심으로 하는 데이터 파이프라인 구축 기초
2020-07-29
.
Data_Engineering_TIL(20200729)
실습요약
로컬 PC(data producer) –> Elasticsearch –> kibana 구조의 데이터 파이프라인을 간단하게 구축해본다.
** 데이터 : key,value 형태의 json파일
해당 json 파일 데이터 구조
{“header”:[{“key1”:”data1”, … ,”key2”:”data2”}],”category2”:[{“key3”:”data3”, … ,”key4”:”data4”}],”category3”:[{“key5”:”data5”, … ,”key6”:”data6”}],”category4”:[],”category5”:[],”category6”:[{“key7”:”data7”, … ,”key8”:”data8”}]}
step 1) AWS Elasticsearch 도메인 생성
아래와 같은 옵션으로 도메인을 생성해준다.
step 2) data producing 및 elasticsearch 전송을 위한 파이썬 모듈 작성
s3에 저장된 샘플 데이터를 로딩해서 데이터 내용의 일부분을 정제하고, 5000건을 랜덤으로 생성하여 elasticsearch에 전송하는 로직
from elasticsearch import Elasticsearch
import boto3
import pandas as pd
import json
import random
from random import randint
import datetime
import time
import string
import logging
import sys
host = 'elasticsearch URL 주소'
region = 'ap-northeast-2'
service = 'es'
def load_json_sample_file():
# s3에 저장된 sample json 파일 load
s3 = boto3.resource('s3')
json_object = s3.Object('bucket_name', 'tran.json')
get_json_object = json_object.get()['Body'].read().decode('utf-8')
json_data = json.loads(get_json_object)
return json_data
def make_sample_test_data(json_data):
## 데이터를 랜덤으로 수정해서 샘플데이터 생성
# header 부분만 추출해서 sample data 만들기
data = json_data['header'][0]
data['00'] = str(random.randint(0,9))
data['00'] = '20200'+str(random.randint(1, 9))+'0'+str(random.randint(1,9))
data['00'] = '0'+str(random.randint(0, 4))
data['0000'] = float(random.randint(1000, 100000))
data['0000'] = '000'+str(random.randint(1, 9))
data['0000'] = str(random.randint(1, 1000000))
# Kibana가 인식할 수 있는 timedata 형태로 샘플데이터의 일부 time 데이터를 변환하는 작업
dtime = (datetime.datetime.now()+ datetime.timedelta(hours=randint(-5,5))).strftime('%Y/%m/%d %H:%M:%S')
data['time'] = dtime
data['time2'] = dtime
return data
def make_es_index_id():
## index의 고유아이디를 생성하기 위한
## 30자리의 랜덤한 문자열 생성
# es id에 넣을 30자리의 랜덤한 문자열 만들기
_LENGTH = 30 # 30자리
string_pool = string.ascii_letters # 대소문자
result = "" # 결과 값
for i in range(_LENGTH):
result += random.choice(string_pool) # 랜덤한 문자열 하나 선택
return result
def main():
# ES 연결설정
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = ('user_id', 'Password321!'),
scheme="https"
)
# s3로부터 샘플데이터 로딩
json_data = load_json_sample_file()
# Elasticsearch 클러스터에 for문으로 샘플데이터 5000개씩 저장
try:
for number in range(1,5001):
test_data = make_sample_test_data(json_data)
json_id = make_es_index_id()
es.index(index="data5", doc_type="header_data", id=json_id, body=test_data)
print("job done")
except:
logging.error("something wrong")
sys.exit(1)
if __name__=='__main__':
main()
작성후 아래와 같이 파이썬 모듈 실행
user@DESKTOP-QKRJSLG MINGW64 ~/Desktop/aa/bb
$ python new_consumer_v2.py
job done