반응형

1. Slack 패키지 설치하기

Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.


여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다.

pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.0

2. Slack API Key 발급

Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정하고 설치한 Slack Workspace을 지정합니다. 그 다음 Basic Information에서 Incoming Webhooks를 클릭하고, 활성화 및 워크 스페이스에 Webhook를 추가합니다.

이제 Airflow 알림과 Slack workspace 사이 연결을 할 준비가 되었는데, 메시지를 전송할 채널을 지정하고 URL을 복사합니다. 여기서 Webhook URL의 sevices 이후의 내용은 외부에 노출되지 않도록 잘 보관해야 합니다.

3. Airflow 웹서버 연결

Airflow UI의 웹서버 Admin - Connections - Add a new channels ('+' 표시) 순으로 연결을 지정합니다. 앞서 복사했던 webhook url을 활용해 아래와 같이 저장해줍니다.

  • Connection ID : 식별자로 airflow 코드와 동일하게 맞춰주기
  • Connection Type : HTTP
  • Host : https://hooks.slack.com/services
  • Password : webhook url에서 services/ 이후 전부

4. 코드

지금까지 airflow와 slack을 연결하고, 활용할 수 있는 기초 작업이 마무리됐습니다. 이제 필요한 코드를 작성합니다. 먼저 dag을 만들기 전에 필요한 함수를 같은 디렉토리에 utils 폴더를 만들고 아래 코드를 작성합니다.(파일명은 slack_notifier.py로 작성) 아래 코드에서는 webhook를 연결하고 슬랙에 알림을 보내는 함수를 정의합니다.

from airflow.providers.slack_operators.slack_webhook import SlackWebhookOperator


SLACK_DAG_CON_ID = "connection_id"


def send_message(slack_msg):
    return SlackWebhookOperator(
        task_id="send_message",
        http_conn_id=SLACK_DAG_CON_ID,
        message=slack_msg,
        username="airflow"
    )


def task_fail_slack_alert(context):
    slack_msg = f"""
                :red_circle: Task Failed.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)


def task_success_slack_alert(context):
    slack_msg = f"""
                :large_green_circle: Task Succeeded.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)

위 util 함수가 만들어지면 airflow를 움직이는 DAG을 작성합니다. DAG 작성에 필요한 기초 개념은 이전 글에 있으니 참고 바랍니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exception import AirflowFailException

from utils.slack_notifier import task_fail_slack_alert, task_success_slack_alert


default_args = {
    "owner": "your_name",
    "depends_on_past": False,           # False는 과거 Task의 성공 여부와 상관없이 실행
    "start_date": datetime(2024, 1, 1)
}


def _handle_job_error() -> None:
    raise AirflowFailException

with DAG(
    dag_id="slack_webhook_dag",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
    on_failure_callback=task_fail_slack_alert,
    on_success_callback=task_success_slack_alert,
) as dag:
    execution_date = "{{ ds }}"

    task_notification = PythonOperator(
        task_id="slack_alert",
        python_callable=_handle_job_error,
        op_args=[execution_date],
    )

    task_notification

참고자료

[1] 변성윤. "[Product Serving] Batch Serving과 Airflow"

반응형