특정조건을 만족하는 task만 실행하는 Airflow DAG 예시

2021-05-30

.

Data_Engineering_TIL(20210529)

[참고한 자료]

베스핀글로벌 데이터 엔지니어 최정민님 ‘Airflow operator 활용자료’를 공부하고 정리한 자료입니다.

[학습내용]

ShortCircuitOperator, AirflowSkipException을 활용하여 특정 조건에 따라 하위 task들의 실행을 skip할 수 있음

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.exceptions import AirflowSkipException
import os
from airflow.models import Variable
my_module_path=Variable.get('my_module_path')
import sys
sys.path.append(my_module_path)
from my_custom_lib import *
from datetime import datetime, timedelta
import time

default_args={
    "owner":"minman",
    "depends_on_past":False,
    "start_date":datetime(2021,3,10),
    "provide_context":True,
    "max_active_runs":5
}

dag = DAG(
    "my_test_dag",
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
    concurrency=10
)

# 조건을 부여하는 함수
# 현재시간과 task case time이 일치하면 True이고 그 외의 경우에는 False를 반환
def get_time(time,check_time):
    time = str(time).split("+")[0]
    dt_time = datetime.strptime(time, '%Y-%m-%dT%H:%M:%S.%f') + timedelta(hours=9) + timedelta(minutes=1)
    print(dt_time)
    print(dt_time.strftime('%H:%M:%s'))
    return dt_time.strftime('%H:%M') == check_time


# task_first_case_time, task_second_case_time, task_third_case_time에서 시간을 체크하고 True일 경우에만 하위 task가 실행되도록 구현
task_first_case_time = "23:11"
task_second_case_time = "23:11"
task_third_case_time = "23:11"

# 각각의 케이스의 시간을 체크하여 일치하면 True, 그렇지 않으면 Exception 처리함
def task_case_time_check(task_case, **kwargs):
    if get_time(kwargs['execution_date'],task_case):
        return True
    else:
        raise AirflowSkipException("Skip this task and indivisual downstream tasks while respecting trigger rules")
   
# ShortCircuitOperator 는 PythonOperator처럼 사용가능
#  Return 값이 True or False
# True일 경우 하위 task 계속 진행
# False일 경우 하위 task들은 모두 skip

first_case = ShortCircuitOperator(
    task_id = 'first_case',
    python_callable = task_case_time_check,
    op_kwargs={'task_case': task_first_case_time},
    provide_context = True,
    dag = dag
)

second_case = ShortCircuitOperator(
    task_id = 'second_case',
    python_callable = task_case_time_check,
    op_kwargs={'task_case': task_second_case_time},
    provide_context = True,
    dag = dag,
    trigger_rule = 'none_failed'
)

third_case = ShortCircuitOperator(
    task_id = 'third_case',
    python_callable = task_case_time_check,
    op_kwargs={'task_case': task_third_case_time},
    provide_context = True,
    dag = dag
)

task_A = DummyOperator(task_id='task_A',dag=dag,depends_on_past=False)
task_B = DummyOperator(task_id='task_B',dag=dag,depends_on_past=False)
task_C = DummyOperator(task_id='task_C',dag=dag)
task_D = DummyOperator(task_id='task_D',dag=dag,depends_on_past=False)

first_case >> task_A >> second_case >> task_B >> task_C
third_case >> task_D