EMR pyspark에서 custom module 사용예시

2020-12-01

.

Data_Engineering_TIL(20201201)

STEP 1) 나만의 custom한 python module을 개발한다.

  • my_python_lib.py
import boto3

def read_spark_conf(bucketname, location):
    s3 = boto3.resource('s3')
    obj = s3.Object(bucketname, location)
    spark_conf = obj.get()['Body'].read().decode("utf-8")
    spark_conf = spark_conf.replace('\n','').replace('\r','').replace('     ','').replace('    ','').replace('   ','').replace('  ','')   
    return spark_conf

STEP 2) 개발한 모듈을 s3://pms-bucket-test/mylib/my_python_lib.py 경로로 저장해준다.

STEP 3) EMR을 콘솔에서 생성하는데 ‘Edit software settings’에서 ‘Enter configuration’으로 아래와 같이 설정해서 생성한다.

[{"classification":"spark-defaults","properties":{"spark.minman.lib":"s3://pms-bucket-test/mylib/my_python_lib.py"}}]

STEP 4) 생성한 EMR master node에 SSH 접속한다.

STEP 5) 테스트할 pyspark script를 아래와 같이 생성한다.

  • test.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("minmantest").getOrCreate()
spark.sparkContext.addPyFile(spark.conf.get("spark.minman.lib"))
from my_python_lib import read_spark_conf

spark_conf = read_spark_conf('pms-bucket-test', 'my_custom_conf.json')
print(spark_conf)

STEP 6) spark-submit test.py 명령어를 실행한다.

실행결과는 아래와 같다.

[hadoop@ip-10-0-1-182 ~]$ spark-submit test.py
20/12/01 04:49:20 INFO SparkContext: Running Spark version 3.0.1-amzn-0
20/12/01 04:49:20 INFO ResourceUtils: ==============================================================
20/12/01 04:49:20 INFO ResourceUtils: Resources for spark.driver:

20/12/01 04:49:20 INFO ResourceUtils: ==============================================================
20/12/01 04:49:20 INFO SparkContext: Submitted application: minmantest
20/12/01 04:49:20 INFO SecurityManager: Changing view acls to: hadoop
20/12/01 04:49:20 INFO SecurityManager: Changing modify acls to: hadoop
20/12/01 04:49:20 INFO SecurityManager: Changing view acls groups to:
20/12/01 04:49:20 INFO SecurityManager: Changing modify acls groups to:
20/12/01 04:49:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
20/12/01 04:49:21 INFO Utils: Successfully started service 'sparkDriver' on port 40771.
20/12/01 04:49:21 INFO SparkEnv: Registering MapOutputTracker

            
            ...
            
            
20/12/01 04:49:30 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@365bbcbc{/SQL/execution/json,null,AVAILABLE,@Spark}
20/12/01 04:49:30 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
20/12/01 04:49:30 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2cb8b564{/static/sql,null,AVAILABLE,@Spark}
20/12/01 04:49:31 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
20/12/01 04:49:32 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 4096, script: , vendor: , cores -> name: cores, amount: 5, script: , vendor: , memory -> name: memory, amount: 32768, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
20/12/01 04:49:32 INFO SparkContext: Added file s3://pms-bucket-test/mylib/my_python_lib.py at s3://pms-bucket-test/mylib/my_python_lib.py with timestamp 1606798172505
20/12/01 04:49:32 INFO S3NativeFileSystem: Opening 's3://pms-bucket-test/mylib/my_python_lib.py' for reading
20/12/01 04:49:32 INFO Utils: Fetching s3://pms-bucket-test/mylib/my_python_lib.py to /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05/userFiles-9c232207-ef87-4eab-812d-a429debbadce/fetchFileTemp2628366074051225520.tmp
...
20/12/01 04:49:32 INFO SparkContext: Invoking stop() from shutdown hook
20/12/01 04:49:32 INFO AbstractConnector: Stopped Spark@66f56f6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/12/01 04:49:32 INFO SparkUI: Stopped Spark web UI at http://ip-10-0-1-182.ap-northeast-2.compute.internal:4040
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: Shutting down all executors
20/12/01 04:49:32 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
20/12/01 04:49:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/01 04:49:32 INFO MemoryStore: MemoryStore cleared
20/12/01 04:49:32 INFO BlockManager: BlockManager stopped
20/12/01 04:49:33 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/01 04:49:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/01 04:49:33 INFO SparkContext: Successfully stopped SparkContext
20/12/01 04:49:33 INFO ShutdownHookManager: Shutdown hook called
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05/pyspark-5b80cb66-ae5a-4e4b-b270-d0c51fdfbcf4
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-89a4e755-e99f-45ef-9237-40a05bf1627c
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05
 Concurrent marking:
      0   init marks: total time =     0.00 s (avg =     0.00 ms).
      3      remarks: total time =     0.01 s (avg =     4.69 ms).
           [std. dev =     1.04 ms, max =     5.87 ms]
         3  final marks: total time =     0.00 s (avg =     0.52 ms).
              [std. dev =     0.22 ms, max =     0.83 ms]
         3    weak refs: total time =     0.01 s (avg =     4.17 ms).
              [std. dev =     1.04 ms, max =     5.50 ms]
      3     cleanups: total time =     0.00 s (avg =     1.00 ms).
           [std. dev =     0.04 ms, max =     1.03 ms]
    Final counting total time =     0.00 s (avg =     0.23 ms).
    RS scrub total time =     0.00 s (avg =     0.30 ms).
  Total stop_world time =     0.02 s.
  Total concurrent time =     0.07 s (    0.00 s marking).