Crontab을 이용한 s3 sync 작업 자동화

2020-05-24

.

Data_Engineering_TIL(20200524)

[구현 목표]

1111

STEP 1) 특정 s3 bucket 간 s3 sync 명령을 실행하는 python script(test.py) 작성 및 crontab 등록

** 빠른 테스트 결과 확인을 위해 매분 실행하는 것으로 크론텝 설정

STEP 2) 매 특정시간마다 python test.py 명령 실행

STEP 3) python test.py 실행에 따른 s3 sync

[구현 과정]

STEP 1) python script(test.py) 작성 및 Crontab 등록

step 1-1) python3.7 및 boto3 설치

sudo yum update -y
sudo yum install python37 python37-pip -y
sudo pip3 install boto3

# 파이썬 기본버전 변경 = ASIS : 2.7 -> TOBE : 3.7
sudo alternatives --install /usr/bin/python python /usr/bin/python2.7 1
sudo alternatives --install /usr/bin/python python /usr/bin/python3.7 2

step 1-2) aws configure 명령어로 액세스 키 등록

step 1-3) test.py 작성 및 저장

import subprocess
from datetime import datetime
import time
import boto3
import math

start = time.time()

timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')+'\n'
start_timestamp = timestamp[0:4]+'_'+timestamp[5:7]+'_'+timestamp[8:10]+'_'+timestamp[11:13]+'_'+timestamp[14:16]+'_'+timestamp[17:19]

source_bucket = "[source_bucket]"
source_prefix = "[prefix]"
source_s3_location = source_bucket+'/'+source_prefix

destination_bucket = '[destination_bucket]'
destination_prefix = '[prefix2]'
destination_s3_location = destination_bucket +'/'+ destination_prefix
task_name = "[task_name]"

sync_command = f"aws s3 sync s3://{source_s3_location}/ s3://{destination_s3_location}/ --delete --acl bucket-owner-full-control"

file_name = start_timestamp+" "+task_name

logfile = open("{}.txt".format(file_name), 'wb')
start_time = "start time : {}".format(timestamp).encode()
logfile.write(start_time)

try :
    log_record = subprocess.check_output(sync_command, stderr=subprocess.STDOUT, shell=True)
    print("job success")
    logfile.write(log_record)
    runtime = "Runtime: {} sec \n".format(time.time()-start).encode()
    logfile.write(runtime)
    print(runtime)

    # check data integrity
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects_v2')

    operation_parameters = {'Bucket': '{}'.format(source_bucket),'Prefix': '{}/'.format(source_prefix)}
    response_iterator = paginator.paginate(**operation_parameters)
    source_object_list = []
    source_folder_list = []

    for page in response_iterator:
        for content in page['Contents']:
            if (content['Key']+','+str(content['Size']))[-3:] == '/,0':
                source_folder_list.append(content['Key']+','+str(content['Size']))
            else :
                source_object_list.append(content['Key'].split('/').pop()+','+str(content['Size']))


    operation_parameters = {'Bucket': '{}'.format(destination_bucket),'Prefix': '{}/'.format(destination_prefix)}
    response_iterator = paginator.paginate(**operation_parameters)
    destination_object_list = []
    destination_folder_list = []

    for page in response_iterator:
        for content in page['Contents']:
            if (content['Key']+','+str(content['Size']))[-3:] == '/,0':
                destination_folder_list.append(content['Key']+','+str(content['Size']))
            else :
                destination_object_list.append(content['Key'].split('/').pop()+','+str(content['Size']))

    if source_object_list == destination_object_list :

        logfile.write("data integrity check success \n".encode())
        logfile.write("{} to {} complete \n".format(source_s3_location,destination_s3_location).encode())

        source_folder_list_str = "source_folder_list:" + str(source_folder_list)+'\n'
        logfile.write(source_folder_list_str.encode())
        source_object_list_str = "source_object_list:" + str(source_object_list)+'\n'
        logfile.write(source_object_list_str.encode())

        destination_folder_list_str = "destination_folder_list:"+str(destination_folder_list)+'\n'
        logfile.write(destination_folder_list_str.encode())
        destination_object_list_str = "destination_object_list:"+str(destination_object_list)+'\n'
        logfile.write(destination_object_list_str.encode())

        source_object_list_str = "number of source_object_list:" + str(len(source_object_list))+'\n'
        logfile.write(source_object_list_str.encode())
        destination_object_list_str = "number of destination_object_list:" + str(len(destination_object_list))+'\n'
        logfile.write(destination_object_list_str.encode())

        print("data integrity check success \n")
    else:
        logfile.write("data integrity check fail \n".encode())

        source_folder_list_str = "source_folder_list:" + str(source_folder_list)+'\n'
        logfile.write(source_folder_list_str.encode())
        source_object_list_str = "source_object_list:" + str(source_object_list)+'\n'
        logfile.write(source_object_list_str.encode())

        destination_folder_list_str = "destination_folder_list:"+str(destination_folder_list)+'\n'
        logfile.write(destination_folder_list_str.encode())
        destination_object_list_str = "destination_object_list:"+str(destination_object_list)+'\n'
        logfile.write(destination_object_list_str.encode())

        source_object_list_str = "number of source_object_list:" + str(len(source_object_list))+'\n'
        logfile.write(source_object_list_str.encode())
        destination_object_list_str = "number of destination_object_list:" + str(len(destination_object_list))+'\n'
        logfile.write(destination_object_list_str.encode())

        print("data integrity check fail")

except subprocess.CalledProcessError as e:
    print("job fail")
    logfile.write(e.output)
    runtime = "Runtime: {} sec".format(time.time()-start).encode()
    logfile.write(runtime)
    print(runtime)

logfile.close()

step 1-4) 크론탭 로그 저장을 위한 폴더 및 log 파일 생성

mkdir /home/ec2-user/log
touch /home/ec2-user/log/test.log

step 1-5) 크론탭 등록

  • crontab -e 명령어 실행 후 아래와 같은 스크립트를 작성하고 저장한다.
* * * * * python /home/ec2-user/test.py > /home/ec2-user/log/test.log 2>&1

wq!로 저장하고 나오면 crontab: installing new crontab 메세지를 확인할 수 있다.

그리고 1분이 지나면 아래와 같이 STEP 2)STEP 3) 이 실행완료된 것을 확인할 수 있다.

(pms-bucket-test에서 pms-bucket-test2로 s3 sync 완료, pms-bucket-test2의 destination 경로로 가보면 객체들이 sync 되어 있는 것도 확인할 수 있다.)

image