Celery 기초실습
.
Data_engineering_TIL(20201012)
실습시 참고한 자료 : 패스트캠퍼스 데이터사이언스 스쿨 강의자료
** URL : https://www.fastcampus.co.kr/data_school_online
[Celery 개요]
-
Python 기반 비동기 작업큐
-
장시간이 걸리는 작업, 대량의 배치작업, 일정한 스케쥴에 따라 반복되는 작업에 유용한 툴
-
broker : 파이썬 프로세스를 연결해서 컨트롤해주는 역할
-
task : 작업 단위 (함수)
-
queue : task 저장하는 공간 (redis를 활용)
-
worker : broker, task, queue등을 관리하는 서버
[Celery 활용실습]
- 사전준비사항
먼저 AWS에 접속해서 amazon linux ami 2로 ec2를 띄운다.
그리고 보안그룹에서 6379포트를 해당ec2 퍼블릭 아이피에 대해 열어줘야 한다.
그런 다음에 Redis를 구동시켜야 한다. 해당 내용은 https://minman2115.github.io/DE_TIL139/ 를 참고하면 된다.
그런 후에 아래와 같이 celery를 설치한다.
[ec2-user@ip-10-1-10-184 ~]$ sudo pip3 install celery
- 실습 상세내용
step 1) task.py 작성
특정 숫자까지 소수갯수 구하는 프로그램을 다음과 같이 코딩하여 작성한다.
[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_task.py
from celery import Celery
# 'redis://[ec2 public ip]:6379/0'
BROKER_URL = 'redis://3.34.253.94:6379/0'
RESULT_BACKEND = 'redis://3.34.253.94:6379/0'
app = Celery('minman_app', broker=BROKER_URL, backend=RESULT_BACKEND)
@app.task
def prime_number(n):
prime_count = 0
for num1 in range(1, n + 1):
is_prime = True
for num2 in range(2, num1):
if num1 % num2 == 0:
is_prime = False
if is_prime:
prime_count += 1
return prime_count
# 참고로 prime_number(20000) 으로 함수를 실행하면 return 값으로 2263 이 나올 것이다.
step 2) worker 실행 (celery_test)
[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_task worker
-------------- celery@ip-10-1-10-184.ap-northeast-2.compute.internal v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-4.14.193-149.317.amzn2.x86_64-x86_64-with-glibc2.2.5 2020-10-12 04:52:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: minman_app:0x7f2627df0490
- ** ---------- .> transport: redis://3.34.253.94:6379/0
- ** ---------- .> results: redis://3.34.253.94:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
step 3) python 코드에서 celery_task를 worker가 실행하도록 요청 및 상태, 결과 확인
새로운 터미널 창을 하나 열고 아래와 같이 실행해본다.
[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_run.py
import celery_task
# delay - 함수를 실행
task_obj = celery_task.prime_number.delay(20000)
print(task_obj)
# get - 결과를 확인
print(task_obj.get())
# ready - task의 종료 여부를 확인
print(task_obj.ready())
[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run.py
6e1ce609-e5aa-4cf0-bbcb-5346709e1c5e
2263
True
[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run2.py
import time
import celery_task
done1, done2 = False, False
result1, result2 = 0, 0
sec = 0
task1 = celery_task.prime_number.delay(10000)
task2 = celery_task.prime_number.delay(500)
while not (done1 and done2):
time.sleep(1)
# ready - task의 종료 여부를 확인
done1 = task1.ready()
done2 = task2.ready()
sec +=1
if done1:
result1 = task1.get()
if done2:
result2 = task2.get()
print("{} sec : done1-{}, done2-{}".format(sec, done1, done2))
print(result1, result2)
[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run2.py
1 sec : done1-False, done2-False
2 sec : done1-False, done2-False
3 sec : done1-False, done2-False
4 sec : done1-True, done2-True
1230 96
step 4) celery pakage 실습
[ec2-user@ip-10-1-10-184 ~]$ mkdir celery_pakages
[ec2-user@ip-10-1-10-184 ~]$ touch celery_pakages/__init__.py
# __init__.py 용도는 간단하다.
# 이 파일이 존재하는 디렉터리는 패키지의 일부임을 알려주는 역할을 한다.
# 따라서 __init__.py라는 파일이 없는 디렉터리는 패키지로 인식되지 않는다.
[ec2-user@ip-10-1-10-184 ~]$ cd celery_pakages/
[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim celery.py
from celery import Celery
BROKER_URL = 'redis://3.34.253.94:6379/0'
RESULT_BACKEND = 'redis://3.34.253.94:6379/0'
app = Celery("celery_pakages",
broker=BROKER_URL,
backend=RESULT_BACKEND,
include=["celery_pakages.task1", "celery_pakages.task2"]
)
app.conf.update(
task_serializer='pickle',
accept_content=['pickle'],
result_serializer='pickle',
)
[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim task1.py
import time
from .celery import app
@app.task
def add(x, y):
time.sleep(5)
return x + y
[ec2-user@ip-10-1-10-184 celery_pakages]$ sudo vim task2.py
import time
import numpy as np
from .celery import app
@app.task
def return_ndarray(n):
time.sleep(5)
return np.arange(n)
[ec2-user@ip-10-1-10-184 celery_pakages]$ cd ..
[ec2-user@ip-10-1-10-184 ~]$ tree celery_pakages
celery_pakages
├── celery.py
├── __init__.py
├── task1.py
└── task2.py
0 directories, 4 files
[ec2-user@ip-10-1-10-184 ~]$ celery --app=celery_pakages worker --loglevel=DEBUG
[2020-10-12 08:29:05,000: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-10-12 08:29:05,002: DEBUG/MainProcess] | Worker: Building graph...
[2020-10-12 08:29:05,002: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}
[2020-10-12 08:29:05,012: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-10-12 08:29:05,012: DEBUG/MainProcess] | Consumer: Building graph...
[2020-10-12 08:29:05,030: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Gossip, Heart, event loop}
-------------- celery@ip-10-1-10-184.ap-northeast-2.compute.internal v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-4.14.193-149.317.amzn2.x86_64-x86_64-with-glibc2.2.5 2020-10-12 08:29:05
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_pakages:0x7f545cbe8790
- ** ---------- .> transport: redis://3.34.253.94:6379/0
- ** ---------- .> results: redis://3.34.253.94:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. celery_pakages.task1.add
. celery_pakages.task2.return_ndarray
[2020-10-12 08:29:05,090: DEBUG/MainProcess] | Worker: Starting Hub
[2020-10-12 08:29:05,090: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,090: DEBUG/MainProcess] | Worker: Starting Pool
[2020-10-12 08:29:05,146: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,146: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-10-12 08:29:05,147: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-10-12 08:29:05,161: INFO/MainProcess] Connected to redis://3.34.253.94:6379/0
[2020-10-12 08:29:05,161: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,161: DEBUG/MainProcess] | Consumer: Starting Events
[2020-10-12 08:29:05,169: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:05,169: DEBUG/MainProcess] | Consumer: Starting Mingle
[2020-10-12 08:29:05,169: INFO/MainProcess] mingle: searching for neighbors
[2020-10-12 08:29:06,189: INFO/MainProcess] mingle: all alone
[2020-10-12 08:29:06,189: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,189: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-10-12 08:29:06,193: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,193: DEBUG/MainProcess] | Consumer: Starting Control
[2020-10-12 08:29:06,196: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,196: DEBUG/MainProcess] | Consumer: Starting Gossip
[2020-10-12 08:29:06,199: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,199: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-10-12 08:29:06,201: DEBUG/MainProcess] ^-- substep ok
[2020-10-12 08:29:06,201: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-10-12 08:29:06,201: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2020-10-12 08:29:06,202: INFO/MainProcess] celery@ip-10-1-10-184.ap-northeast-2.compute.internal ready.
[2020-10-12 08:29:06,202: DEBUG/MainProcess] basic.qos: prefetch_count->8
## celery -A celery_pakages worker 로 하는 방법도 있고 아니면 아래와 같이 백그라운드로 실행하는 방법도 있다.
## ----------------------------------------------------------------------------------------------------- ##
[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_pakages multi start
celery multi v5.0.0 (singularity)
> Starting nodes...
# 백그라운드 실행 중지 : ps -e | grep celery
[ec2-user@ip-10-1-10-184 ~]$ celery -A celery_pakages multi stopwait
celery multi v5.0.0 (singularity)
## ------------------------------------------------------------------------------------------------ ##
## 새로 터미널 창 하나를 띄워서 다음과 같이 실행
[ec2-user@ip-10-1-10-184 ~]$ sudo vim celery_run3.py
from celery_pakages import task1, task2
from celery.result import AsyncResult
task_obj = task1.add.delay(2, 3)
print(task_obj)
task_id = task_obj.id
print(task_id)
ar = AsyncResult(task_id)
print(ar.ready())
print(ar.get())
task_obj = task2.return_ndarray.apply_async((100,), serializer="pickle")
print(task_obj)
print(task_obj.get())
print(task_obj.ready())
[ec2-user@ip-10-1-10-184 ~]$ python3 celery_run3.py
2495a965-9688-4325-9c5c-4edbd24852ea
2495a965-9688-4325-9c5c-4edbd24852ea
False
5
2ce083d4-e19e-46f9-977a-8195f32a3add
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
96 97 98 99]
True
## celery를 띄운 터미널 화면으로 돌아가면 다음과 같이 로그가 남아있는 것을 확인 할 수 있다.
[2020-10-12 08:51:21,780: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f545cc69b90> (args:('celery_pakages.task2.return_ndarray', 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', {'lang': 'py', 'task': 'celery_pakages.task2.return_ndarray', 'id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'parent_id': None, 'argsrepr': '(100,)', 'kwargsrepr': '{}', 'origin': 'gen10974@ip-10-1-10-184.ap-northeast-2.compute.internal', 'reply_to': 'd1b58171-f988-36b6-a260-26dae5e04fdb', 'correlation_id': 'de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c', 'hostname': 'celery@ip-10-1-10-184.ap-northeast-2.compute.internal', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': (100,), 'kwargs': {}}, b'\x80\x02Kd\x85q\x00}q\x01}q\x02(X\t\x00\x00\x00callbacksq\x03NX\x08\x00\x00\x00errbacksq\x04NX\x05\x00\x00\x00chainq\x05NX\x05\x00\x00\x00chordq\x06Nu\x87q\x07.', 'application/x-python-serialize', 'binary') kwargs:{})
[2020-10-12 08:51:21,781: DEBUG/MainProcess] Task accepted: celery_pakages.task2.return_ndarray[de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c] pid:10587
[2020-10-12 08:51:26,787: INFO/ForkPoolWorker-2] Task celery_pakages.task2.return_ndarray[de53ad29-1af8-4c4a-9b7b-c98ae7ad4e5c] succeeded in 5.006750341002771s: array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99])