본문 바로가기

Data Engineering/Spark

[Spark] Pipeline, Logistic Regression

 데이터 전처리 준비

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col,stddev_samp

df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('Default.csv').drop('_c0').cache()

연속형, 수치형 데이터 변환

데이터프레임에 들어갈 이름들(input, output) 설정

params = {
    'inputCol':'student',
    'outputCol':'studentIdx'
}
# unpacking
strIdx = StringIndexer(**params)

params = {
    'inputCol':'default',
    'outputCol':'label'
}
# unpacking
label_strIdx = StringIndexer(**params)

params = {
    'inputCol':'studentIdx',
    'outputCol':'studentClassVec'
}
# unpacking
encode = OneHotEncoder(**params)

stage = [strIdx,encode,label_strIdx]

StringIndexer

문자열 컬럼(또는 범주형 데이터)을 수치형으로 인덱싱하여 변환, 주로 범주형 데이터를 다룰 때 사용됨

  1. StringIndexer를 사용하여 'student' 열을 수치형으로 변환, 이 열은 inputCol로 지정되고 변환된 값은 outputCol에 저장
  2. StringIndexer를 사용하여 'default' 열을 변환하여 'label' 열에 저장
  3. OneHotEncoder를 사용하여 'studentIdx' 열을 인코딩하여 'studentClassVec' 열에 저장, 'student' 열의 각 범주를 이진 벡터로 인코딩합니다. 예를 들어, 'studentIdx' 열의 값이 1이면 'studentClassVec' 열에서 [0, 1, 0, ...]와 같은 형태로 변환

이러한 변환 단계를 stage 리스트에 담아 파이프라인을 구성한다.

파이프라인을 실행하면 이러한 변환들이 순차적으로 적용되어 데이터셋이 변환됨

train, test  데이터 준비

numCols = ['income','balance']
# scaling
for c in numCols:
    df = df.withColumn(c+'Scaled',col(c) / df.agg(stddev_samp(c)).first()[0])

파이프라인

inputs = ['studentClassVec','incomeScaled','balanceScaled']

assembler = VectorAssembler(inputCols=inputs, outputCol='features')
stage += [assembler]
stage

# 파이프라인 구축
pipeline = Pipeline(stages=stage)
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)
# 7 : 3 
(train, test) = dataset.randomSplit([0.7,0.3],seed=14)

Logistic Regression

# 적절한 모델을 준비
lr = LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)

lrModel = lr.fit(train) # 훈련
predictions = lrModel.transform(test) # 예측
predictions.select('default','prediction').show()

평가

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

1에 가까울수록 모델 성능이 좋음