EMR pyspark를 이용한 S3 데이터 처리 예시

2020-08-09

.

실습 목적

EMR 서비스의 spark(sparkSQL)를 이용해서 S3내에 저장된 csv 형태의 데이터를 join 연산으로 처리해본다.

데이터 처리 케이스

  • case 1 : A, B, C 파일을 한번에 join

  • case 2 : A,B file을 join한 중간결과를 HDFS에 저장한 다음 다시 중간결과를 로딩하여 C file과 join

  • case 3 : A,B file을 먼저 join하고 중간결과를 메모리에 변수에 할당 후 C file과 join

** 참고사항 : A,B,C 데이터는 CSV 형태의 파일 데이터(약 8천 500만건, 12GB규모)를 이름만 각각 A,B,C 라고 다르게 지정하였을뿐 모두 동일한 데이터

테스트 환경

  • EMR 5.28.1의 spark 2.4.4

  • 각 노드는 일괄 m5.xlarge(4 vcpu, 16GB mem) 사양으로, 마스터노드 1EA, 코어노드(64GB의 하드볼륨) 4EA로 구성

  • EMR 서비스에서 제공하는 jupyter notebook 환경에서 테스트

테스트 데이터

  • 전자제품 구매이력 데이터

각 컬럼은 구매시간(event_time), 브랜드(brand), 가격(price), 구매자(user_id) 등으로 구성

  • 약 8천4백만건의 레코드, 12GB 용량

  • 아래 코드는 실제 원본 테스트 데이터에 대한 샘플 print

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
df = spark.read.option("header","true").csv("s3a://pms-bucket-test/testdata/10G.csv")
df.show()
print("record count : ", df.count())

spark.catalog.clearCache()
VBox()


Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
2application_1597017600341_0003pysparkidleLinkLink
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


SparkSession available as 'spark'.



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|year|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|2019|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|2019|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    null| 543.10|519107250|566511c2-e2e3-422...|2019|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|2019|
|2019-10-01 00:00:...|      view|   1004237|2053013555631882655|electronics.smart...|   apple|1081.98|535871217|c6bd7419-2748-4c5...|2019|
|2019-10-01 00:00:...|      view|   1480613|2053013561092866779|   computers.desktop|  pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...|2019|
|2019-10-01 00:00:...|      view|  17300353|2053013553853497655|                null|   creed| 380.96|555447699|4fe811e9-91de-46d...|2019|
|2019-10-01 00:00:...|      view|  31500053|2053013558031024687|                null|luminarc|  41.16|550978835|6280d577-25c8-414...|2019|
|2019-10-01 00:00:...|      view|  28719074|2053013565480109009|  apparel.shoes.keds|   baden| 102.71|520571932|ac1cd4e5-a3ce-422...|2019|
|2019-10-01 00:00:...|      view|   1004545|2053013555631882655|electronics.smart...|  huawei| 566.01|537918940|406c46ed-90a4-478...|2019|
|2019-10-01 00:00:...|      view|   2900536|2053013554776244595|appliances.kitche...|elenberg|  51.46|555158050|b5bdd0b3-4ca2-4c5...|2019|
|2019-10-01 00:00:...|      view|   1005011|2053013555631882655|electronics.smart...| samsung| 900.64|530282093|50a293fb-5940-41b...|2019|
|2019-10-01 00:00:...|      view|   3900746|2053013552326770905|appliances.enviro...|   haier| 102.38|555444559|98b88fa0-d8fa-4b9...|2019|
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|2019|
|2019-10-01 00:00:...|      view|  13500240|2053013557099889147|furniture.bedroom...|     brw|  93.18|555446365|7f0062d8-ead0-4e0...|2019|
|2019-10-01 00:00:...|      view|  23100006|2053013561638126333|                null|    null| 357.79|513642368|17566c27-0a8f-450...|2019|
|2019-10-01 00:00:...|      view|   1801995|2053013554415534427|electronics.video.tv|   haier| 193.03|537192226|e3151795-c355-4ef...|2019|
|2019-10-01 00:00:...|      view|  10900029|2053013555069845885|appliances.kitche...|   bosch|  58.95|519528062|901b9e3c-3f8f-414...|2019|
|2019-10-01 00:00:...|      view|   1306631|2053013558920217191|  computers.notebook|      hp| 580.89|550050854|7c90fc70-0e80-459...|2019|
|2019-10-01 00:00:...|      view|   1005135|2053013555631882655|electronics.smart...|   apple|1747.79|535871217|c6bd7419-2748-4c5...|2019|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
only showing top 20 rows

record count :  84897528
  • A,B,C 파일을 join 연산을 해보는 것이 목적이기 때문에 join key 역할을 하는 id라는 컬럼을 추가하고 각 파일의 컬럼에 어떤 파일의 컬림인지 구분자를 주기 위해서 위의 원본데이터를 아래와 같이 전처리하여 A,B,C 라는 파일을 생성해준다.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ABC data test").getOrCreate()

_list = ['A','B','C']

for elem in _list:
    
    df = spark.read.option("header","true").csv("s3a://pms-bucket-test/testdata/10G.csv")
    
    # 기존의 컬럼에 A 파일의 컬럼인지,B 인지,C 인지 구분자를 붙여주는 전처리과정
    df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
           .withColumnRenamed("event_type","event_type_{}".format(elem))\
           .withColumnRenamed("product_id","product_id_{}".format(elem))\
           .withColumnRenamed("category_id","category_id_{}".format(elem))\
           .withColumnRenamed("category_code","category_code_{}".format(elem))\
           .withColumnRenamed("brand","brand_{}".format(elem))\
           .withColumnRenamed("price","price_{}".format(elem))\
           .withColumnRenamed("user_id","user_id_{}".format(elem))\
           .withColumnRenamed("year","year_{}".format(elem))\
           .withColumnRenamed("user_session","user_session_{}".format(elem))\
           .withColumn("id", F.monotonically_increasing_id()) ## id라는 레코드별 고유넘버 컬럼을 추가해서 id를 기준으로 join할 예정
    
    # 1개의 csv 파일형태로 전처리한 데이터를 s3에 저장
    df.repartition(1).write.option("header", "true").csv("s3a://pms-bucket-test/{}".format(elem))
    
    
df.show()

print("record count : ", df.count())

spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
|        event_time_C|event_type_C|product_id_C|      category_id_C|     category_code_C| brand_C|price_C|user_id_C|      user_session_C|year_C| id|
+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
|2019-10-01 00:00:...|        view|    44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|  2019|  0|
|2019-10-01 00:00:...|        view|     3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|  2019|  1|
|2019-10-01 00:00:...|        view|    17200506|2053013559792632471|furniture.living_...|    null| 543.10|519107250|566511c2-e2e3-422...|  2019|  2|
|2019-10-01 00:00:...|        view|     1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|  2019|  3|
|2019-10-01 00:00:...|        view|     1004237|2053013555631882655|electronics.smart...|   apple|1081.98|535871217|c6bd7419-2748-4c5...|  2019|  4|
|2019-10-01 00:00:...|        view|     1480613|2053013561092866779|   computers.desktop|  pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...|  2019|  5|
|2019-10-01 00:00:...|        view|    17300353|2053013553853497655|                null|   creed| 380.96|555447699|4fe811e9-91de-46d...|  2019|  6|
|2019-10-01 00:00:...|        view|    31500053|2053013558031024687|                null|luminarc|  41.16|550978835|6280d577-25c8-414...|  2019|  7|
|2019-10-01 00:00:...|        view|    28719074|2053013565480109009|  apparel.shoes.keds|   baden| 102.71|520571932|ac1cd4e5-a3ce-422...|  2019|  8|
|2019-10-01 00:00:...|        view|     1004545|2053013555631882655|electronics.smart...|  huawei| 566.01|537918940|406c46ed-90a4-478...|  2019|  9|
|2019-10-01 00:00:...|        view|     2900536|2053013554776244595|appliances.kitche...|elenberg|  51.46|555158050|b5bdd0b3-4ca2-4c5...|  2019| 10|
|2019-10-01 00:00:...|        view|     1005011|2053013555631882655|electronics.smart...| samsung| 900.64|530282093|50a293fb-5940-41b...|  2019| 11|
|2019-10-01 00:00:...|        view|     3900746|2053013552326770905|appliances.enviro...|   haier| 102.38|555444559|98b88fa0-d8fa-4b9...|  2019| 12|
|2019-10-01 00:00:...|        view|    44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|  2019| 13|
|2019-10-01 00:00:...|        view|    13500240|2053013557099889147|furniture.bedroom...|     brw|  93.18|555446365|7f0062d8-ead0-4e0...|  2019| 14|
|2019-10-01 00:00:...|        view|    23100006|2053013561638126333|                null|    null| 357.79|513642368|17566c27-0a8f-450...|  2019| 15|
|2019-10-01 00:00:...|        view|     1801995|2053013554415534427|electronics.video.tv|   haier| 193.03|537192226|e3151795-c355-4ef...|  2019| 16|
|2019-10-01 00:00:...|        view|    10900029|2053013555069845885|appliances.kitche...|   bosch|  58.95|519528062|901b9e3c-3f8f-414...|  2019| 17|
|2019-10-01 00:00:...|        view|     1306631|2053013558920217191|  computers.notebook|      hp| 580.89|550050854|7c90fc70-0e80-459...|  2019| 18|
|2019-10-01 00:00:...|        view|     1005135|2053013555631882655|electronics.smart...|   apple|1747.79|535871217|c6bd7419-2748-4c5...|  2019| 19|
+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
only showing top 20 rows

record count :  84897528

데이터 처리해보기

1) case 1

A, B, C 파일을 한번에 join

image

from pyspark import SparkContext
from datetime import datetime
import time

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A,B,C 파일을 id 컬럼을 기준으로 join하여 ABC라는 데이터 프레임을 생성
ABC_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
        .join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')\
        .join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')

# ABC 프레임의 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Number of the dataframe records :  84897528
code running time (sec): 128.2586009502411 , code finish time (GMT) :  2020-08-10 00:48
  • 위의 코드와 동일한 결과를 내는 코드로 sparkSQL 쿼리를 이용해서 처리해보자.
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")

# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")

# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")

# A,B,C 데이터를 한번에 JOIN하여 ABC라는 데이터 프레임 생성 후, createOrReplaceTempView를 이용하여 ABC 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id) JOIN Ctable USING(id)")
ABC_df.createOrReplaceTempView("ABCtable")

# ABC 데이터프레임의 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+--------+
|count(1)|
+--------+
|84897528|
+--------+

code running time (sec): 112.01974630355835 , code finish time (GMT) :  2020-08-10 00:50
DataFrame[id: string, event_time_A: string, event_type_A: string, product_id_A: string, category_id_A: string, category_code_A: string, brand_A: string, price_A: string, user_id_A: string, user_session_A: string, year_A: string, event_time_B: string, event_type_B: string, product_id_B: string, category_id_B: string, category_code_B: string, brand_B: string, price_B: string, user_id_B: string, user_session_B: string, year_B: string, event_time_C: string, event_type_C: string, product_id_C: string, category_id_C: string, category_code_C: string, brand_C: string, price_C: string, user_id_C: string, user_session_C: string, year_C: string]
  • 아래와 같이 스파크 세션을 구동할때 아래와 같이 config 값도 부여할 수 있다.
spark = SparkSession.builder.appName("minmantest")\
        .config("spark.sql.broadcastTimeout", "36000")\
        .config("spark.executor.instances", "4")\
        .config("spark.executor.memory", "4g")\
        .config("spark.executor.cores", "3")\
        .getOrCreate()

참고자료

1) https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html

2) https://aws.amazon.com/ko/blogs/korea/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

3) https://icthuman.tistory.com/entry/Spark%EB%A5%BC-YARN%EC%97%90-%EC%88%98%ED%96%89%ED%95%A0-%EB%95%8C-%EB%A9%94%EB%AA%A8%EB%A6%AC-%EC%84%B8%ED%8C%85-%EC%B6%94%EC%B2%9C-1

4) https://m.blog.naver.com/gyrbsdl18/220880041737

  • 만약에 연산한 데이터 프레임을 HDFS에 저장하고 싶을때는 주피터 노트북에 아래와 같은 코드를 실행하면 된다.
ABC_df.repartition(1).write.csv("hdfs:///test/ABC_df_folder")

# 주피터에서 위의 명령어를 이용해서 파일을 저장하고, 그런 다음에
# 마스터노드에 ssh 접속해서 아래와 같이 명령어를 입력하면 해당 파일을 확인할 수 있다.

[hadoop@ip-10-1-10-143 conf]$ hdfs dfs -ls /test/ABC_df_folder
Found 2 items
-rw-r--r--   2 livy hadoop           0 2020-08-09 09:23 /test/ABC_df_folder/_SUCCESS
-rw-r--r--   2 livy hadoop 36606928013 2020-08-09 09:23 /test/ABC_df_folder/part-00000-68558a96-ed42-4515-ae57-543d34d6fd88-c000.csv

2) Test case 2

A,B file을 join한 중간결과를 HDFS에 저장한 다음 다시 중간결과를 로딩하여 C file과 join

image

from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
from datetime import datetime

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A,B 파일을 join하여 AB라는 데이터 프레임을 생성
AB_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
        .join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')\
        .persist(StorageLevel.DISK_ONLY) # RDD를 디스크에만 상주시키는 옵션

# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임을 생성
ABC_df = AB_df.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')\
         .persist(StorageLevel.DISK_ONLY)

# ABC 데이터프레임 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Number of the dataframe records :  84897528
code running time (sec): 959.8760902881622 , code finish time (GMT) :  2020-08-10 02:28
  • sparkSQL 쿼리를 이용해서 처리하는 코드
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time
from datetime import datetime

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")

# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")

# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")

# A,B 파일을 join하여 AB라는 데이터 프레임을 생성 후 DISK에 중간저장, AB 임시테이블 생성
AB_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id)").persist(StorageLevel.DISK_ONLY)
AB_df.createOrReplaceTempView("ABtable")

# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임 생성 후 DISK에 중간저장, AB 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM ABtable JOIN Ctable USING(id)").persist(StorageLevel.DISK_ONLY)
ABC_df.createOrReplaceTempView("ABCtable")

# ABC 데이터 프레임 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print(" code running time (sec) : ", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+--------+
|count(1)|
+--------+
|84897528|
+--------+

code running time (sec): 926.3053381443024 , code finish time (GMT) :  2020-08-10 02:09
  • RDD Persistance, Stroage level 참고자료

1) https://bcho.tistory.com/1029

2) https://www.quora.com/How-will-you-select-the-storage-level-in-Apache-Spark

3) Test case 3

A,B file을 먼저 join하고 중간결과를 메모리에 변수에 할당 후 C file과 join

image

from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
from datetime import datetime

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A,B 파일을 join하여 AB라는 데이터 프레임을 생성
# 참고로 persist 미지정 시 메모리에 중간저장하게 되어있음
AB_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
        .join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')

# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임을 생성
ABC_df = AB_df.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')

# ABC 데이터프레임 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())

# 위의 전체 코드에 대한 소요시간
print(" code running time (sec) : ", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Number of the dataframe records :  84897528
code running time (sec): 113.00516510009766 , code finish time (GMT) :  2020-08-10 01:51
  • sparkSQL 쿼리를 이용해서 처리하는 코드
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time

# 코드 스타트 시간 체크
start = time.time()

# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()

# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")

# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")

# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")

# AB 데이터 JOIN, 임시테이블 생성
AB_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id)")
AB_df.createOrReplaceTempView("ABtable")

# JOIN한 AB데이터에 C 데이터를 JOIN, 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM ABtable JOIN Ctable USING(id)")
ABC_df.createOrReplaceTempView("ABCtable")

# join 한 ABC 데이터의 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()

# 코드 종료시간 체크
endTimeQuery = time.clock()

# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))

# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+--------+
|count(1)|
+--------+
|84897528|
+--------+

code running time (sec): 111.46066069602966 , code finish time (GMT) :  2020-08-10 01:54