본문 바로가기

Data Engineering/Spark

(16)
[Spark] 집계연산, corr, Window from pyspark.sql.functions import * StockCode 개수 count 하기 # StockCode - count df.select(count('StockCode').alias('cnt')).show() approx_count_distinct df.select(approx_count_distinct('StockCode',0.1)).show() 0.1 : 임계값 first, last df.select(first('StockCode'), last('StockCode')).show() min, max df.select(min('Quantity'), max('Quantity')).show() sum df.select(sum('Quantity')).show() df.select(sum_d..
[Spark] 날짜 연산, 기본 연산 -3 날짜 current_date, current_timestamp # 날짜 from pyspark.sql.functions import current_date, current_timestamp datedf = spark.range(10).withColumn('today',current_date())\ .withColumn("now",current_timestamp()) datedf.show(truncate=False) datedf.createOrReplaceTempView('datedf') date_add, date_sub from pyspark.sql.functions import date_add, date_sub datedf.select(date_sub('today',5), date_add('today'..
[Spark] 기본 연산 - 2 filepath = 'bydata/by-day/2010-12-01.csv' df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(filepath) df.createOrReplaceTempView("dfTable") from pyspark.sql.functions import lit df.select(lit(5), lit('five'), lit(2.0)).show(2) 예제1 InvoiceNo != 536365 인 컬럼 InvoiceNo, Description 5개만 출력하기 from pyspark.sql.functions import col df.where(col("InvoiceNo") != 53636..
[Spark] 기본 연산 df = spark.read.format("json").load("2015-summary.json") from pyspark.sql.functions import lit, expr df.select(expr("*"),lit(1).alias("One")).show(2) df.withColumn("numberOne", lit(1)).show(2) df.withColumn("withinCountry", expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2) 컬럼명 변경 # 컬럼명 변경 df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show(2) # 컬럼명 변경 후 컬럼들 보여주기 df.withColumnRenamed("D..
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리 ~/.bashrc export PYSPARK_PYTHON=python3 export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root' pip3 install jupyterlab 설치 후 pyspark 실행하면 jupyterlab 열림 spark 실행해보기 Spark001.ipynb staticDataFrame = spark.read.format("csv")\ .option("header", "true")\ .option("inferSchema", "true")\ .load("./bydata/by-day/*.csv") # 임시테이블 생성 staticDataFrame.createOrReplaceTempView(..
[Spark] Zeppelin 설치하기 [root@localhost ~]# wget https://dlcdn.apache.org/zeppelin/zeppelin-0.11.0/zeppelin-0.11.0-bin-all.tgz [root@localhost ~]# tar zxvf zeppelin-0.11.0-bin-all.tgz [root@localhost ~]# mv zeppelin-0.11.0-bin-all/ apache-zeppelin [root@localhost ~]# cd apache-zeppelin/conf [root@localhost conf]# cp zeppelin-env.sh.template zeppelin-env.sh [root@localhost conf]# cp zeppelin-site.xml.template zeppelin-s..
[Spark] PySpark scala> var sql_flight2015 = spark.sql(""" | select ORIGIN_COUNTRY_NAME, sum(count) from flight2015 group by ORIGIN_COUNTRY_NAME | order by sum(count) desc""") scala> flight2015.groupBy("ORIGIN_COUNTRY_NAME").sum("count") .withColumnRenamed("sum(count)","origin_total").sort(desc("origin_total")).show() Parquet 파일 읽기 C:\Users\Playdata>scp [파일이름].gz.parquet root@[linux ip주소]:/root/spark Case Class ..
[Spark] Spark 다운로드 및 실습 https://www.apache.org/dyn/closer.lua/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz