Python client를 이용한 Confluent kafka 실습
.
Data_Engineering_TIL(20220712)
- 실습 아키텍처 (메세지 흐름 아키텍처)
schema registry
|
간이 프로듀서(로컬 맥북) --> Confluent kafka 브로커 --> Confluent GCS sink connector --> GCS bucket
** 매새지 형태 : avro
- 실습내용
STEP 1) 로컬 맥북에 파이썬 개발환경 구성
$ mkdir kafka_test
$ cd kafka_test
$ python3 -m venv kafka_env
$ cd kafka_env/bin
# 파이썬 가상환경 활성화
$ source activate
# m1 프로세스의 경우 아래의 라이브러리 및 환경설정을 하지 않게 되면 pip install confluent_kafka에 에러가 발생한다.
$ brew install librdkafka
# 아래에 ibrdkafka/1.9.1 부분은 ibrdkafka 버전에 따라 달라짐
$ C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.9.1/include LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.9.1/lib pip install confluent_kafka
STEP 2) Confluent kafka API 크레덴셜 발급
API 크레덴셜 발급목록 : 클러스터 API 크레덴셜, schema registry API 크레덴셜
step 2-1) 클러스터 API 크레덴셜 발급
confluent 웹콘솔 –> Environments 메뉴애서 원하는 환경접속 –> Clusters 메뉴 –> 원하는 클러스터 접속 –> 좌측 메뉴에 Data integration –> API Keys 메뉴에서 API 크레덴셜 발급
step 2-2) schema registry API 크레덴셜 발급
confluent 웹콘솔 –> Environments 메뉴에서 원하는 환경접속 –> Schema Registry 메뉴 –> API credentials 메뉴에서 API 크레덴셜 발급
STEP 3) Schema Registry에 스키마 등록
confluent 웹콘솔 –> Environments 메뉴에서 원하는 환경접속 –> Schema Registry 메뉴 –> View & manage schemas 메뉴 –> Add schema를 누르고 Avro 양식으로 스키마 등록
STEP 4) GCS sink Connector 생성
confluent 웹콘솔 –> Environments 메뉴애서 원하는 환경접속 –> Clusters 메뉴 –> 원하는 클러스터 접속 –> 좌측 메뉴에 Data integration –> Connectors에서 Add Connector를 누르고 GCS connector를 생성해준다.
confluent 클러스터 연결설정, GCP credential json 파일을 업로드 및 destination bucket 설정을 하여 생성을 해준다.
STEP 5) 간이 avro producer 작성
$ vim avro_producer.py
#!/usr/bin/env python
import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
if __name__ == '__main__':
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
avroProducer = AvroProducer({
'bootstrap.servers':'xxx-xxxx.asia-northeast3.gcp.confluent.cloud:9092',
'security.protocol':'SASL_SSL',
'sasl.mechanisms':'PLAIN',
'sasl.username':'{cluster api key}',
'sasl.password':'{cluster api secret}',
'schema.registry.url':'https:/xxxxx-xxxx.australia-southeast1.gcp.confluent.cloud',
'schema.registry.basic.auth.credentials.source': 'USER_INFO',
'schema.registry.basic.auth.user.info': '{schema registry api key}:{schema registry api secret}',
'on_delivery':delivery_report,
}, default_value_schema=value_schema)
for class_num in range(0,10000):
value = {"name": "Peter", "class": class_num} # 전송할 메시지
avroProducer.produce(topic='minsupark_test', value=value)
avroProducer.flush()
STEP 6) 메세지 프로듀싱 실시
$ chmod +x avro_producer.py
$ python avro_producer.py
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
...
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
Message delivered to minsupark_test [0]
STEP 7) GCS sink connector를 통해서 메세지가 avro 파일로 약속된 GCS 버킷에 저장되는지 확인
$ gsutil ls gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000000008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000001008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000002008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000003008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000004008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000005008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000006008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000007008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000008008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000009008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000010008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000011008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000012008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000013008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000014008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000015008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000016008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000017008.avro
gs://minsu_park_kafka_test/topics/minsupark_test/year=2022/month=07/day=12/hour=00/minsupark_test+0+0000018008.avro
** avro 형태가 아닌 일반적인 메세지일때 간이 컨슈머&프로듀서 예시
# config 파일 작성
$ vim getting_started.ini
[default]
bootstrap.servers=xxx-xxxx.asia-northeast3.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=xxxxxxxxxxxxxxxxxxxxxxx
sasl.password=yyyyyyyyyyyyyyyyyyyyyyyyyy
[consumer]
group.id=python_example_group_1
# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest
# 간이 프로듀서 작성
$ vim producer.py
#!/usr/bin/env python
import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
# Create Producer instance
producer = Producer(config)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err))
else:
print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
# Produce data by selecting random values from these lists.
topic = "minsupark_test"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']
count = 0
for _ in range(10):
user_id = choice(user_ids)
product = choice(products)
producer.produce(topic, product, user_id, callback=delivery_callback)
count += 1
# Block until the messages are sent.
producer.poll(10000)
producer.flush()
$ chmod u+x producer.py
# 프로듀싱 실시
$ ./producer.py getting_started.ini
Produced event to topic minsupark_test: key = sgarcia value = alarm clock
Produced event to topic minsupark_test: key = sgarcia value = batteries
Produced event to topic minsupark_test: key = jsmith value = book
Produced event to topic minsupark_test: key = htanaka value = gift card
Produced event to topic minsupark_test: key = sgarcia value = alarm clock
Produced event to topic minsupark_test: key = htanaka value = alarm clock
Produced event to topic minsupark_test: key = eabara value = book
Produced event to topic minsupark_test: key = awalther value = batteries
Produced event to topic minsupark_test: key = awalther value = batteries
Produced event to topic minsupark_test: key = eabara value = gift card
# 간이 컨슈머 생성
$ vim consumer.py
#!/usr/bin/env python
import sys
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
parser.add_argument('--reset', action='store_true')
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
config.update(config_parser['consumer'])
# Create Consumer instance
consumer = Consumer(config)
# Set up a callback to handle the '--reset' flag.
def reset_offset(consumer, partitions):
if args.reset:
for p in partitions:
p.offset = OFFSET_BEGINNING
consumer.assign(partitions)
# Subscribe to topic
topic = "minsupark_test"
consumer.subscribe([topic], on_assign=reset_offset)
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
$ chmod u+x consumer.py
# 컨슈밍 실시
$ ./consumer.py getting_started.ini
Consumed event from topic minsupark_test: key = sgarcia value = alarm clock
Consumed event from topic minsupark_test: key = sgarcia value = batteries
Consumed event from topic minsupark_test: key = jsmith value = book
Consumed event from topic minsupark_test: key = htanaka value = gift card
...
Waiting...
Waiting...
Waiting...
Waiting...