EMR pyspark에서 custom module 사용예시
2020-12-01
.
Data_Engineering_TIL(20201201)
STEP 1) 나만의 custom한 python module을 개발한다.
- my_python_lib.py
import boto3
def read_spark_conf(bucketname, location):
s3 = boto3.resource('s3')
obj = s3.Object(bucketname, location)
spark_conf = obj.get()['Body'].read().decode("utf-8")
spark_conf = spark_conf.replace('\n','').replace('\r','').replace(' ','').replace(' ','').replace(' ','').replace(' ','')
return spark_conf
STEP 2) 개발한 모듈을 s3://pms-bucket-test/mylib/my_python_lib.py 경로로 저장해준다.
STEP 3) EMR을 콘솔에서 생성하는데 ‘Edit software settings’에서 ‘Enter configuration’으로 아래와 같이 설정해서 생성한다.
[{"classification":"spark-defaults","properties":{"spark.minman.lib":"s3://pms-bucket-test/mylib/my_python_lib.py"}}]
STEP 4) 생성한 EMR master node에 SSH 접속한다.
STEP 5) 테스트할 pyspark script를 아래와 같이 생성한다.
- test.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("minmantest").getOrCreate()
spark.sparkContext.addPyFile(spark.conf.get("spark.minman.lib"))
from my_python_lib import read_spark_conf
spark_conf = read_spark_conf('pms-bucket-test', 'my_custom_conf.json')
print(spark_conf)
STEP 6) spark-submit test.py
명령어를 실행한다.
실행결과는 아래와 같다.
[hadoop@ip-10-0-1-182 ~]$ spark-submit test.py
20/12/01 04:49:20 INFO SparkContext: Running Spark version 3.0.1-amzn-0
20/12/01 04:49:20 INFO ResourceUtils: ==============================================================
20/12/01 04:49:20 INFO ResourceUtils: Resources for spark.driver:
20/12/01 04:49:20 INFO ResourceUtils: ==============================================================
20/12/01 04:49:20 INFO SparkContext: Submitted application: minmantest
20/12/01 04:49:20 INFO SecurityManager: Changing view acls to: hadoop
20/12/01 04:49:20 INFO SecurityManager: Changing modify acls to: hadoop
20/12/01 04:49:20 INFO SecurityManager: Changing view acls groups to:
20/12/01 04:49:20 INFO SecurityManager: Changing modify acls groups to:
20/12/01 04:49:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
20/12/01 04:49:21 INFO Utils: Successfully started service 'sparkDriver' on port 40771.
20/12/01 04:49:21 INFO SparkEnv: Registering MapOutputTracker
...
20/12/01 04:49:30 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@365bbcbc{/SQL/execution/json,null,AVAILABLE,@Spark}
20/12/01 04:49:30 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
20/12/01 04:49:30 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2cb8b564{/static/sql,null,AVAILABLE,@Spark}
20/12/01 04:49:31 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
20/12/01 04:49:32 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 4096, script: , vendor: , cores -> name: cores, amount: 5, script: , vendor: , memory -> name: memory, amount: 32768, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
20/12/01 04:49:32 INFO SparkContext: Added file s3://pms-bucket-test/mylib/my_python_lib.py at s3://pms-bucket-test/mylib/my_python_lib.py with timestamp 1606798172505
20/12/01 04:49:32 INFO S3NativeFileSystem: Opening 's3://pms-bucket-test/mylib/my_python_lib.py' for reading
20/12/01 04:49:32 INFO Utils: Fetching s3://pms-bucket-test/mylib/my_python_lib.py to /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05/userFiles-9c232207-ef87-4eab-812d-a429debbadce/fetchFileTemp2628366074051225520.tmp
...
20/12/01 04:49:32 INFO SparkContext: Invoking stop() from shutdown hook
20/12/01 04:49:32 INFO AbstractConnector: Stopped Spark@66f56f6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/12/01 04:49:32 INFO SparkUI: Stopped Spark web UI at http://ip-10-0-1-182.ap-northeast-2.compute.internal:4040
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: Shutting down all executors
20/12/01 04:49:32 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/12/01 04:49:32 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
20/12/01 04:49:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/01 04:49:32 INFO MemoryStore: MemoryStore cleared
20/12/01 04:49:32 INFO BlockManager: BlockManager stopped
20/12/01 04:49:33 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/01 04:49:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/01 04:49:33 INFO SparkContext: Successfully stopped SparkContext
20/12/01 04:49:33 INFO ShutdownHookManager: Shutdown hook called
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05/pyspark-5b80cb66-ae5a-4e4b-b270-d0c51fdfbcf4
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-89a4e755-e99f-45ef-9237-40a05bf1627c
20/12/01 04:49:33 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2729b2b7-74fb-4df6-807b-2574cfea4d05
Concurrent marking:
0 init marks: total time = 0.00 s (avg = 0.00 ms).
3 remarks: total time = 0.01 s (avg = 4.69 ms).
[std. dev = 1.04 ms, max = 5.87 ms]
3 final marks: total time = 0.00 s (avg = 0.52 ms).
[std. dev = 0.22 ms, max = 0.83 ms]
3 weak refs: total time = 0.01 s (avg = 4.17 ms).
[std. dev = 1.04 ms, max = 5.50 ms]
3 cleanups: total time = 0.00 s (avg = 1.00 ms).
[std. dev = 0.04 ms, max = 1.03 ms]
Final counting total time = 0.00 s (avg = 0.23 ms).
RS scrub total time = 0.00 s (avg = 0.30 ms).
Total stop_world time = 0.02 s.
Total concurrent time = 0.07 s ( 0.00 s marking).