본문 바로가기

Data Engineering/Spark

[Spark] csv 파일 로드, 전처리, parquet 저장

csv 파일 읽기

from pyspark.sql import SQLContext
from pyspark.sql.functions import *

# read csv file
spark_df = spark.read.format('csv').option('header','true').option('inferSchema','true').load('doc_log.csv')
spark_df.show()

임시테이블로 등록

spark_df.createOrReplaceTempView("spark_df")

검색

ismydoc == true

from pyspark.sql.functions import col
spark_df.where(col('ismydoc') == 'true').show()
df1 = spark.sql("select * from spark_df where ismydoc = true")
df1.show(5, False)

sessionid, ext 컬럼 중에 ext = PDF 또는 ext = DOC인 데이터들 중에서 중복을 제거하고 캐쉬

# 내 풀이
df2 = spark.sql("select distinct * from spark_df where ext in ('PDF','DOC')")
df2.show(5, False)

# 풀이
df2 = spark.sql("select * from spark_df")
df2.select('sessionid','ext').filter("ext=='PDF' or ext == 'DOC'").dropDuplicates().cache()
df2.count()

sessionid 별로 datetime의 최솟값 구하기

df2_min_date = df2.groupBy(['sessionid']).agg(min('datetime').alias('min_date'))
df2_min_date.show()
# df2_pdf 를 마스터로 해 df2_min_date 병합
# df2_pdf 목록은 전부 나오게 하고 매칭 안되는 데이터는 na
df2_join = df2_pdf.join(df2_min_date, 'sessionid','left')
df2_join.show()

결측치 조사

df2_join.select(sum(col('sessionid').isNull().cast('int'))).show()

sessionid가 null인지 여부를 검사 -> true/false -> int로 변환 -> sum -> 결측치 개수 확인

모든 열에 대해서 결측치 여부 조사

for c in df2_join.columns:
    df2_join.select(sum(col(c).isNull().cast('int')).alias(c)).show()

이렇게 돌아가면서 출력되는 것을 확인할 수 있음