SQL query job 위주의 DAG jobflow를 EMR pyspark code로 구현

2020-08-23

.

Data_Engineering_TIL(20200823)

실습 목적

SQL query 위주로 구성된 DAG jobflow를 EMR pyspark code로 구현하는 예시

  • AS-IS : SQL query 위주로 구성된 외부시스템의 DAG jobflow

  • TO-BE : EMR pyspark code

AS-IS의 DAG jobflow 형태

아래와 같이 데이터를 로딩해서 SQL Query를 위주로 데이터를 처리하여 다시 저장하는 일련의 jobflow 형태

a

TO-BE의 EMR pyspark code 예시

  • EMR 테스트 환경

1) EMR 버전 : emr-5.30.1 (spark 2.4.5)

2) 마스터 1대, 코어 10대

3) 사양 : 일괄 r5.4xlarge(16vCPU, 128GB RAM)

STEP 1) AS-IS의 형태를 TO-BE형태의 pyspark code형태로 튜닝없이 그대로 마이그레이션

  • example_01.py
from pyspark.sql import SparkSession
from pyspark import SparkContext
import boto3
import time
from datetime import datetime

# python code start time check
start = time.time()

DF_NAME = 'example_01'
s3_bucket = 'minman'
s3_sql_file = 'emr/example.sql'
hdfs_temp_dir = "/tmp/spark/temp"

def sqlfile_read(bucketname, filename):
    '''
    [running process]
    step 1) data file from s3
    step 2) read sql commands
    step 3) save sql commands in python list variable(sqlcommand_list)
    '''
    query_list = []
    
    # Open and read the file as a single buffer
    s3 = boto3.resource('s3')
    content_object = s3.Object(bucketname, filename)
    sql_file = content_object.get()['Body'].read().decode('utf-8')

    # all SQL commands (split on ';')
    sqlCommands = sql_file.split(';')

    # make list from sql commands
    for command in sqlCommands:
        query_list.append(command)
        
    return query_list

sqlcommand_list = sqlfile_read(s3_bucket,s3_sql_file)
spark = SparkSession.builder.appName(DF_NAME).getOrCreate()

# FID : a_table
# FName : a_table
a_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_a/*.parquet')
a_table.createOrReplaceTempView('a_table')

# FID : b_table
# FName : b_table
b_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_b/*.parquet')
b_table.createOrReplaceTempView('b_table')

# FID : c_table
# FName : c_table
c_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_c/*.parquet')
c_table.createOrReplaceTempView('c_table')


...


# FID : query_1
# FName : query_1
query_1 = spark.sql('{}'.format(sqlcommand_list[0]))
query_1.createOrReplaceTempView('query_1')

# FID : query_2
# FName : query_2
query_2 = spark.sql('{}'.format(sqlcommand_list[1]))
query_2.createOrReplaceTempView('query_2')

# FID : query_3
# FName : query_3
query_3 = spark.sql('{}'.format(sqlcommand_list[2]))
query_3.createOrReplaceTempView('query_3')


...



# FID : query_6
# FName : query_6
query_3 = spark.sql('{}'.format(sqlcommand_list[5]))
query_3.createOrReplaceTempView('query_4')

# FID : d_table
# FName : d_table
d_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_d/*.parquet')
d_table.createOrReplaceTempView('d_table')

# FID : query_7
# FName : query_7
query_7 = spark.sql('{}'.format(sqlcommand_list[6]))
query_7.createOrReplaceTempView('query_7')

# FID : query_8
# FName : query_8
query_8 = spark.sql('{}'.format(sqlcommand_list[7]))
query_8.createOrReplaceTempView('query_8')

...

# FID : query_10
# FName : query_10
query_10 = spark.sql('{}'.format(sqlcommand_list[9]))
query_10.createOrReplaceTempView('query_10')

# FID : query_11
# FName : query_11
# used: 1
query_11 = spark.sql('{}'.format(sqlcommand_list[10]))
query_11.createOrReplaceTempView('query_11')

...

# FID : query_13
# FName : query_13
query_13 = spark.sql('{}'.format(sqlcommand_list[12]))
query_13.createOrReplaceTempView('query_13')

# FID : query_14
# FName : query_14
query_14 = spark.sql('{}'.format(sqlcommand_list[13]))
query_14.createOrReplaceTempView('query_14')

# FID : query_15
# FName : query_15
query_15 = spark.sql('{}'.format(sqlcommand_list[14]))
query_15.createOrReplaceTempView('query_15')

# FID : query_16
# FName : query_16
query_16 = spark.sql('{}'.format(sqlcommand_list[15]))
query_16.createOrReplaceTempView('query_16')

# FID : query_17
# FName : query_17
# used: 1
query_17 = spark.sql('{}'.format(sqlcommand_list[16]))
query_17.createOrReplaceTempView('query_17')

# FID : query_18
# FName : query_18
query_17 = spark.sql('{}'.format(sqlcommand_list[17]))
query_17.createOrReplaceTempView('query_18')

# FID : query_19
# FName : query_19
# used: 1
query_19 = spark.sql('{}'.format(sqlcommand_list[18]))
query_19.createOrReplaceTempView('query_19')

...

# FID : query_20
# FName : query_20
query_20 = spark.sql('{}'.format(sqlcommand_list[19]))
query_20.createOrReplaceTempView('query_20')

# FID : query_21
# FName : query_21
query_21 = spark.sql('{}'.format(sqlcommand_list[20]))
query_21.createOrReplaceTempView('query_21')

# FID : f_table
# FName : f_table
f_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_f/*.parquet')
f_table.createOrReplaceTempView('f_table')

# FID : g_table
# FName : g_table
g_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_g/*.parquet')
g_table.createOrReplaceTempView('g_table')


# FID : query_22
# FName : query_22
query_22 = spark.sql('{}'.format(sqlcommand_list[21]))
query_22.createOrReplaceTempView('query_22')

# FID : query_23
# FName : query_23
query_23 = spark.sql('{}'.format(sqlcommand_list[22]))
query_23.createOrReplaceTempView('query_23')


...


# FID : query_25
# FName : query_25
query_25 = spark.sql('{}'.format(sqlcommand_list[24]))
query_25.createOrReplaceTempView('query_25')

# FID : h_table
# FName : h_table
h_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_h/*.parquet')
h_table.createOrReplaceTempView('h_table')

...

# FID : python_code_01
# FName : python_code_01
python_code_01 = query_24.filter(df['City']=='Manchester')
python_code_01.createOrReplaceTempView('python_code_01')

...

# FID : query_27
# FName : query_27
query_27 = spark.sql('{}'.format(sqlcommand_list[26]))
query_27.createOrReplaceTempView('query_27')

# FID : python_code_02
# FName : python_code_02
# Python Script_NA 0처리
input_dataframe = query_27
temp_dataframe = input_dataframe.na.fill(0)
outdata_dataframe = temp_dataframe.na.fill('')
outdata_dataframe.createOrReplaceTempView('python_code_02')

...

# FID : query_29
# FName : query_29
query_29 = spark.sql('{}'.format(sqlcommand_list[28]))
query_29.createOrReplaceTempView('query_29')

# FID : Unload
# FName : Unload
query_29.write.mode('overwrite').option('header','true').parquet('s3://minman/emr/result')

# python code endtime check
endTimeQuery = time.clock()

# python code running time print
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# remove spark dataframe cache
spark.catalog.clearCache()

위와 같이 작성한 example_01.py를 EMR의 마스터 노드에서 spark-submit example_01.py 명령어로 실행하면 AS-IS의 형태에서 처리되는 일련의 flow를 동일하게 재현할 수 있다.

** 위에 python 코드에서 참조하는 sql파일 내용 형태

  • example.sql

;(세미콜론)을 기준으로 쿼리문을 구분하였음

-- FID : query_1
-- FName : query_1
SQL QUERY ;
-- FID : query_2
-- FName : query_2
SQL QUERY ;
-- FID : query_3
-- FName : query_3
SQL QUERY ;

...

-- FID : query_29
-- FName : query_29
SQL QUERY ;

STEP 2) pyspark code형태로 튜닝없이 그대로 마이그레이션한 코드의 실행 퍼포먼스 향상을 위한 튜닝

1) example_01.py code 문제점

  • 특정 sql query 소시지가 이전 step에서 수행된 sql query 소시지를 참조하는 경우가 대부분임. 이런경우 참조하는 이전 step에서의 sql query 소시지까지 한번에 합쳐서 실행하는 구조

  • 따라서 불필요하게 복잡한 sql 연산이 반복적으로 수행되므로 성능이 떨어지고, 디버깅을 트래킹하기에도 어려움

2) 튜닝 포인트

  • 여러번 참조되는 소시지의 dataframe의 경우 hdfs에 해당 dataframe을 save한 다음 그거를 다시 로딩하는 로직을 추가함

  • 이는 무슨의미냐면 특정 sql query 소시지의 dataframe을 다수의 다른 소시지에서 참조할 경우 해당 dataframe의 lineage(RDD가 연산되면서 변경되는 것에 대한 그래프 형태의 트리계보)를 단절시켜버림

  • 이를 어떻게 구현하냐면 아래예시와 같이 여러번 참조되는 소시지의 dataframe을 hdfs에 임시로 save한 다음 다시 로딩시켜버림

예시)

def save_df_and_rad(sparkSession, df, df_path_name):
    '''
    [running process]
   
    step 1) dataframe write to HDFS
    step 2) read dataframe from HDFS
    '''
    df.write.mode("overwrite").parquet("{}/{}".format(hdfs_temp_dir, df_path_name))
    return sparkSession.read.parquet("{}/{}".format(hdfs_temp_dir, df_path_name))

# FID : query_example
# FName : query_example
# used(해당 데이터프레임의 참조횟수): 3
query_example = spark.sql('{}'.format(sqlcommand_list[17]))
query_example = save_df_and_rad(spark, query_example , " query_example ")
query_example .createOrReplaceTempView(' query_example ')
  • 자주 참고되는 dataframe의 data lineage가 길게 늘어지게 되면 연산량이 많아져 코드 전체의 관점에서 퍼포먼스가 느려질수 밖에 없음. 따라서 이런 dataframe들은 HDFS에 저장하고 다시 load 하는 로직을 추가함으로써 불필요한 연산을 감소시킴

  • 여러번 참조되는 일부 dataframe은 cache처리

3) 위와 같은 내용이 반영된 example_02.py code

from pyspark.sql import SparkSession
from pyspark import SparkContext
import boto3
import time
from datetime import datetime

# python code start time check
start = time.time()

DF_NAME = 'example_02'
s3_bucket = 'minman'
s3_sql_file = 'emr/example.sql'
hdfs_temp_dir = "/tmp/spark/temp"

def sqlfile_read(bucketname, filename):
    '''
    [running process]
    step 1) data file from s3
    step 2) read sql commands
    step 3) save sql commands in python list variable(sqlcommand_list)
    '''
    query_list = []
    
    # Open and read the file as a single buffer
    s3 = boto3.resource('s3')
    content_object = s3.Object(bucketname, filename)
    sql_file = content_object.get()['Body'].read().decode('utf-8')

    # all SQL commands (split on ';')
    sqlCommands = sql_file.split(';')

    # make list from sql commands
    for command in sqlCommands:
        query_list.append(command)
        
    return query_list

def save_df_and_rad(sparkSession, df, df_path_name):
    '''
    [running process]
    
    step 1) dataframe write to HDFS
    step 2) read dataframe from HDFS
    '''
    df.write.mode("overwrite").parquet("{}/{}".format(hdfs_temp_dir, df_path_name))
    return sparkSession.read.parquet("{}/{}".format(hdfs_temp_dir, df_path_name))

sqlcommand_list = sqlfile_read(s3_bucket,s3_sql_file)

spark = SparkSession.builder.appName(DF_NAME).getOrCreate()

# FID : a_table
# FName : a_table
a_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_a/*.parquet')
a_table.createOrReplaceTempView('a_table')

# FID : b_table
# FName : b_table
b_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_b/*.parquet')
b_table.createOrReplaceTempView('b_table')

# FID : c_table
# FName : c_table
c_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_c/*.parquet')
c_table.createOrReplaceTempView('c_table')


...


# FID : query_1
# FName : query_1
# used: 2번
query_1 = spark.sql('{}'.format(sqlcommand_list[0]))
query_1.createOrReplaceTempView('query_1')

# FID : query_2
# FName : query_2
# used: 1번
query_2 = spark.sql('{}'.format(sqlcommand_list[1]))
query_2.createOrReplaceTempView('query_2')

# FID : query_3
# FName : query_3
# used: 1번
query_3 = spark.sql('{}'.format(sqlcommand_list[2]))
query_3.createOrReplaceTempView('query_3')


...



# FID : query_6
# FName : query_6
# used: 1번
query_3 = spark.sql('{}'.format(sqlcommand_list[5]))
query_3.createOrReplaceTempView('query_4')

# FID : d_table
# FName : d_table
# used: 5 
d_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_d/*.parquet')
# 다른 SQL 쿼리가 d_table을 5번 사용하기 때문에 아래와 같이 cache() 처리하였음
d_table.cache()
d_table.createOrReplaceTempView('d_table')

# FID : query_7
# FName : query_7
# used: 1
# 작은 여러 테이블을 join하는 쿼리
query_7 = spark.sql('{}'.format(sqlcommand_list[6]))
query_7.createOrReplaceTempView('query_7')

# FID : query_8
# FName : query_8
# used: 3
# in 절에 여러 조건을 찾으나 테이블 자체가 작음
query_8 = spark.sql('{}'.format(sqlcommand_list[7]))
query_8 = save_df_and_rad(spark, query_8, "query_8")
query_8.createOrReplaceTempView('query_8')

...

# FID : query_10
# FName : query_10
# used: 2
query_10 = spark.sql('{}'.format(sqlcommand_list[9]))
query_10 = save_df_and_rad(spark, query_13, "query_10")
query_10.createOrReplaceTempView('query_10')

# FID : query_11
# FName : query_11
# used: 1
query_11 = spark.sql('{}'.format(sqlcommand_list[10]))
query_11.createOrReplaceTempView('query_11')

...

# FID : query_13
# FName : query_13
# used: 3
query_13 = spark.sql('{}'.format(sqlcommand_list[12]))
query_13 = save_df_and_rad(spark, query_13, "query_13")
query_13.createOrReplaceTempView('query_13')

# FID : query_14
# FName : query_14
# used: 2
query_14 = spark.sql('{}'.format(sqlcommand_list[13]))
query_14 = save_df_and_rad(spark, query_14, "query_14")
query_14.createOrReplaceTempView('query_14')

# FID : query_15
# FName : query_15
# used: 7
query_15 = spark.sql('{}'.format(sqlcommand_list[14]))
query_15 = save_df_and_rad(spark, query_15, "query_15")
query_15.createOrReplaceTempView('query_15')

# FID : query_16
# FName : query_16
# used: 3
query_16 = spark.sql('{}'.format(sqlcommand_list[15]))
query_16 = save_df_and_rad(spark, query_16, "query_16")
query_16.createOrReplaceTempView('query_16')

# FID : query_17
# FName : query_17
# used: 1
query_17 = spark.sql('{}'.format(sqlcommand_list[16]))
query_17.createOrReplaceTempView('query_17')

# FID : query_18
# FName : query_18
# used: 1
query_17 = spark.sql('{}'.format(sqlcommand_list[17]))
query_17.createOrReplaceTempView('query_18')

# FID : query_19
# FName : query_19
# used: 1
query_19 = spark.sql('{}'.format(sqlcommand_list[18]))
query_19.createOrReplaceTempView('query_19')

...

# FID : query_20
# FName : query_20
# used: 1
query_20 = spark.sql('{}'.format(sqlcommand_list[19]))
query_20.createOrReplaceTempView('query_20')

# FID : query_21
# FName : query_21
# used: 1
query_21 = spark.sql('{}'.format(sqlcommand_list[20]))
query_21.createOrReplaceTempView('query_21')

# FID : f_table
# FName : f_table
# used: 3
f_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_f/*.parquet')
f_table.cache()
f_table.createOrReplaceTempView('f_table')

# FID : g_table
# FName : g_table
# used: 3
g_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_g/*.parquet')
g_table.cache()
g_table.createOrReplaceTempView('g_table')


# FID : query_22
# FName : query_22
# used: 1
query_22 = spark.sql('{}'.format(sqlcommand_list[21]))
query_22.createOrReplaceTempView('query_22')

# FID : query_23
# FName : query_23
# used: 3
query_23 = spark.sql('{}'.format(sqlcommand_list[22]))
query_23 = save_df_and_rad(spark, query_23, "query_23")
query_23.createOrReplaceTempView('query_23')


...


# FID : query_25
# FName : query_25
# used: 3
query_25 = spark.sql('{}'.format(sqlcommand_list[24]))
query_25 = save_df_and_rad(spark, query_25, "query_25")
query_25.createOrReplaceTempView('query_25')

# FID : h_table
# FName : h_table
# used: 2
h_table = spark.read.option('header','true').parquet('s3://minman/prefix1/prefix_h/*.parquet')
h_table.createOrReplaceTempView('h_table')

...

# FID : python_code_01
# FName : python_code_01
python_code_01 = query_24.filter(df['City']=='Manchester')
python_code_01.createOrReplaceTempView('python_code_01')

...

# FID : query_27
# FName : query_27
# used: 1
query_27 = spark.sql('{}'.format(sqlcommand_list[26]))
query_27.createOrReplaceTempView('query_27')

# FID : python_code_02
# FName : python_code_02
# Python Script_NA 0처리
input_dataframe = query_27
temp_dataframe = input_dataframe.na.fill(0)
outdata_dataframe = temp_dataframe.na.fill('')
outdata_dataframe.createOrReplaceTempView('python_code_02')

...

# FID : query_29
# FName : query_29
query_29 = spark.sql('{}'.format(sqlcommand_list[28]))
query_29.createOrReplaceTempView('query_29')

# FID : Unload
# FName : Unload
query_29.write.mode('overwrite').option('header','true').parquet('s3://minman/emr/result')

# python code endtime check
endTimeQuery = time.clock()

# python code running time print
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# remove spark dataframe cache
spark.catalog.clearCache()

위와 같이 작성한 example_02.py를 EMR의 마스터 노드에서 아래와 같은 명령어로 실행하면 AS-IS의 형태에서 처리되는 일련의 flow를 동일하게 재현하면서 성능은 더 향상된 결과를 얻을 수 있다.

spark-submit \ --master yarn \ --deploy-mode client \ --driver-memory 4g \ --num-executors 17 \ --executor-cores 5 \ --executor-memory 20g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.default.parallelism=170 \ --conf spark.sql.crossJoin.enabled=true \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \ example_02.py

** spark application 차원에서 옵션

driver cores: 1

driver memory: 4GB

executor cores: 85 (executor: 17, executor-core: 5 )

executor memory: 340GB (executor 당 4GB)

** spark 메모리 설정 참고자료

URL : https://aws.amazon.com/ko/blogs/korea/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/