1. json 파일 준비
data.json
{
"p1": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
],
"p2": [
{"id": 4, "name": "David"},
{"id": 5, "name": "Emma"},
{"id": 6, "name": "Frank"}
],
"p3": [
{"id": 7, "name": "Grace"},
{"id": 8, "name": "Henry"},
{"id": 9, "name": "Ivy"}
]
}
이 json 데이터를 Producer가 토픽으로 전달하고 Consumer를 통해 받을 것이다.
2. Topic 생성
# KafkaAdminClient : 클러스터와 관련된 관리작업을 수행, 토픽 생성 수정 삭제, 파티션 추가 삭제
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
# read data function
def read_data_from_file(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
# create topic
def create_topics(topics, bootstrap_servers = 'localhost:9092'):
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_list = [ NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
3. Producer
def producer_to_topics(data, topics, bootstrap_servers='localhost:9092'):
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
value_serializer = lambda v : json.dumps(v).encode('utf-8'))
for topic in topics:
producer.send(topic, value=data.get(topic))
producer.flush()
value_serializer = lambda v : json.dumps(v).encode('utf-8')
Kafka가 각 값 (v)을 Kafka 토픽으로 전송하기 전에 이 함수를 사용하여 직렬화하도록 지정한다.
이렇게 하면 각 값이 전송되기 전에 JSON 형식의 문자열로 변환되고 바이트로 인코딩되어 전송됩니다.
4. Consumer
def consumer_from_topics(topics, bootstrap_servers='localhost:9092', auto_offset_reset = 'earliest'):
try:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset = auto_offset_reset,
consumer_timeout_ms = 1000*10, # 10 seconds
value_deserializer = lambda v : json.loads(v.decode('utf-8')))
consumer.subscribe(topics)
for message in consumer:
print(f"Topic: {message.topic}, Values: {message.value}")
except KeyboardInterrupt:
print("##### User Interrupt #####")
Consumer에서는 deserializer를 이용하여 json을 역직렬화한다.
UTF-8로 디코딩 -> 그 후에 JSON으로 로드 -> 파이썬 객체로 변환
5. 실행
file_path = 'data.json'
topics = ['p1','p2','p3']
data = read_data_from_file(file_path)
# create topic
create_topics(topics)
# product data to topic
producer_to_topics(data, topics)
# consume data from topic
consumer_from_topics(topics, auto_offset_reset='earliest')
'Data Engineering > Kafka' 카테고리의 다른 글
[Kafka] Kafka에서 생성된 데이터를 일정한 시간 단위로 mysql 테이블에 저장하기 (1) | 2024.04.03 |
---|---|
[Kafka] mysql과 연동하여 Producer, Consumer 실행 (0) | 2024.04.01 |
[Kafka] python으로 Kafka 실행하기 (0) | 2024.03.29 |
[Kafka] Apache Kafka 설치, 서버 실행, Producer, Consumer 생성 (0) | 2024.03.29 |