Airflow를 이용한 EMR spark job 병렬처리 예시
2020-10-14
.
Data_Engineering_TIL(20201014)
# 실습목표
Airflow를 이용하여 AWS EMR을 boto3로 띄워서 어떤 spark job을 두개 실행하고 job이 끝나면 EMR cluster를 terminate
# 실습내용
** 실습환경 : Amazon linux AMI 2
step 1) Airflow에서 Task 병렬실행을 위한 localexecutor 셋팅
https://minman2115.github.io/DE_TIL141/ 참고할 것
step 2) DAG 작성 및 실행
[ec2-user@ip-10-1-10-4 ~]$ cd ~/airflow
[ec2-user@ip-10-1-10-4 airflow]$ ls
airflow.cfg airflow.db airflow-webserver.pid logs unittests.cfg
[ec2-user@ip-10-1-10-4 airflow]$ mkdir dags
[ec2-user@ip-10-1-10-4 airflow]$ cd dags
[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
default_args = {
'owner': 'minman',
'depends_on_past': False,
'start_date': datetime(2020,10,14, 1, 55),
'email': ['[email_address]'],
'email_on_failure': True
}
dag = DAG('emr_job_flow_test',
default_args=default_args,
schedule_interval= dt.timedelta(hours=1),
catchup=False
)
JOB_FLOW_OVERRIDES = {
'Name' : 'pms-EMRtest-test',
'LogUri' : 's3://[bucket_name]/',
'ReleaseLabel' : 'emr-5.28.1',
'Instances' : {
'Ec2KeyName': '[key_name]',
'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceGroups': [{
'InstanceRole': 'MASTER',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Master"
}, {
'InstanceRole': 'CORE',
"InstanceCount": 3,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}, {
'InstanceRole': 'TASK',
"InstanceCount": 3,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}
]
},
'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole',
'StepConcurrencyLevel': 10,
'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
{'Key': 'expiry-date', 'Value': '2020-10-13'},
{'Key': 'owner', 'Value': 'pms'}],
'BootstrapActions':[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
# https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
}
],
"VisibleToAllUsers": True
}
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
PRE_STEP = [
{
'Name': 'spark_job_01',
#'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_01.py']
}
}
]
step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=PRE_STEP,
dag=dag
)
step_checker_pre = EmrStepSensor(
task_id='watch_step_of_pre',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
ACTUAL_STEP = [
{
'Name': 'spark_job_03',
# 'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_03.py']
}
}
]
step_adder_actual_step = EmrAddStepsOperator(
task_id='actual_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=ACTUAL_STEP,
dag=dag
)
step_checker_actual = EmrStepSensor(
task_id='watch_step_of_actual',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover
DAG 실행결과
[위에 코드에서 참고사항]
- 스케쥴링 방법
한국시간으로 현재시각이 오전 11시 50분인데 11시 55분 부터 1시간마다 스케쥴링을 걸어주고 싶다고 하면
Airflow는 UTC 기준이기 때문에 AIrflow에서 시간은 오전 2시 50분인 것이다.
결론적으로 위에 코드에서 'start_date': datetime(2020,10,14, 1, 55)
로 해주면
11시 55분부터 1시간씩 해당 DAG가 돌것이다.
- spark_job_01.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
_list = ['A','B','C']
for elem in _list:
df = spark.read.option("header","true").parquet("s3a://[bucket_name]/testdata/merge_data/*.parquet")
df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
.withColumnRenamed("event_type","event_type_{}".format(elem))\
.withColumnRenamed("product_id","product_id_{}".format(elem))\
.withColumnRenamed("category_id","category_id_{}".format(elem))\
.withColumnRenamed("category_code","category_code_{}".format(elem))\
.withColumnRenamed("brand","brand_{}".format(elem))\
.withColumnRenamed("price","price_{}".format(elem))\
.withColumnRenamed("user_id","user_id_{}".format(elem))\
.withColumnRenamed("year","year_{}".format(elem))\
.withColumnRenamed("user_session","user_session_{}".format(elem))\
.withColumn("id", F.monotonically_increasing_id())
df.write.option("header", "true").csv("s3a://[bucket_name]/{}".format(elem))
df.show()
print("record count : ", df.count())
spark.catalog.clearCache()
- spark_job_03.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
_list = ['D','E','F']
for elem in _list:
df = spark.read.option("header","true").parquet("s3a://[bucket_name]/testdata/merge_data/*.parquet")
df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
.withColumnRenamed("event_type","event_type_{}".format(elem))\
.withColumnRenamed("product_id","product_id_{}".format(elem))\
.withColumnRenamed("category_id","category_id_{}".format(elem))\
.withColumnRenamed("category_code","category_code_{}".format(elem))\
.withColumnRenamed("brand","brand_{}".format(elem))\
.withColumnRenamed("price","price_{}".format(elem))\
.withColumnRenamed("user_id","user_id_{}".format(elem))\
.withColumnRenamed("year","year_{}".format(elem))\
.withColumnRenamed("user_session","user_session_{}".format(elem))\
.withColumn("id", F.monotonically_increasing_id())
df.write.option("header", "true").csv("s3a://[bucket_name]/{}".format(elem))
df.show()
print("record count : ", df.count())
spark.catalog.clearCache()