REST API로 Airflow DAG 호출하기

2021-05-29

.

Data_Engineering_TIL(20210529)

[참고한 자료]

베스핀글로벌 데이터 엔지니어 최정민님 ‘Airflow operator 활용자료’를 공부하고 정리한 자료입니다.

[학습내용]

REST API로 Airflow DAG 호출하기

step 1) REST API용 Airflow User 생성

** 유의사항 : Airflow CLI 또는 Airflow UI에서 생성한 User는 Airflow Web server를 이용하기 위한 User임. REST API를 이용해서 DAG를 컨트롤 하기 위해서는 별도의 REST API 용 User(Airflow를 사용하는 계정이 아니라 별도로 Airflow 디비에 접근할 수 있는 시스템 유저라고 생각하면됨)를 생성해야함

  • Create for Airflow REST API user
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
User = PasswordUser(models.User())

# User Info
user.username = 'minsupark'
user.email = 'minsu@korea.com'
user.password = 'mypassword1234'

session = settings.Session()
user_exits = session.query(models.User.id).filter_by(username=user.username).scalar() is not None
if not user_exits:
    session.add(user)
    session.commit()
session.close()

step 2) REST API로 DAG 호출하기

  • 위에서 생성한 User info를 이용해서 REST API로 DAG를 호출하는 예시
import requests
import json
from pprint import pprint

username = 'minsupark'
password = 'mypassword1234'

result = requests.post(
    "http://{Airflow_master_server_IP}:8080/api/experimental/dags/{DAG_NAME}/dag_runs",
    data = json.dumps("{}"),
    auth = (username,password)
)

# REST API는 8080 포트를 이용하기 때문에 당연한 얘기지만 
# Airflow master server의 8080포트가 API를 날리는 쪽에는 개방이 되어 있어야함

pprint(result.content.decode('utf-8'))
  • Airflow DAG 활용예시

# 전체 DAG 스크립트중 일부만 발췌

username=Variable.get("minsupark_username")
password=Variable.get("minsupark_password")


# my_airflow_task_checker 라는 DAG 내에 task가 실행하다가 fail이 나면 restapi로 이 DAG를 재실행하는 함수
# DAG_NAME 라는 변수는 지금 이 스크립트를 돌리는 DAG이름임, 외부의 DAG를 트리거 하는것도 가능
def call_external_dag(**kwargs):
    # 실행시간 정보 조회
    execution_date = kwargs['execution_date']
    # DAG 정보 조회
    dag_instance = kwargs['dag']
    # 특정 TASK 정보 조회
    operator_instance = dag_instance.get_task('my_airflow_task_checker')
    # TASK 상태조회
    task_status = TaskInstance(operator_instance,execution_date).current_state()
    # my_airflow_task_checker TASK 상태가 fail이 나면 REST API를 이용해서 해당 DAG를 다시 RUNNING 시킴
    if task_status == 'failed':
        result = requests.post(
            # 참고로 아래와 같이 단일 Airflow에서 실행할 경우 localhost 주소로 api를 날릴 수 있지만,
            # 클러스터 형태의 airflow는 반드시 airflow master node의 ip로 호출해야 한다.
            "http://localhost:8080/api/experimental/dags/"+DAG_NAME+"/dag_runs",
            #"http://{Airflow_master_server_IP}:8080/api/experimental/dags/"+DAG_NAME+"/dag_runs",
            data = json.dumps("{}"),
            auth = (username,password)
        )
        pprint(result.content.decode('utf-8'))
    else:
        pprint(task_status)
        
retry_dag = PythonOperator(
    task_id = 'retry_dag',
    provide_context=True,
    python_callable=call_external_dag,
    trigger_rule = 'all_done',
    dag=dag
)

my_airflow_task >> my_airflow_task_checker >> retry_dag