반응형

이전에 머신러닝, 딥러닝 프로덕트의 디자인 패턴에 대해 살펴본 적이 있습니다. (아래 링크 참고)

 

2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴

 

그 중 하나인 Web Single 패턴은 API 서버 코드에 모델을 포함시켜 배포하는 방식으로 서비스(모델의 예측과 추론)가 필요한 곳에서 요청에 따라 모델의 결과를 반환하는 방식입니다. 이를 직접 구현하기 위해서는 아래와 같은 요소들을 구현할 필요가 있습니다.

 

  • Config 구현
  • 데이터베이스 구현
  • FastAPI 서버 구현
    • 모델 불러오기
    • 예측 결과 반환 및 저장 
    • 데이터베이스 조회

Config 구현

웹이 동작할 수 있도록 필요한 요소들을 정리해둔 config 파일을 만듭니다. 여기서 pydantic을 통해 데이터 검증을 수행합니다. pydantic에 대한 사용방법은 이전 글(아래)을 참고하시기 바랍니다.

# config.py
from pydantic import Field
from pydantic_settings import BaseSettings


class Config(BaseSettings):
    db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL")
    model_path: str = Field(default="model.joblib", env="MODEL_PATH")
    app_env: str = Field(default="local", env="APP_ENV")


config = Config()

데이터베이스 구현

데이터를 저장하고 가져오기 위한 데이터베이스를 구현합니다. FastAPI에서 활용할 수 있는 SQLModel을 활용하여 config에서 정의한 db_url로 구동을 위한 엔진을 만듭니다. 

# database.py
import datetime
from sqlmodel import SQLModel, Field, create_engine
from config import config 


class PredictionResult(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    result: int
    created_at : str = Field(default_factory=datetime.datetime.now) # 동적으로 기본값 설정


engine = create_engine(config.db_url)

모델 불러오기

모델을 불러올 수 있도록 별도의 의존성 파이썬 파일을 만듭니다. 모델을 가져오는 등의 기능을 구현할 파일입니다. 

# dependency.py
model = None


def load_model(model_path: str):
    import joblib

    global model
    model = joblib.load(model_path)

 

그리고 FastAPI 서버의 메인 코드에 아래와 같이 구현해 최초 생성할 때 데이터베이스 테이블을 만들고 모델을 로드합니다. lifespan을 통해 애플리케이션이 구동될 때 작동할 수 있도록 합니다. 

# main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from loguru import logger
from sqlmodel import SQLModel

from config import config
from database import engine
from dependency import load_model 


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 데이터베이스 테이블 생성
    logger.info("Creating databas table")
    SQLModel.metadata.create_all(engine)

    # 모델 로드
    logger.info("Loading model")
    load_model(config.model_path)

    yield


app = FastAPI(lifespan=lifespan)

예측 결과 반환 및 저장

애플리케이션이 동작할 때 모델이 로드되었으므로 이 모델을 활용해서 예측을 요청받으면 예측 결과를 반환하고 저장해야 합니다. 하지만, 어떤 방식으로 요청을 받고 응답을 할지 정의되어 있지 않으므로 추가적인 정의가 필요한데, 이를 위해 schema라는 파일을 만듭니다. schema는 네트워크를 통해 데이터를 주고 받을 때 어떤 형태로 주고 받을지 정의하는 것을 말합니다.

 

아래 코드에서는 요청을 받을 때는 feature를 리스트로 받고, 예측 결과를 반환할 때는 id와 result를 정수형으로 반환하도록 했습니다.   

# schemas.py
from pydantic import BaseModel


class PredictionRequest(BaseModel):
    feature: list


class PredictionResponse(BaseModel):
    id: int
    result: int

 

그리고 조금 더 유연한 적용을 위해 모델을 가져올 수 있도록 dependency 파일에 get_model 함수를 추가합니다. 

# dependency.py

def get_model():
    global model
    return model

 

 

이제 api.py 파일을 작성합니다. api.py는 클라이언트에서 API를 호출하고, 학습 결과를 반환하는 역할을 수행합니다. 스크립트는 아래 같은 순서로 작성됩니다.

  1. APIRouter를 이용해 라우팅을 관리합니다. (앱이 작을 때는 없어도 무관하나, 앱이 커질 경우를 대비해 미래 관리하는 것이 좋을 것 같습니다)
  2. get_model 함수로 모델을 가져오고 예측을 수행하게끔 합니다. (이 부분은 예측 모델 함수의 I/O에 ㄸ라서 달라질 수 있습니다)
  3. 정의된 schema인 PredictionResult에 넣습니다.
  4. SQLModel의 Session을 통해 데이터베이스에 결과를 넣고 저장합니다. 
# api.py
from fastapi import APIRouter
from sqlmodel import Session

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


router = APIRouter()

@router.post("/predict")
def predict(request:PredictionRequest) -> PredictionResponse:
    model  = get_model()
    prediction = int(model.predict([request.feature])[0]) # 반환 결과에 따라 변경
    prediction_result = PredictionResult(result=prediction)

	# 데이터베이스 연결 관리 
    with Session(engine) as session: 
        session.add(prediction_result)      # 새로운 객체를 세션에 추가
        session.commit()                    # 세션 변경사항을 DB에 저장
        session.refresh(prediction_result)  # 세션에 있는 객체를 업데이트 

    return PredictionResponse(id=prediction_result.id, result=prediction)

 

이를 동작하게 하려면 main.py에 아래 코드를 추가하면 됩니다. 앞서 라우팅을 선언했으니 include_router로 충분합니다.

# main.py
...
from fastapi import router 

...
app = FastAPI(lifespan=lifespan)
app.include_router(router)  # router를 포함하도록 추가

데이터베이스 조회

저장한 데이터베이스를 GET 메서드로 조회하기 위한 코드입니다. 아래 코드는 Session 객체에서 데이터베이스의 전체를 선택하고 가져오는 방법입니다. SQLModel에서 select를 통해 SQL의 SELECT와 동일한 구문을 생성하고 결과를 조회해서 PredictionResponse schema에 가져옵니다. 

# api.py
from sqlmodel import Session, select

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


@router.get("/predict")
def get_predictions() -> list[PredictionResponse]:
    with Session(engine) as session:
        statement = select(PredictionResult)
        prediction_results = session.exec(statement).all()

        return [
            PredictionResponse(id=prediction_results.id, result=prediction_results.result)
            for prediction_result in prediction_results
        ]

이번에는 id를 필터링해서 데이터베이스에서 id로 필터링해서 조회하는 방법입니다. 예외 처리를 추가했고, id로 get 조회하는 것이 다릅니다. 

# api.py
from fastapi import HTTPException, status


def get_prediction(id: int) -> PredictionResponse:
    with Session(engine) as session:
        prediction_result = session.get(PredictionResult, id)
        if not prediction_result:
            raise HTTPException(
                detail="Not found", status_code=status.HTTP_404_NOT_FOUND
            )
        return PredictionResponse(
            id=prediction_result.id, result=prediction_result.result
        )

 

참고자료

[1] https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/tree/main/02-online-serving(fastapi)/projects/web_single

 

반응형