Data Engineering/Kafka
[Kafka] Kafka에서 생성된 데이터를 일정한 시간 단위로 mysql 테이블에 저장하기
snoony
2024. 4. 3. 17:38
Topic 1
from kafka import KafkaProducer
import numpy as np
from time import time,sleep
import os
import json
producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count = 0
while True:
producer.send('topic1',value=np.random.normal())
sleep(.5)
count += 1
if count % 10 ==0:
print("topic1 producer......")
# for data in temp:
# producer.send('random_data',value=data)
# producer.flush()
np.random.normal()을 이용하여 정규분포에 있는 값을 랜덤으로 추출한 값을 전달한다.
Topic 2
from kafka import KafkaProducer
import numpy as np
from time import time,sleep
import os
import json
import random
# data.txt 파일 읽어오기
with open('data.txt','r', encoding='utf-8') as f:
words = f.read().splitlines()
producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count = 0
while True:
index = random.randint(0,len(words)-1)
producer.send('topic2',value=words[index])
sleep(1)
count += 1
if count % 5 ==0:
print("topic2 producer......")
data.txt의 내용을 읽어와서 words에 저장하고, index 값을 무작위 생성하여 words에서 무작위 값을 뽑아내 토픽으로 전달
Spark에서 Kafka Streaming
from pyspark.sql import SparkSession, streaming
from pyspark.sql.functions import col, expr
# create session
spark = SparkSession.builder.appName('kafka_spark').getOrCreate()
# kafka read by streaming
kafka_options = {
'kafka.bootstrap.servers':'localhost:9092',
'subscribe':'topic1,topic2'
}
df = spark.readStream.format("kafka")\
.options(**kafka_options).load()
rawDF = df.selectExpr("topic","CAST(key as STRING)","CAST(value as STRING)")
rawDF.writeStream.format('console').outputMode('Append').start()
readStream : Kafka로부터 데이터를 읽어오기 위해 kafka 포맷을 사용하는 것을 설정하고, kafka_options에 설정된 옵션들을 이용하여 데이터를 로드합니다.
selectExpr : 데이터프레임에서 특정 컬럼을 선택하여 새로운 데이터프레임을 생성하는데, 여기서는 topic, key, value 컬럼을 선택하고, 이를 각각 문자열로 형변환하여 저장합니다.
writeSream : 이렇게 가공된 데이터를 콘솔에 출력하는 스트리밍 프로세스를 설정합니다. 여기서는 Append 모드로 설정되어 있어 새로운 데이터가 도착할 때마다 출력됩니다.
table로 저장
# DB 저장하기
def foreach_batch_function(batch_df, batch_id):
url = "jdbc:mysql://localhost:3306/hadoopguide?useSSL=false"
driver='com.mysql.jdbc.Driver'
user='root'
password='password'
tablename = 'tbl_kafka_spark'
# mode = 'overwrite'
mode = 'append'
props = {'driver':driver,'user':user,'password':password}
params = {
'url': url,
'table' : tablename,
'properties' : props,
'mode' : mode
}
batch_df.write.jdbc(**params)
query = rawDF.writeStream \
.format("console")\
.trigger(processingTime='5 seconds')\
.foreachBatch(foreach_batch_function)\
.start()
query.awaitTermination()