from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col,stddev_samp
train_df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('titanic/train.csv').cache()
남길 컬럼들
Sex : 범주형 -> label encoding : StringIndexer
Age, Fare : 연속형 -> scaling
train_df = train_df.select('Survived','Sex','Pclass','Age','Fare')
결측치 확인하기
from pyspark.sql.functions import *
train_df.select([count(when(isnull(c),c)).alias(c) for c in train_df.columns]).show()
결측치 처리
성별, 객실에 따른 평균값으로 처리
# 성별 및 객실별로 그룹화하여 평균 나이 계산
mean_age_by_sex_pclass = train_df.groupBy("Sex", "Pclass").agg(avg("Age").alias("MeanAge"))
# 결측치를 해당 그룹의 평균 나이로 채우기
filled_df = train_df.join(mean_age_by_sex_pclass, ["Sex", "Pclass"], "left") \
.withColumn("AgeFilled", when(col("Age").isNull(), col("MeanAge")).otherwise(col("Age"))) \
.drop("MeanAge")
# 결과 확인
filled_df.show()
params = {
'inputCol':'Sex',
'outputCol':'SexIdx'
}
# unpacking
strIdx = StringIndexer(**params)
# 정답
params = {
'inputCol':'Survived',
'outputCol':'label'
}
# unpacking
label_strIdx = StringIndexer(**params)
params = {
'inputCol':'SexIdx',
'outputCol':'SexClassVec'
}
# unpacking
encode = OneHotEncoder(**params)
stage = [strIdx,encode,label_strIdx]
연속형, 수치형 데이터 처리
numCols = ['AgeFilled','Fare']
# scaling
for c in numCols:
filled_df = filled_df.withColumn(c+'Scaled',col(c) / filled_df.agg(stddev_samp(c)).first()[0])
filled_df.select('AgeFilledScaled','FareScaled').show(2)
inputs = ['SexClassVec','AgeFilledScaled','FareScaled']
assembler = VectorAssembler(inputCols=inputs, outputCol='features')
stage += [assembler]
stage
파이프라인
# 파이프라인 구축
pipeline = Pipeline(stages=stage)
pipelineModel = pipeline.fit(filled_df)
dataset = pipelineModel.transform(filled_df)
train, test split
filled_df = filled_df.drop('Age')
# 결측치 처리되기 전 Age 제거
# 7 : 3
(train, test) = dataset.randomSplit([0.7,0.3],seed=14)
LR
# 적절한 모델을 준비
lr = LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)
lrModel = lr.fit(train) # 훈련
predictions = lrModel.transform(test) # 예측
predictions.select('Survived','prediction').show()
평가
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)
0.82로 마무리...
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] Pipeline, Logistic Regression (0) | 2024.03.26 |
---|---|
[Spark] csv 파일 로드, 전처리, parquet 저장 (0) | 2024.03.25 |
[Spark] map, reduce 활용 (0) | 2024.03.25 |
[Spark] csv 파일 로드하고 RDD로 처리하기 (0) | 2024.03.25 |
[Spark] Spark RDD - parallelize, collect, map, flatMap, filter, sortBy, mapPartitions, glom (0) | 2024.03.25 |