Data Engineering/Kafka

[Kafka] mysql과 연동하여 Producer, Consumer 실행

snoony 2024. 4. 1. 17:32

1. 사용할 module install

!pip install kafka-python
!pip install pymysql

2. Topic 생성

from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import pymysql

# create topic
def create_topics(topics, bootstrap_servers = 'localhost:9092'):
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    # 새로운 토픽 리스트 생성
    topic_list = [ NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

KafkaAdminClient

카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회하기 위해 KafkaAdminClient 클래스를 제공

Kafka 클러스터를 관리하기 위한 클래스이다.

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

 

KafkaAdminClient — kafka-python 2.0.2-dev documentation

broker_ids – A list of broker node_ids to query for consumer groups. If set to None, will query all brokers in the cluster. Explicitly specifying broker(s) can be useful for determining which consumer groups are coordinated by those broker(s). Default: N

kafka-python.readthedocs.io

3. Producer 생성

# create producer
def create_producer(topic, bootstrap_servers = 'localhost:9092', value_serializer= lambda v : v):
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer= value_serializer)
    # 쿼리를 만들어서 토픽에 실어서 보낸다
    sql_queries = [
        '''CREATE TABLE IF NOT EXISTS tbl_kafka_data ( name varchar(10), email varchar(50), phone varchar(50));
        ''',
        "insert into tbl_kafka_data values('lee','lee@gmail.com','010-1325-1234');",
        "insert into tbl_kafka_data values('asd','asd@gmail.com','010-4356-9243');",
        "insert into tbl_kafka_data values('qwv','qwv@gmail.com','010-2881-3579');"
    ]

    for query in sql_queries:
        producer.send(topic, value=query.encode('utf-8'))
    producer.flush()

KafkaProducer

Kafka 클러스터에 레코드를 게시하는 Kafka Client

  • bootstrap_servers – 생산자가 초기 클러스터 메타데이터를 부트스트랩하기 위해 연결해야 하는 'host[:port]' 문자열(또는 'host[:port]' 문자열 목록), 서버가 지정되지 않은 경우 기본값은 localhost:9092
  • value_serializer ( callable ) – 사용자가 제공한 메시지 값을 바이트로 변환하는 데 사용됨(직렬화) , None이 아닌 경우 f(value)라고 하며 바이트를 반환해야 한다. 

send()는 즉시 broker에게 메세지를 전송하지 않는다.

send()를 통해 메세지를 내부 버퍼에 쌓아두고 flush()를 통해 broker로 전달한다.

프로듀서가 메세지를 보내는 과정

4. Consumer 생성

# create consumer
def create_consumer(topic, bootstrap_servers = 'localhost:9092', auto_offset_reset = 'earliest',
                    value_deserializer = lambda v : v, consumer_timeout_ms=1000):
    
    consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset=auto_offset_reset,
                            value_deserializer=value_deserializer, consumer_timeout_ms=consumer_timeout_ms)
    # mysql connect
    mysql_con = pymysql.connect(
        host='localhost',
        user='root',
        password= [your password],
        db=[your database],
        charset='utf8'
    )
    # 데이터베이스의 쿼리를 실행하기 위한 커서
    cursor = mysql_con.cursor()
    
    for message in consumer:
        sql = message.value
        cursor.execute(sql) # message --> db
    
    mysql_con.commit()
    mysql_con.close()

KafkaConsumer

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

KafkaConsumer — kafka-python 2.0.2-dev documentation

  bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that wil

kafka-python.readthedocs.io

5. 실행

topic = ['kafka_sql']
create_topics(topic)
create_producer(topic[0])
create_consumer(topic[0])

create_topics 메소드가 topic들의 리스트를 받아서 생성하는 원리로 설계되었으므로 

create_producer, create_consumer에는 topic[0]을 지정해주었다.

6. MySQL 확인

tbl_kafka_data 테이블이 생성되었고, insert into를 통해 값들이 들어간 것을 확인할 수 있다.