Data Engineering/Kafka

[Kafka] Kafka에서 생성된 데이터를 일정한 시간 단위로 mysql 테이블에 저장하기

snoony 2024. 4. 3. 17:38

Topic 1

from kafka import KafkaProducer
import numpy as np
from time import time,sleep
import os
import json

producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count = 0
while True:
    producer.send('topic1',value=np.random.normal())
    sleep(.5)
    count += 1
    if count % 10 ==0:
        print("topic1 producer......")
# for data in temp:
#     producer.send('random_data',value=data)
# producer.flush()

np.random.normal()을 이용하여 정규분포에 있는 값을 랜덤으로 추출한 값을 전달한다.

Topic 2

from kafka import KafkaProducer
import numpy as np
from time import time,sleep
import os
import json
import random

# data.txt 파일 읽어오기
with open('data.txt','r', encoding='utf-8') as f:
    words = f.read().splitlines()
    
producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count = 0
while True:
    index = random.randint(0,len(words)-1)
    producer.send('topic2',value=words[index])
    sleep(1)
    count += 1
    if count % 5 ==0:
        print("topic2 producer......")

data.txt의 내용을 읽어와서 words에 저장하고, index 값을 무작위 생성하여 words에서 무작위 값을 뽑아내 토픽으로 전달

Spark에서 Kafka Streaming

from pyspark.sql import SparkSession, streaming
from pyspark.sql.functions import col, expr

# create session
spark = SparkSession.builder.appName('kafka_spark').getOrCreate()

# kafka read by streaming
kafka_options = {
    'kafka.bootstrap.servers':'localhost:9092',
    'subscribe':'topic1,topic2'
}
df = spark.readStream.format("kafka")\
                        .options(**kafka_options).load()
rawDF = df.selectExpr("topic","CAST(key as STRING)","CAST(value as STRING)")
rawDF.writeStream.format('console').outputMode('Append').start()

readStream : Kafka로부터 데이터를 읽어오기 위해 kafka 포맷을 사용하는 것을 설정하고, kafka_options에 설정된 옵션들을 이용하여 데이터를 로드합니다.

selectExpr : 데이터프레임에서 특정 컬럼을 선택하여 새로운 데이터프레임을 생성하는데, 여기서는 topic, key, value 컬럼을 선택하고, 이를 각각 문자열로 형변환하여 저장합니다.

writeSream : 이렇게 가공된 데이터를 콘솔에 출력하는 스트리밍 프로세스를 설정합니다. 여기서는 Append 모드로 설정되어 있어 새로운 데이터가 도착할 때마다 출력됩니다.

table로 저장

# DB 저장하기
def foreach_batch_function(batch_df, batch_id):
    url = "jdbc:mysql://localhost:3306/hadoopguide?useSSL=false"
    driver='com.mysql.jdbc.Driver'
    user='root'
    password='password'
    tablename = 'tbl_kafka_spark'
    # mode = 'overwrite'
    mode = 'append'
    props = {'driver':driver,'user':user,'password':password}
    params = {
        'url': url,
        'table' :  tablename,
        'properties' : props,
        'mode' : mode
    }
    batch_df.write.jdbc(**params)
query = rawDF.writeStream \
    .format("console")\
    .trigger(processingTime='5 seconds')\
    .foreachBatch(foreach_batch_function)\
    .start()
query.awaitTermination()