1. Slack 패키지 설치하기
Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.
여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다.
pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.0
2. Slack API Key 발급
Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정하고 설치한 Slack Workspace을 지정합니다. 그 다음 Basic Information에서 Incoming Webhooks를 클릭하고, 활성화 및 워크 스페이스에 Webhook를 추가합니다.
이제 Airflow 알림과 Slack workspace 사이 연결을 할 준비가 되었는데, 메시지를 전송할 채널을 지정하고 URL을 복사합니다. 여기서 Webhook URL의 sevices 이후의 내용은 외부에 노출되지 않도록 잘 보관해야 합니다.
3. Airflow 웹서버 연결
Airflow UI의 웹서버 Admin - Connections - Add a new channels ('+' 표시) 순으로 연결을 지정합니다. 앞서 복사했던 webhook url을 활용해 아래와 같이 저장해줍니다.
- Connection ID : 식별자로 airflow 코드와 동일하게 맞춰주기
- Connection Type : HTTP
- Host : https://hooks.slack.com/services
- Password : webhook url에서 services/ 이후 전부
4. 코드
지금까지 airflow와 slack을 연결하고, 활용할 수 있는 기초 작업이 마무리됐습니다. 이제 필요한 코드를 작성합니다. 먼저 dag을 만들기 전에 필요한 함수를 같은 디렉토리에 utils 폴더를 만들고 아래 코드를 작성합니다.(파일명은 slack_notifier.py로 작성) 아래 코드에서는 webhook를 연결하고 슬랙에 알림을 보내는 함수를 정의합니다.
from airflow.providers.slack_operators.slack_webhook import SlackWebhookOperator
SLACK_DAG_CON_ID = "connection_id"
def send_message(slack_msg):
return SlackWebhookOperator(
task_id="send_message",
http_conn_id=SLACK_DAG_CON_ID,
message=slack_msg,
username="airflow"
)
def task_fail_slack_alert(context):
slack_msg = f"""
:red_circle: Task Failed.
*Task*: {context.get("task_instance").task_id}
*Dag*: {context.get("task_instance").dag_id}
*Execution Time*: {context.get("execution_date")}
"""
alert = send_message(slack_msg)
return alert.execute(context=context)
def task_success_slack_alert(context):
slack_msg = f"""
:large_green_circle: Task Succeeded.
*Task*: {context.get("task_instance").task_id}
*Dag*: {context.get("task_instance").dag_id}
*Execution Time*: {context.get("execution_date")}
"""
alert = send_message(slack_msg)
return alert.execute(context=context)
위 util 함수가 만들어지면 airflow를 움직이는 DAG을 작성합니다. DAG 작성에 필요한 기초 개념은 이전 글에 있으니 참고 바랍니다.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exception import AirflowFailException
from utils.slack_notifier import task_fail_slack_alert, task_success_slack_alert
default_args = {
"owner": "your_name",
"depends_on_past": False, # False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1)
}
def _handle_job_error() -> None:
raise AirflowFailException
with DAG(
dag_id="slack_webhook_dag",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
on_failure_callback=task_fail_slack_alert,
on_success_callback=task_success_slack_alert,
) as dag:
execution_date = "{{ ds }}"
task_notification = PythonOperator(
task_id="slack_alert",
python_callable=_handle_job_error,
op_args=[execution_date],
)
task_notification
참고자료
[1] 변성윤. "[Product Serving] Batch Serving과 Airflow"
'Python > etc' 카테고리의 다른 글
PyTorch로 Distributed Data Parallel 구현하기 (0) | 2025.01.03 |
---|---|
PyTorch로 Pruning 구현하기 (0) | 2025.01.02 |
[MLflow] MLOps를 위한 MLflow 튜토리얼 (2) | 2024.12.20 |
[Airflow] Airflow 기초 개념 공부 및 Hello World! | DAG, Operator (1) | 2024.12.20 |
[FastAPI] Web Single 패턴 구현하기 (1) | 2024.12.18 |