Facebook 메신저 app을 이용한 Lambda chatbot 구현
.
Data_Engineering_TIL(20200625)
[참고사항]
- study program : Fastcampus Data Engineering 온라인
** URL : https://www.fastcampus.co.kr/data_online_engineering
-
‘spotify 데이터 유사도 모델링’를 이어서 참고할 것
-
URL : https://minman2115.github.io/DE_TIL96
[학습내용]
- Facebook api 활용을 위해서 facebook developer에서 app을 만들어야 한다.
step 1) https://developers.facebook.com/ 접속
step 2) 로그인
step 3) ‘MyApps –> Create APP 클릭’ 또는 ‘시작하기’ 클릭
step 4) add a product에서 messenger의 set up을 클릭
step 5) 액세스 토큰에서 create a page 클릭
step 6) 페이지 이름과 카테고리를 지정하고 create 한다.
step 7) 액세스 토큰화면에서 생성한 페이지를 아래그림과 같이 추가해준다.
(만들때 권한에 관련한 warning 메세지가 뜨는데 일단 무시하고 만들어준다)
-
그리고 람다 하나를 만들어준다.
-
그런 다음에 API GATEWAY 콘솔로 이동해서 아래 그림과 같이 API GATEWAY를 하나 생성하고, GET&POST 리소스를 등록해준다.
- facebook developer에서 app으로 돌아와서 아래 그림과 같이 토큰을 발급받는다.
- 그리고 만들었던 람다에 아래와 같이 코드를 작성해서 저장한다.
import sys
import logging
PAGE_TOKEN = "위에서 복사한 페이스북 앱 토큰"
VERIFY_TOKEN = "verify_123"
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
return None
return None
- 그리고 아래 그림과 같이 API Gateway Endpoint를 facebook app에 등록하여 webhook을 연결해준다.
- 그런 다음에 Lambda 코드를 업데이트 하기 위해 로컬pc에 아래와 같은 파일구조를 만들어준다.
(폴더 안에 파이썬 라이브러리를 저장할 libs 폴더와 람다 코드인 lambda_function.py 그리고 필요한 라이브러리를 정의한 requirements.txt를 만들어준다, 그림에서는 lambda_handler.py로 되어 있는데 lambda_function.py로 만들어준다.)
- requirements.txt
requests
pymysql
-
그 폴더에서
pip install -r requirements.txt -t ./libs
명령어를 실행하여 libs 폴더 안에 필요한 라이브러리를 설치한다. -
그리고 Lambda update 배포를 위한 쉘스크립트 파일(deploy.sh)도 아래와 같이 하나 만들어준다.
#!/bin/bash
rm *.zip
zip facebook.zip -r *
aws s3 rm s3://pms-bucket-test/facebook.zip
aws s3 cp ./facebook.zip s3://pms-bucket-test/facebook.zip
aws lambda update-function-code --function-name pms-lambda-test --s3-bucket pms-bucket-test --s3-key facebook.zip
- 그리고 facebook.zip이 있는 폴더에서
./deploy.sh
를 실행하면 아래와 같은 결과를 얻을 수 있을 것이다. 그리고 해당 람다로 이동하면 람다코드가 잘 업데이트 된 것을 확인할 수 있다.
$ ./deploy.sh
delete: s3://pms-bucket-test/facebook.zip
upload: .\facebook.zip to s3://pms-bucket-test/facebook.zip
{
"FunctionName": "pms-lambda-test",
"FunctionArn": "arn:aws:lambda:ap-northeast-2:xxxxxxxxx:function:pms-lambda-test",
"Runtime": "python3.7",
"Role": "arn:aws:iam::xxxxxxxx:role/service-role/xxxxxxxxxxxxxxxxxxxxxxx",
"Handler": "lambda_function.lambda_handler",
"CodeSize": 1081627,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2020-06-25T08:00:56.818+0000",
"CodeSha256": "rxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"State": "Active",
"LastUpdateStatus": "Successful"
}
- 그리고 페이스북 앱으로 돌아가서 아래와 같이 생성한 페이지에 ‘받아보기’에 messages 기능을 추가해준다.
- 그 다음에 아래와 같이 코드를 수정하고 람다에 다시 배포해준다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
logger.info(event)
return None
- 그런 다음에 페이스 북 앱으로 돌아가서 아래 그림과 같이 메세지 테스트를 해본다. 클라우드와치 로그를 확인해보면 람다로 메세지가 잘 전송되는 것을 확인할 수 있다.
- 그 다음에 아래와 같이 facebook bot 코드를 만들어준다.
#!/usr/bin/env python
import sys
sys.path.append("./libs")
import os
import requests
import base64
import json
import logging
from enum import Enum
DEFAULT_API_VERSION = 4.0
## messaging types: "RESPONSE", "UPDATE", "MESSAGE_TAG"
class NotificationType(Enum):
regular = "REGULAR"
silent_push = "SILENT_PUSH"
no_push = "no_push"
class Bot:
## access_token은 facebook page 토큰이라고 생각하면된다.
def __init__(self, access_token, **kwargs):
self.access_token = access_token
self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)
@property
def auth_args(self):
if not hasattr(self, '_auth_args'):
auth = {
'access_token': self.access_token
}
self._auth_args = auth
return self._auth_args
def send_message(self, recipient_id, payload, notification_type, messaging_type, tag):
payload['recipient'] = {
'id': recipient_id
}
#payload['notification_type'] = notification_type
payload['messaging_type'] = messaging_type
if tag is not None:
payload['tag'] = tag
request_endpoint = '{0}/me/messages'.format(self.graph_url)
response = requests.post(
request_endpoint,
params = self.auth_args,
json = payload
)
logging.info(payload)
return response.json()
def send_text(self, recipient_id, text, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):
return self.send_message(
recipient_id,
{
"message": {
"text": text
}
},
notification_type,
messaging_type,
tag
)
def send_quick_replies(self, recipient_id, text, quick_replies, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):
return self.send_message(
recipient_id,
{
"message":{
"text": text,
"quick_replies": quick_replies
}
},
notification_type,
messaging_type,
tag
)
def send_attachment(self, recipient_id, attachment_type, payload, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):
return self.send_message(
recipient_id,
{
"message": {
"attachment":{
"type": attachment_type,
"payload": payload
}
}
},
notification_type,
messaging_type,
tag
)
def send_action(self, recipient_id, action, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):
return self.send_message(
recipient_id,
{
"sender_action": action
},
notification_type,
messaging_type,
tag
)
def whitelist_domain(self, domain_list, domain_action_type):
payload = {
"setting_type": "domain_whitelisting",
"whitelisted_domains": domain_list,
"domain_action_type": domain_action_type
}
request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)
response = requests.post(
request_endpoint,
params = self.auth_args,
json = payload
)
return response.json()
def set_greeting(self, template):
request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)
response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"setting_type": "greeting",
"greeting": {
"text": template
}
}
)
return response
def set_get_started(self, text):
request_endpoint = '{0}/me/messenger_profile'.format(self.graph_url)
response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"get_started":{
"payload": text
}
}
)
return response
def get_get_started(self):
request_endpoint = '{0}/me/messenger_profile?fields=get_started'.format(self.graph_url)
response = requests.get(
request_endpoint,
params = self.auth_args
)
return response
def get_messenger_profile(self, field):
request_endpoint = '{0}/me/messenger_profile?fields={1}'.format(self.graph_url, field)
response = requests.get(
request_endpoint,
params = self.auth_args
)
return response
def upload_attachment(self, url):
request_endpoint = '{0}/me/message_attachments'.format(self.graph_url)
response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"message":{
"attachment":{
"type": "image",
"payload": {
"is_reusable": True,
"url": url
}
}
}
}
)
return response
- 람다함수 코드도 아래와 같이 수정해서 bot코드까지 람다로 다시 배포해준다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"
host = "pms-rdstest-rxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "xxxxxxxxxxxxxxxxxxxxxx"
password = "xxxxxxxxxxxxxxxxxxxxx"
try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)
bot = fb_bot.Bot(PAGE_TOKEN)
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
messaging = event['entry'][0]['messaging'][0]
user_id = messaging['sender']['id']
logger.info(messaging)
artist_name = messaging['message']['text']
query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)
cursor.execute(query)
genres = []
for (genre, ) in cursor.fetchall():
genres.append(genre)
text = "Here are genres of {}".format(artist_name)
bot.send_text(user_id, text)
bot.send_text(user_id, ', '.join(genres))
return None
- 그런 다음에 메신저로 샘플로 Drake 라는 가수를 입력하면 다음과 같은 결과화면을 확인할 수 있다.
- 그 다음으로 람다함수 코드도 아래와 같이 수정해서 이미지와 이미지URL까지 결과화면에 출력할 수 있도록 해본다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"
host = "pms-rdstest-rxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "xxxxxxxxxxxxxxxxxxxxxx"
password = "xxxxxxxxxxxxxxxxxxxxx"
try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)
bot = fb_bot.Bot(PAGE_TOKEN)
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
messaging = event['entry'][0]['messaging'][0]
user_id = messaging['sender']['id']
logger.info(messaging)
artist_name = messaging['message']['text']
query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)
cursor.execute(query)
genres = []
for (genre, ) in cursor.fetchall():
genres.append(genre)
text = "Here are genres of {}".format(artist_name)
bot.send_text(user_id, text)
bot.send_text(user_id, ', '.join(genres))
query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
cursor.execute(query)
image_url, url = cursor.fetchall()[0]
payload = {
'template_type': 'generic',
'elements': [
{
'title': "Artist Info: '{}'".format(artist_name),
'image_url': image_url,
'subtitle': 'information',
'default_action': {
'type': 'web_url',
'url': url,
'webview_height_ratio': 'full'
}
}
]
}
bot.send_attachment(user_id, "template", payload)
return None
- 그런 다음에 메신저로 샘플로 Drake 라는 가수를 입력하면 다음과 같은 결과화면을 확인할 수 있다.
- 그러면 아티스트가 RDS에 없거나 오타를 입력했을때 처리하는 트리거 기능을 추가하는 코드를 람다 코드에 아래와 같이 수정해본다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"
client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)
bot = fb_bot.Bot(PAGE_TOKEN)
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
messaging = event['entry'][0]['messaging'][0]
user_id = messaging['sender']['id']
logger.info(messaging)
artist_name = messaging['message']['text']
query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
cursor.execute(query)
raw = cursor.fetchall()
if len(raw) == 0:
text = search_artist(cursor, artist_name)
bot.send_text(user_id, text)
sys.exit(0)
image_url, url = raw[0]
payload = {
'template_type': 'generic',
'elements': [
{
'title': "Artist Info: '{}'".format(artist_name),
'image_url': image_url,
'subtitle': 'information',
'default_action': {
'type': 'web_url',
'url': url,
'webview_height_ratio': 'full'
}
}
]
}
bot.send_attachment(user_id, "template", payload)
query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)
cursor.execute(query)
genres = []
for (genre, ) in cursor.fetchall():
genres.append(genre)
text = "Here are genres of {}".format(artist_name)
bot.send_text(user_id, text)
bot.send_text(user_id, ', '.join(genres))
## 만약에 아티스트가 없을때에는 아티스트 추가
## spotify API hit --> Artist Search
## One secend
## 오타 및 아티스트가 아닐경우
return None
def search_artist(cursor, artist_name):
headers = get_headers(client_id, client_secret)
params = {
"q": artist_name,
"type": "artist",
"limit": "1"
}
r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)
raw = json.loads(r.text)
if raw['artists']['items'] == []:
return "Could not find artist. Try again."
artist = {}
artist_raw = raw['artists']['items'][0]
if artist_raw['name'] == params['q']:
artist.update(
{
'id': artist_raw['id'],
'name': artist_raw['name'],
'followers': artist_raw['followers']['total'],
'popularity': artist_raw['popularity'],
'url': artist_raw['external_urls']['spotify'],
'image_url': artist_raw['images'][0]['url']
}
)
for i in artist_raw['genres']:
if len(artist_raw['genres']) != 0:
insert_row(cursor, {'artist_id':artist_raw['id'], 'genre' : i},'artist_genres')
insert_row(cursor, artist, 'pmstest.artists')
conn.commit()
return "we add artist. please try again in a second"
return "Could not find artist. Try again."
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {"Authorization": "Basic {}".format(encoded)}
payload = {"grant_type": "client_credentials"}
r = requests.post(endpoint, data=payload, headers=headers)
access_token = json.loads(r.text)['access_token']
headers = {"Authorization": "Bearer {}".format(access_token)}
return headers
def insert_row(cursor, data, table):
placeholders = ', '.join(['%s'] * len(data)) ## %s,%s,%s,%s,%s ...
columns = ', '.join(data.keys())
key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
## sql은 아래와 같은 형태가 될 것이다.
## INSERT INTO artists ( id, name, followers, popularity, url, image_url ) VALUES ( %s, %s, %s, %s, %s, %s )
## ON DUPLICATE KEY UPDATE id=%s, name=%s, followers=%s, popularity=%s, url=%s, image_url=%s
cursor.execute(sql, list(data.values())*2)
return None
- 그리고 아래 그림과 같이 현재 RDS에 없는 BTS를 검색하는 테스트를 하면 우리가 의도한대로 작동하는 것을 알 수 있다. 아티스트 목록에 없는 전혀 엉뚱한 이름으로 검색했을때도 아래와 같이 예외처리가 잘 된것을 확인할 수 있다.
-
다음에 추가할 것이 뭐냐면 Top track 정보를 우리가 다이나모 디비에 저장을 했고, 이 저장한 데이터를 가져오는 경우가 있는데 만약에 새롭게 아티스트가 추가되면 다이나모 디비에 저장을 해야한다. 이 추가하려고 하는 기능을 다른 람다로 만들어서 이 기능을 트리거링 하는 것을 추가해보자
-
그래서 아래 코드를 이용해서 pms-lambda-test,pms-lambda-test-02 람다를 만들어준다.
[pms-lambda-test]
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"
client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)
bot = fb_bot.Bot(PAGE_TOKEN)
def lambda_handler(event, context):
# event['params'] only exists for HTTPS GET
if 'params' in event.keys():
if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
messaging = event['entry'][0]['messaging'][0]
user_id = messaging['sender']['id']
logger.info(messaging)
artist_name = messaging['message']['text']
query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
cursor.execute(query)
raw = cursor.fetchall()
if len(raw) == 0:
text = search_artist(cursor, artist_name)
bot.send_text(user_id, text)
sys.exit(0)
image_url, url = raw[0]
payload = {
'template_type': 'generic',
'elements': [
{
'title': "Artist Info: '{}'".format(artist_name),
'image_url': image_url,
'subtitle': 'information',
'default_action': {
'type': 'web_url',
'url': url,
'webview_height_ratio': 'full'
}
}
]
}
bot.send_attachment(user_id, "template", payload)
query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)
cursor.execute(query)
genres = []
for (genre, ) in cursor.fetchall():
genres.append(genre)
text = "Here are genres of {}".format(artist_name)
bot.send_text(user_id, text)
bot.send_text(user_id, ', '.join(genres))
## 만약에 아티스트가 없을때에는 아티스트 추가
## spotify API hit --> Artist Search
## One secend
## 오타 및 아티스트가 아닐경우
return None
def invoke_lambda(fxn_name, payload, invocation_type='Event'):
lambda_client = boto3.client('lambda')
invoke_response = lambda_client.invoke(
FunctionName = fxn_name,
InvocationType = invocation_type,
Payload = json.dumps(payload)
)
if invoke_response['StatusCode'] not in [200, 202, 204]:
logging.error("ERROR: Invoking lmabda function: '{0}' failed".format(fxn_name))
return invoke_response
def search_artist(cursor, artist_name):
headers = get_headers(client_id, client_secret)
params = {
"q": artist_name,
"type": "artist",
"limit": "1"
}
r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)
raw = json.loads(r.text)
if raw['artists']['items'] == []:
return "Could not find artist. Try again."
artist = {}
artist_raw = raw['artists']['items'][0]
if artist_raw['name'] == params['q']:
artist.update(
{
'id': artist_raw['id'],
'name': artist_raw['name'],
'followers': artist_raw['followers']['total'],
'popularity': artist_raw['popularity'],
'url': artist_raw['external_urls']['spotify'],
'image_url': artist_raw['images'][0]['url']
}
)
for i in artist_raw['genres']:
if len(artist_raw['genres']) != 0:
insert_row(cursor, {'artist_id':artist_raw['id'], 'genre' : i},'artist_genres')
insert_row(cursor, artist, 'pmstest.artists')
conn.commit()
r = invoke_lambda('pms-lambda-test-02', payload={'artist_id': artist_raw['id']})
print(r)
return "we add artist. please try again in a second"
return "Could not find artist. Try again."
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {"Authorization": "Basic {}".format(encoded)}
payload = {"grant_type": "client_credentials"}
r = requests.post(endpoint, data=payload, headers=headers)
access_token = json.loads(r.text)['access_token']
headers = {"Authorization": "Bearer {}".format(access_token)}
return headers
def insert_row(cursor, data, table):
placeholders = ', '.join(['%s'] * len(data)) ## %s,%s,%s,%s,%s ...
columns = ', '.join(data.keys())
key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
## sql은 아래와 같은 형태가 될 것이다.
## INSERT INTO artists ( id, name, followers, popularity, url, image_url ) VALUES ( %s, %s, %s, %s, %s, %s )
## ON DUPLICATE KEY UPDATE id=%s, name=%s, followers=%s, popularity=%s, url=%s, image_url=%s
cursor.execute(sql, list(data.values())*2)
return None
[pms-lambda-test-02]
import sys
sys.path.append('./libs')
import os
import boto3
import requests
import base64
import json
import logging
import pymysql
client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)
def lambda_handler(event, context):
headers = get_headers(client_id, client_secret)
table = dynamodb.Table('pms-dynamodb-test')
artist_id = event['artist_id']
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
params = {'country': 'US'}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
for track in raw['tracks']:
data = {'artist_id': artist_id}
data.update(track)
table.put_item(Item=data)
return "SUCCESS"
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {"Authorization": "Basic {}".format(encoded)}
payload = {"grant_type": "client_credentials"}
r = requests.post(endpoint, data=payload, headers=headers)
access_token = json.loads(r.text)['access_token']
headers = {"Authorization": "Bearer {}".format(access_token)}
return headers
- 위와 같이 람다 두개를 만들어주고 아래와 같이 페이스북 앱에서 빅뱅을 검색해보는 테스트를 해본다. RDS에 들어가서 빅뱅 데이터가 잘 저장되어 있는지 확인하고 빅뱅의 artist_id를 따와서 다이나모 디비에도 잘 저장이 되어 있는지 확인해본다.