이론 (ETL 과 DAG)
이 글을 클릭한 당신...! 😀
어디선가 들어보았을 이 Airflow 라는 단어가 호기심을 자극했을 것이다!
이번 기회에 airflow 에 대해 배우고 지식을 든든하게 채워보자
Airflow 공식 페이지의 소개하는 글은 이렇다. (https://airflow.apache.org/)
Airflow는 workflow를 만들고(author), 스케쥴링하고(schedule), 감독한다(monitor) 🤔
알 것 같지만 와닿지는 않는다! (적어도 나는 그랬다... 😥)
그래서! 우리는 실습을 통하여 이 Airflow에 대해 더 알아볼 것이다!
그 전에, 한가지 알아야 할 단어가 있다 -> DAG
Directed Acyclic Graph (DAG) 는 우리가 data engineering에서 흔히 말하는 Extract Transform Load (ETL)을 뜻한다.
DAG는 방향은 있지만 cyclic을 이루지 않는 그래프. 즉 A -> B, A-> C 와 같이
A-->B
ㄴㅡ>C
'A 라는 task가 끝나고 B 와 C의 task가 실행된다'와 같은 workflow 들을 표현하기 딱! 좋은 graph 형태이다.
무튼 Airflow에서는 데이터를 뽑고 처리하고 저장하는 이 일련의 테스크들이 모여 이루는 workflow를 DAG라 부른다 🧐
실습
어느 무더운 여름날, 나에게 업무가 주어졌다.
💀: "A 대리! 우리 회사 데이터 베이스에 애플 주식 데이터좀 쌓아줄 수 있나?"
나: "네 알게쯥니다" (난 실력있는 개발자니까! 호다닥 만들어야지!)
ETL 퀘스트이다. 어딘가에서 주식데이터를 뽑아오고 (Extract), 필요한 데이터를 잘 정리하고 (Transform), 그리고 우리 회사 DB에 업로드(Load) 하면 된다!
사전준비
alpha API key 받기
그래, 우선 우리 회사 돈이 없으니까 무료 api 써야겠어!
https://www.alphavantage.co/support/#api-key
대충 정확하고 명확하게 입력하니 (대충하시면 됩니다) API Key 가 발급이 됐다.
database 준비하기 (posgres with docker)
그렇다, 우리 회사에 DB도 없다!🤣 따라서 간단하게 docker를 사용하여 postgres를 띄워보자
ETL 작성하기
Extract 함수 작성하기
import requests
apikey = "??" # 여기에 apikey를 넣으시면 됩니다!
def extract(symbol, apikey, interval):
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={symbol}&interval={interval}&apikey={apikey}'
r = requests.get(url)
data = r.json()
return data[f"Time Series ({interval})"]
Transform 함수 작성하기
def transform(data):
transformed_data = list()
for key, val in data.items():
date, time = key.split()
op, hi, lo, cl, vo = val["1. open"], val["2. high"], val["3. low"], val["4. close"], val["5. volume"]
transformed_data.append((date, time, op, hi, lo, cl, vo))
return transformed_data
Load 함수 작성하기
def load(data, user="airflow", password="airflow", host="localhost", port="3306"):
try:
connection = mysql.connector.connect(
user=user,
password=password,
host=host,
port=port,
)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
# raise (요건 나중에)
else:
cur = connection.cursor()
cur.execute(f"DROP TABLE IF EXISTS stock.apple;")
query = f"""
CREATE TABLE IF NOT EXISTS stock.apple (
upload_date DATE DEFAULT CURRENT_DATE,
upload_time TIME DEFAULT CURRENT_TIME,
date DATE,
time TIME,
open FLOAT,
high FLOAT,
low FLOAT,
close FLOAT,
volume INT
);
"""
cur.execute(query)
for row in data:
query = f"""
INSERT INTO stock.apple (date, time, open, high, low, close, volume)
VALUES ('{row[0]}', '{row[1]}', {row[2]}, {row[3]}, {row[4]}, {row[5]}, {row[6]})
"""
print(query)
cur.execute(query)
connection.commit()
connection.close()
Airflow 가 왜 필요할까
앞서 작성한 ETL을 운영한다면 어떠한 단점이 있을까?
- 매일 출근하여 구동을 해주어야 한다.
- 물론 한가지 방법으로는 cron job 을 설정하여 몇시 몇분에 자동으로 구동하게는 구축할 수 있을 것이다.
- Error가 났는지 안났는지 log를 분석해야한다. (
Terminal 창을 뚫어지게 쳐다보아야한다.)- 이 또한 cron job 의 기능 중에 이메일 알림을 설정할 수는 있다. (
물론 귀찮음은 개발자의 몫이다.)
- 이 또한 cron job 의 기능 중에 이메일 알림을 설정할 수는 있다. (
- 만일 하루 깜박하고 돌리지 않았다면, 빠진 데이터를 채우기 위해 소스 코드를 고치고 재구동을 해야하는 번거로움이 있을 것이다.
Airflow 를 도입하면 뭐가 그렇게 좋은건가요?
- 구동을 직접하지 않아도 airflow 가 자동으로 실행해준다. (cron job에 비해 Web UI 가 직관적이고 편리하다)
- Error 가 났을 시 Web UI를 통하여 빠르게 확인이 가능하며, 슬랙 이메일 등 다양한 알림기능을 설정할 수 있다.
- (Backfill) 중간 중간 오류나 실수로 인하여 데이터가 비어있을 시 이를 보다 간편하게 해결할 수 있다.
다음 챕터에서는,
위와 동일한 ETL을 다음 시간에는 Airflow를 통한 DAG 작업으로 바꾸어 얼마나 편해지는지 살펴볼겁니다 :)
전체 코드는 아래를 통해 받으실 수 있습니다 🥹
https://github.com/ampersandor/stock-airflow
'Data Engineer > airflow' 카테고리의 다른 글
[Airflow] k8s 에서 Airflow 설치 (helm, on-premise) (0) | 2023.08.19 |
---|