Airflow에서 Task 병렬실행을 위한 localexecutor 셋팅

2020-10-13

.

Data_Engineering_TIL(20201013)

# 참고자료

1) Airflow에서 Task 병렬처리를 위한 환경설정을 셋팅하는 실습

김영현님이 작성한 ‘Airflow를 이용한 데이터 Workflow 관리’ 자료를 참고함

2) ‘Airflow 구조와 execution_date 이해하기’ 블로그 글

url : https://bomwo.cc/posts/execution_date/

3) Airflow에서 Task 병렬실행을 위한 celeryexecutor 셋팅

url : https://minman2115.github.io/DE_TIL137/

4) ‘[data] airflow 설치(DB: mysql)’ 블로그글

url : https://m.blog.naver.com/varkiry05/222018641877

# airflow executor

executor는 worker라고보 불리며 조건에 따라 여러가지 executor를 제공하고 있다.

sequentialexector(default)

1) task 순차처리함

2) sqlite를 backend로 설정

3) 아주 심플한 test용도로만 권장

localexecutor

1) task 병렬 처리 가능

2) mysql이나 postgresql을 backend로 설정

3) task마다 subprocess를 생성한다.

celeryexecutor

1) task를 여러 서버(node)에 분산 처리 가능 (cluster)

2) celery backend (rabbitmq, redis, …) 설정이 필요함

# 실습내용

** 실습환경 : Amazon linux AMI 2

step 1) Airflow 설치

먼저 아래와 같이 airflow와 그것에 의존하는 프로그램들을 설치하자

[ec2-user@ip-10-1-10-239 ~]$ sudo yum update -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install python3 -y
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install gcc python3-devel -y
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install apache-airflow
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install mysql-devel -y
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install 'apache-airflow[mysql]'
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install 'apache-airflow[celery]'
[ec2-user@ip-10-1-10-239 ~]$ sudo pip3 install boto3
[ec2-user@ip-10-1-10-239 ~]$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
Default region name [None]: ap-northeast-2
Default output format [None]: json
    
# Meta DB 구동
[ec2-user@ip-10-1-10-239 ~]$ airflow initdb

step 2) Airflow의 MetaDB로 활용할 MySQL 설치

[ec2-user@ip-10-1-10-239 ~]$ sudo yum install https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
    
[ec2-user@ip-10-1-10-239 ~]$ sudo yum install mysql-community-server -y

[ec2-user@ip-10-1-10-239 ~]$ sudo systemctl enable --now mysqld

[ec2-user@ip-10-1-10-239 ~]$ systemctl status mysqld
 mysqld.service - MySQL Server
   Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
   Active: active (running) since Wed 2020-10-07 07:28:55 UTC; 8s ago
     Docs: man:mysqld(8)
           http://dev.mysql.com/doc/refman/en/using-systemd.html
  Process: 11832 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
 Main PID: 11906 (mysqld)
   Status: "Server is operational"
   CGroup: /system.slice/mysqld.service
           └─11906 /usr/sbin/mysqld

Oct 07 07:28:50 ip-10-1-10-81.ap-northeast-2.compute.internal systemd[1]: Starting MySQL Server...
Oct 07 07:28:55 ip-10-1-10-81.ap-northeast-2.compute.internal systemd[1]: Started MySQL Server.

Airflow에서 설치한 MySQL을 사용할 수 있도록 몇가지 config 설정을 아래와 같이 해준다.

슈퍼유저인 root @ localhost 가 initial password로 세팅되어 error log file에 저장되어 있기 때문에 그거를 확인하기 위해 다음과 같은 명령어를 실행한다.

[ec2-user@ip-10-1-10-239 ~]$ sudo grep 'temporary password' /var/log/mysqld.log
2020-10-07T07:28:52.228023Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: F=u3q.9hf!>d

위에서 확인한 임시 루트비번을 갖고 MySQL 서버에서 최초 installation 설정을 해준다.

[ec2-user@ip-10-1-10-239 ~]$ sudo mysql_secure_installation -p'F=u3q.9hf!>d'
mysql_secure_installation: [Warning] Using a password on the command line interface can be insecure.

Securing the MySQL server deployment.

The existing password for the user account root has expired. Please set a new password.

New password:

Re-enter new password:
The 'validate_password' component is installed on the server.
The subsequent steps will run with the existing configuration
of the component.
Using existing password for root.

Estimated strength of the password: 100
Change the password for root ? ((Press y|Y for Yes, any other key for No) :

 ... skipping.
By default, a MySQL installation has an anonymous user,
allowing anyone to log into MySQL without having to have
a user account created for them. This is intended only for
testing, and to make the installation go a bit smoother.
You should remove them before moving into a production
environment.

Remove anonymous users? (Press y|Y for Yes, any other key for No) : y
Success.


Normally, root should only be allowed to connect from
'localhost'. This ensures that someone cannot guess at
the root password from the network.

Disallow root login remotely? (Press y|Y for Yes, any other key for No) :

 ... skipping.
By default, MySQL comes with a database named 'test' that
anyone can access. This is also intended only for testing,
and should be removed before moving into a production
environment.


Remove test database and access to it? (Press y|Y for Yes, any other key for No) : y
 - Dropping test database...
Success.

 - Removing privileges on test database...
Success.

Reloading the privilege tables will ensure that all changes
made so far will take effect immediately.

Reload privilege tables now? (Press y|Y for Yes, any other key for No) : y
Success.

All done!

mysql에 접속해서 airflow에서 사용할 database와 계정을 생성한다.

root 계정으로 접속해서 아래와 같이 database와 계정을 생성한다(이름, 비밀번호는 직접 설정)

[ec2-user@ip-10-1-10-239 ~]$ mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 14
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database airflow;
Query OK, 1 row affected (0.01 sec)

mysql> create user 'airflow'@'localhost' identified by 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> ALTER USER 'airflow'@'localhost' IDENTIFIED WITH mysql_native_password BY 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on airflow.* to 'airflow'@'localhost';
Query OK, 0 rows affected (0.00 sec)

mysql> create user 'airflow'@'%' identified by 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.01 sec)

mysql> ALTER USER 'airflow'@'%' IDENTIFIED WITH mysql_native_password BY 'MyNewStrongP@ssw0d!';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on airflow.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;
Query OK, 0 rows affected (0.01 sec)

mysql> exit;
Bye

그런 다음에 mysql의 my.cnf 파일을 수정해줘야 한다.

안그러면 airflow initdb 했을때 ‘Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql’ Error가 발생할 것이다.

[ec2-user@ip-10-1-10-23 airflow]$ sudo vim /etc/my.cnf
# 하단에 아래 내용을 추가
explicit_defaults_for_timestamp = 1
max_allowed_packet = 30M

[ec2-user@ip-10-1-10-23 airflow]$ sudo systemctl restart mysqld

[ec2-user@ip-10-1-10-23 airflow]$ sudo systemctl status mysqld
 mysqld.service - MySQL Server
   Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2020-10-08 07:36:23 UTC; 22s ago
     Docs: man:mysqld(8)
           http://dev.mysql.com/doc/refman/en/using-systemd.html
  Process: 10133 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
 Main PID: 10157 (mysqld)
   Status: "Server is operational"
   CGroup: /system.slice/mysqld.service
           └─10157 /usr/sbin/mysqld

Oct 08 07:36:22 ip-10-1-10-23.ap-northeast-2.compute.internal systemd[1]: Starting MySQL Server...
Oct 08 07:36:23 ip-10-1-10-23.ap-northeast-2.compute.internal systemd[1]: Started MySQL Server.

그 다음에 airflow에서 airflow.cfg를 다음과 같이 수정해줘야 한다.

[ec2-user@ip-10-1-10-23 ~]$ cd ~/airflow

[ec2-user@ip-10-1-10-23 airflow]$ ls
airflow.cfg  airflow.db  airflow-webserver.pid  logs  unittests.cfg

[ec2-user@ip-10-1-10-23 airflow]$ sudo vim airflow.cfg

# executor를 아래와 같이 LocalExecutor로 변경해준다.
executor = LocalExecutor

# sql_alchemy_conn = mysql://[ID]:[PASSWORD]@[IP]:3306/airflow
sql_alchemy_conn = mysql://airflow:MyNewStrongP@ssw0d!@localhost:3306/airflow
            
# 만약 샘플데이터를 원하지 않는다면 airflow.cfg 파일 내 다음 옵션도 False로 변경해야한다
load_examples = False

airflow metadb를 아래와 같이 재구동한다.

[ec2-user@ip-10-1-10-224 airflow]$ airflow initdb
DB: mysql://airflow:***@localhost:3306/airflow
[2020-10-08 09:24:48,497] {db.py:378} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
Revision ID: xxxxxxxxxxxxxxxxxxxxxxxxx
Revises: xxxxxxxxxxxxxxxxxxxxxxx
Create Date: 2019-08-01 14:39:35.616417
INFO  [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO  [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
INFO  [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
INFO  [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
INFO  [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
WARNI [airflow.models.crypto] cryptography not found - values will not be stored encrypted.
Done.

xcom으로 전달되는 데이터의 크기 확장을 위해 컬럼 타입을 아래와 같이 airflow metadb에 접속해서 변경한다.

[ec2-user@ip-10-1-10-17 airflow]$ mysql -uroot -p
Enter password: ## MyNewStrongP@ssw0d! 로 접속
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 9
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> alter table airflow.xcom modify value LONGBLOB;
Query OK, 0 rows affected (0.05 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> exit
Bye

그런 다음에 airflow 서버와 스케쥴러도 구동한다.

[ec2-user@ip-10-1-10-4 airflow]$ airflow webserver -p 8080
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-13 00:33:29,928] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:29,928] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2020-10-13 00:33:30 +0000] [12247] [INFO] Starting gunicorn 20.0.4
[2020-10-13 00:33:30 +0000] [12247] [INFO] Listening at: http://0.0.0.0:8080 (12247)
[2020-10-13 00:33:30 +0000] [12247] [INFO] Using worker: sync
[2020-10-13 00:33:30 +0000] [12250] [INFO] Booting worker with pid: 12250
[2020-10-13 00:33:30 +0000] [12251] [INFO] Booting worker with pid: 12251
[2020-10-13 00:33:30 +0000] [12252] [INFO] Booting worker with pid: 12252
[2020-10-13 00:33:30 +0000] [12253] [INFO] Booting worker with pid: 12253
[2020-10-13 00:33:31,606] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:31,616] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-13 00:33:31,653] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:31,660] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-13 00:33:31,804] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:31,809] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-13 00:33:31,913] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:31,913] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-13 00:34:00 +0000] [12247] [INFO] Handling signal: ttin
[2020-10-13 00:34:00 +0000] [12509] [INFO] Booting worker with pid: 12509
[2020-10-13 00:34:00,752] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:34:00,752] {dagbag.py:417} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
[2020-10-13 00:34:01 +0000] [12247] [INFO] Handling signal: ttou
[2020-10-13 00:34:01 +0000] [12250] [INFO] Worker exiting (pid: 12250)

# 새로 터미널 창을 하나 띄워서 아래와 같이 스케쥴러를 구동한다.
[ec2-user@ip-10-1-10-4 ~]$ airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-13 00:33:51,618] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-10-13 00:33:51,626] {scheduler_job.py:1367} INFO - Starting the scheduler
[2020-10-13 00:33:51,626] {scheduler_job.py:1375} INFO - Running execute loop for -1 seconds
[2020-10-13 00:33:51,626] {scheduler_job.py:1376} INFO - Processing each file at most -1 times
[2020-10-13 00:33:51,626] {scheduler_job.py:1379} INFO - Searching for files in /home/ec2-user/airflow/dags
[2020-10-13 00:33:51,627] {scheduler_job.py:1381} INFO - There are 0 files in /home/ec2-user/airflow/dags
[2020-10-13 00:33:51,733] {scheduler_job.py:1438} INFO - Resetting orphaned tasks for active dag runs
[2020-10-13 00:33:51,751] {dag_processing.py:562} INFO - Launched DagFileProcessorManager with pid: 12435
[2020-10-13 00:33:51,791] {settings.py:55} INFO - Configured default timezone <Timezone [UTC]>