spotify 데이터 유사도 모델링

2020-06-24

.

Data_Engineering_TIL(20200624)

[참고사항]

  • study program : Fastcampus Data Engineering 온라인

** URL : https://www.fastcampus.co.kr/data_online_engineering

  • ‘Lambda를 이용한 데이터 파이프라이닝 구축’를 이어서 참고할 것

  • URL : https://minman2115.github.io/DE_TIL95

[학습내용]

  • spotify API로 부터 오디오 피쳐 데이터를 가져왔는데 이 오디오 피쳐를 가지고 아티스트들끼리의 유사도를 구하는 모델을 만드려고 한다.

  • 먼저 아래의 코드를 실행하여 s3에 파일을 떨군다.

import sys
import os
import logging
import boto3
import requests
import base64
import json
import pymysql
from datetime import datetime
import pandas as pd
import jsonpath

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxx"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

def main():
    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    headers = get_headers(client_id, client_secret)

    ## RDS에서 10명의 아티스트 ID data를 Load
    cursor.execute("SELECT id FROM artists LIMIT 10")

    ## jsonpath를 이용해서 해당 path안에 어떤 data를 insert를 했을때 그 안에서 key path를 찾는다.
    ## 그 path를 찾아서 그 value값을 사용자에게 줄 수도 있다. 이거를 아래와 같이 활용할 수 있다.
    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }

    ## Top Tracks 정보를 Spotify API로 부터 Load
    top_tracks = []
    for (id, ) in cursor.fetchall():
        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {'country': 'US'}
        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)

        for i in raw['tracks']:
            ## track 하나하마나다 딕셔너리 형태의 데이터가 떨어질 것이다.
            top_track = {}
            for k, v in top_track_keys.items():
                top_track.update({k: jsonpath.jsonpath(i, v)})
                ## k는 top_track_keys에서 key를 얘기하는 것이다.
                ## v는 top_track_keys에서 value를 얘기하는 것이다.
                ## jsonpath.jsonpath(i, v)는 i 딕셔너리 데이터에서
                ## v에 해당하는 path의 데이터를 가져오라는 것이다.
                ## 예를 들어 위와 같이 제이스 패스를 이용해서 id만
                ## 가져온다고 치면 아래와 같이
                ## "id" : "aksdnc20202192029" 처럼 들어올것이다.
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)

    ## track_ids 가져오기
    track_ids = [i['id'][0] for i in top_tracks]

    top_tracks = pd.DataFrame(top_tracks)
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'top-tracks/dt={}/top-tracks.parquet'.format(dt))
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)


    ## audio features 데이터도 파케이 파일로 만들어보자

    ## tracks_batch 정의하기
    tracks_batch = [track_ids[i:i+100] for i in range(0,len(track_ids),100)]

    audio_features = []
    for i in tracks_batch:
        ids = ','.join(i)
        URL = 'https://api.spotify.com/v1/audio-features/?ids={}'.format(ids)
        r = requests.get(URL, headers=headers)
        raw = json.loads(r.text)
        audio_features.extend(raw['audio_features'])

    audio_features = pd.DataFrame(audio_features)
    audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')

    s3 = boto3.resource('s3')
    object = s3.Object('pms-bucket-test', 'audio-features/dt={}/audio-features.parquet'.format(dt))
    data = open('audio-features.parquet', 'rb')
    object.put(Body=data)

    return None

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers

if __name__=='__main__':
    main()
  • 위에 코드를 실행하면 아래 그림과 같이 s3에 top_tracks와 audio_features 파케이 파일이 떨궈진다. 우리는 이 데이터를 활용할 것이다.

image

  • 그 다음에 글루에서 데이터베이스 하나 만들어주고, 아테나로 이동해서 아래와 같은 코드를 날려서 아테나 테이블 top_tracks와 audio_features를 만들어준다.
CREATE EXTERNAL TABLE IF NOT EXISTS top_tracks(
  id string,
  artist_id string,
  name string,
  popularity int,
  external_url string
) PARTITIONED BY (dt string) 
STORED AS PARQUET LOCATION 's3://pms-bucket-test/top-tracks/' tblproperties("parquet.compress"="SNAPPY");

MSCK REPAIR TABLE top_tracks;

CREATE EXTERNAL TABLE IF NOT EXISTS audio_features(
  id string,
  danceability DOUBLE,
  energy DOUBLE,
  key int,
  loudness DOUBLE,
  mode int,
  speechiness DOUBLE,
  accusticness DOUBLE,
  instrumentalness DOUBLE
) PARTITIONED BY (dt string) 
STORED AS PARQUET LOCATION 's3://pms-bucket-test/audio-features/' tblproperties("parquet.compress"="SNAPPY");

MSCK REPAIR TABLE audio_features;
  • 그리고 우리가 일전에 생성했던 RDS에 접속해서 아래와 같은 쿼리 명령으로 새로운 RDS 테이블을 하나 만들어준다.
CREATE TABLE related_artists (artist_id VARCHAR(255), y_artist VARCHAR(255), distance FLOAT, PRIMARY KEY(artist_id,y_artist)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 그리고 아래와 같이 유사도 모델링을 구현한 코드를 실행한다.
import sys
import os
import logging
import boto3
import pymysql
import time
import math

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxx"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

def main():

    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)

    athena = boto3.client('athena')

    query = """
        SELECT
         artist_id,
         AVG(danceability) AS danceability,
         AVG(energy) AS energy,
         AVG(loudness) AS loudness,
         AVG(speechiness) AS speechiness,
         AVG(instrumentalness) AS instrumentalness
        FROM
         top_tracks t1
        JOIN
         audio_features t2 ON t2.id = t1.id AND CAST(t1.dt AS DATE) = DATE('2020-06-24') AND CAST(t2.dt AS DATE) = DATE('2020-06-24')
        GROUP BY t1.artist_id
        LIMIT 10
    """

    r = query_athena(query, athena)
    results = get_query_result(r['QueryExecutionId'], athena)
    artists = process_data(results)

    query = """
        SELECT
         MIN(danceability) AS danceability_min,
         MAX(danceability) AS danceability_max,
         MIN(energy) AS energy_min,
         MAX(energy) AS energy_max,
         MIN(loudness) AS loudness_min,
         MAX(loudness) AS loudness_max,
         MIN(speechiness) AS speechiness_min,
         MAX(speechiness) AS speechiness_max,
         MIN(instrumentalness) AS instrumentalness_min,
         MAX(instrumentalness) AS instrumentalness_max
        FROM
         audio_features
    """
    r = query_athena(query, athena)
    results = get_query_result(r['QueryExecutionId'], athena)
    avgs = process_data(results)[0]

    metrics = ['danceability', 'energy', 'loudness', 'speechiness', 'instrumentalness']

    for i in artists:
        for j in artists:
            dist = 0
            for k in metrics:
                x = float(i[k])
                x_norm = normalize(x, float(avgs[k+'_min']), float(avgs[k+'_max']))
                y = float(j[k])
                y_norm = normalize(y, float(avgs[k+'_min']), float(avgs[k+'_max']))
                dist += (x_norm-y_norm)**2

            dist = math.sqrt(dist) ## euclidean distance

            data = {
                'artist_id': i['artist_id'],
                'y_artist': j['artist_id'],
                'distance': dist
            }

            insert_row(cursor, data, 'related_artists')

    conn.commit()
    cursor.close()

    return None

def normalize(x, x_min, x_max):
    normalized = (x-x_min) / (x_max-x_min)
    return normalized

def query_athena(query, athena):
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': 'pms_gluedb_test'},
        ResultConfiguration={
            'OutputLocation': "s3://pms-bucket-test/repair/",
            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
        }
    )
    return response

def get_query_result(query_id, athena):
    response = athena.get_query_execution(QueryExecutionId=str(query_id))
    while response['QueryExecution']['Status']['State'] != 'SUCCEEDED':
        if response['QueryExecution']['Status']['State'] == 'FAILED':
            logging.error('QUERY FAILED')
            break
        time.sleep(5)
        response = athena.get_query_execution(QueryExecutionId=str(query_id))
    response = athena.get_query_results(QueryExecutionId=str(query_id),MaxResults=1000)
    return response

def process_data(results):
    columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
        values = []
        for field in res['Data']:
            try:
                values.append(list(field.values())[0])
            except:
                values.append(list(' '))
        listed_results.append(dict(zip(columns, values)))
    return listed_results

def insert_row(cursor, data, table):
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
    cursor.execute(sql, list(data.values())*2)
    return None

if __name__=='__main__':
    main()
  • 위의 코드가 실행이 완료되면 RDS로 돌아가서 related_artists 테이블에서 select * from related_artists 쿼리를 날리면 아래와 같이 유사도 정보 결과를 확인할 수 있다.

image

  • 아래 쿼리와 같이 날려서 가장 유사도가 비슷한 아티스트 정보를 뽑을 수 있다.
SELECT p1.name, p2.name, p1.url, p2.url, p2.distance FROM artists p1
JOIN (SELECT t1.name, t1.url, t2.y_artist, t2.distance FROM artists t1 JOIN related_artists t2 ON t2.artist_id = t1.id) p2 ON p2.y_artist = p1.id
WHERE distance != 0 ORDER BY p2.distance ASC LIMIT 3;

image