Data Engineering/Airflow

[Airflow] Airflow와 S3, MySQL 연결

snoony 2024. 7. 19. 14:06

파이프라인

공공데이터포털에서 날씨 데이터를 30분 간격으로 받아오기 위해 Airflow를 이용한 배치 처리를 하였다.

내가 구현한 부분은 중기 날씨 데이터이다.

Task 1 ) Airflow dag에서 30분 간격으로 중기 날씨 데이터 api를 호출하고, 날씨 데이터를 전처리하여 csv 파일로 저장하여 s3에 업로드한다.

Task 2 ) s3에 업로드한 csv 파일을 받아와 mysql에 insert한다. 추후에 rds에 연결할 예정이다.

Airflow 초기 설정

내가 작성한 airflow 초기 설정 내용이다. Docker를 이용하였다.

https://nymagicshop16.tistory.com/115

 

[Airflow] Docker compose로 Airflow 실행하기

초기 설정airflow-docker 디렉터리 만들어주고(base) nayoungkim@nayoungkim airflow-docker % curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'(base) nayoungkim@nayoungkim airflow-docker % echo -e "AIRFLOW_UID=$(i

nymagicshop16.tistory.com

아직 airflow를 완전히 이해하지 못해서 잘 모르겠지만,

(base) nayoungkim@nayoungkim airflow-docker % docker compose up -d --build
(base) nayoungkim@nayoungkim airflow-docker % docker exec -it airflow-docker-airflow-scheduler-1 /bin/bash
default@ed0b3b603614:/opt/airflow$ airflow scheduler
default@ed0b3b603614:/opt/airflow$ airflow celery worker

이렇게 띄운 후 dag들을 실행하였다.

Dag test

airflow tasks test <dag-id> <task-id> <실행 날짜>

이런 식으로 각 dag의 task들을 test 해볼 수 있다.

 

Airflow와 AWS S3 연동하기

먼저, AWS에서 S3 Bucket과 IAM 사용자 access key를 발급받는다. 이 부분은 내 블로그에 있기도 해서 생략하겠다.

Airflow webserver에서 Admin>Connections에 들어간다.

그러면 이와 같이 새로운 connection을 생성할 수 있다.

aws에서 발급한 access key id, secret access key, extra에 region name을 적어주고

save 하면 된다.

나는 test connection이 불가능하게 설정되어 있었는데, airflow.cfg 파일에서 이를 수정할 수 있다고 하니 찾아보시길

Dag 작성

S3Hook을 이용해 S3와 연결한다. 

bucket 에는 내가 만든 connection 이름 = 버킷 이름 을 써준다.

from airflow.hooks.S3_hook import S3Hook

# S3 버킷 연결
bucket = 'middleforecast'
s3_hook = S3Hook(bucket)

def fetch_middle_forecast(datetime_date, **kwargs):
    ti = kwargs['ti']

    merged_result = pd.merge(middle_temp(datetime_date), middle_weather(datetime_date))
    print(merged_result)

    # 현재 날짜와 시간을 문자열로 포맷팅
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")


    # CSV 파일로 저장
    file_name = f'merged_middle_forecast_{datetime_date}_{current_time}.csv'
    merged_result.to_csv(file_name, index=False, encoding='utf-8')
    print(f"Saved merged forecast data to {file_name}")
    
    # S3로 csv 업로드
    s3_key = f'merged_middle_forecast_{datetime_date}_{current_time}.csv'
    s3_hook.load_file(filename=file_name, key=s3_key, bucket_name=bucket)

    # S3 키를 XCom으로 전달
    ti.xcom_push(key='s3_key', value=s3_key)
    ti.xcom_push(key='datetime', value=datetime_date)
    
    return merged_result

def load_csv_from_s3(s3_hook, bucket_name, key):
    csv_data = s3_hook.read_key(key, bucket_name)
    
    # 문자열 데이터를 StringIO 객체로 변환
    csv_stringio = StringIO(csv_data)
    
    # Pandas를 사용하여 CSV 데이터를 DataFrame으로 변환
    df = pd.read_csv(csv_stringio)
    
    return df

내 dag의 일부만 첨부하였다.

위의 dag는 csv 파일을 s3로 업로드 하는 함수와, s3에서 csv 파일을 로드하는 함수로 이루어져 있다.

구현 결과

 

MySQL 연결

Airflow의 webserver에서 MySQL connection을 설정해준다.

Dockerfile 수정

# mysql
RUN pip install --no-cache-dir PyMySQL==1.0.2

Dag 작성

import pymysql
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.S3_hook import S3Hook

warnings.filterwarnings('ignore')
pymysql.install_as_MySQLdb()

# Connection to Mysql
mysql_hook = MySqlHook(mysql_conn_id="mysql_db")
conn = mysql_hook.get_conn()
cur = conn.cursor()

# MySQL에 데이터 삽입하는 task 정의
def insert_to_mysql(**kwargs):
    ti = kwargs['task_instance']
    # execution_date = kwargs['execution_date'].strftime("%Y%m%d")
    datetime_date = ti.xcom_pull(task_ids='fetch_middle_forecast', key='datetime')
    
    # XCom에서 데이터 pull
    s3_key = ti.xcom_pull(task_ids='fetch_middle_forecast', key='s3_key')


    # S3에서 csv 읽기
    merged_result = load_csv_from_s3(s3_hook, bucket, s3_key)

    
    for index, row in merged_result.iterrows():
        sql = """
            """
        data = []
        print(data)
        cur.executemany(sql, data)
    
    conn.commit()
    cur.close()
    conn.close()
    print("데이터가 성공적으로 삽입되었습니다.")

    return merged_result

# Airflow task 정의
fetch_data_task = PythonOperator(
    task_id='fetch_middle_forecast',
    python_callable=fetch_middle_forecast,
    op_args=[datetime.now(seoul_timezone).strftime("%Y%m%d")],  # 오늘 날짜를 가져옵니다.
    dag=dag,
)

insert_to_mysql_task = PythonOperator(
    task_id='insert_to_mysql',
    python_callable=insert_to_mysql,
    dag=dag,
)

fetch_data_task >> insert_to_mysql_task

MySqlHook을 이용하여 MySQL을 연결하였다.

위의 태스크에서 s3에 csv를 업로드한 뒤, 다음 태스크에서 csv 파일을 로드하여 mysql에 테이블에 저장한다.

mysql을 연결한 후 data들을 처리하여 sql 쿼리를 날리는 코드이다. data와 sql문은 생략하였다.

두 태스크는 PythonOperator를 사용하여 정의하였다.

구현 결과

 

마무리

서버를 배포한 후 MySQL을 RDS로 대체하고, Airflow도 배포할 예정이다.

log가 잘 나타나지 않아 MySQL 연결이 순탄치 않았는데, csv 파일로도 저장해서 xcom에 전달도 해보고

따로 dag들을 만들어서 내 데이터베이스와 잘 연결이 되는지 계속 테스트해보고, dag의 task들도 각각 테스트해보며 디버깅하면서 찾아낼 수 있었다.

내가 만든 dataframe과 데이터베이스의 테이블 구조가 잘 맞지 않아 오류가 나타났었다.

이렇게 log를 하나씩 뜯어보며 하나하나씩 헤쳐나가다 보면 결국 해내게 되는 것 같다.

 

참고한 글

https://velog.io/@tfj0531/Airflow-%EC%99%80-AWS-S3-%EC%97%B0%EA%B2%B0%ED%95%98%EA%B8%B0-S3-DAG-example

https://velog.io/@me529/%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%97%94%EC%A7%80%EB%8B%88%EC%96%B4%EB%A7%81-Airflow%EC%99%80-S3-%EC%97%B0%EA%B2%B0%ED%95%98%EA%B8%B0