본문 바로가기

Data Engineering/Spark

[Spark] PySpark

scala> var sql_flight2015 = spark.sql("""
     | select ORIGIN_COUNTRY_NAME, sum(count) from flight2015 group by ORIGIN_COUNTRY_NAME
     | order by sum(count) desc""")

scala> flight2015.groupBy("ORIGIN_COUNTRY_NAME").sum("count")
.withColumnRenamed("sum(count)","origin_total").sort(desc("origin_total")).show()

Parquet 파일 읽기

C:\Users\Playdata>scp [파일이름].gz.parquet root@[linux ip주소]:/root/spark

Case Class

https://docs.scala-lang.org/ko/tour/case-classes.html

  • 기본적으로 불변
  • 패턴 매칭을 통해 분해가능
  • 레퍼런스가 아닌 구조적인 동등성으로 비교됨
  • 초기화와 운영이 간결함
scala> import spark.implicits._
scala> case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
[root@localhost spark]# mkdir -p flight_data/parquet
[root@localhost spark]# mv part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet flight_data/parquet/
scala> var flightDF = spark.read.parquet("./flight_data/parquet/")
scala> var flights = flightDF.as[Flight]

PySpark

>>> range_ = spark.range(1000).toDF("number")
>>> range_.show()
+------+                                                                        
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows

zip 파일 푸는 명령어 : unzip

>>> df = spark.read.format("csv")\
... .option("header","true").option("inferSchema","true")\
... .load("bydata/by-day/*.csv")

spark.read.format("csv") : csv 파일 읽기

option("header", "true") : 각 csv 파일의 첫 번째 줄이 헤더를 포함하고 있음

option("inferSchema", "true") : 스파크가 각 열의 데이터 유형을 자동으로 추론하도록 함

Temporary View

Spark의 Temporary View는 Session-Scope

View의 생명주기가 세션에 달려있다는 뜻이고,  세션이 종료되면 자동으로 View 테이블이 Drop 된다.

Temp View를 생성하는 메소드 : createOrReplaceTempView

>>> from pyspark.sql.functions import window,column,desc,col
>>> df.createOrReplaceTempView("retail_data")
>>> staticSchema = df.schema
>>> df.selectExpr("CustomerId","(UnitPrice*Quantity) as total_count","InvoiceDate")
.groupBy(col("CustomerId"), window(col("InvoiceDate"),"1 day")).sum("total_count").show(5)
  • selectExpr : 데이터프레임에서 선택된 열을 반환 및 새로운 열 생성
    • CustomerId, InvoiceDate 열을 선택
    • UnitPrice*Quantity 값을 total_count 열로 생성
  • groupBy : CustomerId, window(col("InvoiceDate"), "1 day")를 기준으로 그룹화
    • window : InvoiceDate를 기준으로 1일 간격으로 창을 생성하여 그룹화, 특정 기간의 데이터를 그룹화

Spark Streaming

실시간 처리를 위해서 스트리밍 기술 사용

특정폴더의 데이터 수집

>>> streamingDf = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",1).format("csv")\
... .option("header","true").load("./bydata/by-day/*.csv")
>>> purchaseByCustomerPerHour = streamingDf.selectExpr("CustomerID",
... "(Quantity*UnitPrice) as total_cost","InvoiceDate").groupBy(
... col("CustomerID"),window(col("InvoiceDate"),"1 day")).sum("total_cost")
  • spark.readStream: Spark의 스트리밍 기능을 사용하여 DataFrame을 생성
  • schema(staticSchema): 정적 스키마(staticSchema)를 지정하여 스트리밍 데이터의 스키마를 설정
  • option("maxFilesPerTrigger", 1): 스트리밍 처리당 읽어올 파일의 최대 개수를 1로 설정

 

>>> purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchase")\
... .outputMode("complete").start()

실시간 처리를 위해서 reading 하고 있는 데이터를 메모리에 실시간으로 적재

스트림서버에 구축해야 하는데 일단 실행하고 종료시켰다.

PySpark로 데이터 전처리하기

>>> preparedDf = df.na.fill(0).withColumn('day_of_week', date_format(col('InvoiceDate'), "EEEE"))\
... .coalesce(5)
>>> train_df = preparedDf.where("InvoiceDate < '2011-07-01'")
>>> train_df.show()
  • 결측치를 0으로 채움
  • InvoiceDate를 기준으로 요일을 full name으로 추출해서 day_of_week 컬럼 생성
    • 날짜와 같은 연속형 데이터를 범주형 데이터로 새롭게 추출
  • train_df : 스파크에서 지원하는 머신러닝을 위한 훈련데이터 만듦

'Data Engineering > Spark' 카테고리의 다른 글

[Spark] 기본 연산 - 2  (0) 2024.03.21
[Spark] 기본 연산  (0) 2024.03.21
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리  (0) 2024.03.20
[Spark] Zeppelin 설치하기  (0) 2024.03.20
[Spark] Spark 다운로드 및 실습  (0) 2024.03.19