Data Engineering/Spark

[Spark] 집계연산, corr, Window

snoony 2024. 3. 21. 17:51

from pyspark.sql.functions import *

StockCode 개수 count 하기

# StockCode - count
df.select(count('StockCode').alias('cnt')).show()

approx_count_distinct

df.select(approx_count_distinct('StockCode',0.1)).show()

0.1 : 임계값

first, last

df.select(first('StockCode'), last('StockCode')).show()

min, max

df.select(min('Quantity'), max('Quantity')).show()

sum

df.select(sum('Quantity')).show()
df.select(sum_distinct('Quantity')).show()

sum_distinct : 중복된 것 배제하고 sum

sum, count, avg, mean 모두 출력

# Quantity sum, count, avg, mean
df.select(sum('Quantity').alias('sum'), count('Quantity').alias('count'),
         avg('Quantity').alias('avg'), mean('Quantity').alias('mean')).show()

corr

변수들 간의 상관관계 지수를 나타냄 , -1 ~ 1 사이의 범위를 가짐

-1 또는 1과 가까울 수록 높은 상관관계를 나타냄

주의할 점 ) 이것이 인과관계를 나타내지 않는다.

# corr
df.select(corr('InvoiceNO','Quantity')).show()

covar_pop, covar_samp

공분산을 나타냄

  • 양수 : 두 변수가 함께 증가 또는 감소
  • 음수 : 한개가 증가할때 나머지가 감소

collect_set, collect_list

df.agg(collect_set('Country'), collect_list('Country')).show()

리스트로 보여줌

groupBy

# group by InvoiceNo
# Quantity count
df.groupBy('InvoiceNo').agg(count('Quantity')).show()

expr('count(Quantity)') 로도 사용 가능

년월일 반환하기

df.select(to_date(('InvoiceDate'),'yyyy-MM-dd')).show()

Window 파티션

# 시계열성 데이터중에서 파티션별로 정렬
from pyspark.sql.window import Window
windowSpec = Window.partitionBy('CustomerID','date').orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

규칙 적용시키기

dense_rank_data = dense_rank().over(windowSpec)
rank_data = rank().over(windowSpec)

순위 적용

dfWithDate.where('CustomerID IS NOT NULL').orderBy('CustomerID')\
.select('CustomerID','date','Quantity',rank_data.alias('rank_data'), 
dense_rank_data.alias('dense_rank_data'))\
.show(10)

CustomerID 별로 등수를 매겨서 출력