Data Engineering/Spark
[Spark] join 연산, csv, json, Parquet, ORC
snoony
2024. 3. 22. 10:48
join
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")
# join
joinExpression = person['id'] == graduateProgram['id']
joinType = 'inner'
person.join(graduateProgram,joinExpression,joinType).show()
graduateProgram2 = graduateProgram.union(spark.createDataFrame([
(0, 'Master', 'Duplicated Row','Duplicated School')
]))
graduateProgram2.createOrReplaceTempView('graduateProgram2')
# graduateProgram2 내용 확인하기
graduateProgram2.show()
spark.sql("""
select * from graduateProgram2
""").show()
from pyspark.sql.functions import expr
person.withColumnRenamed('id','personid').join(sparkStatus,expr("array_contains(spark_status,id)")).show()
csv 파일로 저장
# 파일 저장하기
# option : \t으로 나누기
csvFile.write.format('csv').mode('overwrite').option('sep','\t').save('my-file.tsv')
# \t로 나뉜 csv 파일 읽어오기
csvFile2 = spark.read.format('csv').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').option('sep','\t').load('my-file.tsv')
json 파일 읽기
# \t로 나뉜 csv 파일 읽어오기
csvFile2 = spark.read.format('csv').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').option('sep','\t').load('my-file.tsv')
json 파일로 저장
# json 파일 읽어오기
jsonFile = spark.read.format('json').option('header','true')\
.option('inferSchema','true').option('mode','FAILFAST').load('2015-summary.json')
Parquet
parquet : 분석용 데이터를 저장하는데 사용되는 파일 형식
압축 / 분할 저장 / 스키마 저장 / 서드파티 지원 / 열 지향 저장
파일 쓰기
from pyspark.sql import SparkSession
spark2 = SparkSession.builder.appName('Create Parquet File').getOrCreate()
data = [
('a',10),('b',20),('c',30)
]
df = spark2.createDataFrame(data,['name','age'])
df.write.parquet('output.parquet')
spark2.stop()
로컬에서 저장된 것 확인가능
파일 읽기
spark.read.format('parquet').load('flight_data/parquet').show()
csv 파일을 parquet 형식으로 저장
csvFile.write.format('parquet').mode('overwrite').save('csvFile-parquet')
저장된 것 확인가능
ORC file
ORC(Optimized Row Columnar) file : 대규모 데이터 저장 및 처리
hive에서 분산처리할때 사용
파일 읽기
spark.read.format('orc').load('2010-summary.orc').show(3)
파일 쓰기
csvFile.write.format('orc').mode('overwrite').save('my-orc-file.orc')