Lambda를 이용한 데이터 파이프라이닝 구축

2020-06-24

.

Data_Engineering_TIL(20200623)

[참고사항]

  • study program : Fastcampus Data Engineering 온라인

** URL : https://www.fastcampus.co.kr/data_online_engineering

  • ‘Athena에서 S3 데이터 테이블 생성&쿼리’를 이어서 참고할 것

  • URL : https://minman2115.github.io/DE_TIL94

[학습내용]

  • 아티스트가 새로 추가되었을때 다이나모디비에 탑트랙도 데이터를 확보하는 것을 람다로 구현하고자 한다.

  • 아티스트 데이터는 mysql 디비에 저장이 되어있고, 해당 아티스트에 대한 탑트랙 데이터만 다이나모 디비에 저장하려고 하는 것이다.

  • 챗봇을 통해서 새로운 아티스트 유저가 input을 했을때 그 유저가 input한 아티스트를 가지고 람다 안에서 search를 하게 된다. search해서 spotify artist가 매칭이 되면 그 lambda가 다른 lambda를 요청하는 그런 구조로 갈 것이다.

  • Lambda function을 하나 만들어준다. 제한시간은 1분, RAM은 1기가로 잡아준다.

  • 그리고 아래와 같이 top_tracks 폴더를 만든 다음에 pytion script를 하나 열어준다.

image

  • top_tracks 폴더 안에 아래 내용과 같이 lambda_function.py를 만들어준다.
import sys
sys.path.append('./libs')
import os
import boto3
import requests
import base64
import json
import logging

client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

try:
    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
    logging.error('could not connect to dynamodb')
    sys.exit(1)

def lambda_handler(event, context):
    headers = get_headers(client_id, client_secret)
    table = dynamodb.Table('pms-dynamodb-test')
    artist_id = event['artist_id']
    URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
    params = {'country': 'US'}
    r = requests.get(URL, params=params, headers=headers)
    raw = json.loads(r.text)
    for track in raw['tracks']:
        data = {'artist_id': artist_id}
        data.update(track)
        table.put_item(Item=data)
    return "SUCCESS"

def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
    headers = {"Authorization": "Basic {}".format(encoded)}
    payload = {"grant_type": "client_credentials"}
    r = requests.post(endpoint, data=payload, headers=headers)
    access_token = json.loads(r.text)['access_token']
    headers = {"Authorization": "Bearer {}".format(access_token)}
    return headers
  • 그리고 requirements.txt를 만들어준다. requirement.txt 내용은 아래와 같다.
requests
  • 그런 다음에 setup.cfg 파일을 하나 더 생성하고 내용은 아래와 같이 작성해서 저장한다.

(pip install시 홈이냐 프리픽스냐 하는 포인터 이슈로 오류가 발생하는 경우가 있어서 setup.cfg 파일을 만든것이다.)

[install]
prefix= 
  • pip3 install -r requirements.txt -t ./libs 명령으로 requirements.txt를 참조하여 top_tracks/libs/ 안에 request 파이썬 라이브러리를 설치한다.

  • 그러면 top_tracks 폴더 안에 lambda_function.py,libs/,requirements.txt가 있고, libs 폴더 안에는 아래와 같이 라이브러리 파일들이 인스톨된다.

$ ll
total 44
drwxr-xr-x 1 user 197121 0  6 23 14:54 bin/
drwxr-xr-x 1 user 197121 0  6 23 14:54 certifi/
drwxr-xr-x 1 user 197121 0  6 23 14:54 certifi-2020.6.20.dist-info/
drwxr-xr-x 1 user 197121 0  6 23 14:54 chardet/
drwxr-xr-x 1 user 197121 0  6 23 14:54 chardet-3.0.4.dist-info/
drwxr-xr-x 1 user 197121 0  6 23 14:54 idna/
drwxr-xr-x 1 user 197121 0  6 23 14:54 idna-2.9.dist-info/
drwxr-xr-x 1 user 197121 0  6 23 14:54 requests/
drwxr-xr-x 1 user 197121 0  6 23 14:54 requests-2.24.0.dist-info/
drwxr-xr-x 1 user 197121 0  6 23 14:54 urllib3/
drwxr-xr-x 1 user 197121 0  6 23 14:54 urllib3-1.25.9.dist-info/
  • 추가로 deploy.sh 파일을 만들어준다. 내용은 아래와 같다.
#!/bin/bash

rm -rf ./libs
pip3 install -r requirements.txt -t ./libs

rm *.zip
zip top_tracks.zip -r *

aws s3 rm s3://pms-bucket-test/top_tracks.zip
aws s3 cp ./top_tracks.zip s3://pms-bucket-test/top_tracks.zip
aws lambda update-function-code --function-name pms-lambda-test --s3-bucket pms-bucket-test --s3-key top_tracks.zip
  • 그 다음에 chmod +x deploy.sh 명령으로 실행권한을 부여한다.

  • 그런 다음에 ./deploy.sh를 실행하면, 해당 s3버킷에 top_tracks.zip 파일이 업로드 된 것을 확인할 수 있고, 아래와 같은 결과를 얻을 수 있다.

{
    "FunctionName": "pms-lambda-test",
    "FunctionArn": "arn:aws:lambda:ap-northeast-2:xxxxxxxxxxxxxxx:function:pms-lambda-test",
    "Runtime": "python3.7",
    "Role": "arn:aws:iam::xxxxxxxxxxxxxxx:role/service-role/pms-lambda-test-role-tlsk1p93",
    "Handler": "lambda_function.lambda_handler",
    "CodeSize": 971993,
    "Description": "",
    "Timeout": 60,
    "MemorySize": 1088,
    "LastModified": "2020-06-23T06:24:21.793+0000",
    "CodeSha256": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "State": "Active",
    "LastUpdateStatus": "Successful"
}
  • Lambda function으로 돌아가면 아래와 같이 코드가 업데이트 된 것을 확인할 수 있다.

image

  • 그리고 아래와 같이 마룬5 아티스트 아이디로 테스트를 진행해본다.

image

  • 테스트를 실시하면 Error가 날것이다. 그래서 클라우드와치로 들어가서 해당 Error 로그를 확인하면 다음과 같을것이다.

–> Error의 결론은 다이나모디비에 접근권한이 없다는 것이다.

[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the PutItem operation: User: arn:aws:sts::xxxxxxxxxx:assumed-role/pms-lambda-test-role-tlsk1p93/pms-lambda-test is not authorized to perform: dynamodb:PutItem on resource: arn:aws:dynamodb:ap-northeast-2:xxxxxxxxxxxxx:table/pms-dynamodb-test
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 30, in lambda_handler
    table.put_item(Item=data)
  File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
    response = action(self, *args, **kwargs)
  File "/var/runtime/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(*args, **params)
  File "/var/runtime/botocore/client.py", line 316, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 626, in _make_api_call
    raise error_class(parsed_response, operation_name)
  • 그래서 아래 그림과 같이 Lambda function의 해당 IAM 역할에서 dynamodb access가 가능한 정책을 추가해준다.

image

  • 그리고 다시 테스트하면 success 가 뜨면서 잘 돌아가는 것을 확인할 수 있다.

  • 추가로 아래 그림과 같이 특정시간에 람다가 돌아갈 수 있도록 트리거를 설정해줄 수 있는데, 이번 실습에서는 실제로 적용하지는 않는다.

image