AWS Lambda를 이용한 EMR spark batch job 자동화

2020-02-18

.

1. 참고자료

1) https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow

2) https://lamanus.kr/57

2. Lambda function 구현하기

step 1) Lambda 코드 작성 전에 ‘환경변수’ -> ‘편집’ 클릭

아래와 같이 내 계정의 access key와 secret access key 변수를 설정해준다.

2

step 2) 람다함수 구현

import json
import boto3
import datetime
import os

aws_key = os.environ['AWS_KEY']
aws_skey = os.environ['AWS_SKEY']

def lambda_handler(event, context):
    print("Creating EMR")
    session = boto3.session.Session(region_name='us-west-2') 
    emr_client = session.client('emr', aws_access_key_id = aws_key, aws_secret_access_key = aws_skey)
    
    cluster_id = emr_client.run_job_flow(
        Name='pms-emr-test', 
        LogUri='s3://lhw-s3-test/log-folder/', 
        ReleaseLabel='emr-5.28.0', 
        Applications=[
            {'Name': 'Hadoop'}, 
            {'Name': 'Hive'}, 
            {'Name': 'Spark'}],
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'Master nodes', 
                    'Market': 'SPOT', 
                    'InstanceRole': 'MASTER', 
                    'InstanceType': 'm5.xlarge', 
                    'InstanceCount': 1, 
                    'EbsConfiguration': {
                        'EbsBlockDeviceConfigs': [
                            {
                                'VolumeSpecification':{
                                    'VolumeType': 'gp2', 
                                    'SizeInGB': 400
                                },
                                'VolumesPerInstance': 2
                            },
                        ],
                        'EbsOptimized': True,
                    }
                },
                {
                    'Name': 'Slave nodes', 
                    'Market': 'SPOT', 
                    'InstanceRole': 'CORE', 
                    'InstanceType': 'm5.xlarge', 
                    'InstanceCount': 4, 
                    'EbsConfiguration': {
                        'EbsBlockDeviceConfigs': [
                            {
                                'VolumeSpecification': {
                                    'VolumeType': 'gp2',
                                    'SizeInGB': 500
                                },
                                'VolumesPerInstance': 2
                            },
                        ],
                        'EbsOptimized': True,
                    },
                    'AutoScalingPolicy': { 
                        'Constraints': {
                            'MinCapacity': 2, 
                            'MaxCapacity': 20
                        },
                        'Rules': [
                            {
                                'Name': 'Compute-scale-up', 
                                'Description': 'scale up on YARNMemory',
                                'Action': {
                                    'SimpleScalingPolicyConfiguration': {
                                        'AdjustmentType': 'CHANGE_IN_CAPACITY', 
                                        'ScalingAdjustment': 1, 
                                        'CoolDown': 300
                                    }
                                },
                                'Trigger': {
                                    'CloudWatchAlarmDefinition' :{
                                        'ComparisonOperator': 'LESS_THAN', 
                                        'EvaluationPeriods': 120, 
                                        'MetricName': 'YARNMemoryAvailablePercentage', 
                                        'Namespace': 'AWS/ElasticMapReduce', 
                                        'Period': 300, 
                                        'Statistic': 'AVERAGE', 
                                        'Threshold': 20, 
                                        'Unit': 'PERCENT'
                                    }
                                }
                            },
                            {
                                'Name': 'Compute-scale-down',
                                'Description':'scale down on YARNMemory', 
                                'Action': {
                                    'SimpleScalingPolicyConfiguration' : {
                                        'AdjustmentType': 'CHANGE_IN_CAPACITY', 
                                        'ScalingAdjustment': -1, 
                                        'CoolDown': 300
                                    }
                                },
                                'Trigger':{
                                    'CloudWatchAlarmDefinition': {
                                        'ComparisonOperator': 'LESS_THAN', 
                                        'EvaluationPeriods': 125, 
                                        'MetricName': 'YARNMemoryAvailablePercentage',
                                        'Namespace': 'AWS/ElasticMapReduce', 
                                        'Period': 250,
                                        'Statistic': 'AVERAGE', 
                                        'Threshold': 85, 
                                        'Unit': 'PERCENT'
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
            'Ec2KeyName': 'pms_oregon_key',
            'Ec2SubnetId': 'subnet-03ab1e1a1ea3165e5', 
            'EmrManagedMasterSecurityGroup': 'sg-017a3f873088b6641',
            'EmrManagedSlaveSecurityGroup':  'sg-0ebcd86afff99eece'
        },
        Steps=[
            {
                'Name': 'spark-submit',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                    ## command-runner.jar은 기본적으로 내장된 파일임. 이걸 통해서 spark-submit 단계를 추가할 수 있게 된다.
                    'Jar': 'command-runner.jar',
                    'Args': ['spark-submit',
                             '--master', 'yarn', '--deploy-mode', 'client',
                             '--class', 'main class',
                             's3://lhw-s3-test/emrtest.py'
                            ]
                }
            },
            {
                'Name': 'spark-submit2',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                         '--master', 'yarn', '--deploy-mode', 'client',
                         '--class', 'main class',
                         's3://lhw-s3-test/emrtest2.py'
                        ]
                }
            }
        ],
        AutoScalingRole='EMR_AutoScaling_DefaultRole', 
        VisibleToAllUsers=True, 
        JobFlowRole='EMR_EC2_DefaultRole', 
        ServiceRole= 'EMR_DefaultRole', 
        EbsRootVolumeSize=100, 
        Tags=[
            {
                'Key': 'NAME', 
                'Value': 'pms-emr-test',
            },
        ],
    )
    return cluster_id['JobFlowId']

3. 로컬 피시(리눅스 환경)에서 boto3를 이용한 EMR spark job 자동화 구현하기

step 1) 로컬피시에서 환경변수 액세스키와 시큐리티 액세스키를 아래 그림과 같이 설정하여 잡아주기

1

step 2) python을 이용한 EMR 구동 및 spark job 부여 코드

import os
import boto3
import json
import datetime


# aws_key = os.environ['AWS_KEY']
# aws_skey = os.environ['AWS_SKEY']
# 보안상 위와같이 환경변수를 지정해서 사용해야 함.

aws_key ='*****'
aws_skey ='*****'


## 람다에서 쓰일경우(람다핸들러) = def lambda_handler(event, context): 로 바꿔서 쓰면됨
def EMR_handler():
    print("Creating EMR")
    session = boto3.session.Session(region_name='us-west-2') 
    emr_client = session.client('emr', aws_access_key_id = aws_key, aws_secret_access_key = aws_skey)
    
    cluster_id = emr_client.run_job_flow(
        Name='pms-emr-test', 
        LogUri='s3://lhw-s3-test/log-folder/', 
        ReleaseLabel='emr-5.28.0', 
        Applications=[
            {'Name': 'Hadoop'}, 
            {'Name': 'Hive'}, 
            {'Name': 'Spark'}],
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'Master nodes', 
                    'Market': 'SPOT', 
                    'InstanceRole': 'MASTER', 
                    'InstanceType': 'm5.xlarge', 
                    'InstanceCount': 1, 
                    'EbsConfiguration': {
                        'EbsBlockDeviceConfigs': [
                            {
                                'VolumeSpecification':{
                                    'VolumeType': 'gp2', 
                                    'SizeInGB': 400
                                },
                                'VolumesPerInstance': 2
                            },
                        ],
                        'EbsOptimized': True,
                    }
                },
                {
                    'Name': 'Slave nodes', 
                    'Market': 'SPOT', 
                    'InstanceRole': 'CORE', 
                    'InstanceType': 'm5.xlarge', 
                    'InstanceCount': 4, 
                    'EbsConfiguration': {
                        'EbsBlockDeviceConfigs': [
                            {
                                'VolumeSpecification': {
                                    'VolumeType': 'gp2',
                                    'SizeInGB': 500
                                },
                                'VolumesPerInstance': 2
                            },
                        ],
                        'EbsOptimized': True,
                    },
                    'AutoScalingPolicy': { 
                        'Constraints': {
                            'MinCapacity': 2, 
                            'MaxCapacity': 20
                        },
                        'Rules': [
                            {
                                'Name': 'Compute-scale-up', 
                                'Description': 'scale up on YARNMemory',
                                'Action': {
                                    'SimpleScalingPolicyConfiguration': {
                                        'AdjustmentType': 'CHANGE_IN_CAPACITY', 
                                        'ScalingAdjustment': 1, 
                                        'CoolDown': 300
                                    }
                                },
                                'Trigger': {
                                    'CloudWatchAlarmDefinition' :{
                                        'ComparisonOperator': 'LESS_THAN', 
                                        'EvaluationPeriods': 120, 
                                        'MetricName': 'YARNMemoryAvailablePercentage', 
                                        'Namespace': 'AWS/ElasticMapReduce', 
                                        'Period': 300, 
                                        'Statistic': 'AVERAGE', 
                                        'Threshold': 20, 
                                        'Unit': 'PERCENT'
                                    }
                                }
                            },
                            {
                                'Name': 'Compute-scale-down',
                                'Description':'scale down on YARNMemory', 
                                'Action': {
                                    'SimpleScalingPolicyConfiguration' : {
                                        'AdjustmentType': 'CHANGE_IN_CAPACITY', 
                                        'ScalingAdjustment': -1, 
                                        'CoolDown': 300
                                    }
                                },
                                'Trigger':{
                                    'CloudWatchAlarmDefinition': {
                                        'ComparisonOperator': 'LESS_THAN', 
                                        'EvaluationPeriods': 125, 
                                        'MetricName': 'YARNMemoryAvailablePercentage',
                                        'Namespace': 'AWS/ElasticMapReduce', 
                                        'Period': 250,
                                        'Statistic': 'AVERAGE', 
                                        'Threshold': 85, 
                                        'Unit': 'PERCENT'
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
            'Ec2KeyName': 'pms_oregon_key',
            'Ec2SubnetId': 'subnet-03ab1e1a1ea3165e5', 
            'EmrManagedMasterSecurityGroup': 'sg-017a3f873088b6641',
            'EmrManagedSlaveSecurityGroup':  'sg-0ebcd86afff99eece'
        },
        Steps=[
            {
                'Name': 'spark-submit',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                    ## command-runner.jar은 기본적으로 내장된 파일임. 이걸 통해서 spark-submit 단계를 추가할 수 있게 된다.
                    'Jar': 'command-runner.jar',
                    'Args': ['spark-submit',
                             '--master', 'yarn', '--deploy-mode', 'client',
                             '--class', 'main class',
                             's3://lhw-s3-test/emrtest.py'
                            ]
                }
            },
            {
                'Name': 'spark-submit2',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                         '--master', 'yarn', '--deploy-mode', 'client',
                         '--class', 'main class',
                         's3://lhw-s3-test/emrtest2.py'
                        ]
                }
            }
        ],
        AutoScalingRole='EMR_AutoScaling_DefaultRole', 
        VisibleToAllUsers=True, 
        JobFlowRole='EMR_EC2_DefaultRole', 
        ServiceRole= 'EMR_DefaultRole', 
        EbsRootVolumeSize=100, 
        Tags=[
            {
                'Key': 'NAME', 
                'Value': 'pms-emr-test',
            },
        ],
    )
    return cluster_id['JobFlowId']

EMR_handler()
Creating EMR





'j-28VEZVYPPWHNT'

3. emrtest.py 코드내용

import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

df_csv = spark.read.format("csv").option("header", "true").load("s3a://lhw-s3-test/source_temp/*.csv")

udf_year = udf(lambda record:record[0:4],StringType())
udf_month = udf(lambda record:record[5:7],StringType())
udf_day = udf(lambda record:record[8:10],StringType())

new_df_csv = df_csv.withColumn('year',udf_year('event_time').cast(IntegerType()))
new_df_csv = new_df_csv.withColumn('month',udf_month('event_time').cast(IntegerType()))
new_df_csv = new_df_csv.withColumn('day',udf_day('event_time').cast(IntegerType()))

new_df_csv.write.partitionBy("year","month","day").save("s3a://lhw-s3-test/destination_temp/", format='parquet', header=True)

4. emrtest2.py 코드내용

import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

df_parquet = spark.read.format("parquet").option("header", "true").load("s3a://lhw-s3-test/destination_temp/")

df_parquet.repartitionByRange(1,'year').write.save("s3a://lhw-s3-test/destination_temp2/", format='csv', header=True)