Data Engineering/Spark

[Spark] MySQL 연결

snoony 2024. 3. 22. 12:15

Session 새로 생성하는 방법

mysql_spark = SparkSession.builder.config('spark.jar',"mysql-connector-java-5.1.46.jar")\
.master('local').appName('pySpark_MySql').getOrCreate()

[root@localhost spark]# mv mysql-connector-java-5.1.46.jar jars/

mysql-connect-java 파일을 spark가 인식할 수 있도록 spark/jars 밑으로 옮김

이 mysql-connector-java-~~~~.jar 파일은 root 밑에 있는 mysql-connector-java 폴더 밑에서 복사해왔음

실행권한 바꾸기

[root@localhost ~]# chmod +x spark/jars/mysql-connector-java-5.1.46.jar 
[root@localhost ~]# ll spark/jars/mysql-connector-java-5.1.46.jar 
-rwxr-xr-x 1 root root 1004838  3월 22 11:08 spark/jars/mysql-connector-java-5.1.46.jar

다시 pyspark 실행, 이때 mysql도 실행되고 있어야 제대로 실행됨

df = (
    mysql_spark.read.format('jdbc')
    .option("url",'jdbc:mysql://localhost:3306/hadoopguide?useSSL=false')
    .option('driver','com.mysql.jdbc.Driver')
    .option('dbtable','tbl_target')
    .option('user','root').option('password','password')
    .load()
)

접속 성공

기존 Session 사용

driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/hadoopguide?useSSL=false"
tablename = "tbl_target"
user = 'root'
password = 'password'
dbDataFrame = spark.read.format("jdbc")\
    .option("url", url)\
    .option("dbtable", tablename)\
    .option("driver",  driver)\
    .option("user",user)\
    .option('password',password)\
    .load()

내 테이블은 반대로 들어가있음 .. ㅋ...

mysql> delete from tbl_target;
Query OK, 5 rows affected (0.01 sec)

mysql> select * from tbl_target;
Empty set (0.00 sec)

mysql> insert into tbl_target (testno, testname) values
    -> (1, 11111), (2, 22222), (3, 33333), (4, 44444), (5, 55555);
Query OK, 5 rows affected (0.00 sec)
Records: 5  Duplicates: 0  Warnings: 0

mysql> select * from tbl_target;

다시 집어넣었다.

.option('numPartitions',10) : 파티션 수 설정해서 대용량 데이터를 빠르게 읽어올 수 있다.