본문 바로가기

Data Engineering

(45)
[Kafka] Apache Kafka 설치, 서버 실행, Producer, Consumer 생성 https://kafka.apache.org/downloads Apache Kafka Apache Kafka: A Distributed Streaming Platform. kafka.apache.org 가장 안정적인 버전 설치하기 : 2.5.0 -> 자바 1.8과 잘 맞는다 Kafka heap 사이즈 조정 [root@localhost kafka]# export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" 서버 설정 [root@localhost kafka]# gedit config/server.properties listeners=PLAINTEXT://:9092 주석 해제 Zookeeper 실행 [root@localhost kafka]# bin/zookeeper-server-start...
[Hadoop] cctv 공공데이터 MapReduce로 분석하기 https://www.data.go.kr/data/15013094/standard.do 전국CCTV표준데이터 국가에서 보유하고 있는 다양한 데이터를『공공데이터의 제공 및 이용 활성화에 관한 법률(제11956호)』에 따라 개방하여 국민들이 보다 쉽고 용이하게 공유•활용할 수 있도록 공공데이터(Datase www.data.go.kr 1. Java로 MapReduce 프로그램 작성 CctvMapper.java package com.bigdata.cctv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException..
[Hadoop] MapReduce 실행 중 Error: org.apache.hadoop.hdfs.BlockMissingException MapReduce을 실행하려고 했으나 BlockMissingException.. 노드들이 죽었다 ㅜㅜ 해결 방법 : namenode format 밑의 명령어들을 차례로 실행해보자 sbin/stop-all.sh rm -rf /tmp/hadoop-* $HADOOP_HOME/bin/hadoop namenode -format sbin/start-all.sh jps를 실행해보고 datanode까지 올라왔는지 확인해보기! 포맷이 잘 안되서 VMware를 껐다 키니까 다시 정상작동 하였다. 안되면 껐다가 켜보는 것도 방법.. namenode를 포맷했으니 hadoop fs -ls로 확인해보면 모두 삭제되어있다. hadoop fs -mkdir -p /user/root/input hadoop fs -put LICENSE...
[Spark] 타이타닉 데이터로 생존 예측하기 - Pipeline, LR from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.sql.functions import col,stddev_samp train_df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('titanic/train.csv').cache() 남길 컬럼들 Sex : 범주형 -> label encoding : StringIndexe..
[Spark] Pipeline, Logistic Regression 데이터 전처리 준비 from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.sql.functions import col,stddev_samp df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('Default.csv').drop('_c0').cache() 연속형, 수치형 데이터 변환 데이터프레임에 들어갈 이름들(input,..
[Spark] csv 파일 로드, 전처리, parquet 저장 csv 파일 읽기 from pyspark.sql import SQLContext from pyspark.sql.functions import * # read csv file spark_df = spark.read.format('csv').option('header','true').option('inferSchema','true').load('doc_log.csv') spark_df.show() 임시테이블로 등록 spark_df.createOrReplaceTempView("spark_df") 검색 ismydoc == true from pyspark.sql.functions import col spark_df.where(col('ismydoc') == 'true').show() df1 = spark.sql(..
[Spark] map, reduce 활용 map, reduce 활용 예제 wordcount word_counts = text.flatMap(lambda line:line.split()).map(lambda word : (word, 1)).reduceByKey(lambda x,y : x+y) word_counts.collect() 모두 2 곱하기 numbers = spark.sparkContext.parallelize([1,2,3,4,5,6,7,8,9]) numbers.map(lambda x : x*2).collect() 짝수만 필터링 numbers.filter(lambda x : x%2 == 0).collect() 모든 요소의 합 구하기 numbers.reduce(lambda x,y : x+y) 각 요소의 길이 # 각 요소의 길이 text = ..
[Spark] csv 파일 로드하고 RDD로 처리하기 csv 파일 로드하고 RDD로 처리 # csv 파일 로드 read_csv_ = "/root/spark/bydata/by-day/2011-12-09.csv" csv_lines = spark.sparkContext.textFile(read_csv_) header 없애기 header = csv_lines.first() # remove header data = csv_lines.filter(lambda line : line!=header) data.collect() data.map(lambda x: x.split(",")).take(3) x의 0,1,2번 인덱스만 추출 data.map(lambda x: x.split(",")).map(lambda x :(x[0],x[1],x[2])).take(3) # 데이터 읽..