scala> var sql_flight2015 = spark.sql("""
| select ORIGIN_COUNTRY_NAME, sum(count) from flight2015 group by ORIGIN_COUNTRY_NAME
| order by sum(count) desc""")
scala> flight2015.groupBy("ORIGIN_COUNTRY_NAME").sum("count")
.withColumnRenamed("sum(count)","origin_total").sort(desc("origin_total")).show()
Parquet 파일 읽기
C:\Users\Playdata>scp [파일이름].gz.parquet root@[linux ip주소]:/root/spark
Case Class
https://docs.scala-lang.org/ko/tour/case-classes.html
- 기본적으로 불변
- 패턴 매칭을 통해 분해가능
- 레퍼런스가 아닌 구조적인 동등성으로 비교됨
- 초기화와 운영이 간결함
scala> import spark.implicits._
scala> case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
[root@localhost spark]# mkdir -p flight_data/parquet
[root@localhost spark]# mv part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet flight_data/parquet/
scala> var flightDF = spark.read.parquet("./flight_data/parquet/")
scala> var flights = flightDF.as[Flight]
PySpark
>>> range_ = spark.range(1000).toDF("number")
>>> range_.show()
+------+
|number|
+------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+------+
only showing top 20 rows
zip 파일 푸는 명령어 : unzip
>>> df = spark.read.format("csv")\
... .option("header","true").option("inferSchema","true")\
... .load("bydata/by-day/*.csv")
spark.read.format("csv") : csv 파일 읽기
option("header", "true") : 각 csv 파일의 첫 번째 줄이 헤더를 포함하고 있음
option("inferSchema", "true") : 스파크가 각 열의 데이터 유형을 자동으로 추론하도록 함
Temporary View
Spark의 Temporary View는 Session-Scope
View의 생명주기가 세션에 달려있다는 뜻이고, 세션이 종료되면 자동으로 View 테이블이 Drop 된다.
Temp View를 생성하는 메소드 : createOrReplaceTempView
>>> from pyspark.sql.functions import window,column,desc,col
>>> df.createOrReplaceTempView("retail_data")
>>> staticSchema = df.schema
>>> df.selectExpr("CustomerId","(UnitPrice*Quantity) as total_count","InvoiceDate")
.groupBy(col("CustomerId"), window(col("InvoiceDate"),"1 day")).sum("total_count").show(5)
- selectExpr : 데이터프레임에서 선택된 열을 반환 및 새로운 열 생성
- CustomerId, InvoiceDate 열을 선택
- UnitPrice*Quantity 값을 total_count 열로 생성
- groupBy : CustomerId, window(col("InvoiceDate"), "1 day")를 기준으로 그룹화
- window : InvoiceDate를 기준으로 1일 간격으로 창을 생성하여 그룹화, 특정 기간의 데이터를 그룹화
Spark Streaming
실시간 처리를 위해서 스트리밍 기술 사용
특정폴더의 데이터 수집
>>> streamingDf = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",1).format("csv")\
... .option("header","true").load("./bydata/by-day/*.csv")
>>> purchaseByCustomerPerHour = streamingDf.selectExpr("CustomerID",
... "(Quantity*UnitPrice) as total_cost","InvoiceDate").groupBy(
... col("CustomerID"),window(col("InvoiceDate"),"1 day")).sum("total_cost")
- spark.readStream: Spark의 스트리밍 기능을 사용하여 DataFrame을 생성
- schema(staticSchema): 정적 스키마(staticSchema)를 지정하여 스트리밍 데이터의 스키마를 설정
- option("maxFilesPerTrigger", 1): 스트리밍 처리당 읽어올 파일의 최대 개수를 1로 설정
>>> purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchase")\
... .outputMode("complete").start()
실시간 처리를 위해서 reading 하고 있는 데이터를 메모리에 실시간으로 적재
스트림서버에 구축해야 하는데 일단 실행하고 종료시켰다.
PySpark로 데이터 전처리하기
>>> preparedDf = df.na.fill(0).withColumn('day_of_week', date_format(col('InvoiceDate'), "EEEE"))\
... .coalesce(5)
>>> train_df = preparedDf.where("InvoiceDate < '2011-07-01'")
>>> train_df.show()
- 결측치를 0으로 채움
- InvoiceDate를 기준으로 요일을 full name으로 추출해서 day_of_week 컬럼 생성
- 날짜와 같은 연속형 데이터를 범주형 데이터로 새롭게 추출
- train_df : 스파크에서 지원하는 머신러닝을 위한 훈련데이터 만듦
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] 기본 연산 - 2 (0) | 2024.03.21 |
---|---|
[Spark] 기본 연산 (0) | 2024.03.21 |
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리 (0) | 2024.03.20 |
[Spark] Zeppelin 설치하기 (1) | 2024.03.20 |
[Spark] Spark 다운로드 및 실습 (0) | 2024.03.19 |