python boto3를 이용한 EMR cluster 구동예시

2020-09-17

.

  • 참고한 URL : https://stackoverflow.com/questions/26314316/how-to-launch-and-configure-an-emr-cluster-using-boto

  • EMR 클러스터를 구동하고, pyspark job을 submit하는 EMR_jobflow를 구현해본다.

  • 다음과 같은 템플릿으로 python 코딩하여 실행한다.

def create_EMR_cluster():

    import boto3

    masterInstanceType = 'm5.xlarge'
    coreInstanceType = 'm5.xlarge'
    taskInstanceType = 'm5.xlarge'
    coreInstanceNum = 1
    taskInstanceNum = 1
    clusterName = '[cluster_name]'

    connection = boto3.client('emr',region_name='ap-northeast-2',\
                              aws_access_key_id='[access_key]',\
                              aws_secret_access_key='[secret_key]',)

    logUri = 's3://[bucket_name]/'
    releaseLabel = 'emr-5.28.1' #emr version
    instances = {
        'Ec2KeyName': '[keypair_name]',
        'Ec2SubnetId': 'subnet-xxxxxxxxxxxxx',
        'EmrManagedMasterSecurityGroup': 'sg-xxxxxxxxxxxxxxxxxxx',
        'EmrManagedSlaveSecurityGroup': 'sg-xxxxxxxxxxxxxxxxx',
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
        'InstanceGroups': [{
            'InstanceRole': 'MASTER',
            "InstanceCount": 1,
                "InstanceType": masterInstanceType,
                "Market": "SPOT",
                "Name": "Master"
            }, {
                'InstanceRole': 'CORE',
                "InstanceCount": coreInstanceNum,
                "InstanceType": coreInstanceType,
                "Market": "SPOT",
                "Name": "Core",
            }, {
                'InstanceRole': 'TASK',
                "InstanceCount": taskInstanceNum,
                "InstanceType": taskInstanceType,
                "Market": "SPOT",
                "Name": "Core",
            }
        ]
    }

    applications = [{'Name': 'Hive'},
                    {'Name' : 'Spark'}]
    
    serviceRole = 'EMR_DefaultRole'
    jobFlowRole = 'EMR_EC2_DefaultRole'
    
    tags = [{'Key': 'name', 'Value': 'emrtest'},
            {'Key': 'expiry-date', 'Value': '2020-09-15'},
            {'Key': 'owner', 'Value': 'pms'}
            ]
    
    BootstrapActions=[
        {
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': {
                'Path': 's3://[bucket_name]/maximize-spark-default-config.sh',
            }
        }
    ]
    
    Steps=[
        {
            'Name': 'spark_job_01',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                          '--deploy-mode', 'cluster',
                          '--master', 'yarn',
                          's3://[bucket_name]/spark_job_01.py']
            }
        }
    ]
    
    VisibleToAllUsers=True

    response = connection.run_job_flow(
        Name=clusterName,
        LogUri=logUri,
        ReleaseLabel=releaseLabel,
        Instances=instances,
        Applications=applications,
        ServiceRole=serviceRole,
        JobFlowRole=jobFlowRole,
        Tags=tags,
        BootstrapActions = BootstrapActions,
        Steps = Steps
    )

    return response

create_EMR_cluster()