Airflow에서 Task 병렬실행을 위한 celeryexecutor 셋팅

2020-10-10

.

Data_Engineering_TIL(20201010)

# 참고자료

1) Airflow에서 Task 병렬처리를 위한 환경설정을 셋팅하는 실습

김영현님이 작성한 ‘Airflow를 이용한 데이터 Workflow 관리’ 자료를 참고함

2) ‘Airflow 구조와 execution_date 이해하기’ 블로그 글

url : https://bomwo.cc/posts/execution_date/

# airflow executor

executor는 worker라고보 불리며 조건에 따라 여러가지 executor를 제공하고 있다.

  • sequentialexector(default)

1) task 순차처리함

2) sqlite를 backend로 설정

3) 아주 심플한 test용도로만 권장

  • localexecutor

1) task 병렬 처리 가능

2) mysql이나 postgresql을 backend로 설정

3) task마다 subprocess를 생성한다.

  • celeryexecutor

1) task를 여러 서버(node)에 분산 처리 가능 (cluster)

2) celery backend (rabbitmq, redis, …) 설정이 필요함

# Airflow에서 Task의 병렬 실행을 위한 celeryexecutor 셋팅조건

1) 실제 Task를 실행하는 airflow의 worker를 sequential executor가 아닌 celery executor를 사용해야 함

2) celery executor 사용을 위해서는 Broker 가 필요한데 이를 위해 RabbitMQ나 Redis가 필요함

3) airflow의 meta store로 sqlite가 아닌 mysql이나 postgresql 을 사용해야 함

# 실습내용

** 실습환경 : Amazon linux AMI 2

step 1) Airflow 설치

  • 먼저 아래와 같이 airflow와 그것에 의존하는 프로그램들을 설치하자
[ec2-user@ip-10-1-10-239 ~]$ sudo yum update -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install python3 -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install gcc python3-devel -y

# airflow 1.10.12 install
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install apache-airflow==1.10.12  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"

[ec2-user@ip-10-1-10-239 ~]$ sudo yum install mysql-devel -y
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install 'apache-airflow[mysql]'
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install 'apache-airflow[celery]'
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install boto3
[ec2-user@ip-10-1-10-239 ~]$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
Default region name [None]: ap-northeast-2
Default output format [None]: json
    
# Meta DB 구동
[ec2-user@ip-10-1-10-239 ~]$ airflow initdb
  • 주의사항 : 파이썬 라이브러리 의존도 때문인지 처음 설치할때 Error가 자주 발생했다. airflow에 의존하는 파이썬 라이브러리를 설치할때는 주의해서 설치해야한다. 예를 들어서 airflow initdb를 했을때 celery 모듈이 없다고 해서 그냥 pip3 install celery 를 했다가 Error나서 하루 이상 헤매었다.

airflow에서 어떤 Extra module을 필요로 한다면 pip3 install apache-airflow[모듈이름] 으로 설치해봐야 한다.

예시)

sudo pip3 install 'apache-airflow[mysql]'

sudo pip3 install 'apache-airflow[celery]'

step 2) Mysql 설치 및 설정

https://minman2115.github.io/DE_TIL134/ 참고할것

step 3) Airflow에서 Mysql을 사용할 수 있도록 설정

** 참고자료 : ‘[data] airflow 설치(DB: mysql)’ 블로그글 참고 https://m.blog.naver.com/varkiry05/222018641877

먼저 mysql에 database와 계정을 생성한다.

root 계정으로 접속해서 아래와 같이 database와 계정을 생성한다(이름, 비밀번호는 직접 설정)

[ec2-user@ip-10-1-10-23 ~]$ mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 14
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database airflow;
Query OK, 1 row affected (0.01 sec)

mysql> create user 'airflow'@'localhost' identified by 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> ALTER USER 'airflow'@'localhost' IDENTIFIED WITH mysql_native_password BY 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on airflow.* to 'airflow'@'localhost';
Query OK, 0 rows affected (0.00 sec)

mysql> create user 'airflow'@'%' identified by 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.01 sec)

mysql> ALTER USER 'airflow'@'%' IDENTIFIED WITH mysql_native_password BY 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on airflow.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;
Query OK, 0 rows affected (0.01 sec)

mysql> exit;
Bye

mysql의 my.cnf 파일을 수정해줘야 한다.

안그러면 airflow initdb 했을때 ‘Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql’ Error가 발생할 것이다.

[ec2-user@ip-10-1-10-23 airflow]$ sudo vim /etc/my.cnf
# 하단에 아래 내용을 추가
explicit_defaults_for_timestamp = 1
max_allowed_packet = 30M

[ec2-user@ip-10-1-10-23 airflow]$ sudo systemctl restart mysqld

[ec2-user@ip-10-1-10-23 airflow]$ sudo systemctl status mysqld
 mysqld.service - MySQL Server
   Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2020-10-08 07:36:23 UTC; 22s ago
     Docs: man:mysqld(8)
           http://dev.mysql.com/doc/refman/en/using-systemd.html
  Process: 10133 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
 Main PID: 10157 (mysqld)
   Status: "Server is operational"
   CGroup: /system.slice/mysqld.service
           └─10157 /usr/sbin/mysqld

Oct 08 07:36:22 ip-10-1-10-23.ap-northeast-2.compute.internal systemd[1]: Starting MySQL Server...
Oct 08 07:36:23 ip-10-1-10-23.ap-northeast-2.compute.internal systemd[1]: Started MySQL Server.

그 다음에 airflow에서 airflow.cfg를 다음과 같이 수정해줘야 한다.

[ec2-user@ip-10-1-10-23 ~]$ cd ~/airflow

[ec2-user@ip-10-1-10-23 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg

[ec2-user@ip-10-1-10-23 airflow]$ sudo vim airflow.cfg

# sql_alchemy_conn = mysql://[ID]:[PASSWORD]@[IP]:3306/airflow
sql_alchemy_conn = mysql://airflow:MyNewStrongP@ssw0d!@localhost:3306/airflow
            
# 만약 샘플데이터를 원하지 않는다면 airflow.cfg 파일 내 다음 옵션도 False로 변경해야한다
load_examples = False

airflow 웹서버와 스케쥴러를 끄고 다음 명령어를 실행한다.

[ec2-user@ip-10-1-10-224 airflow]$ airflow initdb
DB: mysql://airflow:***@localhost:3306/airflow
[2020-10-08 09:24:48,497] {db.py:378} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
Revision ID: xxxxxxxxxxxxxxxxxxxxxxxxx
Revises: xxxxxxxxxxxxxxxxxxxxxxx
Create Date: 2019-08-01 14:39:35.616417
INFO  [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO  [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
INFO  [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
INFO  [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
INFO  [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
WARNI [airflow.models.crypto] cryptography not found - values will not be stored encrypted.
Done.

xcom으로 전달되는 데이터의 크기 확장을 위해 컬럼 타입을 아래와 같이 변경한다.

[ec2-user@ip-10-1-10-17 airflow]$ mysql -uroot -p
Enter password: ## MyNewStrongP@ssw0d! 로 접속
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 9
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> alter table airflow.xcom modify value LONGBLOB;
Query OK, 0 rows affected (0.05 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> exit
Bye

step 3) RabbitMQ 설치 및 설정

https://minman2115.github.io/DE_TIL135/ 를 참고해서 설치한다.

설치를 완료하면 airflow에서 RabbitMQ를 사용할 수 있도록 아래와 같이 설정해준다.

# sudo rabbitmqctl add_user [아이디] [비번]
[ec2-user@ip-10-1-10-40 ~]$ sudo rabbitmqctl add_user airflow airflow
Creating user "airflow"

[ec2-user@ip-10-1-10-40 ~]$ sudo rabbitmqctl add_vhost airflow_vhost
Creating vhost "airflow_vhost"

[ec2-user@ip-10-1-10-40 ~]$ sudo rabbitmqctl set_user_tags airflow airflow
Setting tags for user "airflow" to [airflow]

[ec2-user@ip-10-1-10-40 ~]$ sudo rabbitmqctl set_permissions -p airflow_vhost airflow ".*" ".*" ".*"
Setting permissions for user "airflow" in vhost "airflow_vhost"

[ec2-user@ip-10-1-10-33 ~]$ sudo vim ~/airflow/airflow.cfg

# executor를 default인 SequentialExecutor에서 CeleryExecutor로 변경
executor = CeleryExecutor

# db connection을 sqlite에서 mysql로 변경
sql_alchemy_conn = mysql://airflow:MyNewStrongP@ssw0d!@localhost:3306/airflow

# Celery broker URL을 rabbitmq로 설정
broker_url = amqp://airflow:airflow@localhost:5672/airflow_vhost

# Celery가 job을 수행한 결과를 저장한 db metastore 설정
result_backend = db+mysql://airflow:MyNewStrongP@ssw0d!@localhost:3306/airflow

step 4) airflow 구동

# webserver 구동
[ec2-user@ip-10-1-10-239 ~]$ airflow webserver -p 8080
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-09 08:57:18,603] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-10-09 08:57:18,604] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2020-10-09 08:57:19 +0000] [16042] [INFO] Starting gunicorn 20.0.4
[2020-10-09 08:57:19 +0000] [16042] [INFO] Listening at: http://0.0.0.0:8080 (16042)
[2020-10-09 08:57:19 +0000] [16042] [INFO] Using worker: sync
[2020-10-09 08:57:19 +0000] [16047] [INFO] Booting worker with pid: 16047
[2020-10-09 08:57:19 +0000] [16048] [INFO] Booting worker with pid: 16048
[2020-10-09 08:57:19 +0000] [16049] [INFO] Booting worker with pid: 16049
[2020-10-09 08:57:19 +0000] [16050] [INFO] Booting worker with pid: 16050
[2020-10-09 08:57:20,744] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-10-09 08:57:20,745] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-09 08:57:20,760] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-10-09 08:57:20,765] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-09 08:57:20,913] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-10-09 08:57:20,921] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-09 08:57:20,940] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-10-09 08:57:20,946] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags

# 위와 같이 웹서버를 구동하고 나서 웹브라우져로 이동한 다음 [ec2 server public ip]:8080 으로 접속하면 airflow WebUI화면을 확인할 수 있다.

# 터미널 새 창을 열어서 아래 커맨드 입력
# celeryExecutor는 별도 worker 실행이 필요하다
[ec2-user@ip-10-1-10-239 airflow]$ airflow worker

 -------------- celery@ip-10-1-10-210.ap-northeast-2.compute.internal v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-4.14.193-149.317.amzn2.x86_64-x86_64-with-glibc2.2.5 2020-10-10 08:16:42
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7fba4e143550
- ** ---------- .> transport:   amqp://airflow:**@localhost:5672/airflow_vhost
- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . airflow.executors.celery_executor.execute_command

Starting flask
 * Serving Flask app "airflow.bin.cli" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[2020-10-10 08:16:44,044] {_internal.py:122} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
[2020-10-10 08:16:44,133: INFO/MainProcess] Connected to amqp://airflow:**@127.0.0.1:5672/airflow_vhost
[2020-10-10 08:16:44,142: INFO/MainProcess] mingle: searching for neighbors
[2020-10-10 08:16:45,166: INFO/MainProcess] mingle: all alone
[2020-10-10 08:16:45,192: INFO/MainProcess] celery@ip-10-1-10-210.ap-northeast-2.compute.internal ready.


# 터미널 새 창을 열어서 아래 커맨드 입력
# scheduler 구동. scheduler는 DAG들의 스케쥴링 및 실행을 담당한다.
[ec2-user@ip-10-1-10-239 ~]$ airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-10 08:23:53,580] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-10-10 08:23:53,586] {scheduler_job.py:1346} INFO - Starting the scheduler
[2020-10-10 08:23:53,586] {scheduler_job.py:1354} INFO - Running execute loop for -1 seconds
[2020-10-10 08:23:53,586] {scheduler_job.py:1355} INFO - Processing each file at most -1 times
[2020-10-10 08:23:53,586] {scheduler_job.py:1358} INFO - Searching for files in /home/ec2-user/airflow/dags
[2020-10-10 08:23:53,613] {scheduler_job.py:1360} INFO - There are 24 files in /home/ec2-user/airflow/dags
[2020-10-10 08:23:53,613] {scheduler_job.py:1411} INFO - Resetting orphaned tasks for active dag runs
[2020-10-10 08:23:53,624] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 16107
[2020-10-10 08:23:53,627] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>