Airflow를 이용한 EMR spark job 병렬처리 예시

2020-10-14

.

Data_Engineering_TIL(20201014)

# 실습목표

Airflow를 이용하여 AWS EMR을 boto3로 띄워서 어떤 spark job을 두개 실행하고 job이 끝나면 EMR cluster를 terminate

# 실습내용

** 실습환경 : Amazon linux AMI 2

step 1) Airflow에서 Task 병렬실행을 위한 localexecutor 셋팅

https://minman2115.github.io/DE_TIL141/ 참고할 것

step 2) DAG 작성 및 실행

[ec2-user@ip-10-1-10-4 ~]$ cd ~/airflow

[ec2-user@ip-10-1-10-4 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg

[ec2-user@ip-10-1-10-4 airflow]$ mkdir dags

[ec2-user@ip-10-1-10-4 airflow]$ cd dags

[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator


default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020,10,14, 1, 55),
    'email': ['[email_address]'],
    'email_on_failure': True
}

dag = DAG('emr_job_flow_test',
         default_args=default_args,
         schedule_interval= dt.timedelta(hours=1),
         catchup=False
)

JOB_FLOW_OVERRIDES = {
    'Name' : 'pms-EMRtest-test',
    'LogUri' : 's3://[bucket_name]/',
    'ReleaseLabel' : 'emr-5.28.1',
    'Instances' : {
            'Ec2KeyName': '[key_name]',
            'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
            'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
            'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
            'KeepJobFlowAliveWhenNoSteps': True,
            'TerminationProtected': False,
            'InstanceGroups': [{
                'InstanceRole': 'MASTER',
                "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Master"
                }, {
                    'InstanceRole': 'CORE',
                    "InstanceCount": 3,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }, {
                    'InstanceRole': 'TASK',
                    "InstanceCount": 3,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }
            ]
        },
    'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
    'JobFlowRole':'EMR_EC2_DefaultRole',
    'ServiceRole':'EMR_DefaultRole',
    'StepConcurrencyLevel': 10,
    'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
              {'Key': 'expiry-date', 'Value': '2020-10-13'},
              {'Key': 'owner', 'Value': 'pms'}],
    'BootstrapActions':[
            {
                'Name': 'Maximize Spark Default Config',
                'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
                # https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
            }
    ],
    "VisibleToAllUsers": True
}

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

PRE_STEP = [
    {
        'Name': 'spark_job_01',
        #'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_01.py']
        }
    }
]

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=PRE_STEP,
    dag=dag
)

step_checker_pre = EmrStepSensor(
        task_id='watch_step_of_pre',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
 )

ACTUAL_STEP = [
    {
        'Name': 'spark_job_03',
         # 'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_03.py']
        }
    }
]

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=ACTUAL_STEP,
    dag=dag
)

step_checker_actual = EmrStepSensor(
        task_id='watch_step_of_actual',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover

DAG 실행결과

result

[위에 코드에서 참고사항]

  • 스케쥴링 방법

한국시간으로 현재시각이 오전 11시 50분인데 11시 55분 부터 1시간마다 스케쥴링을 걸어주고 싶다고 하면

Airflow는 UTC 기준이기 때문에 AIrflow에서 시간은 오전 2시 50분인 것이다.

결론적으로 위에 코드에서 'start_date': datetime(2020,10,14, 1, 55)로 해주면

11시 55분부터 1시간씩 해당 DAG가 돌것이다.

  • spark_job_01.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ABC data test").getOrCreate()

_list = ['A','B','C']

for elem in _list:
    
    df = spark.read.option("header","true").parquet("s3a://[bucket_name]/testdata/merge_data/*.parquet")
    
    df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
           .withColumnRenamed("event_type","event_type_{}".format(elem))\
           .withColumnRenamed("product_id","product_id_{}".format(elem))\
           .withColumnRenamed("category_id","category_id_{}".format(elem))\
           .withColumnRenamed("category_code","category_code_{}".format(elem))\
           .withColumnRenamed("brand","brand_{}".format(elem))\
           .withColumnRenamed("price","price_{}".format(elem))\
           .withColumnRenamed("user_id","user_id_{}".format(elem))\
           .withColumnRenamed("year","year_{}".format(elem))\
           .withColumnRenamed("user_session","user_session_{}".format(elem))\
           .withColumn("id", F.monotonically_increasing_id())
    
    df.write.option("header", "true").csv("s3a://[bucket_name]/{}".format(elem))
    
    
df.show()

print("record count : ", df.count())

spark.catalog.clearCache()
  • spark_job_03.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ABC data test").getOrCreate()

_list = ['D','E','F']

for elem in _list:
    
    df = spark.read.option("header","true").parquet("s3a://[bucket_name]/testdata/merge_data/*.parquet")
    
    df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
           .withColumnRenamed("event_type","event_type_{}".format(elem))\
           .withColumnRenamed("product_id","product_id_{}".format(elem))\
           .withColumnRenamed("category_id","category_id_{}".format(elem))\
           .withColumnRenamed("category_code","category_code_{}".format(elem))\
           .withColumnRenamed("brand","brand_{}".format(elem))\
           .withColumnRenamed("price","price_{}".format(elem))\
           .withColumnRenamed("user_id","user_id_{}".format(elem))\
           .withColumnRenamed("year","year_{}".format(elem))\
           .withColumnRenamed("user_session","user_session_{}".format(elem))\
           .withColumn("id", F.monotonically_increasing_id())

    df.write.option("header", "true").csv("s3a://[bucket_name]/{}".format(elem))
    
df.show()

print("record count : ", df.count())

spark.catalog.clearCache()