Airflow 기초개념 및 구현실습

2019-04-20

Data_Engineering_TIL_(20190420)

study program : https://www.fastcampus.co.kr/extension_des

[학습목표]

  • 데이터 파이프라인 개요 이해

  • Apache Airflow 기초개념 이해

[학습기록]

  • 실무관련 팁

GCP Big Query는 장점이 많은 툴이라 실무에서도 쓰기 좋다.

200 lines 정도 되고 10기가가 넘어가는 데이터도 쿼리도 빠르게 돌아감, 신입 팀원들도 어렵지 않게 수정하고 실행함. 그리고 데이터 용량 1기가바이트 당 5원씩 정도로 상당히 저렴함

실제 스파크 클러스터를 구축해서 저런 쿼리를 날리는 비용과 비교했을때 매우 저렴함고 편리하다고 할 수 있다.

보통 저런 200 lines 정도 되고 10기가가 넘어가는 데이터를 쿼리하려면 적어도 4~5개의 클러스터를 구성하고 컴퓨팅을 해야하는데 코스팅이 많이 든다.

ETL 관리는 정말 어렵고 비용이 많이 들기 때문에 이런 Big query이 view를 활용하면 상당히 이점이 있다.

SQL의 view 쿼리 같은것도 빅쿼리에서 자유롭게 할 수 있다.

view란

가상 테이블로 특정 데이터만 보고자 할때 사용한다. 실제 데이터를 저장하고 있지는 않다. 한마디로 특정컬럼의 데이터를 보여주는 역할만 한다. 뷰를 사용 함으로 쿼리를 더 단순하게 만들수 있다. 한번 생성된 뷰는 수정이 불가능 하며 인덱스설정이 불가능하다.

view 문법

CREATE VIEW <뷰이름> AS

(QUERY)

view 사용 예시

(국가코드와 국가이름이 있는 뷰 생성하고 싶을때)

CREATE VIEW code_name AS

SELECT code, name

FROM country

  • 구글 클라우드 플랫폼 Data proc의 활용방안

제플린이나 주피터를 띄워서 분석하는 클러스터를 만들 수 있다. 스파크 submit 했던 것 처럼 잡을 등록해서 실행하는 용도로 쓸 수도 있는 등 다양한 활용방법이 있다.

  • 구글의 pub/sub은 카프카랑 유사한 툴이라고 보면 된다. 펍섭으로 데이터를 모아서, 람다 아키텍처를 안쓰고 스트리밍 처리해서 빅쿼리에 저장해놓고 빅쿼리에서 분석할 수 있다.

  • 데이터 파이프라인 개요

과거에는 데이터를 모으고, 분석하던 비교적 단순한 과정이었으나, 현재는 훨씬 복잡졌다. 예를 들어서 여러군데에서 데이터를 조합하고 변환하거나, 처리된 데이터를 다시 파이프에 넣는다거나, 배치처리, 스트리밍 등 이런 복잡한 작업을 하기위해 안정적으로 복잡한 작업들을 처리할 수 있는 시스템이 필요해졌다.

다시말하면

통상 데이터 ETL(Extract, Transform, Load) 과정을 통해 머신러닝 모델을 위한 Dataset을 만드는 경우가 많다. 또한 다양한 데이터베이스를 사용할 경우 한곳으로 모아서 작업을 해야하는 경우가 있다. 여러개의 앞단의 output이 뒷단의 input이 되는 Sequential한 로직이 존재하는데 이런 로직들을 한번에 관리할 필요가 있다.

그래서 이렇게 관리할 로직이 적다면 크론텝 등을 활용하여 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있기는 하지만 점점 관리할 태스크들이 많아지면 헷갈리는 경우가 생겨 관리의 어려움이 많기 때문에 나온 아이디어로 만든 툴이라고 할 수 있다.

Airflow는 Python으로 코드를 작성할 수 있기 때문에 데이터 분석을 하는 사람들이 쉽게 코드를 작성할 수 있는 장점이 있다.

Airflow 콘솔이 따로 존재해 Task 관리를 서버에서 들어가 관리하지 않아도 되고, 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용하다. 또한 GCP의 BigQuery나 Dataflow에 접근 할 수 있도록 잘 구성되어 있다.

  • Apache Airflow

쉽게 말해서 data 처리 플로우 관리 툴이다.

1) Airbnb에서 개발한 오픈소스 프로젝트

2) Workflow를 Directed Acyclic Graphs (DAG) 로 표현

DAG은 그래프인데 진행방향이 있는 그래프를 말한다.

통상 에어플로우에서 DAG은 task들의 묶음이라고 생각하면 된다.

3) 순차적으로 연결된 작업을 실행하거나 상태를 감시하는 도구들을 구현

4) 많이 쓰이는 데이터소스나 시스템(슬랙 등)과 연동 가능한 Python으로 된 컴포넌트들을 제공

5) Scalable한 클러스터로 구동

에어플로우도 클러스터가 있다.

클러스터에서 계산을 하거나 작업을 해야하는 task가 있을 수 있다.

무언가 큰 계산을 하거나 머신러닝을 할때 부하가 에어플로우 컴퓨터에 몰리면 안되기 때문이다.

  • 에어플로우 인터페이스

1) 웹 콘솔에서 사용하는 방법

2) 커멘드라인으로 사용하는 방법

[실습프리뷰]

실습1) 에어플로우 튜토리얼 실습

실습2) 에어플로우상에서 슬랙을 이용한 에러메시지 알림시스템 구현실습

[실습 상세내용]

실습1) 에어플로우 튜토리얼 실습

STEP1) airflow를 설치하고 구동한다. (아래 그림 참고)

관련 URL : airflow.apache.org/start.html

1-1) 터미널 열어서 pip install로 설치해준다.

1-2) airflow initdb 명령어 실행

DB를 초기화 하는 명령어

1-3) airflow webserver -p 8080 명령어 실행

웹서버와 스케쥴러를 별도로 실행해줘야 한다.

1-4) 터미널을 새로하나 열어서 airflow scheduler 명령어 실행

1-5) 웹브라우저를 하나열어서 localhost:8080 으로 접속해서 아래 그림과 같이 웹콘솔이 잘 뜨는지 확인

1

STEP2) 간단한 파이썬 코드를 작성하여 DAG폴더에 적재하기

에어플로우 작동을 위한 파이썬 코드를 작성하여 DAG폴더에 올려주면 실행가능한 형태가 된다.

활용할 코드관련 URL : https://airflow.apache.org/tutorial.html

그래서 먼저 tutorial.py를 생성해주고 아래의 코드를 복붙해준다.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# 아래는 여러가지 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    # 이거를 명시해놓으면 명시해 놓은 특정 날짜부터 task를 시행한다.
    # 2015. 6. 1.부터 시행한다.
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# dag 아이디 및 각종정보설정
# schedule_interval의 옵션은 여러가지이다. schedule_interval=timedelta(days=1) 이런식으로 넣어줘도 되고,
# cron expression도 가능하다.  

# t1, t2 and t3 are examples of tasks created by instantiating operators
# 각각의 작업 선언
# 배시오퍼레이터는 배시쉘에서 간단한 베시커멘드를 수행하는 작업을 말한다. 
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

# 에어플로우의 템플릿 기능
# 이부분은 베시셸에서는 명령어가 먹지는 않고, 에어플로우에서 처리해주는 부분이다.
templated_command = """
    
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

# 테스크간의 dependency 설정
t2.set_upstream(t1)
t3.set_upstream(t1)

위의 코드에서 set_upstream은 아래 그림처럼 task 간의 DAG를 셋팅해주는 것을 말한다.

t1이 먼저 실행되고, 그 다음에 t2, t3 이렇게 실행된다.

2

그래서 그 다음에 터미널을 하나 열어준다.

~/airflow 폴더로 이동해서 dags 이라는 이름의 폴더를 하나 생성해준다.

dags 폴더로 이동한다.

vi tutorial.py 를 실행해준다. 그 다음에 위의 코드를 복붙해준다.

그리고 dag = DAG(‘tutorial’ .. 의 코드에서 tutorial 만 minman으로 바꿔준다.

dag = DAG(‘minman’ ..

그리고 저장한 다음에 python tutorial.py 명령어를 실행해준다.

아무것도 안뜨면 잘 실행이 된 것이다.

그리고 airflow list_dags로 DAGS 목록에 minman이 들어가 있나 확인해본다.

3

airflow list_tasks tutorial_1 –tree 명령어를 입력하여 트리형태로도 확인해본다.

아래의 그림처럼 print_date, sleep, templated 3개의 태스크 확인

4

STEP3) 이제 에어플로우를 실행을 해볼차례이다.

통상 DAG 전체를 실행하기전에 task 하나하나가 잘 작동되는지 먼저 확인한다.

터미널에서 airflow test tutorial print_date 2015-06-01 명령어를 입력하여 아래 그림과 같이 정상적으로 출력이 되는지 확인한다.

print_date 테스크는 베시쉘에서 date 정보를 출력한다.

5

그 다음에 testing sleep task가 잘 되는지도 확인하고자

airflow test tutorial sleep 2015-06-01 명령어도 실행해서 확인해보고,

(실행결과는 아래 그림과 같다)

5-2

airflow test tutorial templated 2015-06-01 명령어도 실행하여 templated_command 결과가 정상적으로 나오는지 확인해본다.

(실행결과는 아래와 같음)

5-3

STEP4)

터미널에서 airflow backfill minman -s 2015-06-01 -e 2015-06-07 을 실행하여 아래 그림과 같이 잘 출력이 되는지 확인한다.

start_date를 현재날자보다 과거로 설정하면, backfill(과거 데이터를 채워넣는 액션)이 진행된다.

위의 명령어에서 -s 옵션은 스타트를 의미하고 -e옵션은 엔드를 의미한다.

아래 결과출력을 자세히보면 backfill 기능을 통해서 날짜를 2015-06-01부터 2015-06-07까지 넣게된다. airflow, run, minman, 2015-06-02 등의 작업들이 하나의 덩어리로 큐 단위로 넣어지게 된다.

6

STEP5) 웹 콘솔에서도 확인해본다. 실행이 잘 되었는지

7

참고로 아래 그림처럼 해당버튼을 누르면 위에서 실행한 backfill을 실행한 것과 동일하다.

8

그리고 아래 그림처럼 온오프 스위치를 눌러주면 크론텝과 유사하게 정해진 시간에 작업을 실시하는 스케쥴링 같은 기능도 설정 할 수 있다.

9

  • 기타 각종 에어플로우 명령어 정리

9-2

9-3

실습2) 에어플로우상에서 슬랙을 이용한 에러메시지 알림시스템 구현실습

실습내용 관련 URL : https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105

STEP1)

https://api.slack.com/custom-integrations/legacy-tokens 로 접속해서 내가 원하는 슬랙채널의 토근을 생성해준다.

STEP2)

터미널 하나 열어서 pip install apache-airflow[slack] 를 실행

STEP3)

airflow/dags 에서 tutorial_error_slack.py 파일 생성

코드내용은 아래와 같음

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

from airflow.hooks.base_hook import BaseHook

from airflow.operators.slack_operator import SlackAPIPostOperator

def slack_failed_task(context):  
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#dss",
        token="획득한토큰삽입",
        text = ':red_circle: Task Failed',
        username = 'airflow',)
    return failed_alert.execute(context=context)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(seconds=30),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial_error_slack', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='exit 123',
    provide_context=True,
    on_failure_callback=slack_failed_task,
    dag=dag)
# 위에 bash_command에서 exit 0은 성공적으로 실행이 된것이라는 신호이고,
# 0이 아닌 수 예를들어 exit 1이나 exit 123 같은 것들은 이 명령어를 실행하면
# bash command가 실행을 실패한 것으로 처리를 해준다.
# 이렇게 배쉬커멘드가 실행 실패를 하면서 on_faulure_callback이 처리가 되는것이다.

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

STEP4)

터미널에서 python tutorial_error_slack.py 명령어 실행

실행이 완료되면 airflow backfill tutorial_error_slack -s 2015-06-01 -e 2015-06-07 명령어 실행

그리고 슬랙에 위에 설정한 지정된 채널로 접속하면 아래 그림과 같은 메세지가 수신되었을 것이다.

10