Airflow 기초실습

2020-09-14

.

Data_Engineering_TIL(20200914)

‘어쩐지 오늘은’ 블로그의 ‘Apache Airflow - Workflow 관리 도구(1)’ 을 공부하고 정리한 내용입니다.

URL : https://zzsza.github.io/data/2018/01/04/airflow-1/

# airflow 아키텍처

구성요소 : Webserver, Scheduler, metaDB, worker

Webserver : 웹 UI를 표현하고, workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인 등 가능

Scheduler :

1) 작업 기준이 충족되는지 여부를 확인

2) 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등

3) 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함

# airflow 설치 및 구동

# airflow 설치를 위한 명령어
[ec2-user@ip-10-1-10-239 ~]$ sudo yum update -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install python3 -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install gcc python3-devel -y
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install apache-airflow==1.10.3
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install werkzeug==0.15.4

# airflow DB를 생성하는 작업
[ec2-user@ip-10-1-10-239 ~]$ airflow initdb
                                                                                                              
# airflow webserver구동.
[ec2-user@ip-10-1-10-239 ~]$ airflow webserver -p 8080
# 위와 같이 웹서버를 구동하고 나서 웹브라우져로 이동한 다음 [ec2 server public ip]:8080 으로 접속하면 airflow WebUI화면을 확인할 수 있다.

# 터미널 새 창을 열어서 아래 커맨드 입력                                    
# airflow scheduler 구동. DAG들의 스케쥴링 및 실행 담당
[ec2-user@ip-10-1-10-239 ~]$ airflow scheduler                                                                                                              
                                                   
# ~/airflow 위치에 airflow 관련 파일이 저장됨
[ec2-user@ip-10-1-10-239 ~]$ cd ~/airflow
[ec2-user@ip-10-1-10-239 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg
# airflow.cfg : Airflow 관련 설정, airflow.db : sqlite 데이터베이스, dags : 없다면 mkdir dags로 생성한다. DAG 파일이 저장되는 장소
[ec2-user@ip-10-1-10-239 airflow]$ mkdir dags
[ec2-user@ip-10-1-10-239 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  dags  logs  unittests.cfg

# DAG 생성하는 흐름

STEP 1) default_args 정의

누가 생성했는지, start_date에 대한 설정 등

STEP 2) DAG 객체 생성

dag id, schedule interval 정의

STEP 3) DAG 안에 Operator를 활용해 Task 생성

STEP 4) Task들을 연결함( », « 활용)

요약하자면 DAG 객체 생성 -> Operator를 활용해 Task 작성 -> Task를 연결하는 방식

  • Airflow는 $AIRFLOW_HOME(default는 ~/airflow)의 dags 폴더에 있는 dag file을 지속적으로 체크함

Operator를 사용해 Task를 정의함

Operator가 인스턴스화가 될 경우 Task라고 함

Python Operator, Bash Operator, BigQuery Operator, Dataflow Operator 등

Operator 관련 자료는 https://airflow.apache.org/docs/stable/howto/operator/index.html 참고

Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음

# airflow python code 샘플예시

아래와 같은 코드를 dags 폴더 아래에 test.py로 저장하고 웹서버에서 test DAG 옆에 있는 toggle 버튼을 ON으로 변경

templated_command에서 % 앞뒤의 # 제거필요

from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# start_date를 현재날자보다 과거로 설정하면, 
# backfill(과거 데이터를 채워넣는 액션)이 진행됨

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['test@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=10)}

# dag 객체 생성
with models.DAG(
    dag_id='minman_test', description='airflow test DAG', 
    schedule_interval = '55 14 * * *', 
    default_args=default_args) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)

    # BashOperator를 사용
    # task_id는 unique한 이름이어야 함
    # bash_command는 bash에서 date를 입력한다는 뜻

    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)

    templated_command="""
           # #을 삭제해주세요
          {#% for i in range(5) %#}
              echo "{#{ ds }#}"
              echo "{#{ macros.ds_add(ds, 7)}#}"
              echo "{#{ params.my_param }#}"
          {#% endfor %#}
      """
    
    # {#{ ds }#}, {#{ macros }#}는 jinja template을 의미함
    # 실제 사용시엔 #를 제외해야함
    # Macros reference, Jinja Template 참고하면 자세한 내용이 있음
    # scheduler를 실행시켜 둔 상태라면 DAG들이 실행됨

    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)

    # set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
    t2.set_upstream(t1)
    # t1.set_downstream(t2)와 동일한 표현입니다
    # t1 >> t2 와 동일 표현
    t3.set_upstream(t1)

# airflow code를 작성하고 테스트를 해보자

  • 아래와 같이 bash 명령어 task들을 일련의 dag로 실행하는 airflow python code를 작성하고 테스트한다.
[ec2-user@ip-10-1-10-239 airflow]$ cd dags

[ec2-user@ip-10-1-10-239 dags]$ pwd
/home/ec2-user/airflow/dags

# 지금은 간단한 bash command를 사용했지만, bash로 파이썬 파일도 실행할 수 있다.
[ec2-user@ip-10-1-10-239 dags]$ sudo vim airflow_test.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['minmantest@mail.com'],
    'email_on_failure': False,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('bash_dag',
          default_args=default_args,
          schedule_interval='* * * * *') # 매 1분마다 실행

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

task2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=2,
    dag=dag)

task3 = BashOperator(
    task_id='pwd',
    bash_command='pwd',
    dag=dag)


task1 >> task2
task1 >> task3

[ec2-user@ip-10-1-10-239 dags]$ airflow list_dags
[2020-09-14 03:18:45,156] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 03:18:45,384] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bash_dag # 방금 생성한 DAG이고, 밑에 있는 것들은 airflow에서 제공하는 샘플용 DAG들임
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial

# bash_dag라는 DAG의 task 출력
[ec2-user@ip-10-1-10-239 dags]$ airflow list_tasks bash_dag
[2020-09-14 03:22:43,274] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 03:22:43,508] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
print_date
pwd
sleep

# bash_dag라는 DAG의 task를 Tree 형태로 출력
[ec2-user@ip-10-1-10-239 dags]$ airflow list_tasks bash_dag --tree
[2020-09-14 03:23:28,077] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 03:23:28,310] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
<Task(BashOperator): sleep>
    <Task(BashOperator): print_date>
<Task(BashOperator): pwd>
    <Task(BashOperator): print_date>
  • 웹브라우저를 열고 [ec2 server public ip]:8080 에 접속하여 DAGs 메뉴 이동한다. 그런 다음에 bash_dag에 대하여 좌측에 off 버튼을 눌러 on으로 바꾸면 1분마다 해당 DAG가 돌것이다.

  • 파이썬 코드를 실행하는 airflow python code를 짜고 실행해보자.

[ec2-user@ip-10-1-10-239 dags]$ sudo vim airflow_test2.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['your@maile.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('python_dag1',
          default_args=default_args,
          schedule_interval='* * * * *')


def print_current_date():
    date_kor = ["월", "화", "수", "목", "금", "토", "일"]
    date_now = datetime.now().date()
    datetime_weeknum = date_now.weekday()
    print(f"{date_now}{date_kor[datetime_weeknum]}요일입니다")


python_task = PythonOperator(
    task_id='print_current_date',
    python_callable=print_current_date,
    dag=dag,
)

python_task

[ec2-user@ip-10-1-10-239 dags]$ airflow list_dags
[2020-09-14 04:02:58,732] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:02:59,071] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bash_dag
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
python_dag1 # 방금만든 dag
test_utils
tutorial
  • 마찬가지로 웹브라우저를 열고 [ec2 server public ip]:8080 에 접속하여 DAGs 메뉴 이동한다. 그런 다음에 python_dag1에 대하여 좌측에 off 버튼을 눌러 on으로 바꾸면 1분마다 해당 DAG가 돌것이다

  • 아래의 내용은 python_dag1을 실행한 결과 로그

*** Reading local file: /home/ec2-user/airflow/logs/python_dag1/print_current_date/2020-09-14T00:04:00+00:00/1.log
[2020-09-14 04:10:24,503] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag1.print_current_date 2020-09-14T00:04:00+00:00 [queued]>
[2020-09-14 04:10:24,507] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag1.print_current_date 2020-09-14T00:04:00+00:00 [queued]>
[2020-09-14 04:10:24,507] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:10:24,507] {__init__.py:1354} INFO - Starting attempt 1 of 2
[2020-09-14 04:10:24,507] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:10:24,515] {__init__.py:1374} INFO - Executing <Task(PythonOperator): print_current_date> on 2020-09-14T00:04:00+00:00
[2020-09-14 04:10:24,516] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'python_dag1', 'print_current_date', '2020-09-14T00:04:00+00:00', '--job_id', '35', '--raw', '-sd', 'DAGS_FOLDER/airflow_test2.py', '--cfg_path', '/tmp/tmp_nwmlsm9']
[2020-09-14 04:10:25,031] {base_task_runner.py:101} INFO - Job 35: Subtask print_current_date [2020-09-14 04:10:25,030] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:10:25,262] {base_task_runner.py:101} INFO - Job 35: Subtask print_current_date [2020-09-14 04:10:25,262] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test2.py
[2020-09-14 04:10:25,277] {base_task_runner.py:101} INFO - Job 35: Subtask print_current_date [2020-09-14 04:10:25,277] {cli.py:517} INFO - Running <TaskInstance: python_dag1.print_current_date 2020-09-14T00:04:00+00:00 [running]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 04:10:25,290] {python_operator.py:104} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=python_dag1
AIRFLOW_CTX_TASK_ID=print_current_date
AIRFLOW_CTX_EXECUTION_DATE=2020-09-14T00:04:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-09-14T00:04:00+00:00
[2020-09-14 04:10:25,290] {logging_mixin.py:95} INFO - 2020-09-14 월요일입니다
[2020-09-14 04:10:25,291] {python_operator.py:113} INFO - Done. Returned value was: None
[2020-09-14 04:10:29,506] {logging_mixin.py:95} INFO - [2020-09-14 04:10:29,506] {jobs.py:2562} INFO - Task exited with return code 0
  • 그러면 조금 더 복잡한 파이썬 코드를 실행하는 airflow python code를 짜고 실행해보자.

PythonOperator(task_id, python_callable, op_args, dag, provide_context, templates_dict)로 사용함

1) task_id는 task의 id(예 : print_current_date)

2) python_callable는 호출 수 있는 python 함수를 인자로 넣음

3) op_args : callable 함수가 호출될 때 사용할 함수의 인자

4) dag : DAG 정의한 객체 넣으면 됨

5) provide_context : True로 지정하면 Airflow에서 기본적으로 사용되는 keyword arguments 등이 사용 가능하게 됨

6) templates_dict : op_args 등과 비슷하지만 jinja template이 변환됨

Airflow의 기본 context 변수 사용한 것으로 PythonOperator에서 provide_context=True일 경우 사용 가능하며 kwargs에 값이 저장된다.

예를 들면

  provide_context=True로 지정하면 kwargs 다양한 값들이 저장됨
  {'dag': <DAG: python_dag_with_jinja>,
  'ds': '2020-02-10',
  'next_ds': '2020-02-11',
  'next_ds_nodash': '20200211',
  'prev_ds': '2020-02-09',
  'prev_ds_nodash': '20200209',
  'ds_nodash': '20200210',
  'ts': '2020-02-10T00:30:00+00:00',
  'ts_nodash': '20200210T003000',
  'ts_nodash_with_tz': '20200210T003000+0000',
  'yesterday_ds': '2020-02-09',
  'yesterday_ds_nodash': '20200209',
  'tomorrow_ds': '2020-02-11',
  'tomorrow_ds_nodash': '20200211',
  'end_date': '2020-02-10',
  'execution_date': <Pendulum [2020-02-10T00:30:00+00:00]> ...}

아래와 같이 파이썬 코드를 짜고 실행해보자.

[ec2-user@ip-10-1-10-239 dags]$ sudo vim airflow_test3.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['your@mail.com'],
    'email_on_failure': False,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('python_dag_with_context',
          default_args=default_args,
          schedule_interval='* * * * *')


def print_current_date_provide_context(*args, **kwargs):
    """
    provide_context=True로 지정하면 kwargs 다양한 값들이 저장됨
    {'dag': <DAG: python_dag_with_jinja>,
    'ds': '2020-02-10',
    'next_ds': '2020-02-11',
    'next_ds_nodash': '20200211',
    'prev_ds': '2020-02-09',
    'prev_ds_nodash': '20200209',
    'ds_nodash': '20200210',
    'ts': '2020-02-10T00:30:00+00:00',
    'ts_nodash': '20200210T003000',
    'ts_nodash_with_tz': '20200210T003000+0000',
    'yesterday_ds': '2020-02-09',
    'yesterday_ds_nodash': '20200209',
    'tomorrow_ds': '2020-02-11',
    'tomorrow_ds_nodash': '20200211',
    'end_date': '2020-02-10',
    'execution_date': <Pendulum [2020-02-10T00:30:00+00:00]> ...}
    """
    print(f"kwargs :{kwargs}")
    execution_date = kwargs['ds']
    execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date()
    date_kor = ["월", "화", "수", "목", "금", "토", "일"]
    datetime_weeknum = execution_date.weekday()
    print(f"{execution_date}{date_kor[datetime_weeknum]}요일입니다")


python_task_context = PythonOperator(
    task_id='print_current_date_with_context_variable',
    python_callable=print_current_date_provide_context,
    provide_context=True,
    dag=dag,
)


python_task_context

[ec2-user@ip-10-1-10-239 dags]$ airflow list_dags
[2020-09-14 04:18:15,491] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:18:15,763] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bash_dag
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
python_dag1
python_dag_with_context # 방금 생성한 dag
test_utils
tutorial
  • 마찬가지로 웹브라우저를 열고 [ec2 server public ip]:8080 에 접속하여 DAGs 메뉴 이동한다. 그런 다음에 python_dag_with_context에 대하여 좌측에 off 버튼을 눌러 on으로 바꾸면 1분마다 해당 DAG가 돌것이다

  • 아래의 내용은 python_dag_with_context를 실행한 결과 로그

*** Reading local file: /home/ec2-user/airflow/logs/python_dag_with_context/print_current_date_with_context_variable/2020-09-14T00:03:00+00:00/1.log
[2020-09-14 04:23:59,499] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag_with_context.print_current_date_with_context_variable 2020-09-14T00:03:00+00:00 [queued]>
[2020-09-14 04:23:59,502] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag_with_context.print_current_date_with_context_variable 2020-09-14T00:03:00+00:00 [queued]>
[2020-09-14 04:23:59,503] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:23:59,503] {__init__.py:1354} INFO - Starting attempt 1 of 2
[2020-09-14 04:23:59,503] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:23:59,511] {__init__.py:1374} INFO - Executing <Task(PythonOperator): print_current_date_with_context_variable> on 2020-09-14T00:03:00+00:00
[2020-09-14 04:23:59,511] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'python_dag_with_context', 'print_current_date_with_context_variable', '2020-09-14T00:03:00+00:00', '--job_id', '41', '--raw', '-sd', 'DAGS_FOLDER/airflow_test3.py', '--cfg_path', '/tmp/tmpwo9nq1v3']
[2020-09-14 04:24:00,035] {base_task_runner.py:101} INFO - Job 41: Subtask print_current_date_with_context_variable [2020-09-14 04:24:00,034] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:24:00,278] {base_task_runner.py:101} INFO - Job 41: Subtask print_current_date_with_context_variable [2020-09-14 04:24:00,278] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test3.py
[2020-09-14 04:24:00,294] {base_task_runner.py:101} INFO - Job 41: Subtask print_current_date_with_context_variable [2020-09-14 04:24:00,294] {cli.py:517} INFO - Running <TaskInstance: python_dag_with_context.print_current_date_with_context_variable 2020-09-14T00:03:00+00:00 [running]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 04:24:00,309] {python_operator.py:104} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=python_dag_with_context
AIRFLOW_CTX_TASK_ID=print_current_date_with_context_variable
AIRFLOW_CTX_EXECUTION_DATE=2020-09-14T00:03:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-09-14T00:03:00+00:00
[2020-09-14 04:24:00,310] {logging_mixin.py:95} INFO - kwargs :{'dag': <DAG: python_dag_with_context>, 'ds': '2020-09-14', 'next_ds': '2020-09-14', 'next_ds_nodash': '20200914', 'prev_ds': '2020-09-14', 'prev_ds_nodash': '20200914', 'ds_nodash': '20200914', 'ts': '2020-09-14T00:03:00+00:00', 'ts_nodash': '20200914T000300', 'ts_nodash_with_tz': '20200914T000300+0000', 'yesterday_ds': '2020-09-13', 'yesterday_ds_nodash': '20200913', 'tomorrow_ds': '2020-09-15', 'tomorrow_ds_nodash': '20200915', 'END_DATE': '2020-09-14', 'end_date': '2020-09-14', 'dag_run': <DagRun python_dag_with_context @ 2020-09-14 00:03:00+00:00: scheduled__2020-09-14T00:03:00+00:00, externally triggered: False>, 'run_id': 'scheduled__2020-09-14T00:03:00+00:00', 'execution_date': <Pendulum [2020-09-14T00:03:00+00:00]>, 'prev_execution_date': <Pendulum [2020-09-14T00:02:00+00:00]>, 'next_execution_date': <Pendulum [2020-09-14T00:04:00+00:00]>, 'latest_date': '2020-09-14', 'macros': <module 'airflow.macros' from '/usr/local/lib/python3.7/site-packages/airflow/macros/__init__.py'>, 'params': {}, 'tables': None, 'task': <Task(PythonOperator): print_current_date_with_context_variable>, 'task_instance': <TaskInstance: python_dag_with_context.print_current_date_with_context_variable 2020-09-14T00:03:00+00:00 [running]>, 'ti': <TaskInstance: python_dag_with_context.print_current_date_with_context_variable 2020-09-14T00:03:00+00:00 [running]>, 'task_instance_key_str': 'python_dag_with_context__print_current_date_with_context_variable__20200914', 'conf': <module 'airflow.configuration' from '/usr/local/lib/python3.7/site-packages/airflow/configuration.py'>, 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': [], 'templates_dict': None}
[2020-09-14 04:24:00,311] {logging_mixin.py:95} INFO - 2020-09-14 월요일입니다
[2020-09-14 04:24:00,311] {python_operator.py:113} INFO - Done. Returned value was: None
[2020-09-14 04:24:04,502] {logging_mixin.py:95} INFO - [2020-09-14 04:24:04,502] {jobs.py:2562} INFO - Task exited with return code 0

  • 방금 작성한 PythonOperator의 아쉬운 점

실행한 Task의 로그를 보면 => 모두 같은 결과가 나옴

언제 실행해도 무조건 datetime.now()를 사용해서 현재 날짜를 사용함

어제 일자에서 이 함수를 실행했다면?

2020-02-13는 목요일입니다가 출력되었을 것

이런 경우 Python Code에서 시간에 대한 컨트롤을 가진 케이스

Python Code에서 컨트롤을 가지면 과거 작업을 돌리기 힘듬

Airflow에서 Date를 컨트롤하는게 좋음

이럴 때 Airflow에서 제공되는 기본 context 변수 또는 Jinja Template 사용

Flask에서 Jinja Template을 사용함

  • Jinja Template을 사용하여 코드를 짜보자.

”” 이런 형태로 사용함 : execution_date

PythonOperator는 기본 context 변수 사용이 더 쉽지만, 다른 Operator는 Jinja Template이 편하다.

PythonOperator는 templates_dict에 변수를 넣어서 사용

‘Macros Default Variables Document에 정의되어 있음

아래와 같이 파이썬 코드를 짜고 실행해보자.

[ec2-user@ip-10-1-10-239 dags]$ sudo vim airflow_test4.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['your@mail.com'],
    'email_on_failure': False,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('python_dag_with_jinja',
          default_args=default_args,
          schedule_interval='* * * * *')


def print_current_date_jinja(*args, **kwargs):
    """
    jinja template(today)가 templates_dict으로 저장되서 kwargs에서 사용할 수 있음
    """
    execution_date = kwargs.get('templates_dict').get('today', None)
    execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date()
    date_kor = ["월", "화", "수", "목", "금", "토", "일"]
    datetime_weeknum = execution_date.weekday()
    print(f"{execution_date}{date_kor[datetime_weeknum]}요일입니다")


today = ""

python_task_jinja = PythonOperator(
    task_id='print_current_date_with_jinja',
    python_callable=print_current_date_jinja,
    provide_context=True,
    templates_dict={
        'today': today,

    },
    dag=dag,
)


python_task_jinja

[ec2-user@ip-10-1-10-239 dags]$ airflow list_dags
[2020-09-14 04:45:47,193] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:45:47,443] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bash_dag
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
python_dag1
python_dag_with_context
python_dag_with_jinja # 방금 생성한 dag
test_utils
tutorial
  • 마찬가지로 웹브라우저를 열고 [ec2 server public ip]:8080 에 접속하여 DAGs 메뉴 이동한다. 그런 다음에 python_dag_with_jinja에 대하여 좌측에 off 버튼을 눌러 on으로 바꾸면 1분마다 해당 DAG가 돌것이다

  • 아래의 내용은 python_dag_with_jinja를 실행한 결과 로그

*** Reading local file: /home/ec2-user/airflow/logs/python_dag_with_jinja/print_current_date_with_jinja/2020-09-14T00:00:00+00:00/1.log
[2020-09-14 04:52:08,885] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-14T00:00:00+00:00 [queued]>
[2020-09-14 04:52:08,889] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-14T00:00:00+00:00 [queued]>
[2020-09-14 04:52:08,890] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:52:08,890] {__init__.py:1354} INFO - Starting attempt 1 of 2
[2020-09-14 04:52:08,890] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2020-09-14 04:52:08,899] {__init__.py:1374} INFO - Executing <Task(PythonOperator): print_current_date_with_jinja> on 2020-09-14T00:00:00+00:00
[2020-09-14 04:52:08,899] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-14T00:00:00+00:00', '--job_id', '42', '--raw', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmpskohviqp']
[2020-09-14 04:52:09,630] {base_task_runner.py:101} INFO - Job 42: Subtask print_current_date_with_jinja [2020-09-14 04:52:09,630] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:52:09,909] {base_task_runner.py:101} INFO - Job 42: Subtask print_current_date_with_jinja [2020-09-14 04:52:09,909] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test4.py
[2020-09-14 04:52:09,931] {base_task_runner.py:101} INFO - Job 42: Subtask print_current_date_with_jinja [2020-09-14 04:52:09,930] {cli.py:517} INFO - Running <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-14T00:00:00+00:00 [running]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 04:52:09,952] {python_operator.py:104} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=python_dag_with_jinja
AIRFLOW_CTX_TASK_ID=print_current_date_with_jinja
AIRFLOW_CTX_EXECUTION_DATE=2020-09-14T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-09-14T00:00:00+00:00
[2020-09-14 04:52:09,954] {logging_mixin.py:95} INFO - 2020-09-14 월요일입니다
[2020-09-14 04:52:09,954] {python_operator.py:113} INFO - Done. Returned value was: None
[2020-09-14 04:52:13,889] {logging_mixin.py:95} INFO - [2020-09-14 04:52:13,889] {jobs.py:2562} INFO - Task exited with return code 0

# Backfill

Context Variable이나 Jinja Template을 사용하면 Backfill을 제대로 사용할 수 있음

Backfill : 과거 날짜 기준으로 실행

airflow backfill -s START_DATE -e END_DATE dag_id

아래와 같이 airflow_test4.py를 수정하여 backfill을 실행해보자.

[ec2-user@ip-10-1-10-239 dags]$ sudo vim airflow_test4.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 14),
    'email': ['your@mail.com'],
    'email_on_failure': False,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('python_dag_with_jinja',
          default_args=default_args,
          schedule_interval='30 0 * * *')


def print_current_date_jinja(*args, **kwargs):
    """
    jinja template(today)가 templates_dict으로 저장되서 kwargs에서 사용할 수 있음
    """
    execution_date = kwargs.get('templates_dict').get('today', None)
    execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date()
    date_kor = ["월", "화", "수", "목", "금", "토", "일"]
    datetime_weeknum = execution_date.weekday()
    print(f"{execution_date}{date_kor[datetime_weeknum]}요일입니다")


today = ""

python_task_jinja = PythonOperator(
    task_id='print_current_date_with_jinja',
    python_callable=print_current_date_jinja,
    provide_context=True,
    templates_dict={
        'today': today,

    },
    dag=dag,
)


python_task_jinja

[ec2-user@ip-10-1-10-239 dags]$ airflow list_dags
[2020-09-14 04:59:57,100] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 04:59:57,354] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
bash_dag
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
python_dag1
python_dag_with_context
python_dag_with_jinja  # 방금 생성한 dag
test_utils
tutorial

# 아래 명령어를 입력해보고 Webserver에 가보자.
[ec2-user@ip-10-1-10-239 dags]$ airflow backfill -s 2020-09-10 -e 2020-09-13 python_dag_with_jinja
[2020-09-14 05:01:02,650] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 05:01:02,919] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-09-14 05:01:03,350] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-10T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmpcum4zeyt']
[2020-09-14 05:01:03,379] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-11T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmpfaxe5n3a']
[2020-09-14 05:01:03,429] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-12T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmp04ce5d8v']
[2020-09-14 05:01:08,117] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-10T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmpcum4zeyt']
[2020-09-14 05:01:09,754] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 05:01:10,061] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test4.py
[2020-09-14 05:01:10,077] {cli.py:517} INFO - Running <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-10T00:30:00+00:00 [queued]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 05:01:15,361] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-11T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmpfaxe5n3a']
[2020-09-14 05:01:16,257] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 05:01:16,488] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test4.py
[2020-09-14 05:01:16,504] {cli.py:517} INFO - Running <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-11T00:30:00+00:00 [queued]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 05:01:21,730] {sequential_executor.py:45} INFO - Executing command: ['airflow', 'run', 'python_dag_with_jinja', 'print_current_date_with_jinja', '2020-09-12T00:30:00+00:00', '--local', '-sd', 'DAGS_FOLDER/airflow_test4.py', '--cfg_path', '/tmp/tmp04ce5d8v']
[2020-09-14 05:01:22,389] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-09-14 05:01:22,621] {__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/airflow_test4.py
[2020-09-14 05:01:22,638] {cli.py:517} INFO - Running <TaskInstance: python_dag_with_jinja.print_current_date_with_jinja 2020-09-12T00:30:00+00:00 [queued]> on host ip-10-1-10-239.ap-northeast-2.compute.internal
[2020-09-14 05:01:28,151] {__init__.py:4854} INFO - Marking run <DagRun python_dag_with_jinja @ 2020-09-10 00:30:00+00:00: backfill_2020-09-10T00:30:00+00:00, externally triggered: False> successful
[2020-09-14 05:01:28,169] {__init__.py:4854} INFO - Marking run <DagRun python_dag_with_jinja @ 2020-09-11 00:30:00+00:00: backfill_2020-09-11T00:30:00+00:00, externally triggered: False> successful
[2020-09-14 05:01:28,189] {__init__.py:4854} INFO - Marking run <DagRun python_dag_with_jinja @ 2020-09-12 00:30:00+00:00: backfill_2020-09-12T00:30:00+00:00, externally triggered: False> successful
[2020-09-14 05:01:28,197] {jobs.py:2124} INFO - [backfill progress] | finished run 3 of 3 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2020-09-14 05:01:28,197] {jobs.py:2495} INFO - Backfill done. Exiting.

# Airflow로 토이 ETL 파이프라인 코드 샘플

시나리오

Google Cloud Storage에 매일 하루에 1번씩 주기적으로 csv 파일이 저장됨

csv 파일을 BigQuery에 Load

BigQuery에서 쿼리를 돌린 후, 일자별로 사용량 쿼리해서 Table 저장

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

# 시나리오
# Google Cloud Storage에 매일 하루에 1번씩 주기적으로 csv 파일이 저장됨
# - csv 파일을 BigQuery에 Load
# - BigQuery에서 쿼리를 돌린 후, 일자별로 사용량 쿼리해서 Table 저장

default_args = {
    'owner': 'minman',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 9),
    'email': ['your@mail.com'],
    'email_on_failure': False,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'end_date': datetime(2020, 2, 13),
    'project_id': 'my-project-1541645429744'
}

dag = DAG('simple_etl_storage_to_bigquery',
          default_args=default_args,
          schedule_interval='30 0 * * *')

execution_date = ''


storage_to_bigquery_task = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    google_cloud_storage_conn_id='google_cloud_default',
    bigquery_conn_id='google_cloud_default',
    task_id='storage_to_bigquery',
    schema_object='data/bike_schema.json',
    bucket='minman_test', # 생성한 bucket 이름을 넣으세요
    source_objects=[f"data/bike_data_{execution_date}.csv"],
    source_format='CSV',
    destination_project_dataset_table=f'my-project-1541645429744.temp.bike_{execution_date}', # 맨 앞 project_id 변경하세요
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows=1
)

agg_query = f"""
SELECT 
  dummy_date, start_station_id, end_station_id, COUNT(bikeid) as cnt
FROM `my-project-1541645429744.temp.bike_{execution_date}`
GROUP BY dummy_date, start_station_id, end_station_id
"""

query_task = BigQueryOperator(
        dag=dag,
        task_id="query_to_table",
        bigquery_conn_id='google_cloud_default',
        sql=agg_query,
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        destination_dataset_table=f"temp.bike_agg_{execution_date}"
)

storage_to_bigquery_task >> query_task
  • 아래와 같이 설정을 확인해준다.

STEP 1) APIs & Services - Create Credentials - Service account

STEP 2) Service account permissions에서 BigQuery Admin, Storage Admin

STEP 3) Create key (optional) 밑에 있는 CREATE KEY 클릭

STEP 4) JSON 선택하고 CREATE

STEP 5) 다운로드된 project_name-123123.json 확인

이 Key는 외부유출 시 보안상 큰 문제가 될 수 있기 때문에 잘 다루어야 함

  • airflow to GCP 인증 연결해야 하는데 아래와 같이 해주면 된다.

STEP 1) Airflow Webserver - Admin - connection 이동

STEP 2) Conn Id가 google_cloud_default 찾기

STEP 3) 왼쪽에 있는 연필 버튼 클릭

STEP 4) Project Id에 자신의 프로젝트 ID 입력(name 아님!)

STEP 5) Google Cloud Platform Console에 있음

STEP 6) Keyfile JSON에 아까 위에서 만든 JSON key 내용 통째로 복사해서 붙여넣기

STEP 7) Scopes는 https://www.googleapis.com/auth/cloud-platform 입력

STEP 8) save 클릭

  • Google Cloud Storage에 데이터 업로드

STEP 1) Google Cloud Storage로 이동

STEP 2) CREATE BUCKET(이미 있다면 그거 사용해도 무방) 클릭

STEP 3) 지역 선택후 나머지 그냥 다 Continue 클릭

STEP 4) bucket 만듬

STEP 5) https://github.com/zzsza/kyle-school/tree/master/week6/data 에서 데이터 다운로드

bike_data_20200209 ~ bike_data_20200212.csv

STEP 6) 방금 만든 Bucket의 data 폴더 안에 방금 받은 파일 업로드

GoogleCloudStorageToBigQueryOperator, BigQueryOperator 사용할 예정

** 참고사항 : BigQueryOperator

간단히 생각하면 쿼리를 날려서 Table에 저장

agg_query에 간단한 쿼리 작성함

BigQueryOperator는 destination_dataset_table만 잘 정의하면 됨

# 기타 참고사항

  • Already running on PID XXXX Error가 발생할 경우

Airflow webserver 실행시 ~/airflow/airflow-webserver.pid에 process id를 저장하는데 쉘에서 빠져나왔지만 종료되지 않는 경우가 있다.

이런 경우 Webserver가 제대로 종료되지 않은 상황으로 아래와 같이 명령어를 실행하여 강제종료 시키면 된다.

kill -9 $(lsof -t -i:8080)

  • pip로 설치하면 생기는 기본 설정값은 아래와 같다.

sequential executor : 기본적으로 1개만 실행할 수 있음. 병렬 실행 불가능

celery executor를 사용해 병렬로 실행이 가능하나 RabbitMQ나 Redis가 필요하다.

  • sqlite : 기본 meta store는 sqlite인데, 동시 접근이 불가능해서 병렬처리는 불가능하다.

병렬 실행하기 위해 mysql이나 postgresql을 사용해야한다.

  • 위 설정을 서버에 매번 하는 일은 번거로운 일이기 때문에 Docker로 만들면 쉽게 할 수 있다.

docker-airflow Github에 보면 이미 만들어진 Dockerfile이 공유되어 있음

docker-compose로 실행하는 방법도 있고, Airflow 버전이 올라가면 빠르게 업데이트하는 편이다.

  • Airflow의 DAG 폴더(default는 ~/airflow/dags)를 더 쉽게 사용하려면 해당 폴더를 workspace로 하는 jupyter notebook을 띄워도 좋다. 데이터 분석가도 쉽게 DAG 파일 생성 가능하다.

  • Task간 데이터를 주고 받아야 할 경우 xcom 사용해야 한다.

Admin - xcom에 들어가면 값이 보임

아래와 같이 xcom에 데이터 저장(xcom_push) 할 수 있다.

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

아래와 같이 다른 task에서 데이터 불러오기(xcom_pull)를 할 수 있다.

task_instance.xcom_pull(task_ids='my_task', key='the_key')

참고로 PythonOperator에서 사용하는 python_callable 함수에서 return하는 값은 xcom에 자동으로 push된다.

  • DAG에서 다른 DAG에 종속성이 필요한 경우 ExternalTaskSensor 사용해야 한다.

–> https://airflow.readthedocs.io/en/stable/_api/airflow/sensors/external_task_sensor/

1개의 DAG에서 하면 좋지만, 여러 사람이 만든 DAG이 있고 그 중 하나를 사용해야 할 경우도 있음

  • 특정 DAG을 Trigger하고 싶은 경우 TriggerDagRunOperator 사용해야 한다.

–> https://airflow.readthedocs.io/en/stable/_api/airflow/operators/dagrun_operator/index.html#airflow.operators.dagrun_operator.TriggerDagRunOperator.execute

예제 URL : https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py

내용은 아래 코드와 같음

"""
Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id="example_trigger_controller_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval="@once",
    tags=['example']
)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    trigger_dag_id="example_trigger_target_dag",  # Ensure this equals the dag_id of the DAG to trigger
    conf={"message": "Hello World"},
    dag=dag,
)
  • 특정 Task의 성공/실패에 따라 다른 Task를 실행시키고 싶은 경우 Airflow Trigger Rule 사용해야 한다.

예를 들어 앞의 두 작업중 하나만 실패한 경우에 trigger rule을 사용할 수 있다.

관련 Document 참고(https://airflow.apache.org/docs/stable/concepts.html#trigger-rules)

  • Jinja Template이 작동하지 않는 경우 아래와 같이 조치한다.

STEP 1) 우선 provide_context=True 조건을 주었는지 확인

STEP 2) Jinja Template이 있는 함수의 파라미터가 어떤 것인지 확인

STEP 3) Operator마다 Jinja Template을 해주는 template_fields가 있는데, 기존 정의가 아닌 파라미터에서 사용하고 싶은 경우 새롭게 정의

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')
  • Airflow 변수를 저장하고 싶은 경우 Variable 사용해야 한다.

Admin - Variables에서 볼 수 있음

json 파일로 변수 저장해서 사용하는 방식을 자주 사용한다.

from airflow.models import Variable

config=Variable.get(f"{HOME}/config.json", deserialize_json=True)

environment=config["environment"]
project_id=config["project_id"]
  • Task를 그룹화하고 싶은 경우 dummy_operator 사용해야 한다.

workflow 목적으로 사용하는 경우도 있음. 일부 작업을 건너뛰고 싶은 경우, 빈 작업 경로를 가질 수 없어서 dummy_operator를 사용하기도 함

  • 1개의 Task이 완료된 후에 2개의 Task를 병렬로 실행하고 싶을때 예시
task1 >> [task2_1, task_2_2]
  • 앞선 Task의 결과에 따라 True인 경우엔 A Task, False인 경우엔 B Task를 실행해야 하는 경우 BranchPythonOperator 사용해야 한다.

python_callable에 if문을 넣어 그 다음 task_id를 정의할 수 있고,

단순하게 앞선 task가 성공, 1개만 실패 등이라면 trigger_rule만 정의해도 된다.

  • Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우 Papermill 사용해야 한다.

참고 문서 : https://airflow.readthedocs.io/en/latest/howto/operator/papermill.html

  • airflow에서 기준 시는 UTC 시간대임을 주의해야 한다.

시간대 관련 참고할 만한 자료 : https://blog.naver.com/gyrbsdl18/221561318823

” execution_date은 실행날짜가 아니라 주문번호(run id)다. 굳이 시간으로 이해하고 싶다면 예약 시간이 아니라 “예약을 잡으려고 시도한 시간” 이라고 이해해야 한다. “

  • Task가 실패했을 경우 슬랙 메세지 전송하기

https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 참고 할 것

  • Hook

외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스

대부분 Operator가 실행되기 전에 Hook을 통해 통신함

공식 문서 참고 (https://airflow.apache.org/docs/stable/concepts.html#hooks)

  • 머신러닝에서 사용한 예시는 https://github.com/zzsza/fastcampus-machine-learning-project/tree/master/03-Taxi-Demand-Prediction 참고

  • https://github.com/apache/airflow/tree/master/airflow/example_dags 에 많은 airflow 활용예제들이 있음

  • Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음

과거값 기준으로 재실행한다. 단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임

아래는 Backfill 명령어 예시이다.

airflow backfill -s 2020-09-10 -e 2020-09-13 dag_id

backfill은 기본적으로 실행되지 않은 Task들만 실행하기 때문에 DAG을 아예 다시 실행하고 싶다면 --reset_dagruns를 같이 인자로 줘야한다.

airflow backfill -s 2020-09-10 -e 2020-09-13 --reset_dagruns dag_id

실패한 Task만 재실행하고 싶다면 아래 코드와 같이 --rerun_failed_task를 사용한다.

airflow backfill -s 2020-09-10 -e 2020-09-13 --reset_failed_task dag_id

airflow backfill이 아닌 강제로 trigger를 하고 싶다면 아래와 같이 명령어를 실행한다.

airflow trigger_dag dag_id -e 2020-01-13
  • airflow 1.10.8, 1.10.9 버전부터 tag 기능이 생김. DAG을 더 관리하기 용이해졌다.
dag = DAG(
    dag_id='example_dag_tag',
    schedule_interval='0 0 * * *',
    tags=['example']
  )