Celery 기초실습

2020-10-12

.

Data_engineering_TIL(20201012)

실습시 참고한 자료 : 패스트캠퍼스 데이터사이언스 스쿨 강의자료

** URL : https://www.fastcampus.co.kr/data_school_online

[Celery 개요]

  • Python 기반 비동기 작업큐

  • 장시간이 걸리는 작업, 대량의 배치작업, 일정한 스케쥴에 따라 반복되는 작업에 유용한 툴

  • broker : 파이썬 프로세스를 연결해서 컨트롤해주는 역할

  • task : 작업 단위 (함수)

  • queue : task 저장하는 공간 (redis를 활용)

  • worker : broker, task, queue등을 관리하는 서버

[Celery 활용실습]

  • 사전준비사항

먼저 AWS에 접속해서 amazon linux ami 2로 ec2를 띄운다.

그리고 보안그룹에서 6379포트를 해당ec2 퍼블릭 아이피에 대해 열어줘야 한다.

그런 다음에 Redis를 구동시켜야 한다. 해당 내용은 https://minman2115.github.io/DE_TIL139/ 를 참고하면 된다.

그런 후에 아래와 같이 celery를 설치한다.

[ec2-user@ip-10-1-10-184 ~]$ sudo pip3 install celery
  • 실습 상세내용

step 1) task.py 작성

특정 숫자까지 소수갯수 구하는 프로그램을 다음과 같이 코딩하여 작성한다.

[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_task.py
from celery import Celery

# 'redis://[ec2 public ip]:6379/0'
BROKER_URL = 'redis://3.34.253.94:6379/0'
RESULT_BACKEND = 'redis://3.34.253.94:6379/0'

app = Celery('minman_app', broker=BROKER_URL, backend=RESULT_BACKEND)

@app.task
def prime_number(n):
    prime_count = 0
    for num1 in range(1, n + 1):
        is_prime = True
        for num2 in range(2, num1):
            if num1 % num2 == 0:
                is_prime = False
        if is_prime:
            prime_count += 1
    return prime_count

# 참고로 prime_number(20000) 으로 함수를 실행하면 return 값으로 2263 이 나올 것이다.

step 2) worker 실행 (celery_test)

[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_task worker

 -------------- celery@ip-10-1-10-184.ap-northeast-2.compute.internal v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-4.14.193-149.317.amzn2.x86_64-x86_64-with-glibc2.2.5 2020-10-12 04:52:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         minman_app:0x7f2627df0490
- ** ---------- .> transport:   redis://3.34.253.94:6379/0
- ** ---------- .> results:     redis://3.34.253.94:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

step 3) python 코드에서 celery_task를 worker가 실행하도록 요청 및 상태, 결과 확인

새로운 터미널 창을 하나 열고 아래와 같이 실행해본다.

[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_run.py
import celery_task

# delay - 함수를 실행
task_obj = celery_task.prime_number.delay(20000)
print(task_obj)

# get - 결과를 확인
print(task_obj.get())

# ready - task의 종료 여부를 확인
print(task_obj.ready())

[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run.py
6e1ce609-e5aa-4cf0-bbcb-5346709e1c5e
2263
True

[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run2.py
import time
import celery_task

done1, done2 = False, False
result1, result2 = 0, 0
sec = 0

task1 = celery_task.prime_number.delay(10000)
task2 = celery_task.prime_number.delay(500)

while not (done1 and done2):
    time.sleep(1)
    # ready - task의 종료 여부를 확인
    done1 = task1.ready()
    done2 = task2.ready()
    sec +=1

    if done1:
        result1 = task1.get()

    if done2:
        result2 = task2.get()

    print("{} sec : done1-{}, done2-{}".format(sec, done1, done2))

print(result1, result2)

[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run2.py
1 sec : done1-False, done2-False
2 sec : done1-False, done2-False
3 sec : done1-False, done2-False
4 sec : done1-True, done2-True
1230 96

step 4) celery pakage 실습

[ec2-user@ip-10-1-10-184 ~]$ mkdir celery_pakages

[ec2-user@ip-10-1-10-184 ~]$ touch celery_pakages/__init__.py

# __init__.py 용도는 간단하다. 
# 이 파일이 존재하는 디렉터리는 패키지의 일부임을 알려주는 역할을 한다. 
# 따라서 __init__.py라는 파일이 없는 디렉터리는 패키지로 인식되지 않는다.

[ec2-user@ip-10-1-10-184 ~]$ cd celery_pakages/

[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim celery.py
from celery import Celery

BROKER_URL = 'redis://3.34.253.94:6379/0'
RESULT_BACKEND = 'redis://3.34.253.94:6379/0'

app = Celery("celery_pakages",
             broker=BROKER_URL,
             backend=RESULT_BACKEND,
             include=["celery_pakages.task1", "celery_pakages.task2"]
            )

app.conf.update(
    task_serializer='pickle',
    accept_content=['pickle'],
    result_serializer='pickle',
)

[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim task1.py
import time
from .celery import app

@app.task
def add(x, y):
    time.sleep(5)
    return x + y

[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim task2.py
import time
import numpy as np
from .celery import app

@app.task
def return_ndarray(n):
    time.sleep(5)
    return np.arange(n)

[ec2-user@ip-10-1-10-184 celery_pakages]$ cd ..

[ec2-user@ip-10-1-10-184 ~]$ tree celery_pakages
celery_pakages
├── celery.py
├── __init__.py
├── task1.py
└── task2.py

0 directories, 4 files

[ec2-user@ip-10-1-10-184 ~]$ celery --app=celery_pakages worker --loglevel=DEBUG
[2020-10-12 08:29:05,000: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-10-12 08:29:05,002: DEBUG/MainProcess] | Worker: Building graph...
[2020-10-12 08:29:05,002: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}
[2020-10-12 08:29:05,012: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-10-12 08:29:05,012: DEBUG/MainProcess] | Consumer: Building graph...
[2020-10-12 08:29:05,030: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Gossip, Heart, event loop}

 -------------- celery@ip-10-1-10-184.ap-northeast-2.compute.internal v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-4.14.193-149.317.amzn2.x86_64-x86_64-with-glibc2.2.5 2020-10-12 08:29:05
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_pakages:0x7f545cbe8790
- ** ---------- .> transport:   redis://3.34.253.94:6379/0
- ** ---------- .> results:     redis://3.34.253.94:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . celery_pakages.task1.add
  . celery_pakages.task2.return_ndarray

[2020-10-12 08:29:05,090: DEBUG/MainProcess] | Worker: Starting Hub
[2020-10-12 08:29:05,090: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,090: DEBUG/MainProcess] | Worker: Starting Pool
[2020-10-12 08:29:05,146: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,146: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-10-12 08:29:05,147: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-10-12 08:29:05,161: INFO/MainProcess] Connected to redis://3.34.253.94:6379/0
[2020-10-12 08:29:05,161: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,161: DEBUG/MainProcess] | Consumer: Starting Events
[2020-10-12 08:29:05,169: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,169: DEBUG/MainProcess] | Consumer: Starting Mingle
[2020-10-12 08:29:05,169: INFO/MainProcess] mingle: searching for neighbors
[2020-10-12 08:29:06,189: INFO/MainProcess] mingle: all alone
[2020-10-12 08:29:06,189: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,189: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-10-12 08:29:06,193: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,193: DEBUG/MainProcess] | Consumer: Starting Control
[2020-10-12 08:29:06,196: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,196: DEBUG/MainProcess] | Consumer: Starting Gossip
[2020-10-12 08:29:06,199: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,199: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-10-12 08:29:06,201: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,201: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-10-12 08:29:06,201: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2020-10-12 08:29:06,202: INFO/MainProcess] celery@ip-10-1-10-184.ap-northeast-2.compute.internal ready.
[2020-10-12 08:29:06,202: DEBUG/MainProcess] basic.qos: prefetch_count->8

        
## celery -A celery_pakages worker 로 하는 방법도 있고 아니면 아래와 같이 백그라운드로 실행하는 방법도 있다.
## ----------------------------------------------------------------------------------------------------- ##

[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_pakages multi start
celery multi v5.0.0 (singularity)
> Starting nodes...

# 백그라운드 실행 중지 : ps -e | grep celery
[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_pakages multi stopwait
celery multi v5.0.0 (singularity)


## ------------------------------------------------------------------------------------------------ ##


## 새로 터미널 창 하나를 띄워서 다음과 같이 실행
[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_run3.py
from celery_pakages import task1, task2
from celery.result import AsyncResult

task_obj = task1.add.delay(2, 3)
print(task_obj)

task_id = task_obj.id
print(task_id)

ar = AsyncResult(task_id)
print(ar.ready())

print(ar.get())

task_obj = task2.return_ndarray.apply_async((100,), serializer="pickle")
print(task_obj)

print(task_obj.get())

print(task_obj.ready())
    
[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run3.py
2495a965-9688-4325-9c5c-4edbd24852ea
2495a965-9688-4325-9c5c-4edbd24852ea
False
5
2ce083d4-e19e-46f9-977a-8195f32a3add
[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
 96 97 98 99]
True

## celery를 띄운 터미널 화면으로 돌아가면 다음과 같이 로그가 남아있는 것을 확인 할 수 있다.
[2020-10-12 08:51:21,780: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f545cc69b90> (args:('celery_pakages.task2.return_ndarray', 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', {'lang': 'py', 'task': 'celery_pakages.task2.return_ndarray', 'id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'parent_id': None, 'argsrepr': '(100,)', 'kwargsrepr': '{}', 'origin': 'gen10974@ip-10-1-10-184.ap-northeast-2.compute.internal', 'reply_to': 'd1b58171-f988-36b6-a260-26dae5e04fdb', 'correlation_id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'hostname': 'celery@ip-10-1-10-184.ap-northeast-2.compute.internal', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': (100,), 'kwargs': {}}, b'\x80\x02Kd\x85q\x00}q\x01}q\x02(X\t\x00\x00\x00callbacksq\x03NX\x08\x00\x00\x00errbacksq\x04NX\x05\x00\x00\x00chainq\x05NX\x05\x00\x00\x00chordq\x06Nu\x87q\x07.', 'application/x-python-serialize', 'binary') kwargs:{})
[2020-10-12 08:51:21,781: DEBUG/MainProcess] Task accepted: celery_pakages.task2.return_ndarray[de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c] pid:10587
[2020-10-12 08:51:26,787: INFO/ForkPoolWorker-2] Task celery_pakages.task2.return_ndarray[de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c] succeeded in 5.006750341002771s: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
       51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
       68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
       85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99])