본문 바로가기

Data Engineering/Spark

[Spark] 기본 연산

df = spark.read.format("json").load("2015-summary.json")

from pyspark.sql.functions import lit, expr
df.select(expr("*"),lit(1).alias("One")).show(2)

df.withColumn("numberOne", lit(1)).show(2)

df.withColumn("withinCountry", expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2)

컬럼명 변경

# 컬럼명 변경
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show(2)
# 컬럼명 변경 후 컬럼들 보여주기
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns
# 데이터 프임의 열 이름이 특수문자를 포함할때
dfWidthLongColName = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))
dfWidthLongColName.show(2)

dfWidthLongColName.selectExpr("`This Long Column-Name`","`This Long Column-Name` as `new col`").show(2)

where

and

from pyspark.sql.functions import col
# count < 2 and ORIGIN_COUNTRY_NAME != Croatia
df.where(col('count') < 2).where(col('ORIGIN_COUNTRY_NAME') != 'Croatia').show()

or

# or 조건으로 변경하면
df.where((col('count') < 2) | (col('ORIGIN_COUNTRY_NAME') != 'Croatia')).show(2)

# DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME 중복을 제거한후 count
df.select('DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME').distinct().count()

Random

# sampling
# random
seed = 5
withReplacement = False
fraction = 0.5 # 추출할 샘플의 비율
df.sample(withReplacement,fraction,seed).count()
dataFrame = df.randomSplit([0.25,0.75],seed)
dataFrame[0].count() > dataFrame[1].count()
# insert dataFrame
from pyspark.sql import Row
schema = df.schema
newRows = [
    Row("New Country","Other Country",5),
    Row("New Country2","Other Country2",11)
]
# RDD parallelize
parall = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parall,schema)
newDF.show()

 

'Data Engineering > Spark' 카테고리의 다른 글

[Spark] 날짜 연산, 기본 연산 -3  (0) 2024.03.21
[Spark] 기본 연산 - 2  (0) 2024.03.21
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리  (0) 2024.03.20
[Spark] Zeppelin 설치하기  (0) 2024.03.20
[Spark] PySpark  (0) 2024.03.20