본문 바로가기

Data Engineering

(45)
[Spark] Spark RDD - parallelize, collect, map, flatMap, filter, sortBy, mapPartitions, glom RDD 객체 생성 # DataFrame -> RDD spark.range(100).rdd # 0~9 꺄지 담긴 rdd 객체 생성 spark.range(10).toDF("id").rdd.map(lambda row: row[0]) # RDD를 다시 dataframe으로 변환 spark.range(10).rdd.toDF() rdd 호출 후 toDF는 일반적이지 않음 myCollection = "Spark The Definitive Guide: Big Data Processing Made Simple".split() myCollection parallelize # 2개의 파티션으로 나눔 words = spark.sparkContext.parallelize(myCollection,2) map 하나의 인자를 받는 ..
[Spark] MySQL 연결 Session 새로 생성하는 방법 mysql_spark = SparkSession.builder.config('spark.jar',"mysql-connector-java-5.1.46.jar")\ .master('local').appName('pySpark_MySql').getOrCreate() [root@localhost spark]# mv mysql-connector-java-5.1.46.jar jars/ mysql-connect-java 파일을 spark가 인식할 수 있도록 spark/jars 밑으로 옮김 이 mysql-connector-java-~~~~.jar 파일은 root 밑에 있는 mysql-connector-java 폴더 밑에서 복사해왔음 실행권한 바꾸기 [root@localhost ~]# ..
[Spark] join 연산, csv, json, Parquet, ORC join person = spark.createDataFrame([ (0, "Bill Chambers", 0, [100]), (1, "Matei Zaharia", 1, [500, 250, 100]), (2, "Michael Armbrust", 1, [250, 100])])\ .toDF("id", "name", "graduate_program", "spark_status") graduateProgram = spark.createDataFrame([ (0, "Masters", "School of Information", "UC Berkeley"), (2, "Masters", "EECS", "UC Berkeley"), (1, "Ph.D.", "EECS", "UC Berkeley")])\ .toDF("id", ..
[Spark] 집계연산, corr, Window from pyspark.sql.functions import * StockCode 개수 count 하기 # StockCode - count df.select(count('StockCode').alias('cnt')).show() approx_count_distinct df.select(approx_count_distinct('StockCode',0.1)).show() 0.1 : 임계값 first, last df.select(first('StockCode'), last('StockCode')).show() min, max df.select(min('Quantity'), max('Quantity')).show() sum df.select(sum('Quantity')).show() df.select(sum_d..
[Spark] 날짜 연산, 기본 연산 -3 날짜 current_date, current_timestamp # 날짜 from pyspark.sql.functions import current_date, current_timestamp datedf = spark.range(10).withColumn('today',current_date())\ .withColumn("now",current_timestamp()) datedf.show(truncate=False) datedf.createOrReplaceTempView('datedf') date_add, date_sub from pyspark.sql.functions import date_add, date_sub datedf.select(date_sub('today',5), date_add('today'..
[Spark] 기본 연산 - 2 filepath = 'bydata/by-day/2010-12-01.csv' df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(filepath) df.createOrReplaceTempView("dfTable") from pyspark.sql.functions import lit df.select(lit(5), lit('five'), lit(2.0)).show(2) 예제1 InvoiceNo != 536365 인 컬럼 InvoiceNo, Description 5개만 출력하기 from pyspark.sql.functions import col df.where(col("InvoiceNo") != 53636..
[Spark] 기본 연산 df = spark.read.format("json").load("2015-summary.json") from pyspark.sql.functions import lit, expr df.select(expr("*"),lit(1).alias("One")).show(2) df.withColumn("numberOne", lit(1)).show(2) df.withColumn("withinCountry", expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2) 컬럼명 변경 # 컬럼명 변경 df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show(2) # 컬럼명 변경 후 컬럼들 보여주기 df.withColumnRenamed("D..
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리 ~/.bashrc export PYSPARK_PYTHON=python3 export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root' pip3 install jupyterlab 설치 후 pyspark 실행하면 jupyterlab 열림 spark 실행해보기 Spark001.ipynb staticDataFrame = spark.read.format("csv")\ .option("header", "true")\ .option("inferSchema", "true")\ .load("./bydata/by-day/*.csv") # 임시테이블 생성 staticDataFrame.createOrReplaceTempView(..