AWS를 이용한 실시간 데이터 파이프라인 구축 기본개념

2021-12-05

.

Data_engineering_TIL(20211204)

[학습자료]

youtube 채널 Amazon Web Services Korea “AWS 서비스를 이용하여 실시간 분석 시스템 구축하기”를 공부하고 정리한 내용입니다.

참고자료 URL : https://youtu.be/i_X-VULpXDw

[학습내용]

  • 실시간 데이터 처리가 왜 중요한가

1

석유라는 자원이 있으면 이게 1년전 석유나 지금 상태의 석유나 가치는 똑같다. 다시말해서 1년전 휘발유를 지금 자동차에 주입하면 자동차는 잘 움직인다. 반면에 시간이 지날수록 데이터의 가치는 점점 떨어지는 특징을 갖고 있다. 지금 당장 의사결정에 필요한 데이터들이 가치가 있는것이고 시간이 지날수록 지금 당장의 의사결정에 큰 도움을 주지는 못하게 된다. 예를 들어서 지금당장의 내 집주변 지도의 각병원별 코로나 잔여백신 보유 현황이 의사결정에 중요한 것이지 하루전 이틀전 그 이상의 데이터는 지금 당장 의사결정에 큰 의미가 없다는 것이다.

  • 배치 데이터와 실시간 데이터의 차이

실시간은 말그대로 빨리빨리 처리를 해야하고 액세스 빈도도 상당이 자주 있기 때문에 리스펀스도 빨리줘야 한다. 그래서 인메모리 저장소에 데이터를 저장하기 때문에 비용이 상대적으로 비싸다.

2

  • 전통적인 배치 데이터 처리 아키텍처

3

  • 람다 아키텍처

4

전통적인 배치 아키텍처에서 람다 아키텍처로 고도화를 할때 먼저 실시간으로 들어오는 작은 단위의 데이터를 버퍼링해서 저장하는 스트리밍 스토리지가 우선 필요하고, 이 스트리밍 스토리지를 어떤거를 쓰느냐에 따라서 스트리밍 스토리지에 데이터를 넣는 모듈이 필요하다. 스트리밍 스토리지에 실시간 데이터를 버퍼로 쌓아두고 어느정도 일정한 크기가 된다던가 일정한 주기가 된다면 이거를 배치 레이어로 넘겨주는 스트림 딜리버리(데이터를 배치 레이어로 옮겨주는) 모듈도 필요하다. 이렇게 구성하면 실시간 데이터가 들어오더라도 배치레이어의 변경이 없이 데이터 처리가 가능하도록 구성이 가능한 것이다. 하지만 수초 이내에 내가 원하는 결과를 만들어내서 데이터를 소비하고 싶다면 버퍼링된 스트리밍 스토리지에서 실시간 데이터를 읽어와서 처리해주는 스트림 프로세스 모듈을 또 구성해줘야 한다. 이렇게 처리한 결과를 배치 레이어에서 데이터를 처리하고 서빙레이어 쪽의 저장소에 저장한 것처럼 마찬가지로 서비스 레이어에 저장소를 구축해서 컨슈머가 여기에 쿼리를 날리거나 분석을 할 수 있도록 구성을 해줘야 한다.

  • 실시간 데이터 처리 파이프라인의 키 컴포넌트

5

  • 스트리밍 데이터를 어디에 저장할 것인가 : 스트리밍 스토리지

6

스트리밍 인제스천 같은 경우에는 스트리밍 스토리지와 디펜던시가 있기 때문에 스트리밍 스토리지를 어떤거를 쓰느냐에 따라서 스트리밍 인제스천 방식이 결정된다. AWS에서는 키네시스와 매니지드 카프카 서비스를 이용해서 스트리밍 스토리지를 구축할 수 있다. 스트리밍 스토리지의 큰 개념 측면에서 아키텍처는 아래 그림과 같다.

7

** 참고사항

1) 키네시스 샤드 = 카프카 파티션

2) 일반적인 인메모리 큐에서는 컨슈머에서 데이터를 한번 가져가게 되면 그 데이터는 삭제되어 다른 컨슈머들이 가져갈 수 없는 구조인데 카프카에서는 컨슈머에서 데이터를 읽어 갔더라도 그 데이터를 바로 삭제하는게 아니라 컨슈머 오프셋만 넥스트 데이터로 바꿔주게 된다. 그리고 사용자가 설정한 기간동안 데이터를 유지하게 된다.

  • 왜 스트리밍 스토리지가 필요한가

8

  • 그러면 AWS SQS 서비스도 있는데 이것도 스트리밍 스토리지로 사용할 수 있지 않는가

9

SQS도 스트리밍 스토리지와 마찬가지로 프로듀서와 컨슈머를 디커플링해주고, 버퍼를 관리해주고, 여러 스트림 데이터를 다 받을 수 있지만 스트리밍 스토리지와 다른점은 데이터의 순서보장이 안된다. SQS도 겉으로 보기는 큐이기는 하지만 키네시스와 카프카처럼 이것도 성능을 위해서 많은 양의 데이터가 유입이되면 오토스케일링 되는 구조이기 때문에 여러개의 큐가 생기게 된다. 그렇기 때문에 컨슈머들이 소비해갈때 순서를 보장해주지 않는다. 그런데 스텐다드 SQS 말고 FIFO SQS라는 것도 있는데 성능은 손해를 보더라도 순서보장을 해주고 오토스케일링도 된다. 그래서 SQS를 사용해야 하는데 순서를 보장해줘야 한다면 FIFO SQS를 사용하는게 좋다.

또한 내부적으로 파티션 키가 있어서 특정 파티션으로만 즉, 특정서버로만 데이터를 보내주거나 하는 형태가 아니기 때문에 parallel consumption을 구현할 수 없다. 이런경우에는 어떻게 우회해서 구현할수가 있냐면 pubsub 구조로 아키텍처 관점에서 풀수 있다. AWS SNS를 이용해서 토픽을 만들고 거기에 publisher가 데이터를 보내주면 그 SNS를 람다나 또 다른 큐에서 읽어가서 처리할 수 있도록 우회해서 구현할 수 있다.

  • 카프카와 키네시스의 구조적인 차이

10

먼저 카프카는 클러스터로 구성이 되는 구조이다. 클러스터는 메세지 프로듀서와 컨슈머, 그리고 그 사이에서 메세지를 전달해주는 여러개의 브로커 서버로 구성된다. 그리고 이 브로커들이 잘 동작하고 있는지, 브로커들 현황은 어떻게 되는지 매니징 해주는 주키퍼도 있다. 그리고 데이터는 논리적인 단위인 토픽으로 이루어진다. 토픽을 정해서 프로듀서가 데이터를 생산하면 브로커는 이 데이터를 디스크에 저장해두는데 유실이 되는 경우를 대비해서 디폴트로 세카피로 리플리카로 분산해서 저장하는게 특징이다.

반면에 키네시스는 카프카의 토픽과 대응하는 개념이 stream이라는게 있다. 카프카의 파티션과 대응하는 개념은 shard라고 한다. 하나의 스트림 안에는 여러개의 샤드로 구성되어 있고 샤드 안에 데이터가 들어가는 개념이다. 사용자가 액세스해서 볼 수 있는 것은 스트림과 샤드까지만 보고 확인할 수 있다.

  • 카프카와 키네시스의 운영관점에서 차이

11

카프카는 클러스터 단위로 구성이 되기 때문에 엔지니어는 운영을 하거나 서비스를 개발할때 아주 큰 사이즈의 클러스터를 하나 만들어서 거기에 모든 스트리밍 데이터를 처리할 것이냐 작은 규모의 클러스터를 여러개 만들어서 목적에 맞게 클러스터들을 활용할 것인지 이런 고민을 해봐야 한다. 그리고 하나의 클러스터는 몇개의 브로커를 구성할 것인지, 각 브로커마다 최대 몇개의 토픽을 가져갈 것이냐 이런 고민도 필요하다. 토픽단위로 데이터를 저장하게 되는데 이게 파일로 저장되기 때문에 물리적으로 OS 레벨 차원에서 생각해본다면 이게 데이터를 몇개의 파일로 저장할 것이냐의 문제이기 때문이다. 서버에 오픈파일 갯수나 이런 요인들 때문에 토픽이 너무 많아지게 되면 브로커의 부하가 커질수 있다. 그리고 하나의 토픽은 몇개의 파티션으로 나누어서 운영할 것인지 이런것도 고민해야 한다.

반면에 키네시스는 클러스터나 브로커 같은 개념이 아니기 때문에 스트림과 샤드의 관점에서만 고민하면 된다. 예를 들어서 카프카에서 토픽을 몇개 운영할지 고민하는 것 처럼 키네시스에서는 스트림을 몇개를 운영할 것인가 고민하면 된다. 목적에 따라서 각각 다른 스트림을 만들것이냐 아니면 스트림에다 모든 데이터를 다 때려넣을 것이냐 이런 것이다. 그리고 하나의 스트림 안에는 몇개의 샤드를 사용해서 데이터를 처리할 것인지에 대해서는 데이터를 처리할때 속도라던가 쓰르풋이라던가 이런부분에서 고민해서 결정하면 된다.

성능관점에서는 카프카는 파티션이나 샤드로 튜닝을 할수 있다. 카프카는 파티션을 늘릴 수는 있지만 줄일 수는 없다는 점을 유의해야 한다. 반면에 키네시스는 샤드를 늘리거나 줄이는 것 모두 가능하다.

  • 메트릭 모니터링 관점에서 카프카와 키네시스 비교

12

카프카는 클러스터 형태로 운영되기 때문에 클러스터를 이루고 있는 주키퍼와 브로커가 잘 동작하고 있는지 항상 모니터링 해줘야 한다. 그리고 데이터 측면에서도 데이터를 리플리카로 복제해서 저장하기 때문에 특정 토픽에 리플리카 리더는 어떤 브로커인가 이 리더는 정상적으로 선출이 이루어지고 있느냐 하는 부분에서도 모니터링을 잘 해줘야 한다. 그리고 프로듀서는 데이터 생산속도를 컨슈머가 잘 따라가고 있는지 모니터링 해줘야 한다. 만약에 프로듀서 데이터 생산속도가 컨슈밍 속도보다 빠르면 불균형이 생기기 때문에 브로커가 프로듀서가 생산하는 데이터를 저장하기 전에 대기시키는 RequestQueue의 Length(RequestQueue의 길이)와 WaitTime을 튜닝해서 지연해켜줘야 한다. 반대의 상황으로 컨슈머의 속도가 프로듀서의 생산속도보다 빠르다면 ResponseQueue의 Length와 WaitTime을 튜닝해서 지연해켜줘야 한다. 그리고 대량의 데이터를 카프카에 저장하는 경우에는 데이터 패킷사이즈가 커져서 패킷이 네트워크를 타다가 드랍하는 경우도 발생할 수 있는 경우도 잘 봐야 한다. 그리고 데이터를 디스크에 저장하기 때문에 디스크의 현황도 잘 모니터링 해줘야 한다.

13

반면에 키네시스 데이터 스트림즈는 사용자가 현황을 확인할 수 있는 것은 스트림과 샤드까지만이다. 그래서 사용자는 프로듀서의 데이터 생산속도를 컨슈머가 못따라가지는 않는지 이정도만 관리해주면 된다. 이런 관점에서 위에 그림에서 GetRecords.IteratorAgeMilliseconds라는 메트릭은 얼마나 오랫동안 키네시스 샤드안에 대기했느냐. 즉, 마지막 레코드가 읽혀지기까지 얼마나 오랫동안 있었느냐라는 시간지표를 잘 봐줘야 한다.

  • How to Ingest Streaming Data?

14

Steaming Ingestion은 Strea Storage를 어떤거를 쓰느냐에 따라서 선택의 폭이 결정되게 된다. MSK 같은 경우에는 오픈소스 카프카에서 사용하는 대부분의 라이브러리나 서드파티 툴 들을 제공해준다. Kinesis는 AWS의 SDK를 이용해서 개발이 가능하다. 예를 들어서 EC2에 키네시스 에이전트라는 것을 설치하면 fluentd처럼 로그데이터를 읽어서 키네시스로 보내줄 수 있다. 아니면 라이브러리 형태로 쓰고싶으면 키네시스 프로듀서 라이브러리라는 것을 이용해서 작은데이터들이 여러건 생성되면 이것들을 설정하는 것에 따라 한번에 이 데이터들을 모아서 키네시스로 보내서 전체적인 쓰루풋을 높여주는 기능을 구현할 수 있다. 이외에 오픈소스 라이브러리인 카프카 컨넥트라던가 fluentd 등을 사용해서 키네시스에 데이터를 넣을 수 있다.

  • How to Process Streaming Data?

1) Stream Delivery

15

데이터 소스가 스트리밍 인제스천을 이용해서 데이터를 스트리밍 스토리지에 저장하고 스트리밍 프로세스가 스트리밍 스토리지에서 데이터를 읽어서 처리한 다음에 data sink로 보내는 일련의 단계로 처리할 수도 있지만 다른 요구사항이 있을수도 있다. 예를 들어서 데이터 소스 원본을 그대로 다른 데이터 씽크 영역으로 보내주기만 했으면 좋겠다라는 요구다. 아니면 스트림 스토리지로부터 복잡한 로직처리 이런거 필요없고 그대로 읽어서 데이터 싱크 쪽으로 전송만 했으면 좋겠다 라는 요구다. 이런 요구사항에 대한 구조가 스트림 딜리버리 구조이다. 이 구조가 왜 필요하냐면 위에서도 언급했지만 데이터 소스에서 데이터 싱크 영역으로 복잡한 로직 연산없이 거의 그대로 이동만 시키고 싶은 니즈도 있기 때문이다. 이런 스트리밍 딜리버리 역할을 해주는 AWS 서비스가 kinesis data firehose이다. 그래서 예를 들어서 Data Source에서 Data Sink 영역인 S3로 데이터를 그대로 보내주고 싶다. 아니면 redshift로 데이터를 바로 보내고 싶다. 아니면 ES로 데이터를 보내고 싶다. 아니면 kinesis data analytics나 Splunk로 데이터를 그대로 보내고 싶을때 사용할 수 있다. 이때 데이터의 큰 가공이 없이 굳이 번거롭게 Streaming Process 프레임워크로 처리해서 보내고 싶지 않을 경우 유용한 옵션이다. kinesis firehose로 데이터를 보낼 수 있는 데이터 소스로는 위에 그림과 같이 kinesis agent, cloudwatch 등이 있는데 kinesis data firehose에 자체 서버로 구성되어 있기 때문에 http api를 사용해서 바로 데이터를 firehose로 보낼수도 있다.

  • Kinesis Firehose : Filter, Enrich, Convert

16

Kinesis Firehose가 데이터 소스에서 데이터 싱크로 데이터를 거의 그대로 보내줄 수 있는데 간단한 ETL 작업도 처리할 수 있다. 크게 Filter, Enrich, Convert를 할 수 있다(엄밀하게 얘기하면 kinesis firehose 자체적으로 Filter, Enrich, Convert 할수는 없고 외부에 람다를 물려서 구현이 가능한 부분이다). 특정 데이터를 필터링 할 수도 있고, 특정 데이터가 들어왔을때 다른 데이터를 추가해서 좀 더 의미있는 데이터로 만들 수도 있고, 데이터의 포맷을 예를 들어서 csv –> json 또는 다른 형태로 바꿀수도 있다. 예를 들어서 위에 그림과 같이 데이터 소스에서 아파치 로그가 생산되어 오게 되는데 이 데이터를 데이터 싱크 영역 예를 들어서 s3나 redshift에 저장하고 싶다고 하자. 여기서 우리가 하고 싶은거는 아파치 로그에서 아이피 주소만 추출해서 지역정보를 추가해서 json 형태로 데이터 싱크영역으로 보내고 싶은 것이다. 이거를 어떻게 구현하냐 geo-ip(그냥 가상의 예시 API) 같이 아이피주소를 입력으로 넣으면 지역정보를 output으로 주는 API라는 것에 람다를 붙여서 데이터가 kinesis firehose로 오게 되면 이때 firehose가 람다를 호출해서 아파치 로그를 input으로 주면 람다가 geo-ip를 또 호출해서 지역정보를 받아오고, 지역정보를 받아올수 없는 내부IP(위에 그림에서 127.0.0.1)일 경우는 드랍시켜버린다. 그런 다음에 람다가 아파치 로그 형태의 데이터 형태를 json으로 바꾼 다음에 지역정보가 없는 데이터인 경우 필터링해버린 다음에 firehose로 다시 보내고 이렇게 처리한 데이터를 데이터 싱크 영역으로 저장하는 이런 형태로 구성을 하는게 가능하다.

17

그래서 Blueprints라는 코드 예제 템플릿을 AWS에서 제공해주어서 좀더 편리하게 람다 함수를 이용해서 위에 그림과 같이 간단한 ETL 작업을 구현할 수 있다.

18

또한 데이터를 단순히 예를들어서 csv –> json으로 바꾸는 것이 아니라 json 형태의 데이터를 파케이나 ORC 와 같은 포맷으로 변경해서 저장이 가능하다. 이거는 어떻게 하냐면 데이터 소스에서 json 형태로 만약에 데이터가 들어온다고 하면 firehose에서 데이터를 받은 다음에 glue 카탈로그 스키마 정보를 받아와서 이거를 내부적으로 parquet나 ORC로 변환해서 S3에 데이터를 저장할 수 있다. 또한 parquet나 ORC로 데이터 변환중에 에러가 발생했을때 S3 영역에 별도의 실패한 데이터 공간으로 저장하도록 별다른 코딩없이 kinesis firehose를 설정하는 것도 가능하다.

19

만약에 데이터 싱크영역쪽에 문제가 발생해서 firehose to data sink 구간에서 에러가 발생한다면 s3 같은 경우에는 최대 24시간 동안 retry를 할 수 있고, ES나 Redshift는 최대 2시간까지 retry가 가능하고 실패한 것들은 다른 s3 폴더 경로나 다른 S3 영역으로 대체 저장을 해주는 기능도 있다.

2) Stream Process

20

Stream Process 영역에서 할 수 있는 처리는 크게 Transform, Aggregation, Join이 가능하다. 예를 들어서 두개의 스트림 스토리지에 있는 데이터를 합쳐서 새로운 스트리밍 데이터를 만들고 싶다던가 아니면 외부의 레퍼런스 테이블이 있고 이거를 참조를 해서 기존에 스트리밍으로 들어오는 데이터와 합쳐서 새로운 스트리밍을 만들다던가 데이터 싱크로 보낸다던가 라는 것들을 할 수 있다. 이런 처리가 가능하도록 하는 서비스가 람다, EMR(Spark 또는 Flink), Glue(Spark을 풀 매니지먼트 서비스로 사용 가능), Kinesis Data Analytics(Flink를 풀 매니지먼트 서비스로 사용 가능)가 있다.