특정시간까지 Task를 대기하도록하는 Airflow custom operator 예시

2021-05-29

.

Data_Engineering_TIL(20210529)

[참고한 자료]

베스핀글로벌 데이터 엔지니어 최정민님 ‘Airflow operator 활용자료’를 공부하고 정리한 자료입니다.

[학습내용]

  • task를 pending 시키다가 특정시간이 되면 하위 task를 실행하는 custom operator 예시

  • 활용예시

DAG process

create transient EMR cluster –> wait_execution_time –> if 15:00 –> running spark job workflow

만약에 정확히 15:00에 spark job workflow를 실행해야하는 상황이라고 하자 그러나 DAG process에는 해당 spark job workflow를 실행하기 앞서서 Transient한 EMR 클러스터를 띄우는 task가 있는데 구동하는 시간이 20분 ~ 40분정도 소요된다. 그렇다고 하면 14:00에 미리 EMR 클러스터를 띄워놓고, 대기하다가 15:00에 정확하게 workflow를 실행하도록 하면 된다. 이를 가능하게 해주는게 아래와 같은 operator라고 할 수 있다.

# execution time 설정
execution_time = "15:00"

def wait_execution_time(**kwargs):
    while True:
        cur_time = (datetime.now()).strftime("%H:%M")
        # cur_time 과 execution_time 이 일치하는지 확인
        if cur_time == execution_time:
            break
        else :
            # 1분씩 poking 하면서 대기함
            print("poking for {} on {}".format(kwargs['task_instance_key_str'],cur_time))
            time.sleep(60)

wait_execution_time = PythonOperator(
    task_id = 'wait_execution_time',
    python_callable = wait_execution_time,
    provide_context = True,
    dag=dag
)