Data Engineering/Spark

[Spark] csv 파일 로드하고 RDD로 처리하기

snoony 2024. 3. 25. 14:41

csv 파일 로드하고 RDD로 처리

# csv 파일 로드
read_csv_ = "/root/spark/bydata/by-day/2011-12-09.csv"
csv_lines = spark.sparkContext.textFile(read_csv_)

header 없애기

header = csv_lines.first()
# remove header
data = csv_lines.filter(lambda line : line!=header)
data.collect()
data.map(lambda x: x.split(",")).take(3)

x의 0,1,2번 인덱스만 추출

data.map(lambda x: x.split(",")).map(lambda x :(x[0],x[1],x[2])).take(3)

 

# 데이터 읽을때 빈 줄을 제거하고 읽기
csv_lines = spark.sparkContext.textFile(read_csv_)
non_empty_lines = csv_lines.filter(lambda line : line.split() != "")
non_empty_lines.take(2)
# 중복된 행을 제거
data = non_empty_lines.distinct()
data.take(2)

특정 폴더 하위에 있는 모든 csv 파일들을 RDD로 합치기

# 특정폴더 하위에 있는 모든 csv파일 읽어서 RDD로 합치기
from glob import glob
filepaths = "/root/spark/bydata/by-day/*.csv"
files = glob(filepaths)
# 한 개의 csv 파일을 읽어들여서 RDD로 변경하는 함수
def read_csv_to_rdd(filepath):
    return spark.sparkContext.textFile(filepath)
# 합치는 함수
def merge_csv_files_folders(files):
    return spark.sparkContext.union([ read_csv_to_rdd(file) for file in files])
all_csv_rdd = merge_csv_files_folders(files)
all_csv_rdd.collect()