본문 바로가기

Data Engineering/Kafka

[Kafka] python으로 Kafka 실행하기

1. Producer 생성

Kafka 라이브러리 설치

# 카프카 라이브러리 설치
!pip install kafka-python

Topic이 읽을 파일 생성

1부터 1000까지 숫자들이 들어있는 파일을 생성한다.

# 토픽이 읽을 파일을 생성
with open('new-topic.txt', 'w') as f:
    for i in range(1,1001):
        f.write(str(i) + '\n')

Producer 생성

# 프로듀서 생성(토픽으로 데이터를 전송)
from kafka import KafkaProducer
import json
import time
from csv import reader

class MessageProducer:
    def __init__(self,broker,topic):
        self.broker = broker
        self.topic = topic
        params = {
            'bootstrap_servers' : self.broker,
            'value_serializer' : lambda x : json.dumps(x).encode('utf-8'),
            'acks' : 0, # 클라이언트의 응답을 기다리지 않고 다음 메세지 전송(높은 전송속도 보장, 메세지 전송을 보장하지 않을 수 있음)
            'api_version' : (2,5,0),
            'retries' : 3 # 재전송 횟수
        }
        self.producer = KafkaProducer(**params)
    def send_message(self, msg):
        try:
            future = self.producer.send(self.topic, msg)
            self.producer.flush() # 메세지 버퍼를 비워줌
            future.get(timeout=60) # 60초간 대기
            return {'status_code':200, 'error':None}
        except Exception as e:
            print('error:',e)
            return e

acks 옵션

acks : 1 (default) -> 속도는 보통, 유실가능성 있음

acks : all or -1 -> 속도는 가장 느림, 메세지 전달 손실 가능성은 없음

브로커와 토픽명 지정, 프로듀서 실행

# 브로커와 토픽명을 지정
broker = 'localhost:9092'
topic = 'test'
message_producer = MessageProducer(broker,topic)
with open('new-topic.txt','r',encoding='utf-8') as f:
    for data in f:
        print('send-data:',data)
        res = message_producer.send_message(data)

Consumer가 실행되는 창에서 1~1000까지 숫자들이 출력되는 것을 확인할 수 있다.

2. Consumer 생성

# 컨슈머 생성(토픽의 데이터를 소비)
from kafka import KafkaConsumer
import json
import time
from csv import reader

class MessageConsumer:
    def __init__(self,broker,topic,group_id):
        self.broker = broker
        self.topic = topic
        self.group_id = group_id
    def active_listener(self):
        params = {
            'bootstrap_servers' : self.broker,
            'group_id' : self.group_id,
            'consumer_timeout_ms' : 1000 * 5,
            # 토픽을 처음 구독할때 또는 컨슈머의 오프셋이
            # 브로커에 의해 처음 삭제될 때 시작 오프셋을 설정
            'auto_offset_reset' : 'latest', 
            'enable_auto_commit' : False,
            # 역직렬화
            # ASCII decoding, JSON parsing
            'value_deserializer' : lambda x : json.loads(x.decode('ascii'))
        }
        consumer = KafkaConsumer(**params);
        consumer.subscribe(self.topic)  # 구독, 지정한 토픽의 데이터 감지되면 읽는다(소비한다)
        
        # 데이터 출력 - 데이터를 읽었을때 처리할 로직
        for data in consumer:
            print(data.value)

실행

# 브로커와 토픽명을 지정
broker = 'localhost:9092'
topic = 'test'
group_id = 'consumer-1'

consumer = MessageConsumer(broker,topic,group_id)
consumer.active_listener()

파일로 출력하기

start_time = time.time()
        # 데이터 출력 - 데이터를 읽었을때 처리할 로직
        try:
            # for data in consumer:
            #     print(data.value)
            with open(f"result_{self.topic}.txt",'a', encoding='utf-8') as f:
                for data in consumer:
                    f.write(data.value)
            end_time = time.time()
            print(f"총 처리시간 : {end_time - start_time}")
        except KeyboardInterrupt:
            print("KeyboardInterrupt")

result_test.txt에 저장된 것 확인

총 처리시간 확인

 

3. 3개의 Producer, Consumer 생성하기

Zookeeper, Kafka를 실행하고 Topic p1, p2, p3를 먼저 생성해준다.

Producer

# 프로듀서 3개 생성
broker = 'localhost:9092'

message_producer1 = MessageProducer(broker,"p1")
message_producer2 = MessageProducer(broker,"p2")
message_producer3 = MessageProducer(broker,"p3")

with open('new-topic.txt','r',encoding='utf-8') as f:
    for data in f:
        message_producer1.send_message(data)
        message_producer2.send_message(data)
        message_producer3.send_message(data)

Consumer

# 브로커와 토픽명을 지정
broker = 'localhost:9092'
group_id = 'consumer-1'

consumer1 = MessageConsumer(broker,"p1",group_id)
consumer2 = MessageConsumer(broker,"p2",group_id)
consumer3 = MessageConsumer(broker,"p3",group_id)

consumer1.active_listener()
consumer2.active_listener()
consumer3.active_listener()

result_p1.txt에만 데이터가 저장되었다.

Consumer 클래스에서 auto_offset_reset을 earlist로 주면 3개의 파일에 데이터가 모두 쌓인다.

auto_offset_reset

  • latest : 마지막으로 구독한 다음 메세지 부터 구독한다. (가장 최신)
  • earliest : 처음부터 메세지를 구독한다. (가장 오래된)
  • none : 구독하고자 하는 topic의 offset 정보가 없으면 exception을 발생한다.

콘솔에서 확인하기

# p1 producer
bin/kafka-console-producer.sh --topic p1 --bootstrap-server localhost:9092

# p1 consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic p1 --from-beginning

# p2 consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic p2 --from-beginning

topic을 p1으로 하는 producer를 생성하고,  왼쪽 하단이 p1 consumer, 오른쪽이 p2 consumer이다.

p1 consumer가 'this is producer p1'이라고 메세지를 보내자 p1 consumer만 메세지를 받는 것을 확인할 수 있다.