AWS Lambda를 이용한 EMR spark batch job 자동화
2020-02-18
.
1. 참고자료
1) https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
2) https://lamanus.kr/57
2. Lambda function 구현하기
step 1) Lambda 코드 작성 전에 ‘환경변수’ -> ‘편집’ 클릭
아래와 같이 내 계정의 access key와 secret access key 변수를 설정해준다.
step 2) 람다함수 구현
import json
import boto3
import datetime
import os
aws_key = os.environ['AWS_KEY']
aws_skey = os.environ['AWS_SKEY']
def lambda_handler(event, context):
print("Creating EMR")
session = boto3.session.Session(region_name='us-west-2')
emr_client = session.client('emr', aws_access_key_id = aws_key, aws_secret_access_key = aws_skey)
cluster_id = emr_client.run_job_flow(
Name='pms-emr-test',
LogUri='s3://lhw-s3-test/log-folder/',
ReleaseLabel='emr-5.28.0',
Applications=[
{'Name': 'Hadoop'},
{'Name': 'Hive'},
{'Name': 'Spark'}],
Instances={
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification':{
'VolumeType': 'gp2',
'SizeInGB': 400
},
'VolumesPerInstance': 2
},
],
'EbsOptimized': True,
}
},
{
'Name': 'Slave nodes',
'Market': 'SPOT',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 4,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 500
},
'VolumesPerInstance': 2
},
],
'EbsOptimized': True,
},
'AutoScalingPolicy': {
'Constraints': {
'MinCapacity': 2,
'MaxCapacity': 20
},
'Rules': [
{
'Name': 'Compute-scale-up',
'Description': 'scale up on YARNMemory',
'Action': {
'SimpleScalingPolicyConfiguration': {
'AdjustmentType': 'CHANGE_IN_CAPACITY',
'ScalingAdjustment': 1,
'CoolDown': 300
}
},
'Trigger': {
'CloudWatchAlarmDefinition' :{
'ComparisonOperator': 'LESS_THAN',
'EvaluationPeriods': 120,
'MetricName': 'YARNMemoryAvailablePercentage',
'Namespace': 'AWS/ElasticMapReduce',
'Period': 300,
'Statistic': 'AVERAGE',
'Threshold': 20,
'Unit': 'PERCENT'
}
}
},
{
'Name': 'Compute-scale-down',
'Description':'scale down on YARNMemory',
'Action': {
'SimpleScalingPolicyConfiguration' : {
'AdjustmentType': 'CHANGE_IN_CAPACITY',
'ScalingAdjustment': -1,
'CoolDown': 300
}
},
'Trigger':{
'CloudWatchAlarmDefinition': {
'ComparisonOperator': 'LESS_THAN',
'EvaluationPeriods': 125,
'MetricName': 'YARNMemoryAvailablePercentage',
'Namespace': 'AWS/ElasticMapReduce',
'Period': 250,
'Statistic': 'AVERAGE',
'Threshold': 85,
'Unit': 'PERCENT'
}
}
}
]
}
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': 'pms_oregon_key',
'Ec2SubnetId': 'subnet-03ab1e1a1ea3165e5',
'EmrManagedMasterSecurityGroup': 'sg-017a3f873088b6641',
'EmrManagedSlaveSecurityGroup': 'sg-0ebcd86afff99eece'
},
Steps=[
{
'Name': 'spark-submit',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
## command-runner.jar은 기본적으로 내장된 파일임. 이걸 통해서 spark-submit 단계를 추가할 수 있게 된다.
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--master', 'yarn', '--deploy-mode', 'client',
'--class', 'main class',
's3://lhw-s3-test/emrtest.py'
]
}
},
{
'Name': 'spark-submit2',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--master', 'yarn', '--deploy-mode', 'client',
'--class', 'main class',
's3://lhw-s3-test/emrtest2.py'
]
}
}
],
AutoScalingRole='EMR_AutoScaling_DefaultRole',
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole= 'EMR_DefaultRole',
EbsRootVolumeSize=100,
Tags=[
{
'Key': 'NAME',
'Value': 'pms-emr-test',
},
],
)
return cluster_id['JobFlowId']
3. 로컬 피시(리눅스 환경)에서 boto3를 이용한 EMR spark job 자동화 구현하기
step 1) 로컬피시에서 환경변수 액세스키와 시큐리티 액세스키를 아래 그림과 같이 설정하여 잡아주기
step 2) python을 이용한 EMR 구동 및 spark job 부여 코드
import os
import boto3
import json
import datetime
# aws_key = os.environ['AWS_KEY']
# aws_skey = os.environ['AWS_SKEY']
# 보안상 위와같이 환경변수를 지정해서 사용해야 함.
aws_key ='*****'
aws_skey ='*****'
## 람다에서 쓰일경우(람다핸들러) = def lambda_handler(event, context): 로 바꿔서 쓰면됨
def EMR_handler():
print("Creating EMR")
session = boto3.session.Session(region_name='us-west-2')
emr_client = session.client('emr', aws_access_key_id = aws_key, aws_secret_access_key = aws_skey)
cluster_id = emr_client.run_job_flow(
Name='pms-emr-test',
LogUri='s3://lhw-s3-test/log-folder/',
ReleaseLabel='emr-5.28.0',
Applications=[
{'Name': 'Hadoop'},
{'Name': 'Hive'},
{'Name': 'Spark'}],
Instances={
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification':{
'VolumeType': 'gp2',
'SizeInGB': 400
},
'VolumesPerInstance': 2
},
],
'EbsOptimized': True,
}
},
{
'Name': 'Slave nodes',
'Market': 'SPOT',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 4,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 500
},
'VolumesPerInstance': 2
},
],
'EbsOptimized': True,
},
'AutoScalingPolicy': {
'Constraints': {
'MinCapacity': 2,
'MaxCapacity': 20
},
'Rules': [
{
'Name': 'Compute-scale-up',
'Description': 'scale up on YARNMemory',
'Action': {
'SimpleScalingPolicyConfiguration': {
'AdjustmentType': 'CHANGE_IN_CAPACITY',
'ScalingAdjustment': 1,
'CoolDown': 300
}
},
'Trigger': {
'CloudWatchAlarmDefinition' :{
'ComparisonOperator': 'LESS_THAN',
'EvaluationPeriods': 120,
'MetricName': 'YARNMemoryAvailablePercentage',
'Namespace': 'AWS/ElasticMapReduce',
'Period': 300,
'Statistic': 'AVERAGE',
'Threshold': 20,
'Unit': 'PERCENT'
}
}
},
{
'Name': 'Compute-scale-down',
'Description':'scale down on YARNMemory',
'Action': {
'SimpleScalingPolicyConfiguration' : {
'AdjustmentType': 'CHANGE_IN_CAPACITY',
'ScalingAdjustment': -1,
'CoolDown': 300
}
},
'Trigger':{
'CloudWatchAlarmDefinition': {
'ComparisonOperator': 'LESS_THAN',
'EvaluationPeriods': 125,
'MetricName': 'YARNMemoryAvailablePercentage',
'Namespace': 'AWS/ElasticMapReduce',
'Period': 250,
'Statistic': 'AVERAGE',
'Threshold': 85,
'Unit': 'PERCENT'
}
}
}
]
}
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': 'pms_oregon_key',
'Ec2SubnetId': 'subnet-03ab1e1a1ea3165e5',
'EmrManagedMasterSecurityGroup': 'sg-017a3f873088b6641',
'EmrManagedSlaveSecurityGroup': 'sg-0ebcd86afff99eece'
},
Steps=[
{
'Name': 'spark-submit',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
## command-runner.jar은 기본적으로 내장된 파일임. 이걸 통해서 spark-submit 단계를 추가할 수 있게 된다.
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--master', 'yarn', '--deploy-mode', 'client',
'--class', 'main class',
's3://lhw-s3-test/emrtest.py'
]
}
},
{
'Name': 'spark-submit2',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--master', 'yarn', '--deploy-mode', 'client',
'--class', 'main class',
's3://lhw-s3-test/emrtest2.py'
]
}
}
],
AutoScalingRole='EMR_AutoScaling_DefaultRole',
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole= 'EMR_DefaultRole',
EbsRootVolumeSize=100,
Tags=[
{
'Key': 'NAME',
'Value': 'pms-emr-test',
},
],
)
return cluster_id['JobFlowId']
EMR_handler()
Creating EMR
'j-28VEZVYPPWHNT'
3. emrtest.py 코드내용
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
df_csv = spark.read.format("csv").option("header", "true").load("s3a://lhw-s3-test/source_temp/*.csv")
udf_year = udf(lambda record:record[0:4],StringType())
udf_month = udf(lambda record:record[5:7],StringType())
udf_day = udf(lambda record:record[8:10],StringType())
new_df_csv = df_csv.withColumn('year',udf_year('event_time').cast(IntegerType()))
new_df_csv = new_df_csv.withColumn('month',udf_month('event_time').cast(IntegerType()))
new_df_csv = new_df_csv.withColumn('day',udf_day('event_time').cast(IntegerType()))
new_df_csv.write.partitionBy("year","month","day").save("s3a://lhw-s3-test/destination_temp/", format='parquet', header=True)
4. emrtest2.py 코드내용
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
df_parquet = spark.read.format("parquet").option("header", "true").load("s3a://lhw-s3-test/destination_temp/")
df_parquet.repartitionByRange(1,'year').write.save("s3a://lhw-s3-test/destination_temp2/", format='csv', header=True)