S3 bucket간 데이터를 정기적으로 copy하는 Airflow DAG 예시
2021-06-11
.
Data_Engineering_TIL(20210611)
[요약설명]
매일 특정시간에 s3간에 데이터를 복사하는 Airflow DAG 스크립트
관련자료 : https://minman2115.github.io/DE_TIL209/
[아키텍처]
Airflow —————–> EMR ———————-> s3 to s3 copy
** 참고사항
1) 매일 23시에 Airflow DAG 실행
2) EMR은 s3 copy 명령을 실행하는 주체
3) s3 copy 명령 실행중 Error 발생할 경우 재시도 하는 로직이 있음
[구현한 스크립트]
- s3_bucket_data_copy.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import DagBag
from airflow.models import TaskInstance
import requests
import json
import time
import os
from airflow.models import Variable
module_path=Variable.get("my_module_path")
import sys
sys.path.append(module_path)
from my_custom_lib import *
from pprint import pprint
import logging
log = logging.getLogger(__name__)
###############################################################
# custom parameter settings
###############################################################
ENV = Variable.get('ENV') # prod or dev
current_time=check_current_time()
user_info = Variable.get("airflow_sys_user_info")
airflow_sys_user_id = json.load(user_info)['id']
airflow_sys_user_pwd = json.load(user_info)['pwd']
airflow_master_node_ip = Variable.get("airflow_master_node_ip")
airflow_master_node_ip = json.load(airflow_master_node_ip)['{}'.format(ENV)]
###############################################################
# DAG configuration
###############################################################
default_args = {
'owner' : 'minman',
'depends_on_past' : False,
'start_date' : datetime(2021,2,11),
'provide_context' : True,
'max_active_runs' : 5
}
dag = DAG(
's3_bucket_data_copy',
default_args=default_args,
schedule_interval = '* 23 * * *',
catchup=False,
concurrency = 3,
tags=['data_management']
)
get_emr_cluster_id = PythonOperatorperator(
task_id = 'get_emr_cluster_id',
provide_context=True,
python_callable=get_my_emr_cluster_id,
op_kwargs={"cluster_name","minman_emr_test"},
dag=dag
)
s3_copy = EmrAddStepsOperator(
task_id = 's3_copy',
job_flow_id="",
aws_conn_id='aws_seoul',
steps=[
{
'Name':'s3_copy',
'ActionOnFailure':'CONTINUE',
'HadoopJarStep':{
'Jar':'command-runner.jar',
'Args':["aws","s3","cp","s3://my_data_lake/raw_data/","s3://my_warehouse/data/","--recursive"]
}
}
],
default_args=default_args,
trigger_rule='all_done',
retries=5,
dag=dag
)
check_s3_copy = EmrStepSensor(
task_id = 'check_s3_copy',
job_flow_id= "",
step_id = "",
aws_conn_id='aws_seoul',
trigger_rule='all_done',
retries=5,
dag=dag
)
def trigger_external_dag(**kwargs):
execution_date = kwargs['execution_date']
dag_instance = kwargs['dag']
operator_instance = dag_instance.get_task("check_s3_copy")
task_status = TaskInstance(operator_instance,execution_date).current_state()
# my_airflow_task_checker TASK 상태가 fail이 나면 REST API를 이용해서 해당 DAG를 다시 RUNNING 시킴
if task_status == 'failed':
# result = requests.post(
# # 참고로 아래와 같이 단일 Airflow에서 실행할 경우 localhost 주소로 api를 날릴 수 있지만,
# # 클러스터 형태의 airflow는 반드시 airflow master node의 ip로 호출해야 한다.
# "http://localhost:8080/api/experimental/dags/"+DAG_NAME+"/dag_runs",
# #"http://{Airflow_master_server_IP}:8080/api/experimental/dags/"+DAG_NAME+"/dag_runs",
# data = json.dumps("{}"),
# auth = (username,password)
# )
# pprint(result.content.decode('utf-8'))
c = Client(None,None)
c.trigger_dag(dag_id='s3_bucket_data_copy',run_id='trigger_retry_logic_{}'.format(current_time),conf={})
else:
pprint(task_status)
retry_logic = PythonOperator(
task_id = 'retry_logic',
provide_context=True,
python_callable=trigger_external_dag,
trigger_rule = 'all_done',
dag=dag
)
##################################################################################
# Design dag structure
##################################################################################
get_emr_cluster_id >> s3_copy >> check_s3_copy >> retry_logic
- my_custom_lib.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import boto3
import json
import datetime
from airflow.models import Variable
airflow_env=Variable('ENV') # prod or dev
def check_current_time():
utcnow = datetime.datetime.utcnow()
time_gap = datetime.timedelta(hours=9)
kor_time = utcnow + time_gap
return kor_time.strftime('%Y%m%d%H%M%S')
def get_my_emr_cluster_id(cluster_name,**kwargs):
from datetime import datetime, timedelta
client=boto3.client('emr',region_name='us-west-2')
response=client.list_cluster(
CreatedAfter=datetime(2020,1,1),
CreatedBefore=datetime(2999,1,1),
ClusterStates=["WAITING","RUNNING"]
)
index=0
for key in response['Clusters']:
res = client.describe_cluster(ClusterId=key['Id'])
name = res['Cluster']['Name']
name = name.lower()
if cluster_name.lower() in name:
print(name)
print(key['Id'])
kwargs['ti'].xcom_push(key='jobflow_id',value=key['Id'])
- airflow variable 설정
1) airflow_master_node_ip :{“prod”:”10.342.38.10”,”dev”:”10.281.23.12”}
2) airflow_sys_user_info : {“id”:”minman”,”pwd” :”mypwd123@”}
3) ENV : ‘dev’
4) my_module_path : ‘/home/myfolder/airflow/utils’