Facebook 메신저 app을 이용한 Lambda chatbot 구현

2020-06-27

.

Data_Engineering_TIL(20200625)

[참고사항]

  • study program : Fastcampus Data Engineering 온라인

** URL : https://www.fastcampus.co.kr/data_online_engineering

  • ‘spotify 데이터 유사도 모델링’를 이어서 참고할 것

  • URL : https://minman2115.github.io/DE_TIL96

[학습내용]

  • Facebook api 활용을 위해서 facebook developer에서 app을 만들어야 한다.

step 1) https://developers.facebook.com/ 접속

step 2) 로그인

step 3) ‘MyApps –> Create APP 클릭’ 또는 ‘시작하기’ 클릭

step 4) add a product에서 messenger의 set up을 클릭

step 5) 액세스 토큰에서 create a page 클릭

step 6) 페이지 이름과 카테고리를 지정하고 create 한다.

step 7) 액세스 토큰화면에서 생성한 페이지를 아래그림과 같이 추가해준다.

(만들때 권한에 관련한 warning 메세지가 뜨는데 일단 무시하고 만들어준다)

image

  • 그리고 람다 하나를 만들어준다.

  • 그런 다음에 API GATEWAY 콘솔로 이동해서 아래 그림과 같이 API GATEWAY를 하나 생성하고, GET&POST 리소스를 등록해준다.

making apigateway1

making apigateway2

  • facebook developer에서 app으로 돌아와서 아래 그림과 같이 토큰을 발급받는다.

image

  • 그리고 만들었던 람다에 아래와 같이 코드를 작성해서 저장한다.
import sys
import logging

PAGE_TOKEN = "위에서 복사한 페이스북 앱 토큰"
VERIFY_TOKEN = "verify_123"

def lambda_handler(event, context):
    # event['params'] only exists for HTTPS GET
    if 'params' in event.keys():
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        return None
    
    return None
  • 그리고 아래 그림과 같이 API Gateway Endpoint를 facebook app에 등록하여 webhook을 연결해준다.

image

  • 그런 다음에 Lambda 코드를 업데이트 하기 위해 로컬pc에 아래와 같은 파일구조를 만들어준다.

(폴더 안에 파이썬 라이브러리를 저장할 libs 폴더와 람다 코드인 lambda_function.py 그리고 필요한 라이브러리를 정의한 requirements.txt를 만들어준다, 그림에서는 lambda_handler.py로 되어 있는데 lambda_function.py로 만들어준다.)

image

  • requirements.txt
requests
pymysql
  • 그 폴더에서 pip install -r requirements.txt -t ./libs 명령어를 실행하여 libs 폴더 안에 필요한 라이브러리를 설치한다.

  • 그리고 Lambda update 배포를 위한 쉘스크립트 파일(deploy.sh)도 아래와 같이 하나 만들어준다.

#!/bin/bash

rm *.zip
zip facebook.zip -r *
aws s3 rm s3://pms-bucket-test/facebook.zip
aws s3 cp ./facebook.zip s3://pms-bucket-test/facebook.zip
aws lambda update-function-code --function-name pms-lambda-test --s3-bucket pms-bucket-test --s3-key facebook.zip
  • 그리고 facebook.zip이 있는 폴더에서 ./deploy.sh를 실행하면 아래와 같은 결과를 얻을 수 있을 것이다. 그리고 해당 람다로 이동하면 람다코드가 잘 업데이트 된 것을 확인할 수 있다.
$ ./deploy.sh
delete: s3://pms-bucket-test/facebook.zip
upload: .\facebook.zip to s3://pms-bucket-test/facebook.zip
{
    "FunctionName": "pms-lambda-test",
    "FunctionArn": "arn:aws:lambda:ap-northeast-2:xxxxxxxxx:function:pms-lambda-test",
    "Runtime": "python3.7",
    "Role": "arn:aws:iam::xxxxxxxx:role/service-role/xxxxxxxxxxxxxxxxxxxxxxx",
    "Handler": "lambda_function.lambda_handler",
    "CodeSize": 1081627,
    "Description": "",
    "Timeout": 3,
    "MemorySize": 128,
    "LastModified": "2020-06-25T08:00:56.818+0000",
    "CodeSha256": "rxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "State": "Active",
    "LastUpdateStatus": "Successful"
}
  • 그리고 페이스북 앱으로 돌아가서 아래와 같이 생성한 페이지에 ‘받아보기’에 messages 기능을 추가해준다.

making apigateway4

  • 그 다음에 아래와 같이 코드를 수정하고 람다에 다시 배포해준다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"

def lambda_handler(event, context):

    # event['params'] only exists for HTTPS GET

    if 'params' in event.keys():

        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        logger.info(event)

    return None
  • 그런 다음에 페이스 북 앱으로 돌아가서 아래 그림과 같이 메세지 테스트를 해본다. 클라우드와치 로그를 확인해보면 람다로 메세지가 잘 전송되는 것을 확인할 수 있다.

making apigateway5

  • 그 다음에 아래와 같이 facebook bot 코드를 만들어준다.
#!/usr/bin/env python

import sys
sys.path.append("./libs")
import os
import requests
import base64
import json
import logging
from enum import Enum

DEFAULT_API_VERSION = 4.0

## messaging types: "RESPONSE", "UPDATE", "MESSAGE_TAG"

class NotificationType(Enum):
    regular = "REGULAR"
    silent_push = "SILENT_PUSH"
    no_push = "no_push"

class Bot:
    ## access_token은 facebook page 토큰이라고 생각하면된다.
    def __init__(self, access_token, **kwargs):

        self.access_token = access_token
        self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
        self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)

    @property
    def auth_args(self):
        if not hasattr(self, '_auth_args'):
            auth = {
                'access_token': self.access_token
            }
            self._auth_args = auth
        return self._auth_args

    def send_message(self, recipient_id, payload, notification_type, messaging_type, tag):

        payload['recipient'] = {
            'id': recipient_id
        }

        #payload['notification_type'] = notification_type
        payload['messaging_type'] = messaging_type

        if tag is not None:
            payload['tag'] = tag

        request_endpoint = '{0}/me/messages'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = payload
        )

        logging.info(payload)
        return response.json()

    def send_text(self, recipient_id, text, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message": {
                    "text": text
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_quick_replies(self, recipient_id, text, quick_replies, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message":{
                    "text": text,
                    "quick_replies": quick_replies
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_attachment(self, recipient_id, attachment_type, payload, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message": {
                    "attachment":{
                        "type": attachment_type,
                        "payload": payload
                    }
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_action(self, recipient_id, action, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "sender_action": action
            },
            notification_type,
            messaging_type,
            tag
        )

    def whitelist_domain(self, domain_list, domain_action_type):

        payload = {
            "setting_type": "domain_whitelisting",
            "whitelisted_domains": domain_list,
            "domain_action_type": domain_action_type
        }

        request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = payload
        )

        return response.json()

    def set_greeting(self, template):

        request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "setting_type": "greeting",
                "greeting": {
                    "text": template
                }
            }
        )

        return response

    def set_get_started(self, text):

        request_endpoint = '{0}/me/messenger_profile'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "get_started":{
                    "payload": text
                }
            }
        )

        return response

    def get_get_started(self):

        request_endpoint = '{0}/me/messenger_profile?fields=get_started'.format(self.graph_url)

        response = requests.get(
            request_endpoint,
            params = self.auth_args
        )

        return response

    def get_messenger_profile(self, field):

        request_endpoint = '{0}/me/messenger_profile?fields={1}'.format(self.graph_url, field)

        response = requests.get(
            request_endpoint,
            params = self.auth_args
        )

        return response


    def upload_attachment(self, url):

        request_endpoint = '{0}/me/message_attachments'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "message":{
                    "attachment":{
                        "type": "image",
                        "payload": {
                            "is_reusable": True,
                            "url": url
                        }
                    }
                }
            }
        )

        return response
  • 람다함수 코드도 아래와 같이 수정해서 bot코드까지 람다로 다시 배포해준다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"

host = "pms-rdstest-rxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "xxxxxxxxxxxxxxxxxxxxxx"
password = "xxxxxxxxxxxxxxxxxxxxx"

try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)

def lambda_handler(event, context):

    # event['params'] only exists for HTTPS GET

    if 'params' in event.keys():
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        messaging = event['entry'][0]['messaging'][0]
        user_id = messaging['sender']['id']

        logger.info(messaging)
        artist_name = messaging['message']['text']

        query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)

        cursor.execute(query)
        genres = []
        for (genre, ) in cursor.fetchall():
            genres.append(genre)

        text = "Here are genres of {}".format(artist_name)
        bot.send_text(user_id, text)
        bot.send_text(user_id, ', '.join(genres))

    return None
  • 그런 다음에 메신저로 샘플로 Drake 라는 가수를 입력하면 다음과 같은 결과화면을 확인할 수 있다.

image

  • 그 다음으로 람다함수 코드도 아래와 같이 수정해서 이미지와 이미지URL까지 결과화면에 출력할 수 있도록 해본다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"

host = "pms-rdstest-rxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "xxxxxxxxxxxxxxxxxxxxxx"
password = "xxxxxxxxxxxxxxxxxxxxx"

try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)

def lambda_handler(event, context):

    # event['params'] only exists for HTTPS GET

    if 'params' in event.keys():
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        messaging = event['entry'][0]['messaging'][0]
        user_id = messaging['sender']['id']

        logger.info(messaging)
        artist_name = messaging['message']['text']

        query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)

        cursor.execute(query)
        genres = []
        for (genre, ) in cursor.fetchall():
            genres.append(genre)

        text = "Here are genres of {}".format(artist_name)
        bot.send_text(user_id, text)
        bot.send_text(user_id, ', '.join(genres))

        query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
        cursor.execute(query)
        image_url, url = cursor.fetchall()[0]

        payload = {
            'template_type': 'generic',
            'elements': [
                {
                    'title': "Artist Info: '{}'".format(artist_name),
                    'image_url': image_url,
                    'subtitle': 'information',
                    'default_action': {
                        'type': 'web_url',
                        'url': url,
                        'webview_height_ratio': 'full'
                    }
                }
            ]
        }

        bot.send_attachment(user_id, "template", payload)


    return None
  • 그런 다음에 메신저로 샘플로 Drake 라는 가수를 입력하면 다음과 같은 결과화면을 확인할 수 있다.

image

  • 그러면 아티스트가 RDS에 없거나 오타를 입력했을때 처리하는 트리거 기능을 추가하는 코드를 람다 코드에 아래와 같이 수정해본다.
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)

def lambda_handler(event, context):

    # event['params'] only exists for HTTPS GET

    if 'params' in event.keys():
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        messaging = event['entry'][0]['messaging'][0]
        user_id = messaging['sender']['id']

        logger.info(messaging)
        artist_name = messaging['message']['text']

        query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
        cursor.execute(query)
        raw = cursor.fetchall()
        if len(raw) == 0:
            text = search_artist(cursor, artist_name)
            bot.send_text(user_id, text)
            sys.exit(0)

        image_url, url = raw[0]

        payload = {
            'template_type': 'generic',
            'elements': [
                {
                    'title': "Artist Info: '{}'".format(artist_name),
                    'image_url': image_url,
                    'subtitle': 'information',
                    'default_action': {
                        'type': 'web_url',
                        'url': url,
                        'webview_height_ratio': 'full'
                    }
                }
            ]
        }

        bot.send_attachment(user_id, "template", payload)

        query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)

        cursor.execute(query)
        genres = []
        for (genre, ) in cursor.fetchall():
            genres.append(genre)

        text = "Here are genres of {}".format(artist_name)
        bot.send_text(user_id, text)
        bot.send_text(user_id, ', '.join(genres))

        ## 만약에 아티스트가 없을때에는 아티스트 추가

        ## spotify API hit --> Artist Search

        ## One secend

        ## 오타 및 아티스트가 아닐경우

    return None

def search_artist(cursor, artist_name):
    headers = get_headers(client_id, client_secret)

    params = {
        "q": artist_name,
        "type": "artist",
        "limit": "1"
    }

    r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)

    raw = json.loads(r.text)

    if raw['artists']['items'] == []:
        return "Could not find artist. Try again."

    artist = {}
    artist_raw = raw['artists']['items'][0]
    if artist_raw['name'] == params['q']:
        artist.update(
            {
                'id': artist_raw['id'],
                'name': artist_raw['name'],
                'followers': artist_raw['followers']['total'],
                'popularity': artist_raw['popularity'],
                'url': artist_raw['external_urls']['spotify'],
                'image_url': artist_raw['images'][0]['url']
            }
        )

        for i in artist_raw['genres']:
            if len(artist_raw['genres']) != 0:
                insert_row(cursor, {'artist_id':artist_raw['id'], 'genre' : i},'artist_genres')

        insert_row(cursor, artist, 'pmstest.artists')
        conn.commit()

        return "we add artist. please try again in a second"

    return "Could not find artist. Try again."

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers

def insert_row(cursor, data, table):
    placeholders = ', '.join(['%s'] * len(data)) ## %s,%s,%s,%s,%s ...
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
    ## sql은 아래와 같은 형태가 될 것이다.
    ## INSERT INTO artists ( id, name, followers, popularity, url, image_url ) VALUES ( %s, %s, %s, %s, %s, %s )
    ## ON DUPLICATE KEY UPDATE id=%s, name=%s, followers=%s, popularity=%s, url=%s, image_url=%s
    cursor.execute(sql, list(data.values())*2)
    return None
  • 그리고 아래 그림과 같이 현재 RDS에 없는 BTS를 검색하는 테스트를 하면 우리가 의도한대로 작동하는 것을 알 수 있다. 아티스트 목록에 없는 전혀 엉뚱한 이름으로 검색했을때도 아래와 같이 예외처리가 잘 된것을 확인할 수 있다.

image

  • 다음에 추가할 것이 뭐냐면 Top track 정보를 우리가 다이나모 디비에 저장을 했고, 이 저장한 데이터를 가져오는 경우가 있는데 만약에 새롭게 아티스트가 추가되면 다이나모 디비에 저장을 해야한다. 이 추가하려고 하는 기능을 다른 람다로 만들어서 이 기능을 트리거링 하는 것을 추가해보자

  • 그래서 아래 코드를 이용해서 pms-lambda-test,pms-lambda-test-02 람다를 만들어준다.

[pms-lambda-test]

import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
VERIFY_TOKEN = "verify_123"

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)

def lambda_handler(event, context):

    # event['params'] only exists for HTTPS GET

    if 'params' in event.keys():
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
        messaging = event['entry'][0]['messaging'][0]
        user_id = messaging['sender']['id']

        logger.info(messaging)
        artist_name = messaging['message']['text']

        query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
        cursor.execute(query)
        raw = cursor.fetchall()
        if len(raw) == 0:
            text = search_artist(cursor, artist_name)
            bot.send_text(user_id, text)
            sys.exit(0)

        image_url, url = raw[0]

        payload = {
            'template_type': 'generic',
            'elements': [
                {
                    'title': "Artist Info: '{}'".format(artist_name),
                    'image_url': image_url,
                    'subtitle': 'information',
                    'default_action': {
                        'type': 'web_url',
                        'url': url,
                        'webview_height_ratio': 'full'
                    }
                }
            ]
        }

        bot.send_attachment(user_id, "template", payload)

        query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)

        cursor.execute(query)
        genres = []
        for (genre, ) in cursor.fetchall():
            genres.append(genre)

        text = "Here are genres of {}".format(artist_name)
        bot.send_text(user_id, text)
        bot.send_text(user_id, ', '.join(genres))

        ## 만약에 아티스트가 없을때에는 아티스트 추가

        ## spotify API hit --> Artist Search

        ## One secend

        ## 오타 및 아티스트가 아닐경우

    return None


def invoke_lambda(fxn_name, payload, invocation_type='Event'):

    lambda_client = boto3.client('lambda')

    invoke_response = lambda_client.invoke(
        FunctionName = fxn_name,
        InvocationType = invocation_type,
        Payload = json.dumps(payload)
    )

    if invoke_response['StatusCode'] not in [200, 202, 204]:
        logging.error("ERROR: Invoking lmabda function: '{0}' failed".format(fxn_name))


    return invoke_response


def search_artist(cursor, artist_name):
    headers = get_headers(client_id, client_secret)

    params = {
        "q": artist_name,
        "type": "artist",
        "limit": "1"
    }

    r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)

    raw = json.loads(r.text)

    if raw['artists']['items'] == []:
        return "Could not find artist. Try again."

    artist = {}
    artist_raw = raw['artists']['items'][0]
    if artist_raw['name'] == params['q']:
        artist.update(
            {
                'id': artist_raw['id'],
                'name': artist_raw['name'],
                'followers': artist_raw['followers']['total'],
                'popularity': artist_raw['popularity'],
                'url': artist_raw['external_urls']['spotify'],
                'image_url': artist_raw['images'][0]['url']
            }
        )

        for i in artist_raw['genres']:
            if len(artist_raw['genres']) != 0:
                insert_row(cursor, {'artist_id':artist_raw['id'], 'genre' : i},'artist_genres')

        insert_row(cursor, artist, 'pmstest.artists')
        conn.commit()
        r = invoke_lambda('pms-lambda-test-02', payload={'artist_id': artist_raw['id']})
        print(r)

        return "we add artist. please try again in a second"

    return "Could not find artist. Try again."


def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers


def insert_row(cursor, data, table):
    placeholders = ', '.join(['%s'] * len(data)) ## %s,%s,%s,%s,%s ...
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
    ## sql은 아래와 같은 형태가 될 것이다.
    ## INSERT INTO artists ( id, name, followers, popularity, url, image_url ) VALUES ( %s, %s, %s, %s, %s, %s )
    ## ON DUPLICATE KEY UPDATE id=%s, name=%s, followers=%s, popularity=%s, url=%s, image_url=%s
    cursor.execute(sql, list(data.values())*2)
    return None

[pms-lambda-test-02]

import sys
sys.path.append('./libs')
import os
import boto3
import requests
import base64
import json
import logging
import pymysql

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

host = "pms-rdstest-rds.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-2.rds.amazonaws.com"
port = 3306
username = "admin"
database = "pmstest"
password = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

try:
    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
    logging.error('could not connect to dynamodb')
    sys.exit(1)

def lambda_handler(event, context):
    headers = get_headers(client_id, client_secret)
    table = dynamodb.Table('pms-dynamodb-test')
    artist_id = event['artist_id']
    URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
    params = {'country': 'US'}
    r = requests.get(URL, params=params, headers=headers)
    raw = json.loads(r.text)
    for track in raw['tracks']:
        data = {'artist_id': artist_id}
        data.update(track)
        table.put_item(Item=data)
    return "SUCCESS"

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers
  • 위와 같이 람다 두개를 만들어주고 아래와 같이 페이스북 앱에서 빅뱅을 검색해보는 테스트를 해본다. RDS에 들어가서 빅뱅 데이터가 잘 저장되어 있는지 확인하고 빅뱅의 artist_id를 따와서 다이나모 디비에도 잘 저장이 되어 있는지 확인해본다.

image