Data Engineering/Kafka (5) 썸네일형 리스트형 [Kafka] Kafka에서 생성된 데이터를 일정한 시간 단위로 mysql 테이블에 저장하기 Topic 1 from kafka import KafkaProducer import numpy as np from time import time,sleep import os import json producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8')) count = 0 while True: producer.send('topic1',value=np.random.normal()) sleep(.5) count += 1 if count % 10 ==0: print("topic1 producer......") # for data in temp: # pr.. [Kafka] json 데이터 처리하기 1. json 파일 준비 data.json { "p1": [ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Charlie"} ], "p2": [ {"id": 4, "name": "David"}, {"id": 5, "name": "Emma"}, {"id": 6, "name": "Frank"} ], "p3": [ {"id": 7, "name": "Grace"}, {"id": 8, "name": "Henry"}, {"id": 9, "name": "Ivy"} ] } 이 json 데이터를 Producer가 토픽으로 전달하고 Consumer를 통해 받을 것이다. 2. Topic 생성 # KafkaAdminClient : 클러스터와 .. [Kafka] mysql과 연동하여 Producer, Consumer 실행 1. 사용할 module install !pip install kafka-python !pip install pymysql 2. Topic 생성 from kafka import KafkaProducer, KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic import json import pymysql # create topic def create_topics(topics, bootstrap_servers = 'localhost:9092'): admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) # 새로운 토픽 리스트 생성 topic_list = [ NewTopic(top.. [Kafka] python으로 Kafka 실행하기 1. Producer 생성 Kafka 라이브러리 설치 # 카프카 라이브러리 설치 !pip install kafka-python Topic이 읽을 파일 생성 1부터 1000까지 숫자들이 들어있는 파일을 생성한다. # 토픽이 읽을 파일을 생성 with open('new-topic.txt', 'w') as f: for i in range(1,1001): f.write(str(i) + '\n') Producer 생성 # 프로듀서 생성(토픽으로 데이터를 전송) from kafka import KafkaProducer import json import time from csv import reader class MessageProducer: def __init__(self,broker,topic): self.bro.. [Kafka] Apache Kafka 설치, 서버 실행, Producer, Consumer 생성 https://kafka.apache.org/downloads Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 가장 안정적인 버전 설치하기 : 2.5.0 -> 자바 1.8과 잘 맞는다 Kafka heap 사이즈 조정 [root@localhost kafka]# export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" 서버 설정 [root@localhost kafka]# gedit config/server.properties listeners=PLAINTEXT://:9092 주석 해제 Zookeeper 실행 [root@localhost kafka]# bin/zookeeper-server-start... 이전 1 다음