데이터 전처리 준비
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col,stddev_samp
df = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load('Default.csv').drop('_c0').cache()
연속형, 수치형 데이터 변환
데이터프레임에 들어갈 이름들(input, output) 설정
params = {
'inputCol':'student',
'outputCol':'studentIdx'
}
# unpacking
strIdx = StringIndexer(**params)
params = {
'inputCol':'default',
'outputCol':'label'
}
# unpacking
label_strIdx = StringIndexer(**params)
params = {
'inputCol':'studentIdx',
'outputCol':'studentClassVec'
}
# unpacking
encode = OneHotEncoder(**params)
stage = [strIdx,encode,label_strIdx]
StringIndexer
문자열 컬럼(또는 범주형 데이터)을 수치형으로 인덱싱하여 변환, 주로 범주형 데이터를 다룰 때 사용됨
- StringIndexer를 사용하여 'student' 열을 수치형으로 변환, 이 열은 inputCol로 지정되고 변환된 값은 outputCol에 저장
- StringIndexer를 사용하여 'default' 열을 변환하여 'label' 열에 저장
- OneHotEncoder를 사용하여 'studentIdx' 열을 인코딩하여 'studentClassVec' 열에 저장, 'student' 열의 각 범주를 이진 벡터로 인코딩합니다. 예를 들어, 'studentIdx' 열의 값이 1이면 'studentClassVec' 열에서 [0, 1, 0, ...]와 같은 형태로 변환
이러한 변환 단계를 stage 리스트에 담아 파이프라인을 구성한다.
파이프라인을 실행하면 이러한 변환들이 순차적으로 적용되어 데이터셋이 변환됨
train, test 데이터 준비
numCols = ['income','balance']
# scaling
for c in numCols:
df = df.withColumn(c+'Scaled',col(c) / df.agg(stddev_samp(c)).first()[0])
파이프라인
inputs = ['studentClassVec','incomeScaled','balanceScaled']
assembler = VectorAssembler(inputCols=inputs, outputCol='features')
stage += [assembler]
stage
# 파이프라인 구축
pipeline = Pipeline(stages=stage)
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)
# 7 : 3
(train, test) = dataset.randomSplit([0.7,0.3],seed=14)
Logistic Regression
# 적절한 모델을 준비
lr = LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)
lrModel = lr.fit(train) # 훈련
predictions = lrModel.transform(test) # 예측
predictions.select('default','prediction').show()
평가
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)
1에 가까울수록 모델 성능이 좋음
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] 타이타닉 데이터로 생존 예측하기 - Pipeline, LR (0) | 2024.03.26 |
---|---|
[Spark] csv 파일 로드, 전처리, parquet 저장 (0) | 2024.03.25 |
[Spark] map, reduce 활용 (0) | 2024.03.25 |
[Spark] csv 파일 로드하고 RDD로 처리하기 (0) | 2024.03.25 |
[Spark] Spark RDD - parallelize, collect, map, flatMap, filter, sortBy, mapPartitions, glom (0) | 2024.03.25 |