본문 바로가기

Data Engineering/Spark

[Spark] 타이타닉 데이터로 생존 예측하기 - Pipeline, LR

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col,stddev_samp

train_df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('titanic/train.csv').cache()

남길 컬럼들

Sex : 범주형 -> label encoding : StringIndexer
Age, Fare : 연속형 -> scaling

train_df = train_df.select('Survived','Sex','Pclass','Age','Fare')

결측치 확인하기

from pyspark.sql.functions import *
train_df.select([count(when(isnull(c),c)).alias(c) for c in train_df.columns]).show()

결측치 처리

성별, 객실에 따른 평균값으로 처리

# 성별 및 객실별로 그룹화하여 평균 나이 계산
mean_age_by_sex_pclass = train_df.groupBy("Sex", "Pclass").agg(avg("Age").alias("MeanAge"))
# 결측치를 해당 그룹의 평균 나이로 채우기
filled_df = train_df.join(mean_age_by_sex_pclass, ["Sex", "Pclass"], "left") \
              .withColumn("AgeFilled", when(col("Age").isNull(), col("MeanAge")).otherwise(col("Age"))) \
              .drop("MeanAge")
# 결과 확인
filled_df.show()

params = {
    'inputCol':'Sex',
    'outputCol':'SexIdx'
}
# unpacking
strIdx = StringIndexer(**params)

# 정답
params = {
    'inputCol':'Survived',
    'outputCol':'label'
}
# unpacking
label_strIdx = StringIndexer(**params)

params = {
    'inputCol':'SexIdx',
    'outputCol':'SexClassVec'
}
# unpacking
encode = OneHotEncoder(**params)

stage = [strIdx,encode,label_strIdx]

연속형, 수치형 데이터 처리

numCols = ['AgeFilled','Fare']
# scaling
for c in numCols:
    filled_df = filled_df.withColumn(c+'Scaled',col(c) / filled_df.agg(stddev_samp(c)).first()[0])
    
filled_df.select('AgeFilledScaled','FareScaled').show(2)

inputs = ['SexClassVec','AgeFilledScaled','FareScaled']
assembler = VectorAssembler(inputCols=inputs, outputCol='features')
stage += [assembler]
stage

파이프라인

# 파이프라인 구축
pipeline = Pipeline(stages=stage)
pipelineModel = pipeline.fit(filled_df)
dataset = pipelineModel.transform(filled_df)

train, test split

filled_df = filled_df.drop('Age')
# 결측치 처리되기 전 Age 제거
# 7 : 3 
(train, test) = dataset.randomSplit([0.7,0.3],seed=14)

LR

# 적절한 모델을 준비
lr = LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)

lrModel = lr.fit(train) # 훈련
predictions = lrModel.transform(test) # 예측
predictions.select('Survived','prediction').show()

평가

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.82로 마무리...