AWS ECS를 이용한 Airflow 운영사례 - 원티드랩

2022-12-25

.

Data_Engineering_TIL(20221224)

ECS로 Airflow 홀로서기 by 최종원님 - AWSKRUG 데이터사이언스 소모임 221214 자료를 공부하고 정리한 내용입니다.

** URL : https://youtu.be/s6NMpOA8TUU

[공부한 내용]

최초에는 작업이 몇개 없으니까 EC2 위에다가 python 프로그램을 직접 만들었다. 크론탭을 이용해서 멀티쓰레드로 구현을 했었다. 이때 요구사항은 대부분 SQL file을 다루는 것들이었다. SQL file을 다뤄서 google 스프레드 시트에 자동화를 한다던지 그런것들이었다.

1

근데 이제 문제가 생기기 시작했다. 위에 그림과 같이 데이터팀이 세시에 데이터를 주면 서버팀이 네시에 이메일 전송을 진행하고 다섯시에는 마케팅팀이 이어서 작업을 진행한다고 하면 업무에 시간의존성이 생기게 되는데 이런 시간의존성에 대한 스케쥴링을 용이하게 할 수 있는 툴이 에어플로우이다. 에어플로우를 사용하는 결정적인 이유가 바로 시간의존성 업무 때문인 것이다.

2

그래서 에어플로우를 도입하게 되었는데 도입 초기에는 EC2위에 로컬익스큐터로해서 airflow 시스템을 구축하게 되었다. 하지만 활성화된 DAG수가 많아지게 되면 메모리 점유율이 올라가고 여러개의 DAG가 한꺼번에 여러개가 동시에 실행되었을때 성능이슈 저하 문제가 발생하게 되었다. 또한 개발계, 스테이징, 운영계를 구분하지 못하고 그냥 시스템 하나에 한꺼번에 사용했던 문제도 있었다.

3

현재는 위와 같이 ECS 아키텍처를 운영하고 있다. 서비스와 워커를 분리하여 모두 fargate로 띄우게 된다. 서비스는 항상 실행되는 거고, 워커는 작업이 다 끝나면 terminate 되도록하여 띄우게 된다.

사실 에어플로우 도큐먼트에 보면 컨테이너 환경에서 에어플로우를 띄울때 쿠버네티스 사용을 권장하고 있다.

Airflow KubernetesExecutor가 있어서 적용검토를 해봤지만 아래와 같은 문제들로 인해 보류하게 되었다.

  • 쿠버네티스에 익숙하지 않음

  • 당장 우선순위 높은 다른 업무들 때문에 혼자서 PoC부터 다 하기가 시간이 없었다.

  • 혼자 운영하는 입장에서 설정 오류와 확장을 다 잡아낸다는게 어려울 것으로 판단했다.

  • 안정화되는데 상당히 오래걸릴 것으로 예상했다.

  • 에어플로우 아키텍처는 제대로 이해하고 있지 못하고 있다고 판단했고 거기에 쿠버네티스 아키텍처는 잘 이해하고 있지 않다고 생각했다.

ECS를 채택하게 된 이유는 완전관리형 서비스이고, Docker-compose 명령어로 비교적 쉽게 작업정의하고 로컬에서 테스트하는게 용이하기 때문이다.

lambda도 생각을 했는데 CPU&memory 제약이 있고, 무엇보다도 판다스 라이브러리 쓰기가 너무 불편해서 포기했다.

4

그래서 로컬에서 테스트 중이었던 airflow의 yaml file을 서버리스 서비스를 최대한 이용하면서 마이그레이션을 하였다.

이때 boto3를 이용해서 api 자동화를 구현했다.

5

위에 그림과 같이 ECS로 마이그레이션할때 docker-compose yaml file에서 설정한 방법이 거의 흡사해서 쉬운부분이 있었다.

분석과들과 협업할때 Airflow를 아래와 같이 추상화하여 가이드하였다. (DAG 템플릿을 만들어서 가이드 했다.)

  • 언제, 무엇을, 어떻게

언제 : Airflow DAG

6

무엇을 : business logic

어떻게 : command, instance specification

7

ECSOperator라는 것을 커스텀하게 만들어서 command(실제로 뭘 실행할건지)를 지정하도록 파라미터를 받아서 가이드 하였다. 위에 그림과 같이 리눅스 로컬에서 명령어를 날리는 구조기 때문에 테스트 및 운영환경 적용이 쉽다.

아래와 같이 폴더구조와 같이 개발하도록 가이드하였다.

8

하지만 중복관리가 안되고, 일손이 오히려 더 딸리게 되었다. 이 추상화 방법은 별로 좋지 못한 방법이라고 생각했다.

가만히 분석가들이 DAG 개발하는거를 보니까 아래와 같은 Event Sourcing 패턴이다라는 것을 알게되었다.

데이터를 Core와 ad-hoc으로 추상화를 시켰는데 그 중에 이벤트 소싱이라는 개념이 있었다. 이게 뭐냐면 대부분의 DMS 서비스와 read replica 디비를 만드는 과정에서 이런 소싱방법을 많이 쓴다. 처음 스냅샷이 있고, 모든 트랜젝션를 부어넣게 되면 현재 데이터가 된다는 것이다. 데이터양이 어마어마하게 커지는 대신에 모든 데이터를 다 추적할 수 있다는 장점이 있다.

그래서 스냅샷 데이터를 하루에 한번 덤핑을 했고, CDC를 구축해서 해당 디비의 모든 히스토리를 추적하도록 했다. 그래서 예를들어서 어떤 데이터베이스가 계속 덮어쓰기가 되는데 스냅샷을 계속 뜨지 말고, 지금 스냅샷 뜬거랑 CDC와 join해서 보라고 했다. 서버팀에서 과거데이터를 굳이 생성안해도 추적이 가능한 개념이 이벤트 소싱이다.

9

위에 그림과 같이 DB 미러링을 여러방식으로 진행했다. 미러링할 수 있는 디비들은 위와 같이 화살표 명령어로 해당시점에 데이터를 s3로 전부 덤프를 떴다. 이렇게 S3로 모인 데이터는 빅쿼리 API를 이용해서 빅쿼리로 보냈다.

10

에어플로우 2.0부터 DAG 추상화 개념이 등장했다. 그래서 위에 그림과 같이 데이터를 처리하는 모든 DAG들을 디펜던시를 걸어서 한눈에 파악하도록 구현했다.

위에 DAG와 같이 Core영역에서 덤프가 완료가 되었는지 external task senser로 체크하고 이 작업에 대한 의존성을 체크해서 다음 ad-hoc 작업을 실행하도록 했다.

11

그러면 테스트를 어떻게 진행했는가. 위에서 언급했던것처럼 분석가들이 언제, 어디서, 무엇을 이라는 추상화된 개념에 대해서 특히 무엇을 부분을 분석가들이 중점적으로 작업 하는걸로 협업을 하였다.

12

그래서 분석가들과 같이 협업하는 깃헙 레포지토리의 코드리뷰 과정은 위와 같다.

HEAD에서 지내기란 무슨말이냐 git head에서 모든 패키지 버전과 모든 버전이 최신이다. git head에서 항상 최신을 유지하는데 에러가 발생하면 그때그때 해결하는 방식으로 운영했다. 실제 어플리케이션 사용자들에게 제공하는 서비스가 아니라 전사를 대상으로 하는 서비스이기 때문에 가능했다. 이슈가 있으면 feature 브랜치를 따서 해결을 하고 다시 머지해서 사용했다.

그렇다면 ECS 인프라는 어떻게 구현했나

  • ECS 컨테이너의 컴포넌트를 service와 worker의 개념으로 구분해서 추상화했다.

13

  • worker의 경우에는 프로세스가 끝나면 terminate해서 날렸다. (도커에 명시된 cmd 명령어가 끝나면 자동으로 꺼지게 설정함)

  • ECS는 각 작업정의에서 격리환경을 만들어 낼수있다.

아래와 같이 ECSOperator를 커스텀하게 만들어서 airflow DAG에서 활용할수 있도록 분석가들에게 제공했다.

14

아래와 같이 CloudWatch를 Airflow log로 만들었다.

처음에는 print로 추적하려고 하였으나 아주 간혹 ECS에 같은 시간에 대한 로깅 순서보장 이슈가 있었다.

그래서 redis를 이용해서 순서보장을 하도록 했다.

def check_redis(self) :
    """
    DATARO-3285 이슈
    ECS성공체크를 ECS_CONTAINER_METADATA_URI를 Redis 데이터베이스를 통해 기록하고
    이는 containers['runtimeId']에 동일하게 기록되어있다.
    """
    rd = redis.StrictRedis(host='redis-data-wanted', port=6379, db=2)
    # worker가 뜨게되면 작업에 대한 hash_id가 생성되는데 hash_id/timestamp형태로 조합해서 runtime_id로 정의했다.
    return rd.get(self.runtime_id) 

이렇게 하니까 runtime_id가 고유값이 만들어지게 되었는데 이거를 활용해서 에어플로우 로그로 만들었다.

그리고 private pip를 만들어서 커스텀하게 환경을 구성할 수 있도록 만들었다.

아래에 그림이 분석가들이 사용하는 컨테이너 이미지 아키텍처를 추상화 한 것이다.

pip를 프라이빗으로 만들어서 airflow 에서도 쓰고, worker에도 쓰도록 도커 file을 두개를 만들었다. 이 두개를 주축으로 하는 순수 pip 이미지를 만든 다음에 worker와 service를 분리하였다.

15

근데 이슈가 여기서 또 생기는데 pip 컨테이너 이미지에 싸이킷런이랑 코넬파이, 셀레니움, 텐서플로우 별거 다 쓸텐데 이거 컨테이너 이미지 하나에 다 깔아야 하나 라는 것이다.

16

그래서 컨테이너에서 dumb-init이라는 것을 쓰게 되었다.

  • Docker 는 CMD 설정에 여러 프로세스를 지정할 수 없다! 예를 들어서 A 프로세스 AND B 프로세스를 정의해서 동시에 실행할 경우 작업이 먼저 끝난게 있으면 컨테이너가 종료되어 버린다. 도커는 PID 1로 정의가 되기 때문에 그렇다.

  • dumb-init은 먼저 끝나는 작업이 아니라 먼저 데몬으로 실행되고 있는 작업이라고 생각하면 된다. 이 데몬 프로세스가 도커의 프로세스가 다 끝났는지 안끝났는지 체크를 한다.

  • ENTRYPOINT 에 dumb-init을 PID 1 프로세스를 설정하면 CMD 는 자식 프로세스가 되어 모든 정상 종료를 기다리게 된다. 그러면 몇개의 CMD가 들어가던지 기 CMD들이 자식 프로세스들이 되어서 이것들이 다 종료될때까지 기다리게 된다.

그러면 이 dumb-init을 어떻게 활용했냐면 동적으로 pip 패키지를 설치후에 작업을 실행해도록 했다.

그러면 공통 모듈만 셋팅해주게 되면 분석가는 필요한 requirements.txt를 정의해서 추가적으로 설치하게 하면 된다.

17

다음은 CI/CD에 대해서 얘기해보자.

Airflow의 경우 스케쥴러가 주기적으로 DAG bag을 polling하여 최신화 하고 있는 구조이다. 일반적으로는 그래서 DAG bag에 있는 DAG만 최신버전으로 바꿔주면 배포가 끝나게 된다. 하지만 ECS의 경우에는 블루그린 배포만 가능하다. 따라서 도커 노드를 새로 띄워가면서 배포를 하게 되는데 EC2 인스턴스에서 띄운 에어플로우 아키텍처와의 차이점이 여기에서 또 드러나게 된다.

따라서 서비스 배포 시간이 오래걸렸고 이거를 단축시키고 싶었다.

18

그래서 아이디어를 찾은게 위에 그림과 같이 github action이 돌때 codebuild 소스디렉토리의 $CODEBUILD_SRC_DIR 라는 환경변수를 이용해서 이거를 EFS를 마운트시켜서 바라보도록 지정했다. 이 EFS를 모든 컨테이너가 바라보게 아래 그림과 같이 설정하게 되면 굳이 컨테이너를 내렸다 다시 띄울 필요가 없이 배포를 할 수 있게 된 것이다.

참고로 서비스의 배포는 아직 수동으로 하고 있고 블루그린으로 배포하고 있다.

또 에어플로우 인프라에 대한 레포지토리, DAG에 대한 레포지토리를 따로 만들었다. 그래서 DAG들은 push가 되면 EFS에 저장시키고 스크립트가 도는 식으로 해서 빠른 배포를 하도록 했다.

19

그 다음에는 아래와 같이 에어플로우에 설치된 기본 패키지를 잘 활용하면 좋다.

20

21

테너시티는 위에 그림과 같이 함수위에 @retry만 달아주면 재실행이 된다. 리트라이에 괄호를 열어서 try, exception 설정을 해주면 예를 들어서 커넥션 에러일때 재시도를 한다던지 상황에 따라 유용하게 쓸수 있다. 참고로 위에 예시같이 설정하면 무한대로 돌게 되니까 출구조건을 잘 걸어줘야 한다.

그 다음에 airflow를 ECS로 올릴때 federman이라는 것으로 엔드포인트를 아래 그림과 같이 지정했다. 이 federman을 지정하게 되면 route 53 서비스와 연결이 되어서 내부 VPC 안에서만 조회가 가능하도록 서비스 엔드포인트를 설정할 수 있다. 내부서비스에 접근할 수 있도록 설정하는 좋은 방법중에 하나다.

22

MLOps와의 협업은 아래와 같은 아키텍처로 했다.

에어플로우를 이용해서 빅쿼리를 feature store로 저장한 다음에 데이터를 추출하게 되는데 가장 윗부분이 dev 쪽이고 중간쪽은 오토 파이프라인으로 구축해서 소스코드가 데브쪽에서 commit이 되면 코트 파이프라인이 돌아서 자동화가 된다.

23