Airflow를 이용한 EMR spark workflow 구현예시

2020-09-17

.

[구현목표]

image

Airflow client(EC2)에서 dag를 실행하면

EMR cluster create –> pyspark job submit –> EMR terminate 의 일련의 workload가 구동됨

[구현방법]

step 0) Airflow clinet를 위한 EC2 생성

step 1) Airflow client(EC2)에 SSH 접속해 다음과 같이 Airflow를 설치한다.

# airflow 설치를 위한 명령어
[ec2-user@ip-10-1-10-239 ~]$ sudo yum update -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install python3 -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install gcc python3-devel -y
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install apache-airflow==1.10.3
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install werkzeug==0.15.4
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install boto3
[ec2-user@ip-10-1-10-239 ~]$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
Default region name [None]: ap-northeast-2
Default output format [None]: json

step 2) Airflow 구동

# Meta DB 구동
[ec2-user@ip-10-1-10-239 ~]$ airflow initdb
                                                                                                              
# webserver 구동
[ec2-user@ip-10-1-10-239 ~]$ airflow webserver -p 8080
# 위와 같이 웹서버를 구동하고 나서 웹브라우져로 이동한 다음 [ec2 server public ip]:8080 으로 접속하면 airflow WebUI화면을 확인할 수 있다.

# 터미널 새 창을 열어서 아래 커맨드 입력                                    
# scheduler 구동. scheduler는 DAG들의 스케쥴링 및 실행을 담당한다.
[ec2-user@ip-10-1-10-239 ~]$ airflow scheduler                                                                                                                                                                                                                           
    
# 다시 새로운 터미널창을 열고 아래의 커맨드를 입력
# ~/airflow 위치에 airflow 관련 파일이 저장됨
[ec2-user@ip-10-1-10-239 ~]$ cd ~/airflow
[ec2-user@ip-10-1-10-239 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg
# airflow.cfg : Airflow 관련 설정, airflow.db : sqlite 데이터베이스

# airflow 폴더에 dags라는 폴더가 없다면 아래와 같이 mkdir dags로 생성한다. 
# dags 디렉토리는 DAG 파일이 저장되는 장소다.
[ec2-user@ip-10-1-10-239 airflow]$ mkdir dags
[ec2-user@ip-10-1-10-239 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  dags  logs  unittests.cfg

step 3) Airflow UI에 접속하여 ‘connection’ 설정

  • 웹브라우저를 열어서 [ec2 public ip]:8080로 접속

  • Web UI 상단에 ‘Admin’ –> ‘Connections’ 클릭

  • ‘aws_default’ 좌측에 연필모양의 아이콘 클릭

  • ‘Extra’에 {“region_name”: “us-east-1”}를 {“region_name”: “ap-northeast-2”}로 변경하고 save

step 4) DAG 구현 및 Airflow 등록

[ec2-user@ip-10-1-10-144 airflow]$ cd dags
[ec2-user@ip-10-1-10-144 dags]$ sudo vim test.py
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    # start_date = The first dag start time. keep it STATIC 그리고 UTC 기준으로 작성할것
    'start_date': datetime(2020, 9, 17, 7, 30),
    'email': ['[email_address]@gmail.com'],
    'email_on_failure': True
}

dag = DAG('emr_job_flow_test',
         default_args=default_args,
         schedule_interval= dt.timedelta(hours=1),
         catchup=False
)

JOB_FLOW_OVERRIDES = {
    'Name' : 'pms-EMRtest-test',
    'LogUri' : 's3://[bucket_name]/',
    'ReleaseLabel' : 'emr-5.28.1',
    'Instances' : {
            'Ec2KeyName': '[keypair_name]',
            'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxx',
            'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxx',
            'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxx',
            'KeepJobFlowAliveWhenNoSteps': True,
            'TerminationProtected': False,
            'InstanceGroups': [{
                'InstanceRole': 'MASTER',
                "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Master"
                }, {
                    'InstanceRole': 'CORE',
                    "InstanceCount": 3,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }, {
                    'InstanceRole': 'TASK',
                    "InstanceCount": 3,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }
            ]
        },
    'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
    'JobFlowRole':'EMR_EC2_DefaultRole',
    'ServiceRole':'EMR_DefaultRole',
    'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
              {'Key': 'expiry-date', 'Value': '2020-09-16'},
              {'Key': 'owner', 'Value': 'pms'}],
    'BootstrapActions':[
            {
                'Name': 'Maximize Spark Default Config',
                'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
                # https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
            }
    ],
    "VisibleToAllUsers": True
}

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

PRE_STEP = [
    {
        'Name': 'spark_job_01',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_01.py']
        }
    }
]

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=PRE_STEP,
    dag=dag
)

step_checker_pre = EmrStepSensor(
        task_id='watch_step_of_pre',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
 )

ACTUAL_STEP = [
    {
        'Name': 'spark_job_02',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_02.py']
        }
    }
]

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=ACTUAL_STEP,
    dag=dag
)

step_checker_actual = EmrStepSensor(
        task_id='watch_step_of_actual',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator >> step_adder_pre_step >> step_checker_pre >> step_adder_actual_step >> step_checker_actual >> cluster_remover

# 생성한 dag 확인
[ec2-user@ip-10-1-10-144 dags]$ airflow list_dags
[2020-09-17 02:55:14,141] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 02:55:14,388] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
emr_job_flow_test # 방금 생성한 dag
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial

python_script_explain

** 이해가 잘 안되고 어려웠던 내용

1) scheduling 적용 관련해서 이슈확인 및 해결 필요

스케쥴링을 걸면 처음 시도하는 dag 구동이 두번일어남

https://issues.apache.org/jira/browse/AIRFLOW-6207 참조

2) scheduling 적용 시 start_time과 interval 옵션을 잘 줘야함

https://stackoverflow.com/questions/46117211/apache-airflow-dag-executed-twice-before-start-date 링크 참조

step 5) STEP 3) DAG 스케쥴링 구동 및 결과확인

  • airflow UI에서 상단 메뉴에서 ‘DAGs’ 클릭

  • emr_job_flow_test 좌측에 Off 버튼을 클릭하여 On 으로 변경

  • 스케쥴링이 트리거링 되면서 아래와 같이 우리가 의도한 workflow가 구동되는 것을 확인할 수 있다.

result_check

위의 dag를 실행했을때 airflow scheduler의 로그는 다음과 같다.

[2020-09-17 06:36:39,692] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.create_job_flow 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:36:39,696] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 06:36:39,703] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 06:36:39,704] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.create_job_flow 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:36:39,713] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.create_job_flow 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 06:36:39,713] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'create_job_flow', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 6 and queue default
[2020-09-17 06:36:39,713] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'create_job_flow', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:36:39,715] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'create_job_flow', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:36:40,432] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 06:36:40,661] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 06:36:40,715] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.create_job_flow 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 06:36:45,952] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.create_job_flow execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1
[2020-09-17 06:37:25,142] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.pre_step 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:37:25,144] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 06:37:25,145] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 06:37:25,145] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.pre_step 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:37:25,152] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.pre_step 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 06:37:25,152] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'pre_step', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 5 and queue default
[2020-09-17 06:37:25,152] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'pre_step', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:37:25,152] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'pre_step', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:37:25,736] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 06:37:25,989] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 06:37:26,061] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.pre_step 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 06:37:31,371] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.pre_step execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1
[2020-09-17 06:38:10,554] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:38:10,557] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 06:38:10,558] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 06:38:10,558] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:38:10,564] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 06:38:10,565] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'watch_step_of_pre', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 4 and queue default
[2020-09-17 06:38:10,565] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'watch_step_of_pre', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:38:10,565] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'watch_step_of_pre', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:38:11,264] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 06:38:11,499] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 06:38:11,554] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 06:57:18,639] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.watch_step_of_pre execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1
[2020-09-17 06:57:57,837] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.actual_step 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:57:57,839] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 06:57:57,840] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 06:57:57,840] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.actual_step 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:57:57,848] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.actual_step 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 06:57:57,848] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'actual_step', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-09-17 06:57:57,848] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'actual_step', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:57:57,850] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'actual_step', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:57:58,821] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 06:57:59,057] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 06:57:59,112] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.actual_step 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 06:58:04,387] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.actual_step execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1
[2020-09-17 06:58:43,575] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.watch_step_of_actual 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:58:43,579] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 06:58:43,581] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 06:58:43,581] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.watch_step_of_actual 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 06:58:43,589] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.watch_step_of_actual 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 06:58:43,589] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'watch_step_of_actual', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 2 and queue default
[2020-09-17 06:58:43,589] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'watch_step_of_actual', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:58:43,590] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'watch_step_of_actual', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 06:58:44,178] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 06:58:44,406] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 06:58:44,460] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.watch_step_of_actual 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 07:04:50,400] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.watch_step_of_actual execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1
[2020-09-17 07:05:29,581] {jobs.py:1106} INFO - 1 tasks up for execution:
        <TaskInstance: emr_job_flow_test.remove_cluster 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 07:05:29,583] {jobs.py:1144} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2020-09-17 07:05:29,585] {jobs.py:1182} INFO - DAG emr_job_flow_test has 0/16 running and queued tasks
[2020-09-17 07:05:29,585] {jobs.py:1223} INFO - Setting the follow tasks to queued state:
        <TaskInstance: emr_job_flow_test.remove_cluster 2020-09-17 05:36:33.885042+00:00 [scheduled]>
[2020-09-17 07:05:29,592] {jobs.py:1298} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: emr_job_flow_test.remove_cluster 2020-09-17 05:36:33.885042+00:00 [queued]>
[2020-09-17 07:05:29,593] {jobs.py:1334} INFO - Sending ('emr_job_flow_test', 'remove_cluster', datetime.datetime(2020, 9, 17, 5, 36, 33, 885042, tzinfo=<Timezone [UTC]>), 1) to executor with priority 1 and queue default
[2020-09-17 07:05:29,593] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'emr_job_flow_test', 'remove_cluster', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 07:05:29,594] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'emr_job_flow_test', 'remove_cluster', '2020-09-17T05:36:33.885042+00:00', '--local', '-sd', '/home/ec2-user/airflow/dags/test.py']
[2020-09-17 07:05:30,496] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-17 07:05:30,753] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/test.py
[2020-09-17 07:05:30,817] {cli.py:517} INFO - Running <TaskInstance: emr_job_flow_test.remove_cluster 2020-09-17T05:36:33.885042+00:00 [queued]> on host ip-10-1-10-144.ap-northeast-2.compute.internal
[2020-09-17 07:05:36,179] {jobs.py:1468} INFO - Executor reports execution of emr_job_flow_test.remove_cluster execution_date=2020-09-17 05:36:33.885042+00:00 exited with status success for try_number 1

[참고사항]

  • maximize-spark-default-config.sh

** 출처 : https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config

#!/bin/bash
# Configures spark-default.conf for dedicate/maximum cluster use
#   Set num executors to number to total instance count at time of creation (spark.executor.instances)
#   Set vcores per executor to be all the vcores for the instance type of the core nodes (spark.executor.cores)
#   Set the memory per executor to the max available for the node (spark.executor.memory)
#   Set the default parallelism to the total number of cores available across all nodes at time of cluster creation  (spark.default.parallelism)
#
# Limitations:
#   Assumes a homogenous cluster (all core and task instance groups the same instance type)
#   Is not dynamic with cluster resices
#
set -x
#
#determine the current region and place into REGION
EC2AZ=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
REGION="`echo \"$EC2AZ\" | sed -e 's:\([0-9][0-9]*\)[a-z]*\$:\\1:'`"


if [ "$SparkS3SupportingFilesPath" == "" ]
then
       if [ "$REGION" == "eu-central-1" ]
       then
               SparkS3SupportingFilesPath=s3://eu-central-1.support.elasticmapreduce/spark
       else
               SparkS3SupportingFilesPath=s3://support.elasticmapreduce/spark
       fi
fi
SparkS3SupportingFilesPath=${SparkS3SupportingFilesPath%/}

VCOREREFERENCE="$SparkS3SupportingFilesPath/vcorereference.tsv"
CONFIGURESPARK="$SparkS3SupportingFilesPath/configure-spark.bash"
#
echo "Configuring Spark default configuration to the max memory and vcore setting given configured number of cores nodes at cluster creation"

#Set the default yarn min allocation to 256 to allow for most optimum memory use
/usr/share/aws/emr/scripts/configure-hadoop -y yarn.scheduler.minimum-allocation-mb=256

#Gather core node count
NUM_NODES=$(grep /mnt/var/lib/info/job-flow.json -e "instanceCount" | sed 's/.*instanceCount.*:.\([0-9]*\).*/\1/g')
NUM_NODES=$(expr $NUM_NODES - 1)

if [ $NUM_NODES -lt 2 ]
then
    #set back to default to be safe
    NUM_NODES=2
fi

SLAVE_INSTANCE_TYPE=$(grep /mnt/var/lib/info/job-flow.json -e "slaveInstanceType" | cut -d'"' -f4 | sed  's/\s\+//g')

if [ "$SLAVE_INSTANCE_TYPE" == "" ]
then
    SLAVE_INSTANCE_TYPE="m3.xlarge"
fi

hadoop fs -get $VCOREREFERENCE

if [ ! -e "vcorereference.tsv" ]
then
    echo "Reference file vcorereference.tsv not available, failing quietly."
    exit 0
fi

NUM_VCORES=$(grep vcorereference.tsv -e $SLAVE_INSTANCE_TYPE | cut -f2)

MAX_YARN_MEMORY=$(grep /home/hadoop/conf/yarn-site.xml -e "yarn\.scheduler\.maximum-allocation-mb" | sed 's/.*<value>\(.*\).*<\/value>.*/\1/g')

EXEC_MEMORY=$(echo "($MAX_YARN_MEMORY - 1024 - 384) - ($MAX_YARN_MEMORY - 1024 - 384) * 0.07 " | bc | cut -d'.' -f1)
EXEC_MEMORY+="M"

PARALLEL=$(expr $NUM_VCORES \* $NUM_NODES) 

#--- Now use configure-spark.bash to set values

hadoop fs -get $CONFIGURESPARK

bash configure-spark.bash spark.executor.instances=$NUM_NODES spark.executor.cores=$NUM_VCORES spark.executor.memory=$EXEC_MEMORY 

if [ $PARALLEL -gt 2 ]
then
    #only set/change this if it looks reasonable
    bash configure-spark.bash spark.default.parallelism=$PARALLEL
fi


exit 0
  • spark_job_01.py (#01 spark job)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ABC data test").getOrCreate()

_list = ['A','B','C']

for elem in _list:
    
    df = spark.read.option("header","true").csv("s3a://pms-bucket-test/testdata/10G.csv")
    
    # 기존의 컬럼에 A 파일의 컬럼인지,B 인지,C 인지 구분자를 붙여주는 전처리과정
    df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
           .withColumnRenamed("event_type","event_type_{}".format(elem))\
           .withColumnRenamed("product_id","product_id_{}".format(elem))\
           .withColumnRenamed("category_id","category_id_{}".format(elem))\
           .withColumnRenamed("category_code","category_code_{}".format(elem))\
           .withColumnRenamed("brand","brand_{}".format(elem))\
           .withColumnRenamed("price","price_{}".format(elem))\
           .withColumnRenamed("user_id","user_id_{}".format(elem))\
           .withColumnRenamed("year","year_{}".format(elem))\
           .withColumnRenamed("user_session","user_session_{}".format(elem))\
           .withColumn("id", F.monotonically_increasing_id()) ## id라는 레코드별 고유넘버 컬럼을 추가해서 id를 기준으로 join할 예정
    
    # 1개의 csv 파일형태로 전처리한 데이터를 s3에 저장
    df.write.option("header", "true").csv("s3a://pms-bucket-test/{}".format(elem))
    
    
df.show()

print("record count : ", df.count())

spark.catalog.clearCache()
  • spark_job_02.py (#02 spark job)
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time
from datetime import datetime

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A/*.csv")
A_df.createOrReplaceTempView("Atable")

# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B/*.csv")
B_df.createOrReplaceTempView("Btable")

# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C/*.csv")
C_df.createOrReplaceTempView("Ctable")

# A,B 파일을 join하여 AB라는 데이터 프레임을 생성 후 DISK에 중간저장, AB 임시테이블 생성
AB_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id)").persist(StorageLevel.DISK_ONLY)
AB_df.createOrReplaceTempView("ABtable")

# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임 생성 후 DISK에 중간저장, AB 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM ABtable JOIN Ctable USING(id)").persist(StorageLevel.DISK_ONLY)
ABC_df.createOrReplaceTempView("ABCtable")

# ABC 데이터 프레임 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print(" code running time (sec) : ", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()