본문 바로가기

Data Engineering/Spark

[Spark] Jupyterlab, Spark를 이용한 데이터 전처리

~/.bashrc

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root'
pip3 install jupyterlab

설치 후 pyspark 실행하면 jupyterlab 열림

spark 실행해보기

Spark001.ipynb

staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("./bydata/by-day/*.csv")
 
 # 임시테이블 생성
 staticDataFrame.createOrReplaceTempView("retail_data")
 
 # 데이터프레임의 스키마구조 복사
 staticSchema = staticDataFrame.schema

.options(header='true', inferSchema='true') 로도 옵션 설정 가능

# 정적인 데이터 쿼리
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

실시간 처리를 위해서 스트리밍 기술 사용

# 특정폴더의 데이터 수집
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("./bydata/by-day/*.csv")
    
 # 수집한 데이터를 출력
 purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")
  
 # 실시간 처리를 위해서 reading하고 있는 데이터를 메모리에 실시간으로 적재
  purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()
 
 # 메모리에 적재된 데이터 읽어오기
 spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

스파크를 이용한 전처리 방법

여기서부터 기존에 shell에서 수행한 코드에서 새로 추가된 것

from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)
 preppedDataFrame.show()
  • na.fill(0) : 결측치 0으로 채우기
  • withColumn() : 새로운 열 추가
  • date_format : "InvoiceDate" 열에서 요일을 추출, 날짜에서 요일을 전체 이름 형식("월요일", "화요일" 등)으로 추출

Spark에서 지원하는 머신러닝을 위한 훈련, 검증데이터 만들기

trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")
 trainDataFrame.show(3)
# 범주형 변수를 숫자형으로 바꾸는 작업 - 높낮이 또는 크기가 존재하는 범주형 데이터
# label encoding
# 2XL,XL,L,M,S  ---> 1,2,3,4,5
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")
# 숫자형 데이터를 가지고 변경
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol("day_of_week_encoded")
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice","day_of_week_encoded"]).setOutputCol("features")
# 파이프라인을 구축해서 각 단계별로 구성한 기능을 순차적으로 실행(적용)하기
from pyspark.ml import Pipeline
transformationPipeLine =  Pipeline().setStages([indexer,encoder,vectorAssembler])

# 파이프라인 실행 -- fit
fittedPipeLine = transformationPipeLine.fit(trainDataFrame)

# 변환
trainsformedTraning =  fittedPipeLine.transform(trainDataFrame)

Spark002.ipynb

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()

Spark Dataframe 만들기

df = spark.createDataFrame([
    ("lee",32),("kim",20),("hong",30),("cho",50)
],["name","age"])
df.show()

column 추가

from pyspark.sql.functions import col, when
# add column life_stage
df1 = df.withColumn(
    'life_stage',
    when(col('age')<30,'young')
    .when(col('age').between(20,30),"middle")
    .otherwise("old")
)
df1.show()

Filtering - where 조건

# life_stage -> old
df1.where(col('life_stage').isin(['old'])).show()

집계함수

from pyspark.sql.functions import sum,count,avg,expr
df1.select(avg("age")).show()

Temp View 만들어서 집계하는 방법

df1.createOrReplaceTempView("view_df1")
sql_df1 = spark.sql("""
select avg(age) from view_df1
""")
sql_df1.show()
# Temp View 만들지 않고 sql 작성
spark.sql("select avg(age) from {df1}",df1=df1).show()

groupBy

df1.groupBy("life_stage").avg().show()

spark.sql("select life_stage, avg(age) from {df1} group by life_stage",df1=df1).show()
spark.sql("select * from some_people where life_stage = 'old'").show()

Spark Structured Streaming

Parquet : Apache Parquet 형식의 테이블, 대규모데이터 세트를 저장하고 처리하는데 사용되는 오픈소스 파일형식
Kafka에서 데이터를 읽어와서 Parquet 테이블에 시간별로 쓰는 방법
Kafka 스트림은 지속적으로 데이터가 채워지는 구조

구조적 API

df = spark.range(500).toDF("number")
df.select(df['number'] + 10).show()

json 데이터 읽어오기

df = spark.read.format("json").load("2015-summary.json")
df.show(3)

from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME",StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
    StructField("count",LongType(), True)
])
df = spark.read.format("json").schema(myManualSchema).load("2015-summary.json")
from pyspark.sql.functions import col,column
df.select(column("DEST_COUNTRY_NAME")).show(5)

# 문자열 표현식 사용해서 컬럼간 논리 연산을 수행 expr()
example_df = spark.createDataFrame(
    [(1,10),(2,20),(3,30)],['A','B']
)
example_df.show()

from pyspark.sql.functions import expr
expr('A+5')
example_df.filter(expr('A < B')).show()

컬럼간의 연산 가능, 조건에 맞는 행만 출력

Row

# Row class 는 새로운 행을 만들어 준다
from pyspark.sql import Row
myRow = Row("hello","None","1","False")
myRow[0]
spark.createDataFrame([myRow],['a','b','c','d']).show()

from pyspark.sql.types import StructField,StructType,StringType,LongType,IntegerType,BooleanType
myRow = Row("hello",None,1,False)
myManualSchema = StructType([
    StructField('a', StringType(), True),
    StructField('b', IntegerType(), True),
    StructField('c', IntegerType(), True),
    StructField('d', BooleanType(), True),
])
spark.createDataFrame([myRow],myManualSchema).show()

컬럼명과 타입 지정해주면서 생성

df.select(
    expr('DEST_COUNTRY_NAME as aaa'),
    col('DEST_COUNTRY_NAME'),
    column('DEST_COUNTRY_NAME')
).show()

df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show()

 

'Data Engineering > Spark' 카테고리의 다른 글

[Spark] 기본 연산 - 2  (0) 2024.03.21
[Spark] 기본 연산  (0) 2024.03.21
[Spark] Zeppelin 설치하기  (0) 2024.03.20
[Spark] PySpark  (0) 2024.03.20
[Spark] Spark 다운로드 및 실습  (0) 2024.03.19