Lambda를 이용한 데이터 파이프라이닝 구축
.
Data_Engineering_TIL(20200623)
[참고사항]
- study program : Fastcampus Data Engineering 온라인
** URL : https://www.fastcampus.co.kr/data_online_engineering
-
‘Athena에서 S3 데이터 테이블 생성&쿼리’를 이어서 참고할 것
-
URL : https://minman2115.github.io/DE_TIL94
[학습내용]
-
아티스트가 새로 추가되었을때 다이나모디비에 탑트랙도 데이터를 확보하는 것을 람다로 구현하고자 한다.
-
아티스트 데이터는 mysql 디비에 저장이 되어있고, 해당 아티스트에 대한 탑트랙 데이터만 다이나모 디비에 저장하려고 하는 것이다.
-
챗봇을 통해서 새로운 아티스트 유저가 input을 했을때 그 유저가 input한 아티스트를 가지고 람다 안에서 search를 하게 된다. search해서 spotify artist가 매칭이 되면 그 lambda가 다른 lambda를 요청하는 그런 구조로 갈 것이다.
-
Lambda function을 하나 만들어준다. 제한시간은 1분, RAM은 1기가로 잡아준다.
-
그리고 아래와 같이 top_tracks 폴더를 만든 다음에 pytion script를 하나 열어준다.
- top_tracks 폴더 안에 아래 내용과 같이 lambda_function.py를 만들어준다.
import sys
sys.path.append('./libs')
import os
import boto3
import requests
import base64
import json
import logging
client_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)
def lambda_handler(event, context):
headers = get_headers(client_id, client_secret)
table = dynamodb.Table('pms-dynamodb-test')
artist_id = event['artist_id']
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
params = {'country': 'US'}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
for track in raw['tracks']:
data = {'artist_id': artist_id}
data.update(track)
table.put_item(Item=data)
return "SUCCESS"
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {"Authorization": "Basic {}".format(encoded)}
payload = {"grant_type": "client_credentials"}
r = requests.post(endpoint, data=payload, headers=headers)
access_token = json.loads(r.text)['access_token']
headers = {"Authorization": "Bearer {}".format(access_token)}
return headers
- 그리고 requirements.txt를 만들어준다. requirement.txt 내용은 아래와 같다.
requests
- 그런 다음에 setup.cfg 파일을 하나 더 생성하고 내용은 아래와 같이 작성해서 저장한다.
(pip install시 홈이냐 프리픽스냐 하는 포인터 이슈로 오류가 발생하는 경우가 있어서 setup.cfg 파일을 만든것이다.)
[install]
prefix=
-
pip3 install -r requirements.txt -t ./libs
명령으로 requirements.txt를 참조하여 top_tracks/libs/ 안에 request 파이썬 라이브러리를 설치한다. -
그러면 top_tracks 폴더 안에 lambda_function.py,libs/,requirements.txt가 있고, libs 폴더 안에는 아래와 같이 라이브러리 파일들이 인스톨된다.
$ ll
total 44
drwxr-xr-x 1 user 197121 0 6월 23 14:54 bin/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 certifi/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 certifi-2020.6.20.dist-info/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 chardet/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 chardet-3.0.4.dist-info/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 idna/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 idna-2.9.dist-info/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 requests/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 requests-2.24.0.dist-info/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 urllib3/
drwxr-xr-x 1 user 197121 0 6월 23 14:54 urllib3-1.25.9.dist-info/
- 추가로 deploy.sh 파일을 만들어준다. 내용은 아래와 같다.
#!/bin/bash
rm -rf ./libs
pip3 install -r requirements.txt -t ./libs
rm *.zip
zip top_tracks.zip -r *
aws s3 rm s3://pms-bucket-test/top_tracks.zip
aws s3 cp ./top_tracks.zip s3://pms-bucket-test/top_tracks.zip
aws lambda update-function-code --function-name pms-lambda-test --s3-bucket pms-bucket-test --s3-key top_tracks.zip
-
그 다음에
chmod +x deploy.sh
명령으로 실행권한을 부여한다. -
그런 다음에
./deploy.sh
를 실행하면, 해당 s3버킷에 top_tracks.zip 파일이 업로드 된 것을 확인할 수 있고, 아래와 같은 결과를 얻을 수 있다.
{
"FunctionName": "pms-lambda-test",
"FunctionArn": "arn:aws:lambda:ap-northeast-2:xxxxxxxxxxxxxxx:function:pms-lambda-test",
"Runtime": "python3.7",
"Role": "arn:aws:iam::xxxxxxxxxxxxxxx:role/service-role/pms-lambda-test-role-tlsk1p93",
"Handler": "lambda_function.lambda_handler",
"CodeSize": 971993,
"Description": "",
"Timeout": 60,
"MemorySize": 1088,
"LastModified": "2020-06-23T06:24:21.793+0000",
"CodeSha256": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"State": "Active",
"LastUpdateStatus": "Successful"
}
- Lambda function으로 돌아가면 아래와 같이 코드가 업데이트 된 것을 확인할 수 있다.
- 그리고 아래와 같이 마룬5 아티스트 아이디로 테스트를 진행해본다.
- 테스트를 실시하면 Error가 날것이다. 그래서 클라우드와치로 들어가서 해당 Error 로그를 확인하면 다음과 같을것이다.
–> Error의 결론은 다이나모디비에 접근권한이 없다는 것이다.
[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the PutItem operation: User: arn:aws:sts::xxxxxxxxxx:assumed-role/pms-lambda-test-role-tlsk1p93/pms-lambda-test is not authorized to perform: dynamodb:PutItem on resource: arn:aws:dynamodb:ap-northeast-2:xxxxxxxxxxxxx:table/pms-dynamodb-test
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 30, in lambda_handler
table.put_item(Item=data)
File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
response = action(self, *args, **kwargs)
File "/var/runtime/boto3/resources/action.py", line 83, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/var/runtime/botocore/client.py", line 316, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 626, in _make_api_call
raise error_class(parsed_response, operation_name)
- 그래서 아래 그림과 같이 Lambda function의 해당 IAM 역할에서 dynamodb access가 가능한 정책을 추가해준다.
-
그리고 다시 테스트하면 success 가 뜨면서 잘 돌아가는 것을 확인할 수 있다.
-
추가로 아래 그림과 같이 특정시간에 람다가 돌아갈 수 있도록 트리거를 설정해줄 수 있는데, 이번 실습에서는 실제로 적용하지는 않는다.