본문 바로가기

Data Engineering/Kafka

[Kafka] json 데이터 처리하기

1. json 파일 준비

data.json

{
  "p1": [
    {"id": 1, "name": "Alice"},
    {"id": 2, "name": "Bob"},
    {"id": 3, "name": "Charlie"}
  ],
  "p2": [
    {"id": 4, "name": "David"},
    {"id": 5, "name": "Emma"},
    {"id": 6, "name": "Frank"}
  ],
  "p3": [
    {"id": 7, "name": "Grace"},
    {"id": 8, "name": "Henry"},
    {"id": 9, "name": "Ivy"}
  ]
}

이 json 데이터를 Producer가 토픽으로 전달하고 Consumer를 통해 받을 것이다.

2. Topic 생성

# KafkaAdminClient : 클러스터와 관련된 관리작업을 수행, 토픽 생성 수정 삭제, 파티션 추가 삭제
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json

# read data function
def read_data_from_file(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data
    
# create topic
def create_topics(topics, bootstrap_servers = 'localhost:9092'):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic_list = [ NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

3. Producer

def producer_to_topics(data, topics, bootstrap_servers='localhost:9092'):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
    value_serializer = lambda v : json.dumps(v).encode('utf-8'))
    for topic in topics:
        producer.send(topic, value=data.get(topic))
    producer.flush()

value_serializer = lambda v : json.dumps(v).encode('utf-8')

Kafka가 각 값 (v)을 Kafka 토픽으로 전송하기 전에 이 함수를 사용하여 직렬화하도록 지정한다.

이렇게 하면 각 값이 전송되기 전에 JSON 형식의 문자열로 변환되고 바이트로 인코딩되어 전송됩니다.

4. Consumer

def consumer_from_topics(topics, bootstrap_servers='localhost:9092', auto_offset_reset = 'earliest'):
    try:
        consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset = auto_offset_reset, 
                                 consumer_timeout_ms = 1000*10, # 10 seconds
                      value_deserializer = lambda v : json.loads(v.decode('utf-8')))
        consumer.subscribe(topics)
        for message in consumer:
            print(f"Topic: {message.topic}, Values: {message.value}")
    except KeyboardInterrupt:
        print("##### User Interrupt #####")

Consumer에서는 deserializer를 이용하여 json을 역직렬화한다.

UTF-8로 디코딩 -> 그 후에 JSON으로 로드 -> 파이썬 객체로 변환

5. 실행

file_path = 'data.json'
topics = ['p1','p2','p3']
data = read_data_from_file(file_path)

# create topic
create_topics(topics)
# product data to topic
producer_to_topics(data, topics)
# consume data from topic
consumer_from_topics(topics, auto_offset_reset='earliest')