파이프라인
공공데이터포털에서 날씨 데이터를 30분 간격으로 받아오기 위해 Airflow를 이용한 배치 처리를 하였다.
내가 구현한 부분은 중기 날씨 데이터이다.
Task 1 ) Airflow dag에서 30분 간격으로 중기 날씨 데이터 api를 호출하고, 날씨 데이터를 전처리하여 csv 파일로 저장하여 s3에 업로드한다.
Task 2 ) s3에 업로드한 csv 파일을 받아와 mysql에 insert한다. 추후에 rds에 연결할 예정이다.
Airflow 초기 설정
내가 작성한 airflow 초기 설정 내용이다. Docker를 이용하였다.
https://nymagicshop16.tistory.com/115
[Airflow] Docker compose로 Airflow 실행하기
초기 설정airflow-docker 디렉터리 만들어주고(base) nayoungkim@nayoungkim airflow-docker % curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'(base) nayoungkim@nayoungkim airflow-docker % echo -e "AIRFLOW_UID=$(i
nymagicshop16.tistory.com
아직 airflow를 완전히 이해하지 못해서 잘 모르겠지만,
(base) nayoungkim@nayoungkim airflow-docker % docker compose up -d --build
(base) nayoungkim@nayoungkim airflow-docker % docker exec -it airflow-docker-airflow-scheduler-1 /bin/bash
default@ed0b3b603614:/opt/airflow$ airflow scheduler
default@ed0b3b603614:/opt/airflow$ airflow celery worker
이렇게 띄운 후 dag들을 실행하였다.
Dag test
airflow tasks test <dag-id> <task-id> <실행 날짜>
이런 식으로 각 dag의 task들을 test 해볼 수 있다.
Airflow와 AWS S3 연동하기
먼저, AWS에서 S3 Bucket과 IAM 사용자 access key를 발급받는다. 이 부분은 내 블로그에 있기도 해서 생략하겠다.
Airflow webserver에서 Admin>Connections에 들어간다.
그러면 이와 같이 새로운 connection을 생성할 수 있다.
aws에서 발급한 access key id, secret access key, extra에 region name을 적어주고
save 하면 된다.
나는 test connection이 불가능하게 설정되어 있었는데, airflow.cfg 파일에서 이를 수정할 수 있다고 하니 찾아보시길
Dag 작성
S3Hook을 이용해 S3와 연결한다.
bucket 에는 내가 만든 connection 이름 = 버킷 이름 을 써준다.
from airflow.hooks.S3_hook import S3Hook
# S3 버킷 연결
bucket = 'middleforecast'
s3_hook = S3Hook(bucket)
def fetch_middle_forecast(datetime_date, **kwargs):
ti = kwargs['ti']
merged_result = pd.merge(middle_temp(datetime_date), middle_weather(datetime_date))
print(merged_result)
# 현재 날짜와 시간을 문자열로 포맷팅
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# CSV 파일로 저장
file_name = f'merged_middle_forecast_{datetime_date}_{current_time}.csv'
merged_result.to_csv(file_name, index=False, encoding='utf-8')
print(f"Saved merged forecast data to {file_name}")
# S3로 csv 업로드
s3_key = f'merged_middle_forecast_{datetime_date}_{current_time}.csv'
s3_hook.load_file(filename=file_name, key=s3_key, bucket_name=bucket)
# S3 키를 XCom으로 전달
ti.xcom_push(key='s3_key', value=s3_key)
ti.xcom_push(key='datetime', value=datetime_date)
return merged_result
def load_csv_from_s3(s3_hook, bucket_name, key):
csv_data = s3_hook.read_key(key, bucket_name)
# 문자열 데이터를 StringIO 객체로 변환
csv_stringio = StringIO(csv_data)
# Pandas를 사용하여 CSV 데이터를 DataFrame으로 변환
df = pd.read_csv(csv_stringio)
return df
내 dag의 일부만 첨부하였다.
위의 dag는 csv 파일을 s3로 업로드 하는 함수와, s3에서 csv 파일을 로드하는 함수로 이루어져 있다.
구현 결과
MySQL 연결
Airflow의 webserver에서 MySQL connection을 설정해준다.
Dockerfile 수정
# mysql
RUN pip install --no-cache-dir PyMySQL==1.0.2
Dag 작성
import pymysql
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.S3_hook import S3Hook
warnings.filterwarnings('ignore')
pymysql.install_as_MySQLdb()
# Connection to Mysql
mysql_hook = MySqlHook(mysql_conn_id="mysql_db")
conn = mysql_hook.get_conn()
cur = conn.cursor()
# MySQL에 데이터 삽입하는 task 정의
def insert_to_mysql(**kwargs):
ti = kwargs['task_instance']
# execution_date = kwargs['execution_date'].strftime("%Y%m%d")
datetime_date = ti.xcom_pull(task_ids='fetch_middle_forecast', key='datetime')
# XCom에서 데이터 pull
s3_key = ti.xcom_pull(task_ids='fetch_middle_forecast', key='s3_key')
# S3에서 csv 읽기
merged_result = load_csv_from_s3(s3_hook, bucket, s3_key)
for index, row in merged_result.iterrows():
sql = """
"""
data = []
print(data)
cur.executemany(sql, data)
conn.commit()
cur.close()
conn.close()
print("데이터가 성공적으로 삽입되었습니다.")
return merged_result
# Airflow task 정의
fetch_data_task = PythonOperator(
task_id='fetch_middle_forecast',
python_callable=fetch_middle_forecast,
op_args=[datetime.now(seoul_timezone).strftime("%Y%m%d")], # 오늘 날짜를 가져옵니다.
dag=dag,
)
insert_to_mysql_task = PythonOperator(
task_id='insert_to_mysql',
python_callable=insert_to_mysql,
dag=dag,
)
fetch_data_task >> insert_to_mysql_task
MySqlHook을 이용하여 MySQL을 연결하였다.
위의 태스크에서 s3에 csv를 업로드한 뒤, 다음 태스크에서 csv 파일을 로드하여 mysql에 테이블에 저장한다.
mysql을 연결한 후 data들을 처리하여 sql 쿼리를 날리는 코드이다. data와 sql문은 생략하였다.
두 태스크는 PythonOperator를 사용하여 정의하였다.
구현 결과
마무리
서버를 배포한 후 MySQL을 RDS로 대체하고, Airflow도 배포할 예정이다.
log가 잘 나타나지 않아 MySQL 연결이 순탄치 않았는데, csv 파일로도 저장해서 xcom에 전달도 해보고
따로 dag들을 만들어서 내 데이터베이스와 잘 연결이 되는지 계속 테스트해보고, dag의 task들도 각각 테스트해보며 디버깅하면서 찾아낼 수 있었다.
내가 만든 dataframe과 데이터베이스의 테이블 구조가 잘 맞지 않아 오류가 나타났었다.
이렇게 log를 하나씩 뜯어보며 하나하나씩 헤쳐나가다 보면 결국 해내게 되는 것 같다.
참고한 글
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Docker compose로 Airflow 실행하기 (0) | 2024.07.16 |
---|