no image
PyTorch로 지식 증류(Knowledge Distillation) 구현하기
지식 증류(Knowledge Distillation)는 보다 가벼운 모델을 사용하기 위해 큰 모델에서 작은 모델로 지식을 전달하여 학습의 효율성을 높여주는 방법입니다. (참고) 여기서는 Response-based 기법을 수행하는 것을 구현하려고 합니다.큰 모델(Teacher)과 작은 모델(Student)는 별도로 구현이 완료되어 있고, 여기서 우리가 구현해야 하는 것은 기존의 손실함수 외에 새로운 손실함수를 통합하는 과정입니다.    참고자료PyTorch Knowledge Distillation Tutorial
2025.01.03
PyTorch로 Distributed Data Parallel 구현하기
분산 데이터 병렬화(Distributed Data Parallel, DDP)은 효율적인 분산 및 병렬 처리를 위한 데이터 병렬화 방법론입니다. DDP는 PyTorch의 강력한 모듈로 여러 GPU에서 모델이 구동될 수 있도록 도와줍니다. DDP는 각 프로세스에서 모델의 복사본을 유지하며, 데이터를 병렬로 처리하고 각 GPU의 계산 결과를 자동으로 동기화하여 효율성을 극대화 합니다.DDP 프로세스환경 설정DDP를 실행하기 위해 분산 학습을 위한 기본 환경 변수를 설정합니다. dist.init_process_group로 DDP 모듈을 초기화하는데, 이는 서로 다른 GPU 간 소통 및 동기화를 위해 필요합니다. 초기화에서 사용하는 값들의 주요의미는 다음과 같습니다. PyTorch에서 지원하는 분산처리를 위한 백..
2025.01.03
PyTorch로 Pruning 구현하기
1. Pruning 준비하기Pruning(가지치기)는 모델 경량화 방법 중 하나로 파라미터를 줄여 모델의 경량화를 목표로 한다. 이전 글을 통해 이론적 배경에 대해 살펴본 바 있었는데, 이번에는 PyTorch에서 제공하고 있는 도구를 활용해 Pruning을 적용하는 것을 연습해보고자 한다.우선, model이라는 변수에 딥러닝 모델 인스턴스가 있음을 가정하고, 필요한 가지치기 클래스를 불러온다. 그리고 module이라는 이름으로 적용 대상인 하나의 층을 가져온다. import torch.nn.utils.prune as prunemodel = Model()module = model.conv12. Pruning 적용하기지정한 module에 대해서 torch.nn.utils.prune에서 제공하는 기법 중 하나..
2025.01.02
no image
[Airflow] Slack 연동해 Task 실패 메시지 보내기
1. Slack 패키지 설치하기Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다. pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.02. Slack API Key 발급Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정..
2024.12.22
[MLflow] MLOps를 위한 MLflow 튜토리얼
MLflow는 머신러닝 관련 실험들을 관리하고 내용들을 기록할 수 있는 오픈소스 솔루션입니다. 실험의 소스 코드, 하이퍼 파라미터, 지표, 기타 부산물들을 저장하고, 해당 모델들을 협업하는 동료들과 공유할 필요가 있는데 하나의 MLflow 서버 위에서 각자 자기 실험을 만들고 공유할 수 있습니다. 비슷한 기능의 Weights & Biases가 있기는 하지만, 이는 별도 솔루션을 구매해야 하기 때문에 비용 측면에 있어서 강점이 있습니다.  1. MLflow 설치와 실행우선, MLflow가 없는 경우 설치가 필요합니다. pip install mlflow 설치가 완료되었다면, MLflow를 실행하기 위해서는 아래와 같은 코드를 통해 UI를 불러올 수 있습니다.# UI 실행$ mlflow ui# 호스트 및 포트..
2024.12.20
no image
[Airflow] Airflow 기초 개념 공부 및 Hello World! | DAG, Operator
Airflow와 DAG이전 글에서 일정 기간 단위로 Product Serving 하는 것을 Batch Serving이라고 합니다. 이와는 다르지만, 일정 주기를 기준으로 소프트웨어 프로그램을 자동으로 실행(스케줄링)하는 것을 Batch Processing 입니다. 이러한 스케줄링을 진행하는 방법 중 하나로 Airflow가 대표적으로 사용되고 있습니다.  2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴 Airflow는 Airbnb에서 개발한 스케줄링 및 워크플로우 도구입니다. Airflow의 작업들의 흐름과 순서를 정의하는 요소를 DAG(Directed Acylic Graph)라고 합니다. DAG는 Airflow의 가장 중요한 요소로 동작 ..
2024.12.20
[FastAPI] Web Single 패턴 구현하기
이전에 머신러닝, 딥러닝 프로덕트의 디자인 패턴에 대해 살펴본 적이 있습니다. (아래 링크 참고) 2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴 그 중 하나인 Web Single 패턴은 API 서버 코드에 모델을 포함시켜 배포하는 방식으로 서비스(모델의 예측과 추론)가 필요한 곳에서 요청에 따라 모델의 결과를 반환하는 방식입니다. 이를 직접 구현하기 위해서는 아래와 같은 요소들을 구현할 필요가 있습니다. Config 구현데이터베이스 구현FastAPI 서버 구현모델 불러오기예측 결과 반환 및 저장 데이터베이스 조회Config 구현웹이 동작할 수 있도록 필요한 요소들을 정리해둔 config 파일을 만듭니다. 여기서 pydantic을 통해 데..
2024.12.18
no image
[FastAPI] 파이썬으로 웹 구현하기 (2) | FastAPI 확장 기능 : Lifespan, APIRouter, HTTPException, Background Task
Lifesapn functionFastAPI 앱을 실행하거나 종료할 때 로직을 넣고 싶을 경우 사용합니다. 예를 들어, 앱이 시작할 때 머신런이 모델을 로드하고 앱을 종료하면 db 연결을 정리하면서 각각 상황마다 출력하도록 구현할 수 있습니다. 아래 코드는 예제를 구현한 것으로 기본적인 FastAPI 구조는 아래 글을 참고해보시기 바랍니다. 2024.12.12 - [Python/etc] - [Fast API] 파이썬으로 웹 구현하기 (1) | Parameter, Form, File, Request & Response Body from contextlib import asynccontextmanagerdef fake_answer_to_everything_ml_model(x: float): return..
2024.12.16
no image
[Fast API] 파이썬으로 웹 구현하기 (1) | Parameter, Form, File, Request & Response Body
Fast API 기본 구조Fast API를 통해 웹을 구축할 때 기본적인 구조는 아래 코드와 같이 구축하면 됩니다. 기본적으로 FastAPI를 불러오고, 관련된 코드를 작성합니다. from fastapi import FastAPIimport uvicornapp = FastAPI()##### Code ######@app.get("/")def read_root(): return {"Hello": "World"}##### Code ######if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)그리고 uvicorn을 통해 파이썬 파일을 실행시킵니다. uvicorn은 Asynchronous Server Gateway Interf..
2024.12.13
반응형

지식 증류(Knowledge Distillation)는 보다 가벼운 모델을 사용하기 위해 큰 모델에서 작은 모델로 지식을 전달하여 학습의 효율성을 높여주는 방법입니다. (참고) 여기서는 Response-based 기법을 수행하는 것을 구현하려고 합니다.



큰 모델(Teacher)과 작은 모델(Student)는 별도로 구현이 완료되어 있고, 여기서 우리가 구현해야 하는 것은 기존의 손실함수 외에 새로운 손실함수를 통합하는 과정입니다.



 

 

 

 

참고자료

  1. PyTorch Knowledge Distillation Tutorial
반응형
반응형

분산 데이터 병렬화(Distributed Data Parallel, DDP)은 효율적인 분산 및 병렬 처리를 위한 데이터 병렬화 방법론입니다. DDP는 PyTorch의 강력한 모듈로 여러 GPU에서 모델이 구동될 수 있도록 도와줍니다. DDP는 각 프로세스에서 모델의 복사본을 유지하며, 데이터를 병렬로 처리하고 각 GPU의 계산 결과를 자동으로 동기화하여 효율성을 극대화 합니다.


DDP 프로세스

환경 설정

DDP를 실행하기 위해 분산 학습을 위한 기본 환경 변수를 설정합니다. dist.init_process_group로 DDP 모듈을 초기화하는데, 이는 서로 다른 GPU 간 소통 및 동기화를 위해 필요합니다. 초기화에서 사용하는 값들의 주요의미는 다음과 같습니다. PyTorch에서 지원하는 분산처리를 위한 백엔드 종류는 여기에서 확인 가능합니다. 여기서는 gloo를 사용했습니다.

 

  • MASTER_ADDR: 마스터 프로세스의 IP 주소.
  • MASTER_PORT: 마스터 프로세스의 통신 포트.
  • RANK: 현재 프로세스의 순위(고유 ID).
  • WORLD_SIZE: 총 프로세스 수.
import os
import torch
import torch.distributed as dist



def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

모델 및 데이터 준비

모델을 준비한 뒤 DDP로 감싸줍니다. 또한 DistributedSampler를 사용해 데이터셋을 분산 학습에 맞게 나눕니다.

from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler


model = MyModel().to(device)
model = DDP(model, device_ids=[rank])

data_sampler = DistributedSampler(dataset)
data_loader = DataLoader(dataset, sampler=data_sampler, batch_size=batch_size)

학습 루프 작성

DDP를 사용할 때는 일반적인 학습 루프와 유사하지만, 각 프로세스가 독립적으로 실행되며 DistributedSampler를 재설정해야 합니다:

for epoch in range(num_epochs):
    data_sampler.set_epoch(epoch)  # 각 epoch마다 샘플링 초기화
    for data, target in data_loader:
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

프로세스 종료

모든 학습이 끝난 후에는 분산 프로세스를 정리합니다.

dist.destroy_process_group()

최종 코드

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

# 모델 정의
class MyModel(torch.nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.linear = torch.nn.Linear(10, 1)

    def forward(self, x):
        return self.linear(x)

# 분산 학습 함수
def main(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    dist.init_process_group(
        backend='nccl',
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

    # 데이터 준비
    dataset = torch.utils.data.TensorDataset(torch.randn(100, 10), torch.randn(100, 1))
    sampler = DistributedSampler(dataset)
    data_loader = DataLoader(dataset, sampler=sampler, batch_size=32)

    # 모델 준비
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    criterion = torch.nn.MSELoss()

    # 학습 루프
    for epoch in range(10):
        sampler.set_epoch(epoch)
        for data, target in data_loader:
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

    dist.destroy_process_group()

if __name__ == "__main__":
    world_size = 2  # GPU 개수
    torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size)

참고자료

  1. PyTorch Distributed Data Parallel Tutorial
반응형
반응형

1. Pruning 준비하기

Pruning(가지치기)는 모델 경량화 방법 중 하나로 파라미터를 줄여 모델의 경량화를 목표로 한다. 이전 을 통해 이론적 배경에 대해 살펴본 바 있었는데, 이번에는 PyTorch에서 제공하고 있는 도구를 활용해 Pruning을 적용하는 것을 연습해보고자 한다.



우선, model이라는 변수에 딥러닝 모델 인스턴스가 있음을 가정하고, 필요한 가지치기 클래스를 불러온다. 그리고 module이라는 이름으로 적용 대상인 하나의 층을 가져온다.

import torch.nn.utils.prune as prune

model = Model()
module = model.conv1

2. Pruning 적용하기

지정한 module에 대해서 torch.nn.utils.prune에서 제공하는 기법 중 하나를 선택하고, 모듈과 파라미터를 지정한다. 아래 예제는 module의 가중치 50%를 랜덤으로 가지치기 하는 방법을 선택했다.

prune.random_unstructured(module, name="weight", amount=0.5)

만약 가중치가 아니라 편향값에 Pruning을 수행하려면 아래와 같이 수행할 수 있다. 여기서는 L1 Norm 값이 가장 작은 편향값 3개를 가지치기를 시도하였다.

prune.l1_unstructured(module, name="bias", amount=3)

잘 적용되었는지 확인하려면, 아래 코드 중 하나로 결과값을 출력할 수 있다.

print(module.weight)
print(list(module.named_parameters()))

3.정리

Pruning 기법이 영구적으로 적용되게 하고 싶을 때 prune을 적용하면서 생성된 것을 제거하는 과정이 필요하다. 이를 위해서는 prune에 있는 remove를 사용한다.

prune.remove(module, 'weight')

참고자료

  1. PyTorch Pruning Tutorial
반응형
반응형

1. Slack 패키지 설치하기

Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.


여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다.

pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.0

2. Slack API Key 발급

Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정하고 설치한 Slack Workspace을 지정합니다. 그 다음 Basic Information에서 Incoming Webhooks를 클릭하고, 활성화 및 워크 스페이스에 Webhook를 추가합니다.

이제 Airflow 알림과 Slack workspace 사이 연결을 할 준비가 되었는데, 메시지를 전송할 채널을 지정하고 URL을 복사합니다. 여기서 Webhook URL의 sevices 이후의 내용은 외부에 노출되지 않도록 잘 보관해야 합니다.

3. Airflow 웹서버 연결

Airflow UI의 웹서버 Admin - Connections - Add a new channels ('+' 표시) 순으로 연결을 지정합니다. 앞서 복사했던 webhook url을 활용해 아래와 같이 저장해줍니다.

  • Connection ID : 식별자로 airflow 코드와 동일하게 맞춰주기
  • Connection Type : HTTP
  • Host : https://hooks.slack.com/services
  • Password : webhook url에서 services/ 이후 전부

4. 코드

지금까지 airflow와 slack을 연결하고, 활용할 수 있는 기초 작업이 마무리됐습니다. 이제 필요한 코드를 작성합니다. 먼저 dag을 만들기 전에 필요한 함수를 같은 디렉토리에 utils 폴더를 만들고 아래 코드를 작성합니다.(파일명은 slack_notifier.py로 작성) 아래 코드에서는 webhook를 연결하고 슬랙에 알림을 보내는 함수를 정의합니다.

from airflow.providers.slack_operators.slack_webhook import SlackWebhookOperator


SLACK_DAG_CON_ID = "connection_id"


def send_message(slack_msg):
    return SlackWebhookOperator(
        task_id="send_message",
        http_conn_id=SLACK_DAG_CON_ID,
        message=slack_msg,
        username="airflow"
    )


def task_fail_slack_alert(context):
    slack_msg = f"""
                :red_circle: Task Failed.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)


def task_success_slack_alert(context):
    slack_msg = f"""
                :large_green_circle: Task Succeeded.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)

위 util 함수가 만들어지면 airflow를 움직이는 DAG을 작성합니다. DAG 작성에 필요한 기초 개념은 이전 글에 있으니 참고 바랍니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exception import AirflowFailException

from utils.slack_notifier import task_fail_slack_alert, task_success_slack_alert


default_args = {
    "owner": "your_name",
    "depends_on_past": False,           # False는 과거 Task의 성공 여부와 상관없이 실행
    "start_date": datetime(2024, 1, 1)
}


def _handle_job_error() -> None:
    raise AirflowFailException

with DAG(
    dag_id="slack_webhook_dag",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
    on_failure_callback=task_fail_slack_alert,
    on_success_callback=task_success_slack_alert,
) as dag:
    execution_date = "{{ ds }}"

    task_notification = PythonOperator(
        task_id="slack_alert",
        python_callable=_handle_job_error,
        op_args=[execution_date],
    )

    task_notification

참고자료

[1] 변성윤. "[Product Serving] Batch Serving과 Airflow"

반응형
반응형

MLflow는 머신러닝 관련 실험들을 관리하고 내용들을 기록할 수 있는 오픈소스 솔루션입니다. 실험의 소스 코드, 하이퍼 파라미터, 지표, 기타 부산물들을 저장하고, 해당 모델들을 협업하는 동료들과 공유할 필요가 있는데 하나의 MLflow 서버 위에서 각자 자기 실험을 만들고 공유할 수 있습니다.

 

비슷한 기능의 Weights & Biases가 있기는 하지만, 이는 별도 솔루션을 구매해야 하기 때문에 비용 측면에 있어서 강점이 있습니다. 

 

1. MLflow 설치와 실행

우선, MLflow가 없는 경우 설치가 필요합니다. 

pip install mlflow

 

설치가 완료되었다면, MLflow를 실행하기 위해서는 아래와 같은 코드를 통해 UI를 불러올 수 있습니다.

# UI 실행
$ mlflow ui

# 호스트 및 포트 지정
$ mlflow server --host 127.0.0.1 --port 8080

 

2. 훈련 기록

기록을 위해 모델을 훈련시키고 필요한 메타 데이터 저장을 아래와 같이 수행한다고 가정하겠습니다.

import mlflow
from mlflow.models import infer_signature


# Define the model hyperparameters
params = {
    "solver": "lbfgs",
    "max_iter": 1000,
    "multi_class": "auto",
    "random_state": 8888,
}

# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)

# Predict on the test set
y_pred = lr.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)

 

위 모델에서 나오는 데이터들을 기록할 수 있도록 아래 코드를 추가할 수 있습니다.

# Set our tracking server uri for logging
mlflow.set_tracking_uri(uri="http://127.0.0.1:8080")

# Create a new MLflow Experiment
mlflow.set_experiment("MLflow Quickstart")

# Start an MLflow run
with mlflow.start_run():
    # Log the hyperparameters
    mlflow.log_params(params)

    # Log the loss metric
    mlflow.log_metric("accuracy", accuracy)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Basic LR model for iris data")

    # Infer the model signature
    signature = infer_signature(X_train, lr.predict(X_train))

    # Log the model
    model_info = mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="iris_model",
        signature=signature,
        input_example=X_train,
        registered_model_name="tracking-quickstart",
    )

 

3. 모델 로드 및 추론

모델 기록이 끝난 후에 모델을 로드하고, 추론을 수행할 수 있습니다. 아래 코드는  mlflow의 pyfunc을 사용해 로드하는 예시 코드입니다. 

# Load the model back for predictions as a generic Python Function model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

predictions = loaded_model.predict(X_test)

iris_feature_names = datasets.load_iris().feature_names

result = pd.DataFrame(X_test, columns=iris_feature_names)
result["actual_class"] = y_test
result["predicted_class"] = predictions

 

4. MLflow 환경 관리 

MLflow에서는 실험은 하나의 프로젝트 단위로 생각할 수 있고, 그 아래에서 실행되는 여러 Run으로 구성됩니다. 실험은 아래와 같은 코드로 실행 가능합니다.

mlflow experiments create --experiment-name [실험명]

 

프로젝트를 어떤 환경에서 실행시킬지 정의하는 MLProject.yaml 파일을 작성합니다. 이 파일은 패키지 모듈의 상단에 위치해서 프로젝트의 메타 정보를 저장합니다.

name: My Project

python_env: python_env.yaml
# or
# conda_env: my_env.yaml
# or
# docker_env:
#    image:  mlflow-docker-example

entry_points:
  main:
    parameters:
      data_file: path
      regularization: {type: float, default: 0.1}
    command: "python train.py -r {regularization} {data_file}"
  validate:
    parameters:
      data_file: path
    command: "python validate.py {data_file}"

 

그리고 연결된 python_env.yaml 파일은 파이썬의 버전과 관련된 정보를 저장합니다.

# Python version required to run the project.
python: "3.8.15"
# Dependencies required to build packages. This field is optional.
build_dependencies:
  - pip
  - setuptools
  - wheel==0.37.1
# Dependencies required to run the project.
dependencies:
  - mlflow==2.3
  - scikit-learn==1.0.2

 

CLI로 mlflow run을 통해 프로젝트를 실행할 수 있습니다. 

mlflow run <projrect> --experiment-name <실험명>

참고자료

[1] 변성윤. "[Product Serving] 모델 관리와 모델 평가". boostcamp AI Tech. 

[2] MLflow 공식 문서 Tutorial :  https://mlflow.org/docs/latest/getting-started/intro-quickstart/index.html

반응형
반응형

Airflow와 DAG

이전 글에서 일정 기간 단위로 Product Serving 하는 것을 Batch Serving이라고 합니다. 이와는 다르지만, 일정 주기를 기준으로 소프트웨어 프로그램을 자동으로 실행(스케줄링)하는 것을 Batch Processing 입니다. 이러한 스케줄링을 진행하는 방법 중 하나로 Airflow가 대표적으로 사용되고 있습니다. 

 

2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴

 

Airflow는 Airbnb에서 개발한 스케줄링 및 워크플로우 도구입니다. Airflow의 작업들의 흐름과 순서를 정의하는 요소를 DAG(Directed Acylic Graph)라고 합니다. DAG는 Airflow의 가장 중요한 요소로 동작 방식, 실행 주기 등을 설정할 수 있습니다. DAG를 생성할 때 DAG() 클래스에 전달되는 주요 인자들은 다음과 같습니다. [1]

  • dag_id : DAG의 고유 식별자로 알파벳, 언더바 등 ASCII로 구성.
  • schedule : DAG의 실행주기를 설정. cron 표현식, timedelta 객체 등 사용. (2.4 버전 이전에는 schedule_interval)
  • start_date : DAG의 첫 실행 날짜를 지정. datetime.datetime을 사용.
  • end_date : DAG의 마지막 실행 날짜를 지정. datetime.datetime을 사용.
  • catchup : 과거에 지나간 일자에 대해 DAG 실행 여부
  • default_args : DAG 내 operator에게 공통적으로 적용할 인자를 딕셔너리로 전달. 
  • tags : DAG에 태그를 지정해 Airflow UI에서 필터링하고 관리하는데 사용. 
더보기

Cron 표현

* * * * * 에서 순서대로 분(0~59), 시(0~23), 일(1~31), 월(1~12), 요일(0~6)

Operator

Airflow에서 Operator는 또 다른 핵심 구성 요소로 작업을 정의하고 실행하는데 사용됩니다. 각각의 Operator는 고유한 작업을 수행하도록 설계되어 있으며 DAG 내에서 작업(task)로 사용됩니다. 주요하게 사용되는 Operator는 다음과 같습니다.

 

PythonOperator은 파이썬 함수와 같은 Callable 객체를 넘겨서 실행이 가능한 Operator입니다. 특정 데이터를 처리하거나 외부 API를 호출하는 등 작업을 수행하는데 사용할 수 있습니다. 사용 예제는 다음과 같습니다. 

from airflow.operators.python import PythonOperator

def my_function():
    print("Hello, Airflow!")

my_task = PythonOperator(
    task_id='my_task',
    python_callable=my_function,
)

 

특정 조건에 따라 다른 작업으로 넘어갈 때는 BranchPythonOperator를 사용할 수 있습니다. 사용 예제는 다음과 같습니다.

from airflow.operators.python import BranchPythonOperator

def choose_branch():
    return 'task_a' if some_condition else 'task_b'

branching_task = BranchPythonOperator(
    task_id='branching_task',
    python_callable=choose_branch,
)

 

BashOperator는 Bash 스크립트를 실행해야할 때 사용합니다. 쉘 스크립트를 실행하거나 시스템 명령어를 호출하는데 사용합니다. 사용 예제는 다음과 같습니다.

from airflow.operators.bash import BashOperator

my_task = BashOperator(
    task_id='my_task',
    bash_command='echo "Hello, Airflow!"',
)

 

이외에도 아무것도 실행하지 않는 DummyOperator (DAG 의 여러 태스크 중 기다려야 하는 상황에서 사용), 특정 호스트로 HTTP 요청을 보내고 응답을 반환하는데 사용하는 SimmpleHttpOperator 등 개발 작업을 하는데 필요한 많은 오퍼레이터들이 정의되어 있습니다.   


그리고 앞서 DAG에서 default_args를 통해 operator에 전달하는데, 작업에 적용되는 아래와 같은 인자들을 공통적으로 적용할 수 있습니다. 아래 언급한 인자 이외에 더 많은 것들도 있으니 공식 문서를 통해 확인하시면 좋습니다.[2]

  • owner : 작업의 소유자를 지정
  • depends_on_past : 특정 task가 이전 DAG 실행 결과에 의존할지 여부
  • start_date : 작업 시작 날짜
  • end_date : 작업 종료 날짜
  • retries : 작업 실패 시 재시도 횟수
  • retry_delay : 재시도 간 대기 시간을 timedelta 객체로 지정

그럼 이 내용까지 포함해서 DAG와 함께 사용하는 예제를 살펴보겠습니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email': ['test@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

with DAG(
    dag_id='example_dag',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    description='An example DAG',
    tags=['example'],
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "Hello, Airflow!"',
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command='echo "This is task 2."',
    )

    task1 >> task2   # task1 하고 task2 순서

 

Airflow의 전체 구조

Airflow는 크게 다음 그림과 같은 구조로 이뤄져 있습니다. 각 요소들의 역할은 아래 처럼 정리할 수 있습니다. 

  • Scheduler : 각종 메타 정보를 기록하는 역할을 수행합니다. DAG 디렉토리의 .py 파일들을 파싱해서 DB에 저장하고, DAG들의 스케줄링과 Executor를 통해 실행 시점이 되면 DAG을 실행하고 결과를 저장합니다.
  • Executor : 태스크를 실행하는 역할을 수행하나, Executor의 유형에 따라 외부 프로세스로 실행하는 Remote Executor를 사용하는 경우 외부 Worker로 전달합니다.
  • Worker : DAG의 작업을 수행하고, 생성되는 로그를 저장합니다.
  • Metadaa Database : Scheduler에 의해 메타데이터를 저장합니다. MySQL이나 PostgreSQL 등을 사용합니다.
  • Webserver : Web UI를 담당하고, 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화하는 역할을 수행합니다.

Airflow의 전체 구조 [3]

 

참고자료

[1] https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG

[2] https://airflow.apache.org/docs/apache-airflow/1.10.8/tutorial.html#default-arguments

[3] Architecture Overview — Airflow Documentation

[4] 변성윤. "[Product Serving] Batch Serving과 Airflow" 

반응형
반응형

이전에 머신러닝, 딥러닝 프로덕트의 디자인 패턴에 대해 살펴본 적이 있습니다. (아래 링크 참고)

 

2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴

 

그 중 하나인 Web Single 패턴은 API 서버 코드에 모델을 포함시켜 배포하는 방식으로 서비스(모델의 예측과 추론)가 필요한 곳에서 요청에 따라 모델의 결과를 반환하는 방식입니다. 이를 직접 구현하기 위해서는 아래와 같은 요소들을 구현할 필요가 있습니다.

 

  • Config 구현
  • 데이터베이스 구현
  • FastAPI 서버 구현
    • 모델 불러오기
    • 예측 결과 반환 및 저장 
    • 데이터베이스 조회

Config 구현

웹이 동작할 수 있도록 필요한 요소들을 정리해둔 config 파일을 만듭니다. 여기서 pydantic을 통해 데이터 검증을 수행합니다. pydantic에 대한 사용방법은 이전 글(아래)을 참고하시기 바랍니다.

# config.py
from pydantic import Field
from pydantic_settings import BaseSettings


class Config(BaseSettings):
    db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL")
    model_path: str = Field(default="model.joblib", env="MODEL_PATH")
    app_env: str = Field(default="local", env="APP_ENV")


config = Config()

데이터베이스 구현

데이터를 저장하고 가져오기 위한 데이터베이스를 구현합니다. FastAPI에서 활용할 수 있는 SQLModel을 활용하여 config에서 정의한 db_url로 구동을 위한 엔진을 만듭니다. 

# database.py
import datetime
from sqlmodel import SQLModel, Field, create_engine
from config import config 


class PredictionResult(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    result: int
    created_at : str = Field(default_factory=datetime.datetime.now) # 동적으로 기본값 설정


engine = create_engine(config.db_url)

모델 불러오기

모델을 불러올 수 있도록 별도의 의존성 파이썬 파일을 만듭니다. 모델을 가져오는 등의 기능을 구현할 파일입니다. 

# dependency.py
model = None


def load_model(model_path: str):
    import joblib

    global model
    model = joblib.load(model_path)

 

그리고 FastAPI 서버의 메인 코드에 아래와 같이 구현해 최초 생성할 때 데이터베이스 테이블을 만들고 모델을 로드합니다. lifespan을 통해 애플리케이션이 구동될 때 작동할 수 있도록 합니다. 

# main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from loguru import logger
from sqlmodel import SQLModel

from config import config
from database import engine
from dependency import load_model 


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 데이터베이스 테이블 생성
    logger.info("Creating databas table")
    SQLModel.metadata.create_all(engine)

    # 모델 로드
    logger.info("Loading model")
    load_model(config.model_path)

    yield


app = FastAPI(lifespan=lifespan)

예측 결과 반환 및 저장

애플리케이션이 동작할 때 모델이 로드되었으므로 이 모델을 활용해서 예측을 요청받으면 예측 결과를 반환하고 저장해야 합니다. 하지만, 어떤 방식으로 요청을 받고 응답을 할지 정의되어 있지 않으므로 추가적인 정의가 필요한데, 이를 위해 schema라는 파일을 만듭니다. schema는 네트워크를 통해 데이터를 주고 받을 때 어떤 형태로 주고 받을지 정의하는 것을 말합니다.

 

아래 코드에서는 요청을 받을 때는 feature를 리스트로 받고, 예측 결과를 반환할 때는 id와 result를 정수형으로 반환하도록 했습니다.   

# schemas.py
from pydantic import BaseModel


class PredictionRequest(BaseModel):
    feature: list


class PredictionResponse(BaseModel):
    id: int
    result: int

 

그리고 조금 더 유연한 적용을 위해 모델을 가져올 수 있도록 dependency 파일에 get_model 함수를 추가합니다. 

# dependency.py

def get_model():
    global model
    return model

 

 

이제 api.py 파일을 작성합니다. api.py는 클라이언트에서 API를 호출하고, 학습 결과를 반환하는 역할을 수행합니다. 스크립트는 아래 같은 순서로 작성됩니다.

  1. APIRouter를 이용해 라우팅을 관리합니다. (앱이 작을 때는 없어도 무관하나, 앱이 커질 경우를 대비해 미래 관리하는 것이 좋을 것 같습니다)
  2. get_model 함수로 모델을 가져오고 예측을 수행하게끔 합니다. (이 부분은 예측 모델 함수의 I/O에 ㄸ라서 달라질 수 있습니다)
  3. 정의된 schema인 PredictionResult에 넣습니다.
  4. SQLModel의 Session을 통해 데이터베이스에 결과를 넣고 저장합니다. 
# api.py
from fastapi import APIRouter
from sqlmodel import Session

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


router = APIRouter()

@router.post("/predict")
def predict(request:PredictionRequest) -> PredictionResponse:
    model  = get_model()
    prediction = int(model.predict([request.feature])[0]) # 반환 결과에 따라 변경
    prediction_result = PredictionResult(result=prediction)

	# 데이터베이스 연결 관리 
    with Session(engine) as session: 
        session.add(prediction_result)      # 새로운 객체를 세션에 추가
        session.commit()                    # 세션 변경사항을 DB에 저장
        session.refresh(prediction_result)  # 세션에 있는 객체를 업데이트 

    return PredictionResponse(id=prediction_result.id, result=prediction)

 

이를 동작하게 하려면 main.py에 아래 코드를 추가하면 됩니다. 앞서 라우팅을 선언했으니 include_router로 충분합니다.

# main.py
...
from fastapi import router 

...
app = FastAPI(lifespan=lifespan)
app.include_router(router)  # router를 포함하도록 추가

데이터베이스 조회

저장한 데이터베이스를 GET 메서드로 조회하기 위한 코드입니다. 아래 코드는 Session 객체에서 데이터베이스의 전체를 선택하고 가져오는 방법입니다. SQLModel에서 select를 통해 SQL의 SELECT와 동일한 구문을 생성하고 결과를 조회해서 PredictionResponse schema에 가져옵니다. 

# api.py
from sqlmodel import Session, select

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


@router.get("/predict")
def get_predictions() -> list[PredictionResponse]:
    with Session(engine) as session:
        statement = select(PredictionResult)
        prediction_results = session.exec(statement).all()

        return [
            PredictionResponse(id=prediction_results.id, result=prediction_results.result)
            for prediction_result in prediction_results
        ]

이번에는 id를 필터링해서 데이터베이스에서 id로 필터링해서 조회하는 방법입니다. 예외 처리를 추가했고, id로 get 조회하는 것이 다릅니다. 

# api.py
from fastapi import HTTPException, status


def get_prediction(id: int) -> PredictionResponse:
    with Session(engine) as session:
        prediction_result = session.get(PredictionResult, id)
        if not prediction_result:
            raise HTTPException(
                detail="Not found", status_code=status.HTTP_404_NOT_FOUND
            )
        return PredictionResponse(
            id=prediction_result.id, result=prediction_result.result
        )

 

참고자료

[1] https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/tree/main/02-online-serving(fastapi)/projects/web_single

 

반응형
반응형

Lifesapn function

FastAPI 앱을 실행하거나 종료할 때 로직을 넣고 싶을 경우 사용합니다. 예를 들어, 앱이 시작할 때 머신런이 모델을 로드하고 앱을 종료하면 db 연결을 정리하면서 각각 상황마다 출력하도록 구현할 수 있습니다. 아래 코드는 예제를 구현한 것으로 기본적인 FastAPI 구조는 아래 글을 참고해보시기 바랍니다.

 

2024.12.12 - [Python/etc] - [Fast API] 파이썬으로 웹 구현하기 (1) | Parameter, Form, File, Request & Response Body

 

from contextlib import asynccontextmanager


def fake_answer_to_everything_ml_model(x: float):
    return x * 42

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    print("Start Up Event")
    ml_models['answer_to_everything'] = fake_answer_to_everything_ml_model

    yield # 앱이 기동되는 시점

    print("Shutdown Event!")
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
def predict(x: float):
    result = ml_models['answer_to_everything'](x)
    return {"result" : result}

 

아래 이미지와 같이 잘 잘동하는 것처럼 보입니다. 

 

API Router

앱이 커짐에 따라 get, post가 많아질텐데 이를 관리하기 위해 API Router를 활용할 수 있습니다. @app.get, @app.post을 사용하지 않고 별도의 router 파일을 따로 설정하고 app에 가져와서 사용하는 방식입니다. 다음 예제는 user라는 라우터를 별도로 만들어 app에 연결하는 코드입니다. 우선은 user.py로 만드는 코드입니다.

from fastapi import APIRouter


user_router = APIRouter(prefix="/users")


@user_router.get("/", tags=['users'])
def read_users():
    return [{'username':'Rick'}, {"username": "Morty"}]


@user_router.get("/me", tags=['users'])
def read_user_me():
    return {"username": "fakecurrentuser"}


@user_router.get("/{username}", tags=['users'])
def read_user(username: str):
    return {"username": username}

 

다음으로 저희가 만든 앱에 해당 라우터를 불러옵니다. 같은 디렉토리에 user.py가 있다고 가정하고 import해온 후에 이를 FastAPI 앱에 include_router를 통해 추가해줍니다. 역서 참고로 include_router는 스크립트가 실행되는 __name__ == '__main__'에 들어가게 되면 app에 라우터가 포함되지 않은 상태로 정의되기 때문에 제대로 처리하지 못합니다.

from fastapi import FastAPI
import uvicorn
import user

app = FastAPI()
app.include_router(user.user_router)


if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

 

아래와 같이 잘 작동하고 있는 것을 확인할 수 있습니다.

Error Handler

에러가 발생한 경우 관련 기록을 남기고, 메시지를 클라이언트에 보내는 것은 애플리케이션을 운영하는 관점에서 필요한 일입니다. FastAPI에서는 이러한 에러 처리를 위해 HTTP Exception을 통해 가능하도록 하고 있습니다. 아래는 그 예시 코드입니다. 

 

from fastapi import HTTPException


@app.get("/v/{item_id}")
async def find_by_id(item_id: int):
    try :
        item = items[items_id]
    except KeyError:
        raise HTTPException(status_code=404, 
                            detail=f"아이템을 찾을 수 없습니다. id : {item_id}")
    return item

 

아래 이미지를 보면 제대로 입력된 경우(왼쪽) 원하는 값을 반환하지만, 사전에 정의되지 않은 오류 발생의 경우(오른쪽) 에러 메시지를 띄우는 것을 확인할 수 있습니다.

Background Task

여러 작업 중 시간이 오래 소요되는 것들에 대해서는 백그라운드로 실행할 수 있습니다. 다음은 주어진 대기 시간(wait_time) 동안 작업을 수행하고 생성한 작업 결과를 저장해 필요할 때 조회하는 코드입니다. 

from time import sleep
from fastapi import BackgroundTasks
from uuid import UUID, uuid4
from pydantic import BaseModel, Field


# 입력 데이터 모델 
class TaskInput(BaseModel):
    id_: UUID = Field(default_factory=uuid4)
    wait_time: int

# 대기 및 작업 저장
def cpu_bound_task(id_ : UUID, wait_time: int):
    sleep(wait_time)
    result = f"task done after {wait_time}"
    task_repo[id_] = result

# 비동기 작업
@app.post("/task", status_code=202) # 비동기 작업에 대해서 status code 202를 반환
async def create_task_in_background(task_input: TaskInput, background_tasks: BackgroundTasks):
    background_tasks.add_task(cpu_bound_task, id_=task_input.id_, wait_time=task_input.wait_time)
    return "ok"

# 작업 결과 조회
@app.get("/task/{task_id}")
def get_task_result(task_id: UUID):
    try:
        return task_repo[task_id]
    except KeyError:
        return None

참고자료

[1] 변성윤. "[Product Serving] Fast API (2)". boostcamp AI Tech. 

[2] https://fastapi.tiangolo.com/advanced/events/

[3] https://fastapi.tiangolo.com/reference/apirouter/

[4] https://fastapi.tiangolo.com/tutorial/handling-errors/

[5] https://fastapi.tiangolo.com/tutorial/background-tasks/

반응형
반응형

Fast API 기본 구조

Fast API를 통해 웹을 구축할 때 기본적인 구조는 아래 코드와 같이 구축하면 됩니다. 기본적으로 FastAPI를 불러오고, 관련된 코드를 작성합니다.

from fastapi import FastAPI
import uvicorn

app = FastAPI()

##### Code ######
@app.get("/")
def read_root():
    return {"Hello": "World"}
##### Code ######

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

그리고 uvicorn을 통해 파이썬 파일을 실행시킵니다. uvicorn은 Asynchronous Server Gateway Interface(ASGI)라 불리는 비동기 코드를 처리할 수 있는 파이썬 웹 서버인 프레임워크 간 표준 인터페이스 입니다. 실행하는 방법은 아래와 같이 CLI 입력하면 됩니다. 만약 아래처럼 입력하지 않고 코드 하단에 uvicorn.run을 추가하면 됩니다.

$uvicorn [.py 확장자 없는 파일명]:app --reload

Path Parameter

패스 파라미터를 구현하기 위해서는 코드 부분에 아래 내용을 추가하면 됩니다.

@app.get("/users/{user_id}")
def get_user(user_id):
    return {"user_id": user_id}

Query Parameter

쿼리 파라미터를 구현하기 위해서는 코드 부분에 아래 내용을 추가하면 됩니다.

test_db = [
    {
        'item_name' : "Foo"
    },
    {
        'item_name' : "Bar"
    }
]

@app.get("/items/")
def read_item(skip:int = 0, limit: int = 10):
    return test_db[skip:skip+limit]

앞서 언급한 패스 파라미터와 쿼리 파라미터에 대한 간단한 내용은 아래에서 살펴볼 수 있습니다.

[온라인 서빙을 위한 웹에 대한 기본 지식 정리 | REST API

온라인 서빙고려사항온라인 서빙은 실시간으로 클라이언트 요청에 대한 서비스를 제공해야 하는 구조 입니다. 따라서 고객 경험을 좋게 하기 위해서는 지연 시간(Latency)를 최소화할 필요가 있

seanpark11.tistory.com](https://seanpark11.tistory.com/173#url-%ED%8C%8C%EB%9D%BC%EB%AF%B8%ED%84%B0)

Optional Parameter

특정 파라미터에 대해서 선택적으로 하고 싶은 경우 아래 코드와 같이 구현할 수 있습니다. 아래 코드에서는 q가 있냐 없느냐에 따라 다르게 반환합니다. 이를 구현하기 위해선 typing 라이브러리의 Optional을 활용합니다.

from typing import Optional 

@app.get("/items/{item_id}")
def read_item(item_id:str, q: Optional[str]=None):
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}

Request Body

클라이언트에서 API에 데이터를 보낼 때 사용하는 Payload로 데이터를 보내고 싶다면, POST 메서드를 사용합니다. 아래 예제는 Request Body에 타입 힌트를 생성한 클래스를 상속받아 데이터를 검증할 수 있도록 했습니다.

from typing import Optional
from pydantic import BaseModel


class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None


@app.post("/items/")
def create_item(item: Item):
    return item

Schemas에서 정의한 내용을 살펴봤습니다. 정의된 것처럼 잘 나오는 것을 확인할 수 있습니다.

Response Body

Response Body는 Request Body의 반대로 API의 응답이 클라이언트에게로 전달되는 것입니다. 아래 코드는 입력되는 값(ItemIn)에 대해 응답하는 값(ItemOut)을 POST 메서드를 적용합니다.

class ItemIn(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None


class ItemOut(BaseModel):
    name: str
    price: float
    tax: Optional[float] = None


@app.post("/items/", response_model=ItemOut)
def create_item(item: ItemIn):
    return item

Form

구글 폼과 같이 데이터를 입력받기 위해 Form 클래스를 사용하면, 요청하는 형식의 함수인 Request의 데이터를 가져올 수 있습니다. GET을 통해 해당 페이지를 조회하고 입력받은 데이터를 보내는 POST 메서드를 각각 구현합니다.

또한, 프론트의 구현을 위해 Jinja2를 활용했는데 템플릿을 활용해 로그인 폼을 구현합니다. Jinja2Templates에서는 {{}}에 들어있는 데이터를 사용할 수 있게 됩니다. (여기서는 login_form.html은 있다고 가정) 추가적으로 덧붙인다면, 아래와 같은 코드 구현을 위해서는 사전에 Jinja2와 python-multipart 설치가 필요합니다.

from fastapi import Form, Request
from fastapi.templating import Jinja2Templates


template = Jinja2Templates(directory='./')

@app.get("/login/")
def get_login_form(request: Request):
    return template.TemplateResponse('login_form.html', context={'request': request})

@app.post("/login/")
def login(username: str = Form(...), password: str = Form(...)):
    return {"username" : username}

File

파일을 업로드하고 싶다면 File과 UploadFile을 이용할 수 있습니다. main 함수에서는 입력받을 수 있는 요소를 넣기 위해 HTML을 입력해 HTMLResponse를 통해 구현했습니다. 그리고 파일을 업로드하면 각각 사이즈와 파일명을 반환할 수 있는 함수들입니다.

from typing import List
from fastapi import File, UploadFile
from fastapi.responses import HTMLResponse


@app.post("/files/")
def create_files(files: List[bytes] = File(...)):
    return {"file_sizes": [len(file) for file in files]}

@app.post("/uploadfiles/")
def create_upload_files(files: List[UploadFile] = File(...)):
    return {"filename": [file.filename for file in files]}

@app.get("/")
def main():
    content = """
<body>
<form action="/files/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
<form action="/uploadfiles/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>
</body>
    """
    return HTMLResponse(content=content)

참고자료

[1] 변성윤. "[Product Serving] Fast API (1)". boostcamp AI Tech.

반응형