본문 바로가기

전체 글

(113)
[Kafka] Kafka에서 생성된 데이터를 일정한 시간 단위로 mysql 테이블에 저장하기 Topic 1 from kafka import KafkaProducer import numpy as np from time import time,sleep import os import json producer = KafkaProducer(bootstrap_servers = 'localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8')) count = 0 while True: producer.send('topic1',value=np.random.normal()) sleep(.5) count += 1 if count % 10 ==0: print("topic1 producer......") # for data in temp: # pr..
[Kafka] json 데이터 처리하기 1. json 파일 준비 data.json { "p1": [ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Charlie"} ], "p2": [ {"id": 4, "name": "David"}, {"id": 5, "name": "Emma"}, {"id": 6, "name": "Frank"} ], "p3": [ {"id": 7, "name": "Grace"}, {"id": 8, "name": "Henry"}, {"id": 9, "name": "Ivy"} ] } 이 json 데이터를 Producer가 토픽으로 전달하고 Consumer를 통해 받을 것이다. 2. Topic 생성 # KafkaAdminClient : 클러스터와 ..
[Kafka] mysql과 연동하여 Producer, Consumer 실행 1. 사용할 module install !pip install kafka-python !pip install pymysql 2. Topic 생성 from kafka import KafkaProducer, KafkaConsumer from kafka.admin import KafkaAdminClient, NewTopic import json import pymysql # create topic def create_topics(topics, bootstrap_servers = 'localhost:9092'): admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) # 새로운 토픽 리스트 생성 topic_list = [ NewTopic(top..
[Kafka] python으로 Kafka 실행하기 1. Producer 생성 Kafka 라이브러리 설치 # 카프카 라이브러리 설치 !pip install kafka-python Topic이 읽을 파일 생성 1부터 1000까지 숫자들이 들어있는 파일을 생성한다. # 토픽이 읽을 파일을 생성 with open('new-topic.txt', 'w') as f: for i in range(1,1001): f.write(str(i) + '\n') Producer 생성 # 프로듀서 생성(토픽으로 데이터를 전송) from kafka import KafkaProducer import json import time from csv import reader class MessageProducer: def __init__(self,broker,topic): self.bro..
[Kafka] Apache Kafka 설치, 서버 실행, Producer, Consumer 생성 https://kafka.apache.org/downloads Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 가장 안정적인 버전 설치하기 : 2.5.0 -> 자바 1.8과 잘 맞는다 Kafka heap 사이즈 조정 [root@localhost kafka]# export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" 서버 설정 [root@localhost kafka]# gedit config/server.properties listeners=PLAINTEXT://:9092 주석 해제 Zookeeper 실행 [root@localhost kafka]# bin/zookeeper-server-start...
[Hadoop] cctv 공공데이터 MapReduce로 분석하기 https://www.data.go.kr/data/15013094/standard.do 전국CCTV표준데이터 국가에서 보유하고 있는 다양한 데이터를『공공데이터의 제공 및 이용 활성화에 관한 법률(제11956호)』에 따라 개방하여 국민들이 보다 쉽고 용이하게 공유•활용할 수 있도록 공공데이터(Datase www.data.go.kr 1. Java로 MapReduce 프로그램 작성 CctvMapper.java package com.bigdata.cctv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException..
[Hadoop] MapReduce 실행 중 Error: org.apache.hadoop.hdfs.BlockMissingException MapReduce을 실행하려고 했으나 BlockMissingException.. 노드들이 죽었다 ㅜㅜ 해결 방법 : namenode format 밑의 명령어들을 차례로 실행해보자 sbin/stop-all.sh rm -rf /tmp/hadoop-* $HADOOP_HOME/bin/hadoop namenode -format sbin/start-all.sh jps를 실행해보고 datanode까지 올라왔는지 확인해보기! 포맷이 잘 안되서 VMware를 껐다 키니까 다시 정상작동 하였다. 안되면 껐다가 켜보는 것도 방법.. namenode를 포맷했으니 hadoop fs -ls로 확인해보면 모두 삭제되어있다. hadoop fs -mkdir -p /user/root/input hadoop fs -put LICENSE...
[Spark] 타이타닉 데이터로 생존 예측하기 - Pipeline, LR from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.sql.functions import col,stddev_samp train_df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('titanic/train.csv').cache() 남길 컬럼들 Sex : 범주형 -> label encoding : StringIndexe..