Spotify 음악데이터 추출, 파케이 변환 및 s3 저장

2020-06-14

.

Data_Engineering_TIL(20200614)

[참고사항]

  • study program : Fastcampus Data Engineering 온라인

** URL : https://www.fastcampus.co.kr/data_online_engineering

  • ‘Python Boto3를 이용한 Spotify 음악데이터 DynamoDB 저장&활용’를 이어서 참고할 것

  • URL : https://minman2115.github.io/DE_TIL92

[학습내용]

  • 아래와 같이 top-tracks data를 parquet 파일로 만드는 코드를 만들어서 실행했다.
import sys
import os
import logging
import boto3
import requests
import base64
import json
import pymysql
from datetime import datetime
import pandas as pd
import jsonpath

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxx"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

def main():
    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    headers = get_headers(client_id, client_secret)

    ## RDS에서 아티스트 ID data를 Load
    cursor.execute("SELECT id FROM artists LIMIT 10")
    
    ## jsonpath를 이용해서 해당 path안에 어떤 data를 insert를 했을때 그 안에서 key path를 찾는다.
    ## 그 path를 찾아서 그 value값을 사용자에게 줄 수도 있다.
    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }
    
    ## Top Tracks 정보를 Spotify API로 부터 Load
    top_tracks = []
    for (id, ) in cursor.fetchall():
        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {'country': 'US'}
        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)
        top_tracks.extend(raw['tracks'])

    top_tracks = pd.DataFrame(top_tracks)
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

    sys.exit(0)

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'test/dt={}/top-tracks.parquet'.format(dt))
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)

    return None

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers


if __name__=='__main__':
    main()
Traceback (most recent call last):
  File "pmstest6.py", line 77, in <module>
    main()
  File "pmstest6.py", line 52, in main
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')
  File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\frame.py", line 2222, in to_parquet
    **kwargs
  File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\parquet.py", line 254, in to_parquet
    **kwargs
  File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\parquet.py", line 117, in write
    **kwargs
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\parquet.py", line 1344, in write_table
    writer.write_table(table, row_group_size=row_group_size)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\parquet.py", line 474, in write_table
    self.writer.write_table(table, row_group_size=row_group_size)
  File "pyarrow\_parquet.pyx", line 1375, in pyarrow._parquet.ParquetWriter.write_table
  File "pyarrow\error.pxi", line 78, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Nested column branch had multiple children: struct<album_type: string, artists: list<item: struct<external_urls: struct<spotify: string>, href: string, id: string, name: string, type: string, uri: string>>, external_urls: struct<spotify: string>, href: string, id: string, images: list<item: struct<height: int64, url: string, width: int64>>, name: string, release_date: string, release_date_precision: string, total_tracks: int64, type: string, uri: string>
  • 그러면 위와 같은 에러가 발생하는데 parquet화 하는 과정에서 에러가 발생한 것을 알 수 있다. 데이터내의 어떤 value가 리스트형식의 데이터가 있기 때문이다. 그런걸 struct type 형식이라고 하는데 이 스트럭 타입형식은 파케이화를 하는데 문제가 있다.

  • 파케이화 하면 데이터 퍼포먼스는 빨라지지만 그렇게 하려면 파케이 포맷에 맞게 데이터를 정확하게 정의해줘야 한다.

  • 그래서 일반적으로는 1차적으로 이런 가장 로우한 데이터를 먼저 s3에 저장해놓고, 파케이화를 하고 싶은 몇개의 데이터만 뽑아서 그거를 가공하는 프로세싱 잡이 돌아서 가공데이터가 다시 s3에 떨어지게 하는 파이프라이닝을 구축한다.

  • 그래서 아래와 같이 json 파일을 만들어 파케이 파일로 변환한 다음 s3에 떨구는 코드를 작성하고 실행해본다.

import sys
import os
import logging
import boto3
import requests
import base64
import json
import pymysql
from datetime import datetime
import pandas as pd
import jsonpath

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxx"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

def main():
    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    headers = get_headers(client_id, client_secret)

    ## RDS에서 10명의 아티스트 ID data를 Load
    cursor.execute("SELECT id FROM artists LIMIT 10")
    
    ## jsonpath를 이용해서 해당 path안에 어떤 data를 insert를 했을때 그 안에서 key path를 찾는다.
    ## 그 path를 찾아서 그 value값을 사용자에게 줄 수도 있다. 이거를 아래와 같이 활용할 수 있다.
    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }
    
    ## Top Tracks 정보를 Spotify API로 부터 Load
    top_tracks = []
    for (id, ) in cursor.fetchall():
        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {'country': 'US'}
        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)

        for i in raw['tracks']: 
            ## track 하나하마나다 딕셔너리 형태의 데이터가 떨어질 것이다.
            top_track = {}
            for k, v in top_track_keys.items():
                top_track.update({k: jsonpath.jsonpath(i, v)})
                ## k는 top_track_keys에서 key를 얘기하는 것이다.
                ## v는 top_track_keys에서 value를 얘기하는 것이다.
                ## jsonpath.jsonpath(i, v)는 i 딕셔너리 데이터에서
                ## v에 해당하는 path의 데이터를 가져오라는 것이다.
                ## 예를 들어 위와 같이 제이스 패스를 이용해서 id만 
                ## 가져온다고 치면 아래와 같이 
                ## "id" : "aksdnc20202192029" 처럼 들어올것이다.
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)

    top_tracks = pd.DataFrame(top_tracks)
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'test/dt={}/top-tracks.parquet'.format(dt))
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)

    return None

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers

if __name__=='__main__':
    main()

아래 그림과 같이 지정된 s3 버킷에 파케이 파일이 떨어지는 것을 확인할 수 있다.

2

  • 여기에 아래와 같이 코딩하여 오디오 피쳐스 정보도 추가해보자.
import sys
import os
import logging
import boto3
import requests
import base64
import json
import pymysql
from datetime import datetime
import pandas as pd
import jsonpath

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxx"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

def main():
    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    headers = get_headers(client_id, client_secret)

    ## RDS에서 10명의 아티스트 ID data를 Load
    cursor.execute("SELECT id FROM artists LIMIT 10")

    ## jsonpath를 이용해서 해당 path안에 어떤 data를 insert를 했을때 그 안에서 key path를 찾는다.
    ## 그 path를 찾아서 그 value값을 사용자에게 줄 수도 있다. 이거를 아래와 같이 활용할 수 있다.
    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }

    ## Top Tracks 정보를 Spotify API로 부터 Load
    top_tracks = []
    for (id, ) in cursor.fetchall():
        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {'country': 'US'}
        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)

        for i in raw['tracks']:
            ## track 하나하마나다 딕셔너리 형태의 데이터가 떨어질 것이다.
            top_track = {}
            for k, v in top_track_keys.items():
                top_track.update({k: jsonpath.jsonpath(i, v)})
                ## k는 top_track_keys에서 key를 얘기하는 것이다.
                ## v는 top_track_keys에서 value를 얘기하는 것이다.
                ## jsonpath.jsonpath(i, v)는 i 딕셔너리 데이터에서
                ## v에 해당하는 path의 데이터를 가져오라는 것이다.
                ## 예를 들어 위와 같이 제이스 패스를 이용해서 id만
                ## 가져온다고 치면 아래와 같이
                ## "id" : "aksdnc20202192029" 처럼 들어올것이다.
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)

    ## track_ids 가져오기
    track_ids = [i['id'][0] for i in top_tracks]

    top_tracks = pd.DataFrame(top_tracks)
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'top-tracks/dt={}/top-tracks.parquet'.format(dt))
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)


    ## audio features 데이터도 파케이 파일로 만들어보자

    ## tracks_batch 정의하기
    tracks_batch = [track_ids[i:i+100] for i in range(0,len(track_ids),100)]

    audio_features = []
    for i in tracks_batch:
        ids = ','.join(i)
        URL = 'https://api.spotify.com/v1/audio-features/?ids={}'.format(ids)
        r = requests.get(URL, headers=headers)
        raw = json.loads(r.text)
        audio_features.extend(raw['audio_features'])

    audio_features = pd.DataFrame(audio_features)
    audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'audio-features/dt={}/audio-features.parquet'.format(dt))
    data = open('audio-features.parquet', 'rb')
    object.put(Body=data)

    return None

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers

if __name__=='__main__':
    main()

실행하면 아래 그림과 같이 날짜를 기준으로 파티션된 폴더가 만들어지고 그 안에 파케이 파일이 생성이 된다.

image