~/.bashrc
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root'
pip3 install jupyterlab
설치 후 pyspark 실행하면 jupyterlab 열림
spark 실행해보기
Spark001.ipynb
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./bydata/by-day/*.csv")
# 임시테이블 생성
staticDataFrame.createOrReplaceTempView("retail_data")
# 데이터프레임의 스키마구조 복사
staticSchema = staticDataFrame.schema
.options(header='true', inferSchema='true') 로도 옵션 설정 가능
# 정적인 데이터 쿼리
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)
실시간 처리를 위해서 스트리밍 기술 사용
# 특정폴더의 데이터 수집
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("./bydata/by-day/*.csv")
# 수집한 데이터를 출력
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")
# 실시간 처리를 위해서 reading하고 있는 데이터를 메모리에 실시간으로 적재
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()
# 메모리에 적재된 데이터 읽어오기
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)
스파크를 이용한 전처리 방법
여기서부터 기존에 shell에서 수행한 코드에서 새로 추가된 것
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)
preppedDataFrame.show()
- na.fill(0) : 결측치 0으로 채우기
- withColumn() : 새로운 열 추가
- date_format : "InvoiceDate" 열에서 요일을 추출, 날짜에서 요일을 전체 이름 형식("월요일", "화요일" 등)으로 추출
Spark에서 지원하는 머신러닝을 위한 훈련, 검증데이터 만들기
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")
trainDataFrame.show(3)
# 범주형 변수를 숫자형으로 바꾸는 작업 - 높낮이 또는 크기가 존재하는 범주형 데이터
# label encoding
# 2XL,XL,L,M,S ---> 1,2,3,4,5
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")
# 숫자형 데이터를 가지고 변경
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol("day_of_week_encoded")
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice","day_of_week_encoded"]).setOutputCol("features")
# 파이프라인을 구축해서 각 단계별로 구성한 기능을 순차적으로 실행(적용)하기
from pyspark.ml import Pipeline
transformationPipeLine = Pipeline().setStages([indexer,encoder,vectorAssembler])
# 파이프라인 실행 -- fit
fittedPipeLine = transformationPipeLine.fit(trainDataFrame)
# 변환
trainsformedTraning = fittedPipeLine.transform(trainDataFrame)
Spark002.ipynb
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
Spark Dataframe 만들기
df = spark.createDataFrame([
("lee",32),("kim",20),("hong",30),("cho",50)
],["name","age"])
df.show()
column 추가
from pyspark.sql.functions import col, when
# add column life_stage
df1 = df.withColumn(
'life_stage',
when(col('age')<30,'young')
.when(col('age').between(20,30),"middle")
.otherwise("old")
)
df1.show()
Filtering - where 조건
# life_stage -> old
df1.where(col('life_stage').isin(['old'])).show()
집계함수
from pyspark.sql.functions import sum,count,avg,expr
df1.select(avg("age")).show()
Temp View 만들어서 집계하는 방법
df1.createOrReplaceTempView("view_df1")
sql_df1 = spark.sql("""
select avg(age) from view_df1
""")
sql_df1.show()
# Temp View 만들지 않고 sql 작성
spark.sql("select avg(age) from {df1}",df1=df1).show()
groupBy
df1.groupBy("life_stage").avg().show()
spark.sql("select life_stage, avg(age) from {df1} group by life_stage",df1=df1).show()
spark.sql("select * from some_people where life_stage = 'old'").show()
Spark Structured Streaming
Parquet : Apache Parquet 형식의 테이블, 대규모데이터 세트를 저장하고 처리하는데 사용되는 오픈소스 파일형식
Kafka에서 데이터를 읽어와서 Parquet 테이블에 시간별로 쓰는 방법
Kafka 스트림은 지속적으로 데이터가 채워지는 구조
구조적 API
df = spark.range(500).toDF("number")
df.select(df['number'] + 10).show()
json 데이터 읽어오기
df = spark.read.format("json").load("2015-summary.json")
df.show(3)
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME",StringType(), True),
StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
StructField("count",LongType(), True)
])
df = spark.read.format("json").schema(myManualSchema).load("2015-summary.json")
from pyspark.sql.functions import col,column
df.select(column("DEST_COUNTRY_NAME")).show(5)
# 문자열 표현식 사용해서 컬럼간 논리 연산을 수행 expr()
example_df = spark.createDataFrame(
[(1,10),(2,20),(3,30)],['A','B']
)
example_df.show()
from pyspark.sql.functions import expr
expr('A+5')
example_df.filter(expr('A < B')).show()
컬럼간의 연산 가능, 조건에 맞는 행만 출력
Row
# Row class 는 새로운 행을 만들어 준다
from pyspark.sql import Row
myRow = Row("hello","None","1","False")
myRow[0]
spark.createDataFrame([myRow],['a','b','c','d']).show()
from pyspark.sql.types import StructField,StructType,StringType,LongType,IntegerType,BooleanType
myRow = Row("hello",None,1,False)
myManualSchema = StructType([
StructField('a', StringType(), True),
StructField('b', IntegerType(), True),
StructField('c', IntegerType(), True),
StructField('d', BooleanType(), True),
])
spark.createDataFrame([myRow],myManualSchema).show()
컬럼명과 타입 지정해주면서 생성
df.select(
expr('DEST_COUNTRY_NAME as aaa'),
col('DEST_COUNTRY_NAME'),
column('DEST_COUNTRY_NAME')
).show()
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show()
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] 기본 연산 - 2 (0) | 2024.03.21 |
---|---|
[Spark] 기본 연산 (0) | 2024.03.21 |
[Spark] Zeppelin 설치하기 (1) | 2024.03.20 |
[Spark] PySpark (0) | 2024.03.20 |
[Spark] Spark 다운로드 및 실습 (0) | 2024.03.19 |