Lambda 함수를 이용한 S3 bucket간 실시간 data sync 구현

2020-05-17

.

1. 개요

Source S3 bucket에 객체가 업로드될때 Lambda에서 이를 감지하고 해당객체를 Destination S3 bucket(타계정)에 copy해주는 아키텍처 구현

(source bucket의 prefix 라는 이름의 폴더 내에 어떤 객체가 업로드되면, Destination bucket의 bdp 라는 이름의 폴더경로로 해당 객체를 copy)

2. 구현순서

STEP 1) Destination s3 bucket(타계정)에 Source bucket에서 접근 할 수 있도록 권한부여

STEP 2) Lambda 함수 구현

2-1) Event trigger 설정

2-2) 로직구현

2-3) IAM Role 설정

STEP 3) 테스트 및 결과확인

STEP 1) Destination s3 bucket(타계정)에 Source bucket에서 접근 할 수 있도록 권한부여

Destination s3 bucket이 있는 계정의 콘솔로 접속해서 아래 그림과 같이 버킷정책을 적용해준다.

1

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DelegateS3Access",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::11111111111111111:user/ididid" # source bucket에서 접근하는 사용자계정 arn
            },
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::[Destination_bucket_name]",
                "arn:aws:s3:::[Destination_bucket_name]/*"
            ]
        }
    ]
}

STEP 2) Lambda 함수 구현

source bucket이 있는 계정으로 접속해서 람다함수를 하나 만들어준다.

2-1) Event trigger 설정

아래와 같이 Source bucket의 prefix 경로로 객체가 추가되는 경우 trigger가 발생하도록 Event trigger를 설정해준다.

2

2-2) 로직구현

람다함수의 로직은 아래와 같다. 파이썬 3.7 환경에서 구현하였다.

access_key 및 secret_key는 람다함수의 환경변수를 이용하였다.

그 외에 특별한 설정은 적용하지 않았는데, 상황에 따라 메모리나 이런것들을 수정해주면 된다.

import json
import subprocess
import datetime
import time
import boto3
import math
import os
import urllib

access_key = os.environ['access_key']
secret_key = os.environ['secret_key']

def lambda_handler(event,context):

    ## Source bucket name load
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    
    ## 깨진 글자 urllib으로 복원 = 한글 및 특수문자로 된 이름의 객체도 처리가 가능하도록 하는 과정
    asis_object_key = urllib.parse.unquote_plus(str(event['Records'][0]['s3']['object']))
    
    ## string형태에서 dict 형태로 변환  
    asis_object_key = asis_object_key.replace("'","\"")
    asis_object_key = json.loads(asis_object_key)
    
    ## s3 copy to Destination bucket
    asis_object_key = asis_object_key['key']
    object_name = asis_object_key.split('/')[-1]
    
    print(event)
    print(asis_object_key)
    
    s3 = boto3.resource(
        's3',
        region_name='ap-northeast-2',
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key
    )
    
    copy_source = {
        'Bucket': bucket_name,
        'Key': asis_object_key
    }
    
    extra_args = {
        'ACL': 'bucket-owner-full-control'
    }
    
    s3.meta.client.copy(copy_source, '[Destination_bucket_name]', '[prefix_folder_name]/{}'.format(object_name), extra_args)
    
    return None

참고로 Source bucket에 데이터가 업로드되면 람다함수로 리턴되는 event 데이터 형태는 아래와 같다.

{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "ap-northeast-2",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "EXAMPLE"
      },
      "requestParameters": {
        "sourceIPAddress": "127.0.0.1"
      },
      "responseElements": {
        "x-amz-request-id": "EXAMPLE123456789",
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "testConfigRule",
        "bucket": {
          "name": "example-bucket",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          },
          "arn": "arn:aws:s3:::example-bucket"
        },
        "object": {
          "key": "test/key",
          "size": 1024,
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901"
        }
      }
    }
  ]
}

2-3) IAM Role 설정

아래와 같이 람다함수의 ‘권한’ 메뉴로 가서 해당 람다함수에 적용된 IAM Role을 먼저 클릭해서 접속한다.

3

그 다음에 아래그림과 같이 Source bucket과 Destination bucket의 권한을 부여해주는 인라인 정책을 추가해준다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::[Source_bucket_name]",
                "arn:aws:s3:::[Source_bucket_name]/*",
                "arn:aws:s3:::[Destination_bucket_name]",
                "arn:aws:s3:::[Destination_bucket_name]/*"
            ]
        }
    ]
}

4

추가적으로 혹시몰라서 위와 같이 람다함수의 풀권한이 있는 인라인 정책도 추가해줬다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "lambda:*",
            "Resource": "*"
        }
    ]
}

STEP 3) 테스트 및 결과확인

Source bucket의 prefix 폴더내에 bjcbqwje.tb_enqwke_info-2029.txt 라는 객체를 업로드하였다.

(한글 및 특수문자로 된 이름의 객체도 처리 가능)

cloudformation으로 콘솔로 가보니 아래와 같이 로그 이벤트가 기록이 되는 것을 확인할 수 있다.

5

또한 Destination bucket의 bdp 폴더로 가보면 위에 그림과 같이 정상적으로 파일이 복사된 것을 확인할 수 있다.