Airflow와 DAG
이전 글에서 일정 기간 단위로 Product Serving 하는 것을 Batch Serving이라고 합니다. 이와는 다르지만, 일정 주기를 기준으로 소프트웨어 프로그램을 자동으로 실행(스케줄링)하는 것을 Batch Processing 입니다. 이러한 스케줄링을 진행하는 방법 중 하나로 Airflow가 대표적으로 사용되고 있습니다.
2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴
Airflow는 Airbnb에서 개발한 스케줄링 및 워크플로우 도구입니다. Airflow의 작업들의 흐름과 순서를 정의하는 요소를 DAG(Directed Acylic Graph)라고 합니다. DAG는 Airflow의 가장 중요한 요소로 동작 방식, 실행 주기 등을 설정할 수 있습니다. DAG를 생성할 때 DAG() 클래스에 전달되는 주요 인자들은 다음과 같습니다. [1]
- dag_id : DAG의 고유 식별자로 알파벳, 언더바 등 ASCII로 구성.
- schedule : DAG의 실행주기를 설정. cron 표현식, timedelta 객체 등 사용. (2.4 버전 이전에는 schedule_interval)
- start_date : DAG의 첫 실행 날짜를 지정. datetime.datetime을 사용.
- end_date : DAG의 마지막 실행 날짜를 지정. datetime.datetime을 사용.
- catchup : 과거에 지나간 일자에 대해 DAG 실행 여부
- default_args : DAG 내 operator에게 공통적으로 적용할 인자를 딕셔너리로 전달.
- tags : DAG에 태그를 지정해 Airflow UI에서 필터링하고 관리하는데 사용.
Cron 표현
* * * * * 에서 순서대로 분(0~59), 시(0~23), 일(1~31), 월(1~12), 요일(0~6)
Operator
Airflow에서 Operator는 또 다른 핵심 구성 요소로 작업을 정의하고 실행하는데 사용됩니다. 각각의 Operator는 고유한 작업을 수행하도록 설계되어 있으며 DAG 내에서 작업(task)로 사용됩니다. 주요하게 사용되는 Operator는 다음과 같습니다.
PythonOperator은 파이썬 함수와 같은 Callable 객체를 넘겨서 실행이 가능한 Operator입니다. 특정 데이터를 처리하거나 외부 API를 호출하는 등 작업을 수행하는데 사용할 수 있습니다. 사용 예제는 다음과 같습니다.
from airflow.operators.python import PythonOperator
def my_function():
print("Hello, Airflow!")
my_task = PythonOperator(
task_id='my_task',
python_callable=my_function,
)
특정 조건에 따라 다른 작업으로 넘어갈 때는 BranchPythonOperator를 사용할 수 있습니다. 사용 예제는 다음과 같습니다.
from airflow.operators.python import BranchPythonOperator
def choose_branch():
return 'task_a' if some_condition else 'task_b'
branching_task = BranchPythonOperator(
task_id='branching_task',
python_callable=choose_branch,
)
BashOperator는 Bash 스크립트를 실행해야할 때 사용합니다. 쉘 스크립트를 실행하거나 시스템 명령어를 호출하는데 사용합니다. 사용 예제는 다음과 같습니다.
from airflow.operators.bash import BashOperator
my_task = BashOperator(
task_id='my_task',
bash_command='echo "Hello, Airflow!"',
)
이외에도 아무것도 실행하지 않는 DummyOperator (DAG 의 여러 태스크 중 기다려야 하는 상황에서 사용), 특정 호스트로 HTTP 요청을 보내고 응답을 반환하는데 사용하는 SimmpleHttpOperator 등 개발 작업을 하는데 필요한 많은 오퍼레이터들이 정의되어 있습니다.
그리고 앞서 DAG에서 default_args를 통해 operator에 전달하는데, 작업에 적용되는 아래와 같은 인자들을 공통적으로 적용할 수 있습니다. 아래 언급한 인자 이외에 더 많은 것들도 있으니 공식 문서를 통해 확인하시면 좋습니다.[2]
- owner : 작업의 소유자를 지정
- depends_on_past : 특정 task가 이전 DAG 실행 결과에 의존할지 여부
- start_date : 작업 시작 날짜
- end_date : 작업 종료 날짜
- retries : 작업 실패 시 재시도 횟수
- retry_delay : 재시도 간 대기 시간을 timedelta 객체로 지정
그럼 이 내용까지 포함해서 DAG와 함께 사용하는 예제를 살펴보겠습니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email': ['test@example.com'],
'email_on_failure': True,
'email_on_retry': True,
}
with DAG(
dag_id='example_dag',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
default_args=default_args,
description='An example DAG',
tags=['example'],
) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello, Airflow!"',
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "This is task 2."',
)
task1 >> task2 # task1 하고 task2 순서
Airflow의 전체 구조
Airflow는 크게 다음 그림과 같은 구조로 이뤄져 있습니다. 각 요소들의 역할은 아래 처럼 정리할 수 있습니다.
- Scheduler : 각종 메타 정보를 기록하는 역할을 수행합니다. DAG 디렉토리의 .py 파일들을 파싱해서 DB에 저장하고, DAG들의 스케줄링과 Executor를 통해 실행 시점이 되면 DAG을 실행하고 결과를 저장합니다.
- Executor : 태스크를 실행하는 역할을 수행하나, Executor의 유형에 따라 외부 프로세스로 실행하는 Remote Executor를 사용하는 경우 외부 Worker로 전달합니다.
- Worker : DAG의 작업을 수행하고, 생성되는 로그를 저장합니다.
- Metadaa Database : Scheduler에 의해 메타데이터를 저장합니다. MySQL이나 PostgreSQL 등을 사용합니다.
- Webserver : Web UI를 담당하고, 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화하는 역할을 수행합니다.
참고자료
[2] https://airflow.apache.org/docs/apache-airflow/1.10.8/tutorial.html#default-arguments
[3] Architecture Overview — Airflow Documentation
[4] 변성윤. "[Product Serving] Batch Serving과 Airflow"
'Python > etc' 카테고리의 다른 글
[Airflow] Slack 연동해 Task 실패 메시지 보내기 (0) | 2024.12.22 |
---|---|
[MLflow] MLOps를 위한 MLflow 튜토리얼 (2) | 2024.12.20 |
[FastAPI] Web Single 패턴 구현하기 (1) | 2024.12.18 |
[FastAPI] 파이썬으로 웹 구현하기 (2) | FastAPI 확장 기능 : Lifespan, APIRouter, HTTPException, Background Task (0) | 2024.12.16 |
[Fast API] 파이썬으로 웹 구현하기 (1) | Parameter, Form, File, Request & Response Body (0) | 2024.12.13 |