Airflow DAG 구동중 Task 실패시 이메일 알림설정 예시

2020-10-16

.

Data_Engineering_TIL(20201016)

[실습내용]

step 1) Airflow server 구동

https://minman2115.github.io/DE_TIL141/ 참고

step 2) Airflow server가 사용할 google smtp app 계정설정 (Gmail 앱비밀번호 생성하기)

  • Google Home에서 자신의 프로필 > Google 계정 관리 클릭한다.

  • 왼쪽 사이드 바에서 보안 클릭한다.

  • Google에 로그인 항목에서 2단계 인증 사용으로 선택한다.

  • 앱 비밀번호를 생성한다.

  • 노란색 박스안의 16자리 비밀번호를 메모장에 복사해둔다.

step 2) task 실패시 email 발송을 위한 airflow config 설정

[ec2-user@ip-10-1-10-227 ~]$ cd ~/airflow

[ec2-user@ip-10-1-10-227 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg

[ec2-user@ip-10-1-10-227 airflow]$ sudo vim airflow.cfg
# 아래와 같이 smtp 부분의 내용을 변경해준다.
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [YOUR_EMAIL_ADDRESS]
smtp_password = [16_DIGIT_APP_PASSWORD] # 위에서 메모장에 복사해둔 16자리 Gmail 앱 비밀번호
smtp_port = 587
smtp_mail_from = [YOUR_EMAIL_ADDRESS]

step 3) DAG 생성 및 등록

task 가 실행중에 문제가 발생할 수 있도록 코드를 작성

[ec2-user@ip-10-1-10-4 airflow]$ mkdir dags

[ec2-user@ip-10-1-10-4 airflow]$ cd dags

[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator


default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020,10,14, 1, 55),
    'email': ['[my_email_address]'],
    'email_on_failure': True
}

dag = DAG('emr_job_flow_test',
         default_args=default_args,
         schedule_interval= None, # 스케쥴 인터벌을 None으로 바꾸고, 트리거로만 실행하도록 함 (온디멘드 실행)
         catchup=False
)

JOB_FLOW_OVERRIDES = {
    'Name' : 'pms-EMRtest-test',
    'LogUri' : 's3://[bucket_name]/',
    'ReleaseLabel' : 'emr-5.28.1',
    'Instances' : {
            'Ec2KeyName': '[key_name]',
            'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
            'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
            'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
            'KeepJobFlowAliveWhenNoSteps': True,
            'TerminationProtected': False,
            'InstanceGroups': [{
                'InstanceRole': 'MASTER',
                "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Master"
                }, {
                    'InstanceRole': 'CORE',
                    "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }, {
                    'InstanceRole': 'TASK',
                    "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }
            ]
        },
    'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
    'JobFlowRole':'EMR_EC2_DefaultRole',
    'ServiceRole':'EMR_DefaultRole',
    'StepConcurrencyLevel': 10,
    'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
              {'Key': 'expiry-date', 'Value': '2020-10-13'},
              {'Key': 'owner', 'Value': 'pms'}],
    'BootstrapActions':[
            {
                'Name': 'Maximize Spark Default Config',
                'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
                # https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
            }
    ],
    "VisibleToAllUsers": True
}

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

PRE_STEP = [
    {
        'Name': 'spark_job_01',
        #'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_07.py'] # Error를 발생하는 코드를 삽입한 python script
        }
    }
]

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=PRE_STEP,
    dag=dag
)

step_checker_pre = EmrStepSensor(
        task_id='watch_step_of_pre',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
 )

ACTUAL_STEP = [
    {
        'Name': 'spark_job_03',
         # 'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_03.py']
        }
    }
]

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=ACTUAL_STEP,
    dag=dag
)

step_checker_actual = EmrStepSensor(
        task_id='watch_step_of_actual',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover

실행하면 EMR step에서 spark-submit –deploy-mode cluster –master yarn s3://pms-bucket-test/spark_job_07.py가 fail날 것이다.

Airflow webUI에 접속해서 생성한 emr_job_flow_test DAG 메뉴의 Graph view를 보면 해당 task가 실행되는 것을 체크하는 watch_step_of_pre task가 빨간불이 뜨면서 failed가 뜬 것을 확인할 수 있다.

그러면 등록한 메일에 아래와 같이 메일이 날라온 것을 확인할 수 있다.

메일 제목 : Airflow alert: <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-10-16T04:15:46.233306+00:00 [failed]>

내용 :

Try 1 out of 1

Exception:

EMR job failed for reason None with message Exception in thread “main” org.apache.spark.SparkException: Application application_1602822022591_0001 finished with failed status and log file s3://pms-bucket-test/j-3CTW0D7GR53SD/steps/s-308IQQ3TXMGKO/stderr.gz

Log: Link

Host: ip-10-1-10-227.ap-northeast-2.compute.internal

Log file: /home/ec2-user/airflow/logs/emr_job_flow_test/watch_step_of_pre/2020-10-16T04:15:46.233306+00:00.log

Mark success: Link

[참고사항]

  • 위의 코드에서 spark_job_03.py 내용
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ABC data test").getOrCreate()

_list = ['D','E','F']

for elem in _list:
    
    df = spark.read.option("header","true").parquet("s3a://mybucket/testdata/merge_data/*.parquet")
    
    df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
           .withColumnRenamed("event_type","event_type_{}".format(elem))\
           .withColumnRenamed("product_id","product_id_{}".format(elem))\
           .withColumnRenamed("category_id","category_id_{}".format(elem))\
           .withColumnRenamed("category_code","category_code_{}".format(elem))\
           .withColumnRenamed("brand","brand_{}".format(elem))\
           .withColumnRenamed("price","price_{}".format(elem))\
           .withColumnRenamed("user_id","user_id_{}".format(elem))\
           .withColumnRenamed("year","year_{}".format(elem))\
           .withColumnRenamed("user_session","user_session_{}".format(elem))\
           .withColumn("id", F.monotonically_increasing_id())

    df.write.option("header", "true").csv("s3a://mybucket/{}".format(elem))
    
df.show()

print("record count : ", df.count())

spark.catalog.clearCache()
  • 위의 코드에서 spark_job_07.py 내용
print) admka

# 만약에 특정 Task에서만 실패했을때 메일링을 하려고 한다면 아래와 같이 스크립트를 짜면된다.

[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020,10,14, 1, 55),
    'email': ['[my_email_address]'],
    # email_on_failure 를 False로 안해주면 아래 메일링과 함께 중복으로 메일링이 오게됨
    'email_on_failure': False 
}

dag = DAG('emr_job_flow_test',
         default_args=default_args,
         schedule_interval= None, # 스케쥴 인터벌을 None으로 바꾸고, 트리거로만 실행하도록 함 (온디멘드 실행)
         catchup=False
)

JOB_FLOW_OVERRIDES = {
    'Name' : 'pms-EMRtest-test',
    'LogUri' : 's3://[bucket_name]/',
    'ReleaseLabel' : 'emr-5.28.1',
    'Instances' : {
            'Ec2KeyName': '[key_name]',
            'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
            'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
            'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
            'KeepJobFlowAliveWhenNoSteps': True,
            'TerminationProtected': False,
            'InstanceGroups': [{
                'InstanceRole': 'MASTER',
                "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Master"
                }, {
                    'InstanceRole': 'CORE',
                    "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }, {
                    'InstanceRole': 'TASK',
                    "InstanceCount": 1,
                    "InstanceType": 'm5.2xlarge',
                    "Market": "SPOT",
                    "Name": "Core",
                }
            ]
        },
    'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
    'JobFlowRole':'EMR_EC2_DefaultRole',
    'ServiceRole':'EMR_DefaultRole',
    'StepConcurrencyLevel': 10,
    'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
              {'Key': 'expiry-date', 'Value': '2020-10-13'},
              {'Key': 'owner', 'Value': 'pms'}],
    'BootstrapActions':[
            {
                'Name': 'Maximize Spark Default Config',
                'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
                # https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
            }
    ],
    "VisibleToAllUsers": True
}

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

PRE_STEP = [
    {
        'Name': 'spark_job_01',
        #'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_07.py'] # Error를 발생하는 코드를 삽입한 python script
        }
    }
]

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=PRE_STEP,
    dag=dag
)

step_checker_pre = EmrStepSensor(
        task_id='watch_step_of_pre',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
 )

ACTUAL_STEP = [
    {
        'Name': 'spark_job_03',
         # 'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode', 'cluster',
                      '--master', 'yarn',
                      's3://[bucket_name]/spark_job_03.py']
        }
    }
]

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="",
    aws_conn_id='aws_default',
    steps=ACTUAL_STEP,
    dag=dag
)

step_checker_actual = EmrStepSensor(
        task_id='watch_step_of_actual',
        job_flow_id="",
        step_id="",
        aws_conn_id='aws_default',
        dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="",
    aws_conn_id='aws_default',
    dag=dag
)

# prestep 실패시 메일링
TaskFailed = EmailOperator (
    dag=dag,
    trigger_rule=TriggerRule.ONE_FAILED,
    task_id="TaskFailed",
    to=["[email_address]"],
    subject="Task Failed",
    html_content='<h3>One of Task Failed" </h3>')

TaskFailed.set_upstream([step_adder_pre_step])

# emr 생성 성공시 메일링
TaskSucceded = EmailOperator (
    dag=dag,
    trigger_rule=TriggerRule.ONE_SUCCESS,
    task_id="TaskSucceded",
    to=["[email_address]"],
    subject="Task Succeded",
    html_content='<h3>One of Task Succeded" </h3>')

TaskSucceded.set_upstream([cluster_creator])

cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover
  • TriggerRule의 attribute

all_success: (기본값) 모두 성공

all_failed: 모든 상위 task가 failed 또는 upstream_failed 상태

all_done: 모든 task 실행을 완료

one_failed: 하나 이상의 task가 실패하자마자 실행되며 모든 task가 완료 될 때까지 기다리지 않음

one_success: 하나 이상의 task가 성공하자마자 실행되며 모든 task가 완료 될 때까지 기다리지 않음