Dataframe을 s3 특정경로에 csv형태로 저장하는 pyspark 함수 예시

2021-07-15

.

Data_Engineering_TIL(20210715)

##########################################################################################
# spark df convert to csv and upload s3
##########################################################################################

def spark_df_to_csv(spark_df, s3_location, sep):
    import pandas
    import boto3
    import StringIO
    pandas_df=spark_df.toPandas()
    bucket_name=s3_location.split("/")[2]
    obj_key='/'.join(s3_location.split("/")[3:])
    csv_buffer = StringIO()
    pandas_df.to_csv(csv_buffer,sep=sep,encoding='UTF-8',index=False)
    s3_resource=boto3.resource('s3')
    s3_resource.Object(bucket_name,obj_key).put(Body=csv_buffer.getvalue())
    
spark_df_to_csv(my_spark_df,'s3://my-bucket-test/testfolder/test_my_file.csv',',')

##########################################################################################
# load the csv file
##########################################################################################

schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)

df=spark.read.schema(schema).csv('s3://my-bucket-test/testfolder/test_my_file.csv',\
                                header=True,inferSchema='false',multiLine=True,quote='"',\
                                escape='\\', sep=',', ignoreLeadingWhiteSpace='true',\
                                ignoreTrailingWhiteSpace='true', mode='PERMISSIVE',encoding='UTF-8')

df.show()