본문 바로가기

Data Engineering/Spark

[Spark] join 연산, csv, json, Parquet, ORC

join

person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")
# join
joinExpression = person['id'] == graduateProgram['id']

joinType = 'inner'
person.join(graduateProgram,joinExpression,joinType).show()
graduateProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, 'Master', 'Duplicated Row','Duplicated School')
]))
graduateProgram2.createOrReplaceTempView('graduateProgram2')
# graduateProgram2 내용 확인하기
graduateProgram2.show()

spark.sql("""
select * from graduateProgram2
""").show()

from pyspark.sql.functions import expr
person.withColumnRenamed('id','personid').join(sparkStatus,expr("array_contains(spark_status,id)")).show()

csv 파일로 저장

# 파일 저장하기
# option : \t으로 나누기
csvFile.write.format('csv').mode('overwrite').option('sep','\t').save('my-file.tsv')

# \t로 나뉜 csv 파일 읽어오기
csvFile2 = spark.read.format('csv').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').option('sep','\t').load('my-file.tsv')

json 파일 읽기

# \t로 나뉜 csv 파일 읽어오기
csvFile2 = spark.read.format('csv').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').option('sep','\t').load('my-file.tsv')

json 파일로 저장

# json 파일 읽어오기
jsonFile = spark.read.format('json').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').load('2015-summary.json')

Parquet

parquet : 분석용 데이터를 저장하는데 사용되는 파일 형식
압축 / 분할 저장 / 스키마 저장 / 서드파티 지원 / 열 지향 저장

파일 쓰기

from pyspark.sql import SparkSession
spark2 = SparkSession.builder.appName('Create Parquet File').getOrCreate()

data = [
    ('a',10),('b',20),('c',30)
]
df = spark2.createDataFrame(data,['name','age'])

df.write.parquet('output.parquet')

spark2.stop()

로컬에서 저장된 것 확인가능

파일 읽기

spark.read.format('parquet').load('flight_data/parquet').show()

csv 파일을 parquet 형식으로 저장

csvFile.write.format('parquet').mode('overwrite').save('csvFile-parquet')

저장된 것 확인가능

ORC file

ORC(Optimized Row Columnar) file : 대규모 데이터 저장 및 처리

hive에서 분산처리할때 사용

파일 읽기

spark.read.format('orc').load('2010-summary.orc').show(3)

파일 쓰기

csvFile.write.format('orc').mode('overwrite').save('my-orc-file.orc')