Class101 사례 - AWS EMR과 Airflow를 이용한 Batch Data Processing 요약정리
.
Data_Engineering_TIL(20210605)
[학습자료]
‘AWS EMR과 Airflow를 이용한 Batch Data Processing (Class101 데이터팀에서 Spark를 이용해 빅데이터를 ETL하는 법)’ 블로그글을 공부하고 정리한 내용입니다.
URL : https://medium.com/class101-dev/aws-emr%EA%B3%BC-airflow%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-batch-data-processing-a100fc2f4f10
[학습내용]
- Class 101의 Batch data processing을 구성하는 인프라
1) 분산처리 프레임워크 - EMR
대용량 데이터를 빠르게 처리하기 위해 사용하는 맵리듀스 프레임워크
하둡대비하여 개발공수측면에서 간편
인메모리 처리로 빠른 처리성능
스트리밍 처리 인프라도 비교적 쉽게 셋팅가능
2) 데이터 레이크 - S3
방대한 데이터를 디스크에 I/O 제한 없이 여러서버에서 동시에 데이터를 읽고 쓸 수 있는 시스템
Raw 데이터, 단계별 정제 데이터, 스냅샷 등을 적재하기 위한 공간
3) workflow 관리시스템 - Airflow
여러개의 task를 연결해서 정해진 시간마다 workflow를 실행하는 경우 적용
ex) data ETL 작업, 머신러닝 작업
(Preprocessing from rawdata –> ML training from training data –> ML Prediction from Tranined model –> Prediction result)
에러가 났을때 재처리를 하거나, 수행결과에 따라 분기를 할 수 있다는 것이 장점임
4) 데이터 웨어하우스 - Redshift
데이터기반 의사결정을 돕기위한 데이터 저장&분석 플랫폼
데이터 품질보장
집계 / 통합테이블 (데이터 마트) 적재
ex) Redshift, BigQuery
- Class 101의 Batch data ETL 파이프라인 아키텍처
- Class 101의 Batch data ETL 파이프라인 구성
1) airflow - workflow scheduling
2) EMR (spark) - Batch processing
master node : cluster resource manager, spark application master가 위치하는 노드
core node : HDFS가 구성된 노드, 직접 job을 수행하는 executor들을 launch해서 일을하는 노드
task node : 직접 job을 수행하는 executor들을 launch해서 일을하는 노드
3) Glue - Hive Metastore
4) S3 - Parquet file storage
5) Redshift - Data Warehouse
- Class 101의 EMR 활용방안
spark job 마다 클러스터 하나 (Transient EMR 운영방식) ? or 모든 spark job을 위한 클러스터 하나(Permanent EMR 운영방식) ?
–> ‘모든 spark job을 위한 클러스터 하나’ 방식을 채택함
이를 가능하게 하는게 Yarn resource manager에서 할 수 있는 ‘Dynamic resource allocation’ 옵션이다.
- spark submit processing
step 1) 이미 실행되고 있는 AWS EMR cluster 마스터 노드에 SSH 접속
step 2) 직접 shell에서 spark-submit커맨드를 실행
** 이때 실행될 커맨드에 parameter로 PySpark job 이름과 그 job의 application code가 들어있는 Docker Image를 같이 제공
step 3) PySpark job이 완료되면 Airflow Task역시 완료되며 종료
- 참고사항
블로그글 일부 발췌내용
” PySpark job은 Python dependency들을 내재하고 있기 때문에 Spark 클러스터에 Python dependency들을 미리 설치해 Python환경을 조성해주어야 한다. 만약 PySpark Job들이 서로 다른 Python library들을 사용한다면 같은 머신 (클러스터)에서 Job들을 실행할 수 없을 것이다. 이를 해결해주기 위해 Amazon EMR이 6.0.0 release에서 Docker Image Support를 발표했다 (참조). 이제 각 PySpark Job들이 같은 클러스터이지만 서로 다른 Docker 컨테이너 위에 실행될 수 있다!”
참고자료 : https://aws.amazon.com/ko/blogs/big-data/run-spark-applications-with-docker-using-amazon-emr-6-0-0-beta/
- Class101에서 사용하는 PySpark Codebase 구축과정 요약
1) 준비물 : pyspark-etl repo, my-airflow repo
2) process
step 1) pyspark method를 pyspark-etl 코드베이스에 정의한 후 CLI로 등록한다. (python의 click 라이브러리라는 것을 이용함)
step 2) CI/CD 파이프라인을 통해 pyspark-etl docker image를 push한다.
step 3) airflow 코드베이스에 DAG task를 정의한다. 이때 파라미터 넣어줄 변수들도 정의한다.
파라미터로 넣어줄 변수들 : pyspark-etl docker image 이름과 tag, CLI로 등록된 커맨드 이름과 거기에 필요한 파라미터들
step 4) 위와 같이 진행하면 airflow dag의 스케쥴에 맞춰 pyspark job이 실행된다.
- Class101에서 사용하는 PySpark Codebase
Airflow Repo에는 PySpark Job들을 Scheduling 해줄 코드만이 존재할 뿐 PySpark코드를 일체 포함하지 않음. PySpark 애플리케이션 코드들은 다른 Repo에 구성되어있다.
아래는 그 Repo의 디렉토리 구성임
pyspark-etl/
│ README.md
│ task_runner.py
│ Dockerfile
│
└───etl_codes/
│ │
│ │ users_transform.py
│ │ marketing_transform.py
│
│
└───cli_etl_codes/
│ cli_users_transform.py
│ cli_marketing_transform.py
위의 리포에 정의한 pyspark 코드를 EMR에서 실행시키려면 정의한 method를 CLI커맨드 형태로 실행시키도록 하였는데 이를 위해 Python의 Click이라는 라이브러리를 활용하였다. Pyspark를 코드를 CLI로 정의하는 방법을 users_transform.py를 참고한다.
[소스코드 구현 과정]
step 1) pyspark-etl/etl_codes/cli_users_transform.py (메소드 정의) 작성
def users_transform(
spark: SparkSession,
dest_db_prefix: str,
dest_table_name: str,
execution_datetime_tag: Union[date, datetime],
) -> None:
"""
1. Read "raw_users" and "raw_user_infos" tables
2. preprocess "raw_users"
3. preprocess "raw_user_infos"
4. join on "user_id"
5. write "users"
"""
# some pyspark codes
s3_path = os.path.join(
S3_ROOT_PATH,
f"{dest_db_prefix}_{db_suffix}",
f"{dest_table_name}",
f"{execution_datetime_tag.strftime('%Y_%m_%d_%H_%M_%S')}",
)
df.repartition(partition_num).write.mode("overwrite").option(
"path", s3_path
).saveAsTable(f"{dest_db_prefix}_{db_suffix}.{dest_table_name}")
step 2) pyspark-etl/cli_etl_codes/cli_users_transform.py(Cli command 정의) 작성
@click.command(name="users_transform")
@click.option("--dest_db_prefix", type=click.STRING, required=True)
@click.option("--dest_table_name", type=click.STRING, required=True)
@click.option("--airflow_execution_datetime_tag",type=click_utils.DateTimeParamType(),required=True,help="Airflow Execution Datetime",)
def cli_users_transform(dest_db_prefix: str,dest_table_name: str,airflow_execution_datetime_tag: Union[date, datetime],) -> None:
"""
Join raw lectures and raw steps table, transform, write lectures
"""
with spark_session(app_name="users_transform") as spark:
users_transform(spark=spark,dest_db_prefix=dest_db_prefix,dest_table_name=dest_table_name,execution_datetime_tag=airflow_execution_datetime_tag,)
step 3) pyspark-etl/task_runner.py의 run_task라는 main method에 위에 형성한 cli를 더해줌
from cli_etl_codes import cli_users_transform
@click.group()
def run_task():
"""Simple group task that will group all sub-tasks together."""
pass
# pyspark jobs
run_task.add_command(cli_users_transform)
step 4) Docker Image를 build하여 push한다.
그러면 EMR master 노드 shell에서 아래와 같은 커맨드를 실행이 가능하다.
spark-submit
--master yarn
--deploy-mode cluster
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=my_imgage/pyspark-etl:latest
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=my_imgage/pyspark-etl:latest
--executor-memory 3G
--executor-cores 1
--driver-memory 2G
--driver-cores 1
run_task.py users_transform --dest_db_prefix='etl_production'
--dest_table_name='users' --execution_datetime_tag='2020-07-19T00:00:00+00:00'
step 5) Airflow에서 위에 PySpark job을 Airflow Task로 정의한다.
Class101 데이터팀이 Airflow에서 EMR로 PySpark job을 submit하는 방법 : EMRSSHOperator 라는 커스텀한 Operator를 만들어서 사용함
dag = DAG(
"transform-dag",
schedule_interval="30 15 * * *", # every 00:30 AM KST
catchup=False,
max_active_runs=1,
default_args={
**PYSPARK_ETL_DEFAULT_ARGS,
"start_date": datetime(2020, 8, 6),
},
)
users_transform = EMRSSHOperator(
dag=dag,
task_id="users_transform",
image="my_imgage/pyspark-etl:latest",
spark_program_args=shell_command_args(
"users_transform",
dest_db_prefix="etl",
dest_table_name="users",
execution_datetime_tag="",
),
)
- EMRSSHOperator
class EMRSSHOperator(ssh_operator.SSHOperator):
"""
Operator that inherits SSHOperator to run spark-submit command on remote EMR cluster
"""
@decorators.apply_defaults
def __init__(
self,
spark_program_args: str,
driver_cores: str = "1",
driver_memory: str = "2G",
executor_cores: str = "1",
executor_memory: str = "3G",
spark_confs: list = [],
extra_spark_conf_overrides: typing.Optional[dict] = None,
image: str = None,
*args: typing.Any,
**kwargs: typing.Any,
) -> None:
self.driver_cores = driver_cores
self.driver_memory = driver_memory
self.executor_cores = executor_cores
self.executor_memory = executor_memory
self.image = image
self.spark_program_args = spark_program_args
super().__init__(
ssh_conn_id=f"my_emr_conn_ssh", command="", *args, **kwargs,
)
def execute(self, context: typing.Dict[str, typing.Any]) -> None:
self.command = f"spark-submit \\
--master yarn \\
--deploy-mode cluster \\
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={self.image} \\
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={self.image} \\
--executor-memory {self.executor_memory} \\
--executor-cores {self.executor_cores} \\
--driver-memory {self.driver_memory} \\
--driver-cores {self.driver_cores} \\
{self.spark_confs} \\
run_task.py \\
{self.spark_program_args}"
super().execute(context)
self.cleanup()
- 위에 pyspark-etl/etl_codes/cli_users_transform.py에서 users라는 ETL 결과 테이블을 S3에 write하였는데 이 테이블을 어떻게 SQL을 통해 쿼리할 수 있을까?
s3_path = os.path.join(
"s3://my-data-lake",
f"{dest_db_prefix}_{db_suffix}",
f"{dest_table_name}",
f"{execution_datetime_tag.strftime('%Y_%m_%d_%H_%M_%S')}",
)
df.repartition(partition_num).write.mode("overwrite").option(
"path", s3_path
).saveAsTable(f"{dest_db_prefix}_{db_suffix}.{dest_table_name}")
users라는 테이블을 s3://my-data-lake/etl_production/users/2020-08_06_00_00_00/ 에 parquet형태로 write한다. 그런다음에 Redshift/Athena에서 쿼리가 가능하도록 이 테이블에 대한 metadata를 Glue 카탈로그에 싱크하고 테이블을 etl_production이라는 external schema에 등록한다. 이는 etl_production이라는 external schema가 Redshift에 존재하고 EMR클러스터가 hive metastore로 Glue Catalog를 사용하다면 Pyspark의 saveAsTableoperation에서 자동으로 이루어진다. 그러면 이 테이블은 Redshift에서 etl_production.users의 형태로 존재하게 되어 SQL통하여 쿼리가 가능해진다.