Airflow DAG 구동중 Task 실패시 이메일 알림설정 예시
.
Data_Engineering_TIL(20201016)
[실습내용]
step 1) Airflow server 구동
https://minman2115.github.io/DE_TIL141/ 참고
step 2) Airflow server가 사용할 google smtp app 계정설정 (Gmail 앱비밀번호 생성하기)
-
Google Home에서 자신의 프로필 > Google 계정 관리 클릭한다.
-
왼쪽 사이드 바에서
보안
클릭한다. -
Google에 로그인 항목에서
2단계 인증
사용으로 선택한다. -
앱 비밀번호를 생성한다.
-
노란색 박스안의 16자리 비밀번호를 메모장에 복사해둔다.
step 2) task 실패시 email 발송을 위한 airflow config 설정
[ec2-user@ip-10-1-10-227 ~]$ cd ~/airflow
[ec2-user@ip-10-1-10-227 airflow]$ ls
airflow.cfg airflow.db airflow-webserver.pid logs unittests.cfg
[ec2-user@ip-10-1-10-227 airflow]$ sudo vim airflow.cfg
# 아래와 같이 smtp 부분의 내용을 변경해준다.
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [YOUR_EMAIL_ADDRESS]
smtp_password = [16_DIGIT_APP_PASSWORD] # 위에서 메모장에 복사해둔 16자리 Gmail 앱 비밀번호
smtp_port = 587
smtp_mail_from = [YOUR_EMAIL_ADDRESS]
step 3) DAG 생성 및 등록
task 가 실행중에 문제가 발생할 수 있도록 코드를 작성
[ec2-user@ip-10-1-10-4 airflow]$ mkdir dags
[ec2-user@ip-10-1-10-4 airflow]$ cd dags
[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
default_args = {
'owner': 'minman',
'depends_on_past': False,
'start_date': datetime(2020,10,14, 1, 55),
'email': ['[my_email_address]'],
'email_on_failure': True
}
dag = DAG('emr_job_flow_test',
default_args=default_args,
schedule_interval= None, # 스케쥴 인터벌을 None으로 바꾸고, 트리거로만 실행하도록 함 (온디멘드 실행)
catchup=False
)
JOB_FLOW_OVERRIDES = {
'Name' : 'pms-EMRtest-test',
'LogUri' : 's3://[bucket_name]/',
'ReleaseLabel' : 'emr-5.28.1',
'Instances' : {
'Ec2KeyName': '[key_name]',
'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceGroups': [{
'InstanceRole': 'MASTER',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Master"
}, {
'InstanceRole': 'CORE',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}, {
'InstanceRole': 'TASK',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}
]
},
'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole',
'StepConcurrencyLevel': 10,
'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
{'Key': 'expiry-date', 'Value': '2020-10-13'},
{'Key': 'owner', 'Value': 'pms'}],
'BootstrapActions':[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
# https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
}
],
"VisibleToAllUsers": True
}
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
PRE_STEP = [
{
'Name': 'spark_job_01',
#'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_07.py'] # Error를 발생하는 코드를 삽입한 python script
}
}
]
step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=PRE_STEP,
dag=dag
)
step_checker_pre = EmrStepSensor(
task_id='watch_step_of_pre',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
ACTUAL_STEP = [
{
'Name': 'spark_job_03',
# 'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_03.py']
}
}
]
step_adder_actual_step = EmrAddStepsOperator(
task_id='actual_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=ACTUAL_STEP,
dag=dag
)
step_checker_actual = EmrStepSensor(
task_id='watch_step_of_actual',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover
실행하면 EMR step에서 spark-submit –deploy-mode cluster –master yarn s3://pms-bucket-test/spark_job_07.py가 fail날 것이다.
Airflow webUI에 접속해서 생성한 emr_job_flow_test DAG 메뉴의 Graph view를 보면 해당 task가 실행되는 것을 체크하는 watch_step_of_pre task가 빨간불이 뜨면서 failed가 뜬 것을 확인할 수 있다.
그러면 등록한 메일에 아래와 같이 메일이 날라온 것을 확인할 수 있다.
메일 제목 : Airflow alert: <TaskInstance: emr_job_flow_test.watch_step_of_pre 2020-10-16T04:15:46.233306+00:00 [failed]>
내용 :
Try 1 out of 1
Exception:
EMR job failed for reason None with message Exception in thread “main” org.apache.spark.SparkException: Application application_1602822022591_0001 finished with failed status and log file s3://pms-bucket-test/j-3CTW0D7GR53SD/steps/s-308IQQ3TXMGKO/stderr.gz
Log: Link
Host: ip-10-1-10-227.ap-northeast-2.compute.internal
Log file: /home/ec2-user/airflow/logs/emr_job_flow_test/watch_step_of_pre/2020-10-16T04:15:46.233306+00:00.log
Mark success: Link
[참고사항]
- 위의 코드에서 spark_job_03.py 내용
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
_list = ['D','E','F']
for elem in _list:
df = spark.read.option("header","true").parquet("s3a://mybucket/testdata/merge_data/*.parquet")
df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
.withColumnRenamed("event_type","event_type_{}".format(elem))\
.withColumnRenamed("product_id","product_id_{}".format(elem))\
.withColumnRenamed("category_id","category_id_{}".format(elem))\
.withColumnRenamed("category_code","category_code_{}".format(elem))\
.withColumnRenamed("brand","brand_{}".format(elem))\
.withColumnRenamed("price","price_{}".format(elem))\
.withColumnRenamed("user_id","user_id_{}".format(elem))\
.withColumnRenamed("year","year_{}".format(elem))\
.withColumnRenamed("user_session","user_session_{}".format(elem))\
.withColumn("id", F.monotonically_increasing_id())
df.write.option("header", "true").csv("s3a://mybucket/{}".format(elem))
df.show()
print("record count : ", df.count())
spark.catalog.clearCache()
- 위의 코드에서 spark_job_07.py 내용
print) admka
# 만약에 특정 Task에서만 실패했을때 메일링을 하려고 한다면 아래와 같이 스크립트를 짜면된다.
[ec2-user@ip-10-1-10-4 dags]$ sudo vim test.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
default_args = {
'owner': 'minman',
'depends_on_past': False,
'start_date': datetime(2020,10,14, 1, 55),
'email': ['[my_email_address]'],
# email_on_failure 를 False로 안해주면 아래 메일링과 함께 중복으로 메일링이 오게됨
'email_on_failure': False
}
dag = DAG('emr_job_flow_test',
default_args=default_args,
schedule_interval= None, # 스케쥴 인터벌을 None으로 바꾸고, 트리거로만 실행하도록 함 (온디멘드 실행)
catchup=False
)
JOB_FLOW_OVERRIDES = {
'Name' : 'pms-EMRtest-test',
'LogUri' : 's3://[bucket_name]/',
'ReleaseLabel' : 'emr-5.28.1',
'Instances' : {
'Ec2KeyName': '[key_name]',
'Ec2SubnetId': 'subnet-xxxxxxxxxxxxxxxxxx',
'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxxx',
'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxxx',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceGroups': [{
'InstanceRole': 'MASTER',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Master"
}, {
'InstanceRole': 'CORE',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}, {
'InstanceRole': 'TASK',
"InstanceCount": 1,
"InstanceType": 'm5.2xlarge',
"Market": "SPOT",
"Name": "Core",
}
]
},
'Applications':[{'Name': 'Spark'},{'Name': 'Hadoop'},{'Name': 'Hive'}],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole',
'StepConcurrencyLevel': 10,
'Tags' : [{'Key': 'name', 'Value': 'pms-EMR-test'},
{'Key': 'expiry-date', 'Value': '2020-10-13'},
{'Key': 'owner', 'Value': 'pms'}],
'BootstrapActions':[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {'Path': 's3://[bucket_name]/maximize-spark-default-config.sh'}
# https://github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/maximize-spark-default-config
}
],
"VisibleToAllUsers": True
}
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
PRE_STEP = [
{
'Name': 'spark_job_01',
#'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_07.py'] # Error를 발생하는 코드를 삽입한 python script
}
}
]
step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=PRE_STEP,
dag=dag
)
step_checker_pre = EmrStepSensor(
task_id='watch_step_of_pre',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
ACTUAL_STEP = [
{
'Name': 'spark_job_03',
# 'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
's3://[bucket_name]/spark_job_03.py']
}
}
]
step_adder_actual_step = EmrAddStepsOperator(
task_id='actual_step',
job_flow_id="",
aws_conn_id='aws_default',
steps=ACTUAL_STEP,
dag=dag
)
step_checker_actual = EmrStepSensor(
task_id='watch_step_of_actual',
job_flow_id="",
step_id="",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="",
aws_conn_id='aws_default',
dag=dag
)
# prestep 실패시 메일링
TaskFailed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="TaskFailed",
to=["[email_address]"],
subject="Task Failed",
html_content='<h3>One of Task Failed" </h3>')
TaskFailed.set_upstream([step_adder_pre_step])
# emr 생성 성공시 메일링
TaskSucceded = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_SUCCESS,
task_id="TaskSucceded",
to=["[email_address]"],
subject="Task Succeded",
html_content='<h3>One of Task Succeded" </h3>')
TaskSucceded.set_upstream([cluster_creator])
cluster_creator >> step_adder_pre_step >> step_checker_pre >> cluster_remover
cluster_creator >> step_adder_actual_step >> step_checker_actual >> cluster_remover
- TriggerRule의 attribute
all_success: (기본값) 모두 성공
all_failed: 모든 상위 task가 failed 또는 upstream_failed 상태
all_done: 모든 task 실행을 완료
one_failed: 하나 이상의 task가 실패하자마자 실행되며 모든 task가 완료 될 때까지 기다리지 않음
one_success: 하나 이상의 task가 성공하자마자 실행되며 모든 task가 완료 될 때까지 기다리지 않음