AWS RDS에 대한 confluent CDC connector 연결 실습

2022-08-16

.

Data_Engineering_TIL(20220816)

  • 관련 실습자료

Python client를 이용한 Confluent kafka 실습

URL : https://minman2115.github.io/DE_TIL364

confluent ksqldb를 이용한 실시간 데이터 파이프라인 구성 실습

URL : https://minman2115.github.io/DE_TIL366

  • 실습 아키텍처 (메세지 흐름 아키텍처)
       [AWS ENV (seoul)]                                    [GCP ENV (seoul)]

    AWS RDS (MySql)       -->     confluent MySql source connector   -->      confluent kafka broker
(table : kingsale.sale)   CDC    (minman_mysql_cdc_source_connector)      (topic : minmanrds.kingsale.sale) 
(DB name : minmanrds)                                                                   |
(인터넷 전자상거래 주문기록)                                                                   |
                                                                            confluent schema registry        
                                                                                 (topic 스키마 연동)
  • 실습내용 요약

STEP 1) AWS RDS 생성

STEP 2) DB&Table 생성 및 데이터 insert

STEP 3) Confluent 환경 구성

STEP 4) CDC 동작 확인

  • 실습 상세내용

STEP 1) AWS RDS 생성

step 1-1) RDS subnet group 생성

step 1-2) RDS Parameter group 생성

minmanmysql8 라는 이름으로 만들어준다.

parameter 그룹은 mysql 8.0 디폴트 파라미터를 사용하지 말고, mysql 8.0 파라미터 그룹을 새로 생성해준다. (일부 파라미터 수정이 필요한데 디포트 파라미터는 수정이 안되기 때문임)

RDS Parameter group 생성후 아래와 같이 파라미터 설정값을 변경해준다.

1 ) binlog_format 을 ROW로 변경(CDC 할때 바이너리 포맷이 row여야함)

2 ) log_bin_trust_function_creators 는 1로 변경해준다(admin 계정 권한을 root로 주기 위함임)

3 ) time_zone 은 Asia/Seoul로 설정.

step 1-3) 아래와 같은 스펙으로 RDS를 생성해준다.

- DB : MySQL 8.0.28
- instance name : minmanrds
- admin 계정 : admin
- password : xxxxxxxxx
- t3.micro (2vcpu, 1GB RAM)
- port : 3306
- network : public access 가능하도록 설정
- Automated backups 옵션은 반드시 On(사용가능)으로 설정해야함(CDC 할때 mysql binary logging이 가능해야하기 때문임)
- DB parapeter group : minmanmysql8 이라는 이름으로 파라미터를 반드시 선택해줘야 한다.

step 2) DB&Table 생성 및 데이터 insert

$ brew install mysql

$ brew install mysql-client

$ mysql --version
mysql  Ver 8.0.30 for macos12.4 on arm64 (Homebrew)

$ mysql -h minmanrds.xxxxxxxxxx.ap-northeast-2.rds.amazonaws.com -P 3306 -u admin -p
Enter password: xxxxxxxxxxxxx
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 12
Server version: 8.0.28 Source distribution

Copyright (c) 2000, 2022, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> CREATE DATABASE kingsale;
Query OK, 1 row affected (0.01 sec)

mysql> use kingsale;
Database changed

mysql>
create table sale(
    order_id BIGINT NOT NULL,
    customer_id VARCHAR(20) NULL,
    item VARCHAR(20) NULL,
    order_total_usd DECIMAL(10,2) NULL
);

Query OK, 0 rows affected (0.03 sec)

mysql> show tables;
+--------------------+
| Tables_in_kingsale |
+--------------------+
| sale               |
+--------------------+
1 row in set (0.01 sec)

mysql> DESC sale;
+-----------------+---------------+------+-----+---------+-------+
| Field           | Type          | Null | Key | Default | Extra |
+-----------------+---------------+------+-----+---------+-------+
| order_id        | bigint        | NO   |     | NULL    |       |
| customer_id     | varchar(20)   | YES  |     | NULL    |       |
| item            | varchar(20)   | YES  |     | NULL    |       |
| order_total_usd | decimal(10,2) | YES  |     | NULL    |       |
+-----------------+---------------+------+-----+---------+-------+
4 rows in set (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697328, 375, 'book', 29.99);
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697329, 375, 'guitar', 215.99);
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697330, 983, 'thermometer', 12.99);
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697331, 983, 'scarf', 32.99);
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697332, 375, 'doormat', 15.99);
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697444, 992, 'macbook', 265.99);
Query OK, 1 row affected (0.19 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697445, 789, 'galaxy_s22', 165.99);
Query OK, 1 row affected (0.19 sec)

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697446, 790, 'galaxy_s21', 160.99);
ERROR 2013 (HY000): Lost connection to MySQL server during query
No connection. Trying to reconnect...
Connection id:    258
Current database: kingsale

Query OK, 1 row affected (0.40 sec)

mysql> select * from kingsale.sale;
+----------+-------------+-------------+-----------------+
| order_id | customer_id | item        | order_total_usd |
+----------+-------------+-------------+-----------------+
| 44697328 | 375         | book        |           29.99 |
| 44697329 | 375         | guitar      |          215.99 |
| 44697330 | 983         | thermometer |           12.99 |
| 44697331 | 983         | scarf       |           32.99 |
| 44697332 | 375         | doormat     |           15.99 |
| 44697333 | 983         | clippers    |           65.99 |
| 44697444 | 992         | macbook     |          265.99 |
| 44697445 | 789         | galaxy_s22  |          165.99 |
| 44697446 | 790         | galaxy_s21  |          160.99 |
+----------+-------------+-------------+-----------------+
9 rows in set (0.03 sec)

STEP 3) Confluent 환경 구성

step 3-1) 아래의 자료에서 ‘confluent cli 설치 및 사용자 인증’ 자료를 참고해서 로컬 PCdp confluent cli 설치를 해준다.

되어 있으면 상관없이 다음 스텝으로 넘어간다.

** confluent ksqldb를 이용한 실시간 데이터 파이프라인 구성 실습 : https://minman2115.github.io/DE_TIL366

step 3-2) confluent cluster 생성

컨플루언트 웹콘솔에 접속한 다음에 클러스터를 생성하고 반드시 https://confluent.cloud/settings/org/assignments 에서 아래와 같이 권한부여를 잘 해준다.

1

STEP 3-3) confluent 웹콘솔에서 생성한 클러스터에 접속한 다음에 아래와 같이 토픽을 생성해준다.

토픽 이름 : minmanrds.kingsale.sale

다른 설정은 디폴트 설정

step 3-4) confluent 웹콘솔에서 생성한 클러스터에 schema registry 메뉴로 가서 API가 On 이 안되어 있다면 활성화를 시켜준다.

만약에 API 활성화가 되어 있다면 넘어간다.

STEP 3-5) MySQL mysql cdc source connector 생성

confluent 웹콘솔에서 connectors 메뉴에서 add connector 버튼을 누르고 MySQL mysql cdc source connector를 생성한다.

그런 다음에 아래와 같은 스펙으로 커넥터를 생성해준다.

{
  "name": "minman_mysql_cdc_source_connector",
  "config": {
    "connector.class": "MySqlCdcSource",
    "name": "minman_mysql_cdc_source_connector",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "****************",
    "kafka.api.secret": "****************************************************************",
    "database.hostname": "minmanrds.xxxxxxxxx.ap-northeast-2.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "admin",
    "database.password": "*********",
    "database.server.name": "minmanrds",
    "database.ssl.mode": "preferred",
    "database.connectionTimeZone": "Asia/Seoul",
    "table.include.list": "kingsale.sale",
    "snapshot.mode": "when_needed",
    "output.data.format": "JSON",
    "after.state.only": "true",
    "output.key.format": "JSON",
    "tasks.max": "1"
  }
}

STEP 4) CDC 동작 확인

그러면 Confluent 콘솔에서 minmanrds.kingsale.sale 토픽에 들어가서 message 메뉴에 가보면 아래 그림과 같이 메세지 들이 잘 들어오는 것을 확인할 수 있다.

아래와 같이 기존에 테이블에 저장되어 있던 데이터도 확인할 수 있고

2

아래와 같이 RDS에서 신규로 데이터를 집어 넣은 다음에

mysql> INSERT INTO kingsale.sale (order_id, customer_id, item, order_total_usd) VALUES (44697447, 791, 'galaxy_buds', 60.99);
Query OK, 1 row affected (0.03 sec)

mysql> select * from kingsale.sale;
+----------+-------------+-------------+-----------------+
| order_id | customer_id | item        | order_total_usd |
+----------+-------------+-------------+-----------------+
| 44697328 | 375         | book        |           29.99 |
| 44697329 | 375         | guitar      |          215.99 |
| 44697330 | 983         | thermometer |           12.99 |
| 44697331 | 983         | scarf       |           32.99 |
| 44697332 | 375         | doormat     |           15.99 |
| 44697333 | 983         | clippers    |           65.99 |
| 44697444 | 992         | macbook     |          265.99 |
| 44697445 | 789         | galaxy_s22  |          165.99 |
| 44697446 | 790         | galaxy_s21  |          160.99 |
| 44697447 | 791         | galaxy_buds |           60.99 |
+----------+-------------+-------------+-----------------+
10 rows in set (0.05 sec)

message 메뉴를 조회하면 새로 들어온 데이터도 확인할 수 있다.

3