광고 데이터 수집을 위한 데이터 파이프라인 운영 사례 - 매드업

2022-12-11

.

Data_Engineering_TIL(20221211)

AWS Community Day 2022 - Seoul 자료를 공부하고 정리한 내용입니다.

** URL : https://www.youtube.com/playlist?list=PLX2fs3661XpN1mBctkVosU5jxkusdBRxC

[세미나 정보]

  • 제목 : 광고 데이터 수집을 위한 인프라 구축

  • URL : https://youtu.be/8wIEYFzZmAs

[세미나 아젠다]

1 ) 광고 도메인 설명

2 ) 수집 : 대용량 광고 데이터 수집

3 ) 수집 : ElasticCache for Redis(stream)으로 stateless 수집

4 ) 데이터 적재 및 Redshift 튜닝

5 ) Terraform으로 ECS 인프라 구축

6 ) 기타 : DataHub & Grafana

7 ) Wrap up

[공부한 내용]

광고 도메인 설명

매드업이라는 회사는 ad tech 기반의 퍼포먼스 마케팅 회사이다. 우리가 유투브를 보게 되면 광고영상을 자연스럽게 보게 되는데 이 광고들을 매드업이라는 회사가 대행해서 내보낸다. (물론 회사마다 광고를 직접 집행하는 경우가 있다.). 매드업의 데이터 플랫폼팀은 이 광고가 클릭되었을때 전환값 지표와 사용자의 행동 패턴을 트레킹하기 위해 데이터를 수집하고 필요한 팀에 공급하는 역할을 한다. 매드업에서 연동해서 다루고 있는 광고 매체의 종류는 수십개에 이른다. 이렇게 다양한 광고 매체로 부터 여러종류의 데이터를 다운로드 받아서 활용하게 된다.

0

데이터 플랫폼팀에서 특히 관심있게 보고있는 데이터는 성과데이터이다. 광고를 집행했으면 그에 따른 성과리포트를 내려받을 수 있는데 여기에는 까다로운점이 있다. 광고주마다 성과 측정이 되는 기여기간 기준이 다르고, 각 매체가 관리하는 광고 지면들로부터 뒤늦게 데이터가 집계되는 경우가 있기 때문이다.

예를 들어서 2022년 9월 10일에 앱설치를 의미하는 install이라는 지표가 2501 이었다. 이게 시간이 흐르면서 아래 그림과 같이 동일한 엔드포인트와 파라미터로 조회했을때 점점 증가하게 된다.

1

install 이라는 단편적인 지표지만 실제로는 다운로드 되어지는 데이터의 행의 개수가 달라지기도 한다. 일반적인 매드업의 고객들은 한달정도의 리포트를 원하기 때문에 매드업의 DW에는 최소 한달치 데이터를 최신으로 유지해야하는 미션이 있는 것이다.

2

매드업 광고 데이터 파이프라인의 간소화된 아키텍처이다. 다양한 광고매체로 부터 시간당 수십만개의 csv, tsv, json file을 ECS에서 수집해서 S3로 저장하고, 저장된 정보를 elasticcache로 보내게 된다. 그런 다음에 airflow는 이들을 읽어서 redshift에 적재한다. 매체별로 사용하는 광고주들이 다양하게 존재하기 때문에 처리해야할 file의 수가 수십만개에 이르게 된다.

대용량 광고 데이터 수집

데이터 수집의 중심에는 AWS ECS 서비스가 있다. ECS는 컨테이너화된 어플리케이션의 손쉬운 배포, 관리 및 크기 조정을 지원하는 완전관리형 컨테이너 오케스트레이션 툴이다. ECS는 EC2와 Fargate에서 호스팅 될 수 있다. 매드업의 데이터 수집 부분에서는 EC2를 사용하고 있고, 데이터 분석을 위해 jupyterhub를 fargate에 호스팅해서 지원하고 있다. fargate는 서버리스이기 때문에 관리 관리 리소스가 들지 않는 장점이 있는데 비싸다는게 단점이다. 서버리스라는 단어 때문에 람다와 같이 짧은 콜드스타트를 기대할수 있지만 반드시 그렇지 않다.

3

구글이라는 광고 매체로 부터 데이터를 수집하는 과정을 알아보자. ECS는 사용자의 어플리케이션을 여러 서비스로 구성을 할 수 있는데 광고데이터 수집을 위해서 위에 그림과 같이 scheduler와 collector라는 것을 구현을 했다. 스케쥴러는 1개만 구동하고, 콜랙터는 상황에 따라 수백개까지 오토스케일링 되도록 설정을 해두었다.

4

스케쥴러는 수집해야하는 광고주의 정보가 저장되어 있는 secret manager 정보를 RDS로부터 읽어와서 secret manager로 부터 API 호출이 필요한 token 정보를 가져오게 된다. 그리고 그 정보를 elastic cache로 보내게 된다.

5

그러면 elastic cache로 보내지게 된 정보를 컬랙터들이 전부 가져가게 된다. 컬랙터는 매체에 데이터 요청 API를 호출하게 된다. 이때 한가지 이슈가 있다. 아래 그림과 같이 동시에 너무 많은 API를 호출하게 되면 429 too many requery Error가 발생한다.

6

그래서 이에 대한 대안으로 throttling 어플리케이션을 구현하여 적용하게 되었다. 스케쥴러가 RDS에서 정보를 읽어와서 secret manager를 호출하고, elastic chache로 보내게 된다. throttling 어플리케이션은 elastic chache에 저장된 정보를 읽어서 광고 매체별로 쓰로틀링을 제어하게 된다. 구글은 구글대로, 페북은 페북대로, 유툽은 유툽대로 .. 왜냐하면 광고매체별로 초당 API 호출 제한이 다르기 때문이다.

7

아래 그림과 같이 쓰로틀링에 따라서 콜랙터가 데이터를 수집을 하여 S3에 저장하게 된다. 그런 다음에 S3에 저장한 file에 대한 정보(file이 저장된 s3 경로를 포함한 정보)를 elastic chache로 보내게 된다. 여기서 쓰로틀링 처리를 스케쥴러가 해도 되는거 아닌가 라고 생각할 수도 있는데 일반적인 마이크로 서비스를 지향해서 한개의 어플레이션은 하나의 목적으로만 동작하도록 설계를 하였다. 이외에도 데이터 수집외에 많은 서비스가 이 ECS 클러스터 안에서 동작하고 있다.

8

9

10

11

12

13

ElasticCache for Redis(stream)으로 stateless 수집

앞선 시나리오와 동일하게 데이터를 수집하고 있는 상황이다. 편의상 시크릿 매니저와 쓰로틀링은 표시하지 않았다. 스케쥴러가 RDS에서 정보를 읽어서 elastic cache로 보내고 컬랙터가 수집을 시작한다. 이때 개발자가 콜랙터의 소스코드를 수정해서 깃헙에 푸쉬를 했다고 가정하자 그러면 깃헙의 워크플로우에 따라서 배포가 될 것이다. 그러면 이때 무슨일이 일어날까. 콜랙터가 갑자기 내려가게 된다. 또한 이때 만약에 컬랙터가 데이터를 수집하고 있었다면 중단이 될것이다. 그리고 새로 배포된 서비스들이 올라오게 될 것이다. 물론 ECS에서 어플리케이션이 내려가기전에 그 시그널을 받아서 셧다운을 다르게 유도할 수도 있다. 하지만 우리가 데이터를 다운로드 받을때 데이터의 크기에 따라서 길게는 몇시간씩 걸리는 데이터들도 있다. 그 몇시간 동안 시그널을 붙잡고 있기는 곤란하기 때문에 그냥 종료를 시켜버리고 새로 뜬 컨테이너에서 데이터를 새로 수집할 수 있도록 처리가 필요했다.

14

개발자들은 눈치를 보지 않고 배포를 하는 것을 지향하고 있기 때문에 하루에 몇번 배포를 해도 무너지지 않는 시스템을 만들어야 했다. 이런 시스템의 중심에 Redis stream이 있다. Stream은 redis에서 카프카를 겨냥해서 만든 데이터 타입이다. 메세지가 들어오면 해당 키를 바라보는 레디스 클라이언트 하나가 메세지를 읽어가고 처리가 끝났으면 ack와 함께 메세지를 삭제할 수 있다. 만약에 ack와 함께 삭제하는게 에러가 났다면 해당 메세지에 대해 xclaim 처리하고 다른 클라이언트가 메세지를 가져가도록 지원을 한다.

15

16

17

즉 컨테이너가 배포로 인해 중단이 되었거나 혹은 어떤 이유로 죽게되는 경우 ack를 보내지 못하면 xclaim 처리를 통해 다른 컨테이너에서 데이터를 수집할 수 있도록 시스템을 구축해놓았다. 컨테이너가 데이터를 처리중이었다는 상태를 가질 필요 없게 만들어 둔 것이다. 카프카도 마찬가지이지만 스트림을 사용하기 위해서는 어느정도 데이터 타입에 대한 이해가 필요하다.

18

스트림을 조금 더 살펴보면 스트림에 메세지가 쌓여있고, xgroup을 read로 읽어가게 되면 pending 리스트로 관리가 된다. 이때 ack를 처리하게 되면 pending 리스트에서 삭제가 된다. 한편 xdel을 통해서 스트림 자체에서 메세지 자체를 아예 삭제하는 것도 가능하다.

데이터 적재 & Redshift 튜닝

앞서 수집한 데이터를 DW에 적재하고 DW를 튜닝하는 내용이다. 계속해서 변경되는 광고 데이터를 최신으로 유지하기 위해서는 로그 데이터 처럼 무작정 append only로 하기에는 애매하다. 물론 컴퓨팅 파워를 어마어마하게 구매하고, 스토리지에 다 때려넣으면 가능은 할텐데 이게 비용과 직결되는 문제라 이렇게 하기에는 쉽지 않다. 매드업이 대응하는 광고주의 수와 광고주가 사용하는 매체의 갯수, 그리고 그 매체에서 제공하는 리포트를 모두 수집을 해야하기 때문에 매시간 수십만건의 데이터를 다운로드 받게 된다. 이때 이 수십만개의 file의 크기가 킬로바이트 부터 기가바이트까지 스펙트럼이 다양하게 있다. 매드업은 언제 어디에서 쿼리를 하던지 쿼리한 시점에 데이터가 가장 최신이기를 지향한다. 이말은 적어도 매시간 데이터를 꾸준히 다시 수집을 해서 기존의 데이터를 갈아치울 필요가 있었다.

19

현실적으로는 일반적인 디비는 사용하기 어려웠다. 그래서 DW의 서비스를 Redshift로 하게 되었다. Redshift에서 기존데이터를 더 빠르게 삭제하기 위해서 트런케이트를 고민할 수 있는데 트런케이트는 implicit commit를 수행하기 때문에 다른 쿼리와 트렌젝션으로 묶일수가 없다. 즉 트런케이트가 되어도 새로운 데이터가 쌓이기 전까지는 그 데이터를 조회할 수 없다는 문제가 있다. 그래서 기존에 저장되어 있는 대용량 데이터를 삭제하고 새로운 데이터를 넣는 방식을 선택했다. 이렇게 거대한 작업도 하나의 트렌젝션에서 가능하기 때문이다. 트렌젝션을 세분화 해서 쿼리를 날릴수록 레드시프트에는 더 큰 부하가 갈 수도 있다고 한다. 비지니스 상황에 따라서 트렌젝션을 적절하게 쪼개야 한다. 레드시프트로 카피하는 file의 형식도 선택을 해야한다. 대표적으로는 json, csv, parquet가 있다. csv와 parquet을 사용하는 경우에는 레드시프트에 정의된 컬럼과 file에 있는 키가 일치해야 카피가 된다. copy 속도는 일반적으로는 파케이가 가장 빠른 편이다. 레드시프트와 동일한 컬럼기반의 데이터 타입이기 떄문이다. file의 크기에 따라서 파케이가 제대로 성능을 못내는 경우도 있기 때문에 테스트를 해봐야 한다.

20

21

광고 매체로 부터 다운받는 데이터 형식은 사전 공지도 없이 변경되는 경우가 있다. 예를 들어서 어제는 json에 2개의 키가 있었는데 오늘은 3개가 되고, 그 다음날에는 다시 두개가 되었는데 두개의 이름이 이전과 다르다던지 하는 경우가 종종 있다. 이런경우에는 csv와 parquet에는 키 문제 때문에 copy가 안되는 경우가 있다. 그래서 매드업에서는 주로 json을 메인으로 사용한다. 물론 json 자체를 사용하는 것 보다는 압축해서 copy하는 것이 큰 효율을 내게 된다.

redshift에 copy를 하기 위해서는 먼저 menifest를 만들어줘야 한다. menifest 통해서 s3에 있는 복수개의 file을 한방에 redshift로 copy 할 수 있다.

{
    "entries": [
        {
            "mandatory": true,
            "url": "s3: //bucket-path/year=2022/month-09/day=11/file.json.bz2"
        },
        {
            "mandatory": true,
            "url": "s3://bucket-path/year=2022/month-09/day=12/file.json.bz2"
        },
        {   
            "mandatory": true,
            "url": "s3://bucket-path/year=2022/month=09/day=13/file.json.bz2"
        },
        {
            "mandatory": true,
            "Url":"s3://bucket-path/year=2022/month=09/day=14/file.json.bz2"
        },
    ]
}

위와 같이 만든 menifest file의 경로를 copy 쿼리에다가 기입을 하고, 이때 IAM 권한을 사용해도 되고, 크레덴셜을 직접 써줘도 된다. 이때 mandatory 옵션으로는 s3에 file이 없는 경우에 에러를 발생시킬지에 대한 옵션이다. 이 카피 쿼리는 airflow에서 수행을 하게 된다.

COPY public.facebook_ad_daily
    FROM 's3://bucket/path/to/manifest/facebook_ad_daily.json'
    with credentials
    'aws_access _key_id=***;aws_secret_access _key=***'
    TIMEFORMAT 'auto' MAXERROR 10 bzip2 FORMAT JSON 'auto ignorecase' manifest;

22

레드시프트 카피를 위해 사용하는 에어플로우 DAG 구조는 아래와 같다. 레드시프트로 카피 해야하는 file 리스트를 aggregate해서 쿼리를 하는 간단한 구조이다. 물론 카피할때 transaction 범위를 적절하게 잡는게 중요하다. 이런 종류의 DAG가 광고매체별로 수십개씩 존재한다. 카피쿼리 자체에 대한 부하는 레드시프트가 가져가기 때문에 에어플로우가 설치된 환경은 큰 리소스가 필요없다.

23

AWS에서 airflow를 구동할수 있는 환경은 크게 세가지가 있다.

  • EC2 위에 설치해서 운영

DAG 안에 task 간의 반응 속도가 EKS 위에서 운영하는 방식이나 MWAA 보다 훨씬 빠르다. 관리해야하는 DAG 수가 적당히 있고, task와 task 간에 반응속도에 민감하다면 EC2 위에 설치해서 사용하는 것도 나쁘지 않은 선택이다. 다만 가용성이 떨어진다는 단점이 있다.

  • EKS 위에 위에서 운영

생각보다 관리해야하는 부분도 많고, DAG의 스케쥴 주기가 짧다면 pod를 죽였다 살렸다 하는 주기도 그만큼 짧기 때문에 EKS 운영관점상 안좋을 수도 있다.

  • MWAA

Redshift 성능 튜닝

레드시프트 성능 튜닝의 가장 기본은 분산키 설정이다. 분산키 설정은 필수이다. 어떤키를 분산키로 잡을지는 비지니스 의사결정에 따라 다르기 때문에 잘 선택하면 된다.

24

래드시프트는 워크로드 매니지먼트 시스템을 통해서 슬롯이라는 개념을 제공한다. 한개의 큐에는 여러개의 슬롯이 존재하게 된다. 슬롯은 쿼리실행의 동시성을 나타낸다. 즉 슬롯 개수만큼 쿼리도 동시에 수행이 된다는 것이다. 그렇다고 쿼리 속도가 슬롯에 독립적으로 보장된다는 말은 아니다. 클러스터 전체에서 사용하는 메모리를 나누어서 사용하는 구조이기 때문이다.

큐 종류는 삭제할 수 없는 default 큐 한개와 추가로 정의할수 있는 큐 7개, 그리고 AWS 콘솔상에 나타나지 않는 superuser를 위한 큐 한개가 있다. superuser를 위한 큐는 일반적으로 포함해서 얘기하지 않는다. 그래서 WLM 큐는 8개의 큐를 갖고 있다고 얘기한다. 한개의 큐 안에는 5개의 슬롯이 존재한다.

25

26

WLM은 자동과 수동을 지원하는데 기본적으로는 자동으로 설정된다. 자동과 수동의 가장 큰 차이점은 큐의 메모리를 사용자가 직접 컨트롤 할수 있냐 없냐의 차이점이다. ETL을 위한 큐, 분석을 위한 큐, 서비스를 위한 큐로 일반적으로는 분리해서 사용한다. 이때 서비스 우선순위에 따라 메모리 크기를 할당해서 사용한다는 개념이다. 레드시프트 튜닝에서 가장 중요한거는 클러스터 노드에 있는 슬라이스에 일을 잘 분배해서 일을 시키는 것이다. 슬라이스가 고르게 일을 하고 있지 않다면 레드시프트가 제대로 된 성능을 내지 못하기 떄문에 반드시 신경써야 하는 부분이다.

테라폼으로 ECS 인프라 구축

서비스가 복잡하고, 개발, 스테이징, 운영처럼 환경이 많아지면 인프라 설정과정에서 휴먼에러가 종종 발생하게 된다. 더 난감한 점은 리소스의 형상이 추적이 안된다는 것이다. 이외에도 아래 그림과 같이 다양한 인프라적인 문제가 발생할 수 있다.

27

28

테라폼과 같이 코딩 인프라 스트럭처를 이용하면 위와 같은 문제를 해결할 수 있다. 처음에 테라폼으로 인프라를 구축할때 고민하는게 모듈 구조를 어떻게 가져갈 것이냐이다. 아래와 같이 환경과 모듈을 분리해서 하는 경우도 있다. 모듈 안에 정의되어 있는 리소스를 개발과 운영환경의 main.tf에서 임포트해서 사용한다고 보면 된다.

29

모듈 사용예시는 아래와 같다.

module "ecs_cluster" {
    source= "../modules/ecs-cluster"    # 리소스 경로 지정
    name= local. name   # 인프라 전반에서 사용되는 리소스 이름
    tags= local. tags   # 인프라 전반에서 사용되는 리소스 태그
    env= local.env  # 리소스 이름을 환경 구분하는 용도
    max_size= 5 # ASG 컨테이너 인스턴스 최대 개수
    min_size= 3 # ASG 컨테이너 인스턴스 최소 개수
    default_cooldown=60
    ...
module "ecr_scheduler" {
    source ="../modules/ecr"    # 리소스 경로 지정
    name = "scheduler/${local.branch}"  # 인프라 전반에서 사용되는 리소스 이름
    tags = local.tags   # 인프라 전반에서 사용되는 리소스 태그
}
module "ecs _service_scheduler" {
    cluster_id = module.ecs_cluster.cluster_ id
    iam_role_arn = module. ecs_cluster.iam_role_arn
    target_group_arn = module.ecs_cluster.dmp_tg_arn
    source = ".../modules/ecs-service" # 리소스 경로 지정
    name = local.name    # 인프라 전반에서 사용되는 리소스 이름
    tags = local.tags    # 인프라 전반에서 사용되는 리소스 태그
    
    ...

    task_cpu = 512
    ask_memory = 900
    max_capacity = 1    # Maximum tasks
    min_capacity = 1    # Minimum tasks
    image= format("%s:latest", module.ecr_scheduler.repo_url)   # 컨테이너 이미지 경로
}

매드업에서는 클러스터의 오토스케일링 그룹 설정에는 비용절감을 위해 스팟 인스턴스를 할당하고, weight capacity 설정을 하여 t타입과 m 타입을 사용하도록 설정했다. t 타입 인스턴스를 먼저 할당받고 이게 실패하면 m 타입 인스턴스를 할당받도록 시도할 것이다.

resource "aws_autoscaling_group" "this" {
    name= var.name
    max_size=var.max_size
    min_size=var.min_size

    ...

    mixed_instances_policy {
        instances_distribution {
            on_demand_percentage_above_base_capacity = 0
        }
        launch_template {
            override {
                instance_type = var.instance_type_weighted_1
                weighted_capacity = 1
            }
            override {    
                instance_type = var.instance_type_weighted_2
                weighted_capacity = 2
            }
        }
    }

30

인프라 배포는 테라폼 클라우드를 통해서 이루어지게 된다. 테라폼 클라우드는 5인 이하에서는 무료로 사용이 가능하다. 형상관리 도구로 PR 이 올라가게 되면 테라폼 클라우드에서 테라폼 플랜 명령이 동작해서 그 결과를 받아올 수 있다. 또한 push가 되면 테라폼 apply를 통해서 자동으로 인프라가 배포될 수 있다. 테라폼 클라우드 안에서는 테라폼 플랜과 어플라이 상태를 모두 살필수가 있다. 테라폼 status 파일을 테라폼 클라우드에서 관리하기 때문에 쉽고 안정적으로 인프라를 스텍을 배포할 수 있다.

31

위에 그림은 매드업 데이터 플랫폼팀의 데이터 수집과 주피터 허브 지원을 위한 아키텍처이다.

wrap up : 매드업의 도입 실패 사례

데이터 수집과 처리를 처음에는 EMR spark으로 하려고 했다. 시간당 수십만개에 file이 들어오게 되는데 map reduce 연산으로 이 file들을 쪼개고 합치는 시간이 실제 transform 시간 보다 지나치게 오래걸려서 이거는 포기하고 그냥 수집된 데이터를 최대한 빠르게 비지니스 목적에 맞게 DW로 적재하기 위해서 file 변형 작업은 어플리케이션에서 별도로 처리하도록 했다.

wrap up : 넥스트 스텝

인프라 코드 레벨부터 업그레이드를 진행하고 있다. 먼저 일라스텍 캐시 대신에 SQS와 MSK로 대체할 예정이다. 스트림으로 처리하던 메세지 상태를 SQS의 visibility timeout으로 대체를 하고, 수집결과 처리는 MSK 에서 하려고 한다. 그리고 EC2위에 올려서 사용하던 airflow를 MWAA로 전환중이다. DAG의 개수가 점점 늘어나고 있어서 인스턴스를 스케일업해야하는 상황까지 왔는데 이럴거면 그냥 매니지드 서비스를 사용하는게 낫다고 판단했다. 그리고 마지막으로 보다 복잡한 컨테이너 구조를 다루기 위해서 ECS에서 EKS로 전환하고 있다.

32