EMR pyspark를 이용한 S3 데이터 처리 예시
.
실습 목적
EMR 서비스의 spark(sparkSQL)를 이용해서 S3내에 저장된 csv 형태의 데이터를 join 연산으로 처리해본다.
데이터 처리 케이스
-
case 1 : A, B, C 파일을 한번에 join
-
case 2 : A,B file을 join한 중간결과를 HDFS에 저장한 다음 다시 중간결과를 로딩하여 C file과 join
-
case 3 : A,B file을 먼저 join하고 중간결과를 메모리에 변수에 할당 후 C file과 join
** 참고사항 : A,B,C 데이터는 CSV 형태의 파일 데이터(약 8천 500만건, 12GB규모)를 이름만 각각 A,B,C 라고 다르게 지정하였을뿐 모두 동일한 데이터
테스트 환경
-
EMR 5.28.1의 spark 2.4.4
-
각 노드는 일괄 m5.xlarge(4 vcpu, 16GB mem) 사양으로, 마스터노드 1EA, 코어노드(64GB의 하드볼륨) 4EA로 구성
-
EMR 서비스에서 제공하는 jupyter notebook 환경에서 테스트
테스트 데이터
- 전자제품 구매이력 데이터
각 컬럼은 구매시간(event_time), 브랜드(brand), 가격(price), 구매자(user_id) 등으로 구성
-
약 8천4백만건의 레코드, 12GB 용량
-
아래 코드는 실제 원본 테스트 데이터에 대한 샘플 print
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
df = spark.read.option("header","true").csv("s3a://pms-bucket-test/testdata/10G.csv")
df.show()
print("record count : ", df.count())
spark.catalog.clearCache()
VBox()
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
2 | application_1597017600341_0003 | pyspark | idle | Link | Link | ✔ |
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
SparkSession available as 'spark'.
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
| event_time|event_type|product_id| category_id| category_code| brand| price| user_id| user_session|year|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
|2019-10-01 00:00:...| view| 44600062|2103807459595387724| null|shiseido| 35.79|541312140|72d76fde-8bb3-4e0...|2019|
|2019-10-01 00:00:...| view| 3900821|2053013552326770905|appliances.enviro...| aqua| 33.20|554748717|9333dfbd-b87a-470...|2019|
|2019-10-01 00:00:...| view| 17200506|2053013559792632471|furniture.living_...| null| 543.10|519107250|566511c2-e2e3-422...|2019|
|2019-10-01 00:00:...| view| 1307067|2053013558920217191| computers.notebook| lenovo| 251.74|550050854|7c90fc70-0e80-459...|2019|
|2019-10-01 00:00:...| view| 1004237|2053013555631882655|electronics.smart...| apple|1081.98|535871217|c6bd7419-2748-4c5...|2019|
|2019-10-01 00:00:...| view| 1480613|2053013561092866779| computers.desktop| pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...|2019|
|2019-10-01 00:00:...| view| 17300353|2053013553853497655| null| creed| 380.96|555447699|4fe811e9-91de-46d...|2019|
|2019-10-01 00:00:...| view| 31500053|2053013558031024687| null|luminarc| 41.16|550978835|6280d577-25c8-414...|2019|
|2019-10-01 00:00:...| view| 28719074|2053013565480109009| apparel.shoes.keds| baden| 102.71|520571932|ac1cd4e5-a3ce-422...|2019|
|2019-10-01 00:00:...| view| 1004545|2053013555631882655|electronics.smart...| huawei| 566.01|537918940|406c46ed-90a4-478...|2019|
|2019-10-01 00:00:...| view| 2900536|2053013554776244595|appliances.kitche...|elenberg| 51.46|555158050|b5bdd0b3-4ca2-4c5...|2019|
|2019-10-01 00:00:...| view| 1005011|2053013555631882655|electronics.smart...| samsung| 900.64|530282093|50a293fb-5940-41b...|2019|
|2019-10-01 00:00:...| view| 3900746|2053013552326770905|appliances.enviro...| haier| 102.38|555444559|98b88fa0-d8fa-4b9...|2019|
|2019-10-01 00:00:...| view| 44600062|2103807459595387724| null|shiseido| 35.79|541312140|72d76fde-8bb3-4e0...|2019|
|2019-10-01 00:00:...| view| 13500240|2053013557099889147|furniture.bedroom...| brw| 93.18|555446365|7f0062d8-ead0-4e0...|2019|
|2019-10-01 00:00:...| view| 23100006|2053013561638126333| null| null| 357.79|513642368|17566c27-0a8f-450...|2019|
|2019-10-01 00:00:...| view| 1801995|2053013554415534427|electronics.video.tv| haier| 193.03|537192226|e3151795-c355-4ef...|2019|
|2019-10-01 00:00:...| view| 10900029|2053013555069845885|appliances.kitche...| bosch| 58.95|519528062|901b9e3c-3f8f-414...|2019|
|2019-10-01 00:00:...| view| 1306631|2053013558920217191| computers.notebook| hp| 580.89|550050854|7c90fc70-0e80-459...|2019|
|2019-10-01 00:00:...| view| 1005135|2053013555631882655|electronics.smart...| apple|1747.79|535871217|c6bd7419-2748-4c5...|2019|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+
only showing top 20 rows
record count : 84897528
- A,B,C 파일을 join 연산을 해보는 것이 목적이기 때문에 join key 역할을 하는 id라는 컬럼을 추가하고 각 파일의 컬럼에 어떤 파일의 컬림인지 구분자를 주기 위해서 위의 원본데이터를 아래와 같이 전처리하여 A,B,C 라는 파일을 생성해준다.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("ABC data test").getOrCreate()
_list = ['A','B','C']
for elem in _list:
df = spark.read.option("header","true").csv("s3a://pms-bucket-test/testdata/10G.csv")
# 기존의 컬럼에 A 파일의 컬럼인지,B 인지,C 인지 구분자를 붙여주는 전처리과정
df = df.withColumnRenamed("event_time","event_time_{}".format(elem))\
.withColumnRenamed("event_type","event_type_{}".format(elem))\
.withColumnRenamed("product_id","product_id_{}".format(elem))\
.withColumnRenamed("category_id","category_id_{}".format(elem))\
.withColumnRenamed("category_code","category_code_{}".format(elem))\
.withColumnRenamed("brand","brand_{}".format(elem))\
.withColumnRenamed("price","price_{}".format(elem))\
.withColumnRenamed("user_id","user_id_{}".format(elem))\
.withColumnRenamed("year","year_{}".format(elem))\
.withColumnRenamed("user_session","user_session_{}".format(elem))\
.withColumn("id", F.monotonically_increasing_id()) ## id라는 레코드별 고유넘버 컬럼을 추가해서 id를 기준으로 join할 예정
# 1개의 csv 파일형태로 전처리한 데이터를 s3에 저장
df.repartition(1).write.option("header", "true").csv("s3a://pms-bucket-test/{}".format(elem))
df.show()
print("record count : ", df.count())
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
| event_time_C|event_type_C|product_id_C| category_id_C| category_code_C| brand_C|price_C|user_id_C| user_session_C|year_C| id|
+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
|2019-10-01 00:00:...| view| 44600062|2103807459595387724| null|shiseido| 35.79|541312140|72d76fde-8bb3-4e0...| 2019| 0|
|2019-10-01 00:00:...| view| 3900821|2053013552326770905|appliances.enviro...| aqua| 33.20|554748717|9333dfbd-b87a-470...| 2019| 1|
|2019-10-01 00:00:...| view| 17200506|2053013559792632471|furniture.living_...| null| 543.10|519107250|566511c2-e2e3-422...| 2019| 2|
|2019-10-01 00:00:...| view| 1307067|2053013558920217191| computers.notebook| lenovo| 251.74|550050854|7c90fc70-0e80-459...| 2019| 3|
|2019-10-01 00:00:...| view| 1004237|2053013555631882655|electronics.smart...| apple|1081.98|535871217|c6bd7419-2748-4c5...| 2019| 4|
|2019-10-01 00:00:...| view| 1480613|2053013561092866779| computers.desktop| pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...| 2019| 5|
|2019-10-01 00:00:...| view| 17300353|2053013553853497655| null| creed| 380.96|555447699|4fe811e9-91de-46d...| 2019| 6|
|2019-10-01 00:00:...| view| 31500053|2053013558031024687| null|luminarc| 41.16|550978835|6280d577-25c8-414...| 2019| 7|
|2019-10-01 00:00:...| view| 28719074|2053013565480109009| apparel.shoes.keds| baden| 102.71|520571932|ac1cd4e5-a3ce-422...| 2019| 8|
|2019-10-01 00:00:...| view| 1004545|2053013555631882655|electronics.smart...| huawei| 566.01|537918940|406c46ed-90a4-478...| 2019| 9|
|2019-10-01 00:00:...| view| 2900536|2053013554776244595|appliances.kitche...|elenberg| 51.46|555158050|b5bdd0b3-4ca2-4c5...| 2019| 10|
|2019-10-01 00:00:...| view| 1005011|2053013555631882655|electronics.smart...| samsung| 900.64|530282093|50a293fb-5940-41b...| 2019| 11|
|2019-10-01 00:00:...| view| 3900746|2053013552326770905|appliances.enviro...| haier| 102.38|555444559|98b88fa0-d8fa-4b9...| 2019| 12|
|2019-10-01 00:00:...| view| 44600062|2103807459595387724| null|shiseido| 35.79|541312140|72d76fde-8bb3-4e0...| 2019| 13|
|2019-10-01 00:00:...| view| 13500240|2053013557099889147|furniture.bedroom...| brw| 93.18|555446365|7f0062d8-ead0-4e0...| 2019| 14|
|2019-10-01 00:00:...| view| 23100006|2053013561638126333| null| null| 357.79|513642368|17566c27-0a8f-450...| 2019| 15|
|2019-10-01 00:00:...| view| 1801995|2053013554415534427|electronics.video.tv| haier| 193.03|537192226|e3151795-c355-4ef...| 2019| 16|
|2019-10-01 00:00:...| view| 10900029|2053013555069845885|appliances.kitche...| bosch| 58.95|519528062|901b9e3c-3f8f-414...| 2019| 17|
|2019-10-01 00:00:...| view| 1306631|2053013558920217191| computers.notebook| hp| 580.89|550050854|7c90fc70-0e80-459...| 2019| 18|
|2019-10-01 00:00:...| view| 1005135|2053013555631882655|electronics.smart...| apple|1747.79|535871217|c6bd7419-2748-4c5...| 2019| 19|
+--------------------+------------+------------+-------------------+--------------------+--------+-------+---------+--------------------+------+---+
only showing top 20 rows
record count : 84897528
데이터 처리해보기
1) case 1
A, B, C 파일을 한번에 join
from pyspark import SparkContext
from datetime import datetime
import time
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A,B,C 파일을 id 컬럼을 기준으로 join하여 ABC라는 데이터 프레임을 생성
ABC_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')\
.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')
# ABC 프레임의 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())
# 코드 종료시간 체크
endTimeQuery = time.clock()
# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
Number of the dataframe records : 84897528
code running time (sec): 128.2586009502411 , code finish time (GMT) : 2020-08-10 00:48
- 위의 코드와 동일한 결과를 내는 코드로 sparkSQL 쿼리를 이용해서 처리해보자.
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")
# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")
# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")
# A,B,C 데이터를 한번에 JOIN하여 ABC라는 데이터 프레임 생성 후, createOrReplaceTempView를 이용하여 ABC 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id) JOIN Ctable USING(id)")
ABC_df.createOrReplaceTempView("ABCtable")
# ABC 데이터프레임의 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()
# 코드 종료시간 체크
endTimeQuery = time.clock()
# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+--------+
|count(1)|
+--------+
|84897528|
+--------+
code running time (sec): 112.01974630355835 , code finish time (GMT) : 2020-08-10 00:50
DataFrame[id: string, event_time_A: string, event_type_A: string, product_id_A: string, category_id_A: string, category_code_A: string, brand_A: string, price_A: string, user_id_A: string, user_session_A: string, year_A: string, event_time_B: string, event_type_B: string, product_id_B: string, category_id_B: string, category_code_B: string, brand_B: string, price_B: string, user_id_B: string, user_session_B: string, year_B: string, event_time_C: string, event_type_C: string, product_id_C: string, category_id_C: string, category_code_C: string, brand_C: string, price_C: string, user_id_C: string, user_session_C: string, year_C: string]
- 아래와 같이 스파크 세션을 구동할때 아래와 같이 config 값도 부여할 수 있다.
spark = SparkSession.builder.appName("minmantest")\
.config("spark.sql.broadcastTimeout", "36000")\
.config("spark.executor.instances", "4")\
.config("spark.executor.memory", "4g")\
.config("spark.executor.cores", "3")\
.getOrCreate()
참고자료
1) https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html
2) https://aws.amazon.com/ko/blogs/korea/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
3) https://icthuman.tistory.com/entry/Spark%EB%A5%BC-YARN%EC%97%90-%EC%88%98%ED%96%89%ED%95%A0-%EB%95%8C-%EB%A9%94%EB%AA%A8%EB%A6%AC-%EC%84%B8%ED%8C%85-%EC%B6%94%EC%B2%9C-1
4) https://m.blog.naver.com/gyrbsdl18/220880041737
- 만약에 연산한 데이터 프레임을 HDFS에 저장하고 싶을때는 주피터 노트북에 아래와 같은 코드를 실행하면 된다.
ABC_df.repartition(1).write.csv("hdfs:///test/ABC_df_folder")
# 주피터에서 위의 명령어를 이용해서 파일을 저장하고, 그런 다음에
# 마스터노드에 ssh 접속해서 아래와 같이 명령어를 입력하면 해당 파일을 확인할 수 있다.
[hadoop@ip-10-1-10-143 conf]$ hdfs dfs -ls /test/ABC_df_folder
Found 2 items
-rw-r--r-- 2 livy hadoop 0 2020-08-09 09:23 /test/ABC_df_folder/_SUCCESS
-rw-r--r-- 2 livy hadoop 36606928013 2020-08-09 09:23 /test/ABC_df_folder/part-00000-68558a96-ed42-4515-ae57-543d34d6fd88-c000.csv
2) Test case 2
A,B file을 join한 중간결과를 HDFS에 저장한 다음 다시 중간결과를 로딩하여 C file과 join
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
from datetime import datetime
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A,B 파일을 join하여 AB라는 데이터 프레임을 생성
AB_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')\
.persist(StorageLevel.DISK_ONLY) # RDD를 디스크에만 상주시키는 옵션
# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임을 생성
ABC_df = AB_df.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')\
.persist(StorageLevel.DISK_ONLY)
# ABC 데이터프레임 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())
# 코드 종료시간 체크
endTimeQuery = time.clock()
# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
Number of the dataframe records : 84897528
code running time (sec): 959.8760902881622 , code finish time (GMT) : 2020-08-10 02:28
- sparkSQL 쿼리를 이용해서 처리하는 코드
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time
from datetime import datetime
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")
# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")
# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")
# A,B 파일을 join하여 AB라는 데이터 프레임을 생성 후 DISK에 중간저장, AB 임시테이블 생성
AB_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id)").persist(StorageLevel.DISK_ONLY)
AB_df.createOrReplaceTempView("ABtable")
# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임 생성 후 DISK에 중간저장, AB 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM ABtable JOIN Ctable USING(id)").persist(StorageLevel.DISK_ONLY)
ABC_df.createOrReplaceTempView("ABCtable")
# ABC 데이터 프레임 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()
# 코드 종료시간 체크
endTimeQuery = time.clock()
# 위의 전체 코드에 대한 소요시간
print(" code running time (sec) : ", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+--------+
|count(1)|
+--------+
|84897528|
+--------+
code running time (sec): 926.3053381443024 , code finish time (GMT) : 2020-08-10 02:09
- RDD Persistance, Stroage level 참고자료
1) https://bcho.tistory.com/1029
2) https://www.quora.com/How-will-you-select-the-storage-level-in-Apache-Spark
3) Test case 3
A,B file을 먼저 join하고 중간결과를 메모리에 변수에 할당 후 C file과 join
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
from datetime import datetime
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A,B 파일을 join하여 AB라는 데이터 프레임을 생성
# 참고로 persist 미지정 시 메모리에 중간저장하게 되어있음
AB_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")\
.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/B.csv"),on='id',how='inner')
# AB 데이터 프레임에 C파일 join하여 ABC라는 데이터 프레임을 생성
ABC_df = AB_df.join(spark.read.options(header='true').csv("s3a://pms-bucket-test/C.csv"),on='id',how='inner')
# ABC 데이터프레임 레코드 카운트
print("Number of the dataframe records : ", ABC_df.count())
# 위의 전체 코드에 대한 소요시간
print(" code running time (sec) : ", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
Number of the dataframe records : 84897528
code running time (sec): 113.00516510009766 , code finish time (GMT) : 2020-08-10 01:51
- sparkSQL 쿼리를 이용해서 처리하는 코드
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark
import time
# 코드 스타트 시간 체크
start = time.time()
# 스파크 세션을 생성하여 데이터 프레임을 구동할 수 있도록 정의
spark = SparkSession.builder.appName("minmantest").getOrCreate()
# A 데이터 로딩, 임시테이블 생성
A_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/A.csv")
A_df.createOrReplaceTempView("Atable")
# B 데이터 로딩, 임시테이블 생성
B_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/B.csv")
B_df.createOrReplaceTempView("Btable")
# C 데이터 로딩, 임시테이블 생성
C_df = spark.read.option("header","true").csv("s3a://pms-bucket-test/C.csv")
C_df.createOrReplaceTempView("Ctable")
# AB 데이터 JOIN, 임시테이블 생성
AB_df = spark.sql("SELECT * FROM Atable JOIN Btable USING(id)")
AB_df.createOrReplaceTempView("ABtable")
# JOIN한 AB데이터에 C 데이터를 JOIN, 임시테이블 생성
ABC_df = spark.sql("SELECT * FROM ABtable JOIN Ctable USING(id)")
ABC_df.createOrReplaceTempView("ABCtable")
# join 한 ABC 데이터의 레코드 카운트
spark.sql("SELECT COUNT(*) FROM ABCtable").show()
# 코드 종료시간 체크
endTimeQuery = time.clock()
# 위의 전체 코드에 대한 소요시간
print("code running time (sec):", time.time() - start,', code finish time (GMT) : ',datetime.now().strftime('%Y-%m-%d %H:%M'))
# 데이터 프레임 캐시제거
spark.catalog.clearCache()
VBox()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+--------+
|count(1)|
+--------+
|84897528|
+--------+
code running time (sec): 111.46066069602966 , code finish time (GMT) : 2020-08-10 01:54