df = spark.read.format("json").load("2015-summary.json")
from pyspark.sql.functions import lit, expr
df.select(expr("*"),lit(1).alias("One")).show(2)
df.withColumn("numberOne", lit(1)).show(2)
df.withColumn("withinCountry", expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME")).show(2)
컬럼명 변경
# 컬럼명 변경
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show(2)
# 컬럼명 변경 후 컬럼들 보여주기
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns
# 데이터 프임의 열 이름이 특수문자를 포함할때
dfWidthLongColName = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))
dfWidthLongColName.show(2)
dfWidthLongColName.selectExpr("`This Long Column-Name`","`This Long Column-Name` as `new col`").show(2)
where
and
from pyspark.sql.functions import col
# count < 2 and ORIGIN_COUNTRY_NAME != Croatia
df.where(col('count') < 2).where(col('ORIGIN_COUNTRY_NAME') != 'Croatia').show()
or
# or 조건으로 변경하면
df.where((col('count') < 2) | (col('ORIGIN_COUNTRY_NAME') != 'Croatia')).show(2)
# DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME 중복을 제거한후 count
df.select('DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME').distinct().count()
Random
# sampling
# random
seed = 5
withReplacement = False
fraction = 0.5 # 추출할 샘플의 비율
df.sample(withReplacement,fraction,seed).count()
dataFrame = df.randomSplit([0.25,0.75],seed)
dataFrame[0].count() > dataFrame[1].count()
# insert dataFrame
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country","Other Country",5),
Row("New Country2","Other Country2",11)
]
# RDD parallelize
parall = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parall,schema)
newDF.show()
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] 날짜 연산, 기본 연산 -3 (0) | 2024.03.21 |
---|---|
[Spark] 기본 연산 - 2 (0) | 2024.03.21 |
[Spark] Jupyterlab, Spark를 이용한 데이터 전처리 (0) | 2024.03.20 |
[Spark] Zeppelin 설치하기 (1) | 2024.03.20 |
[Spark] PySpark (0) | 2024.03.20 |