본문 바로가기

분류 전체보기

(135)
[Spark] map, reduce 활용 map, reduce 활용 예제 wordcount word_counts = text.flatMap(lambda line:line.split()).map(lambda word : (word, 1)).reduceByKey(lambda x,y : x+y) word_counts.collect() 모두 2 곱하기 numbers = spark.sparkContext.parallelize([1,2,3,4,5,6,7,8,9]) numbers.map(lambda x : x*2).collect() 짝수만 필터링 numbers.filter(lambda x : x%2 == 0).collect() 모든 요소의 합 구하기 numbers.reduce(lambda x,y : x+y) 각 요소의 길이 # 각 요소의 길이 text = ..
[Spark] csv 파일 로드하고 RDD로 처리하기 csv 파일 로드하고 RDD로 처리 # csv 파일 로드 read_csv_ = "/root/spark/bydata/by-day/2011-12-09.csv" csv_lines = spark.sparkContext.textFile(read_csv_) header 없애기 header = csv_lines.first() # remove header data = csv_lines.filter(lambda line : line!=header) data.collect() data.map(lambda x: x.split(",")).take(3) x의 0,1,2번 인덱스만 추출 data.map(lambda x: x.split(",")).map(lambda x :(x[0],x[1],x[2])).take(3) # 데이터 읽..
[Spark] Spark RDD - parallelize, collect, map, flatMap, filter, sortBy, mapPartitions, glom RDD 객체 생성 # DataFrame -> RDD spark.range(100).rdd # 0~9 꺄지 담긴 rdd 객체 생성 spark.range(10).toDF("id").rdd.map(lambda row: row[0]) # RDD를 다시 dataframe으로 변환 spark.range(10).rdd.toDF() rdd 호출 후 toDF는 일반적이지 않음 myCollection = "Spark The Definitive Guide: Big Data Processing Made Simple".split() myCollection parallelize # 2개의 파티션으로 나눔 words = spark.sparkContext.parallelize(myCollection,2) map 하나의 인자를 받는 ..
[Spring MVC] HttpServletResponse, Http 응답 데이터 HttpServletResponse 역할 Http 응답 메시지 생성 : HTTP 응답코드 지정, 헤더, 바디 생성 편의 기능 제공 : Content-Type, 쿠키, Redirect Content 편의 메서드 private void content(HttpServletResponse response) { //Content-Type: text/plain;charset=utf-8 //Content-Length: 2 //response.setHeader("Content-Type", "text/plain;charset=utf-8"); response.setContentType("text/plain"); response.setCharacterEncoding("utf-8"); //response.setContentL..
[Spark] MySQL 연결 Session 새로 생성하는 방법 mysql_spark = SparkSession.builder.config('spark.jar',"mysql-connector-java-5.1.46.jar")\ .master('local').appName('pySpark_MySql').getOrCreate() [root@localhost spark]# mv mysql-connector-java-5.1.46.jar jars/ mysql-connect-java 파일을 spark가 인식할 수 있도록 spark/jars 밑으로 옮김 이 mysql-connector-java-~~~~.jar 파일은 root 밑에 있는 mysql-connector-java 폴더 밑에서 복사해왔음 실행권한 바꾸기 [root@localhost ~]# ..
[Spark] join 연산, csv, json, Parquet, ORC join person = spark.createDataFrame([ (0, "Bill Chambers", 0, [100]), (1, "Matei Zaharia", 1, [500, 250, 100]), (2, "Michael Armbrust", 1, [250, 100])])\ .toDF("id", "name", "graduate_program", "spark_status") graduateProgram = spark.createDataFrame([ (0, "Masters", "School of Information", "UC Berkeley"), (2, "Masters", "EECS", "UC Berkeley"), (1, "Ph.D.", "EECS", "UC Berkeley")])\ .toDF("id", ..
[Spark] 집계연산, corr, Window from pyspark.sql.functions import * StockCode 개수 count 하기 # StockCode - count df.select(count('StockCode').alias('cnt')).show() approx_count_distinct df.select(approx_count_distinct('StockCode',0.1)).show() 0.1 : 임계값 first, last df.select(first('StockCode'), last('StockCode')).show() min, max df.select(min('Quantity'), max('Quantity')).show() sum df.select(sum('Quantity')).show() df.select(sum_d..
[Spark] 날짜 연산, 기본 연산 -3 날짜 current_date, current_timestamp # 날짜 from pyspark.sql.functions import current_date, current_timestamp datedf = spark.range(10).withColumn('today',current_date())\ .withColumn("now",current_timestamp()) datedf.show(truncate=False) datedf.createOrReplaceTempView('datedf') date_add, date_sub from pyspark.sql.functions import date_add, date_sub datedf.select(date_sub('today',5), date_add('today'..