[알고리즘] 파이썬으로 이진 탐색(Binary Search) 구현하기
이진 탐색(혹은 이분 탐색)은 정렬된 배열에서 특정한 값을 찾는 알고리즘이다. 탐색 범위를 절반씩 줄여가며 빠르게 원하는 값을 찾아낼 수 있어 효율적이다. 이진 탐색의 기본적인 원리는 다음과 같다.이분 탐색을 위해 오름차순 또는 내림차순으로 정렬된 배열을 준비탐색 범위의 중간에 있는 값을 찾아서, 찾고자 하는 값과 비교비교에서 범위를 좁힌다. (오름차순 기준) 3-1. 중간값이 찾고자 하는 값보다 크면 탐색 범위를 왼쪽 절반으로 좁힌다. 3-2. 중간값이 찾고자 하는 값보다 작으면 탐색 범위를 오른쪽 절반으로 좁힌다.3의 과정을 원하는 값을 찾을 때까지 반복 위 과정을 하나의 함수로 정리하면 다음과 같다.def binary_search(arr, target): left = 0 right = l..
2025.01.21
no image
PyTorch로 지식 증류(Knowledge Distillation) 구현하기
지식 증류(Knowledge Distillation)는 보다 가벼운 모델을 사용하기 위해 큰 모델에서 작은 모델로 지식을 전달하여 학습의 효율성을 높여주는 방법입니다. (참고) 여기서는 Response-based 기법을 수행하는 것을 구현하려고 합니다.큰 모델(Teacher)과 작은 모델(Student)는 별도로 구현이 완료되어 있고, 여기서 우리가 구현해야 하는 것은 기존의 손실함수 외에 새로운 손실함수를 통합하는 과정입니다.    참고자료PyTorch Knowledge Distillation Tutorial
2025.01.03
PyTorch로 Distributed Data Parallel 구현하기
분산 데이터 병렬화(Distributed Data Parallel, DDP)은 효율적인 분산 및 병렬 처리를 위한 데이터 병렬화 방법론입니다. DDP는 PyTorch의 강력한 모듈로 여러 GPU에서 모델이 구동될 수 있도록 도와줍니다. DDP는 각 프로세스에서 모델의 복사본을 유지하며, 데이터를 병렬로 처리하고 각 GPU의 계산 결과를 자동으로 동기화하여 효율성을 극대화 합니다.DDP 프로세스환경 설정DDP를 실행하기 위해 분산 학습을 위한 기본 환경 변수를 설정합니다. dist.init_process_group로 DDP 모듈을 초기화하는데, 이는 서로 다른 GPU 간 소통 및 동기화를 위해 필요합니다. 초기화에서 사용하는 값들의 주요의미는 다음과 같습니다. PyTorch에서 지원하는 분산처리를 위한 백..
2025.01.03
PyTorch로 Pruning 구현하기
1. Pruning 준비하기Pruning(가지치기)는 모델 경량화 방법 중 하나로 파라미터를 줄여 모델의 경량화를 목표로 한다. 이전 글을 통해 이론적 배경에 대해 살펴본 바 있었는데, 이번에는 PyTorch에서 제공하고 있는 도구를 활용해 Pruning을 적용하는 것을 연습해보고자 한다.우선, model이라는 변수에 딥러닝 모델 인스턴스가 있음을 가정하고, 필요한 가지치기 클래스를 불러온다. 그리고 module이라는 이름으로 적용 대상인 하나의 층을 가져온다. import torch.nn.utils.prune as prunemodel = Model()module = model.conv12. Pruning 적용하기지정한 module에 대해서 torch.nn.utils.prune에서 제공하는 기법 중 하나..
2025.01.02
파이썬으로 백트래킹 구현하기 | backtracking
기본 개념백트래킹(backtracking) 알고리즘은 문제 해결을 위한 모든 가능한 해를 체계적으로 탐색하는 방법입니다. 가능한 해를 구성하면서 조건을 만족하지 않는 경우 그 해를 포기하고 이전 단계로 돌아가는 방식 (backtrack)으로 동작합니다. 이 알고리즘은 DFS(깊이 우선 탐색) 방식으로 트리나 그래프 구조에서 해를 찾아가며 불필요한 탐색을 줄여줍니다.알고리즘 구조백트래킹 알고리즘은 깊이 우선 탐색(DFS) 방식으로 동작하며, 재귀적으로 호출됩니다. 조건에 부합하지 않은 선택에 대해서는 가지치기(pruning)를 통해 더 이상 탐색하지 않습니다.종료 조건 확인먼저, 현재 상태가 문제의 정답이라면 저장하고 종료합니다. 재귀적으로 호출되기에 종료 조건을 확인해서 종료해야 하는 경우 함수 동작을 ..
2024.12.31
no image
[Airflow] Slack 연동해 Task 실패 메시지 보내기
1. Slack 패키지 설치하기Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다. pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.02. Slack API Key 발급Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정..
2024.12.22
[MLflow] MLOps를 위한 MLflow 튜토리얼
MLflow는 머신러닝 관련 실험들을 관리하고 내용들을 기록할 수 있는 오픈소스 솔루션입니다. 실험의 소스 코드, 하이퍼 파라미터, 지표, 기타 부산물들을 저장하고, 해당 모델들을 협업하는 동료들과 공유할 필요가 있는데 하나의 MLflow 서버 위에서 각자 자기 실험을 만들고 공유할 수 있습니다. 비슷한 기능의 Weights & Biases가 있기는 하지만, 이는 별도 솔루션을 구매해야 하기 때문에 비용 측면에 있어서 강점이 있습니다.  1. MLflow 설치와 실행우선, MLflow가 없는 경우 설치가 필요합니다. pip install mlflow 설치가 완료되었다면, MLflow를 실행하기 위해서는 아래와 같은 코드를 통해 UI를 불러올 수 있습니다.# UI 실행$ mlflow ui# 호스트 및 포트..
2024.12.20
no image
[Airflow] Airflow 기초 개념 공부 및 Hello World! | DAG, Operator
Airflow와 DAG이전 글에서 일정 기간 단위로 Product Serving 하는 것을 Batch Serving이라고 합니다. 이와는 다르지만, 일정 주기를 기준으로 소프트웨어 프로그램을 자동으로 실행(스케줄링)하는 것을 Batch Processing 입니다. 이러한 스케줄링을 진행하는 방법 중 하나로 Airflow가 대표적으로 사용되고 있습니다.  2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴 Airflow는 Airbnb에서 개발한 스케줄링 및 워크플로우 도구입니다. Airflow의 작업들의 흐름과 순서를 정의하는 요소를 DAG(Directed Acylic Graph)라고 합니다. DAG는 Airflow의 가장 중요한 요소로 동작 ..
2024.12.20
[FastAPI] Web Single 패턴 구현하기
이전에 머신러닝, 딥러닝 프로덕트의 디자인 패턴에 대해 살펴본 적이 있습니다. (아래 링크 참고) 2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴 그 중 하나인 Web Single 패턴은 API 서버 코드에 모델을 포함시켜 배포하는 방식으로 서비스(모델의 예측과 추론)가 필요한 곳에서 요청에 따라 모델의 결과를 반환하는 방식입니다. 이를 직접 구현하기 위해서는 아래와 같은 요소들을 구현할 필요가 있습니다. Config 구현데이터베이스 구현FastAPI 서버 구현모델 불러오기예측 결과 반환 및 저장 데이터베이스 조회Config 구현웹이 동작할 수 있도록 필요한 요소들을 정리해둔 config 파일을 만듭니다. 여기서 pydantic을 통해 데..
2024.12.18
반응형

이진 탐색(혹은 이분 탐색)은 정렬된 배열에서 특정한 값을 찾는 알고리즘이다. 탐색 범위를 절반씩 줄여가며 빠르게 원하는 값을 찾아낼 수 있어 효율적이다. 이진 탐색의 기본적인 원리는 다음과 같다.


  1. 이분 탐색을 위해 오름차순 또는 내림차순으로 정렬된 배열을 준비
  2. 탐색 범위의 중간에 있는 값을 찾아서, 찾고자 하는 값과 비교
  3. 비교에서 범위를 좁힌다. (오름차순 기준)
    3-1. 중간값이 찾고자 하는 값보다 크면 탐색 범위를 왼쪽 절반으로 좁힌다.
    3-2. 중간값이 찾고자 하는 값보다 작으면 탐색 범위를 오른쪽 절반으로 좁힌다.
  4. 3의 과정을 원하는 값을 찾을 때까지 반복

위 과정을 하나의 함수로 정리하면 다음과 같다.

def binary_search(arr, target):
    left = 0
    right = len(arr) - 1

    while left <= right:
        mid = (left + right) // 2

        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1

    return -1  # 존재하지 않는 경우
반응형
반응형

지식 증류(Knowledge Distillation)는 보다 가벼운 모델을 사용하기 위해 큰 모델에서 작은 모델로 지식을 전달하여 학습의 효율성을 높여주는 방법입니다. (참고) 여기서는 Response-based 기법을 수행하는 것을 구현하려고 합니다.



큰 모델(Teacher)과 작은 모델(Student)는 별도로 구현이 완료되어 있고, 여기서 우리가 구현해야 하는 것은 기존의 손실함수 외에 새로운 손실함수를 통합하는 과정입니다.



 

 

 

 

참고자료

  1. PyTorch Knowledge Distillation Tutorial
반응형
반응형

분산 데이터 병렬화(Distributed Data Parallel, DDP)은 효율적인 분산 및 병렬 처리를 위한 데이터 병렬화 방법론입니다. DDP는 PyTorch의 강력한 모듈로 여러 GPU에서 모델이 구동될 수 있도록 도와줍니다. DDP는 각 프로세스에서 모델의 복사본을 유지하며, 데이터를 병렬로 처리하고 각 GPU의 계산 결과를 자동으로 동기화하여 효율성을 극대화 합니다.


DDP 프로세스

환경 설정

DDP를 실행하기 위해 분산 학습을 위한 기본 환경 변수를 설정합니다. dist.init_process_group로 DDP 모듈을 초기화하는데, 이는 서로 다른 GPU 간 소통 및 동기화를 위해 필요합니다. 초기화에서 사용하는 값들의 주요의미는 다음과 같습니다. PyTorch에서 지원하는 분산처리를 위한 백엔드 종류는 여기에서 확인 가능합니다. 여기서는 gloo를 사용했습니다.

 

  • MASTER_ADDR: 마스터 프로세스의 IP 주소.
  • MASTER_PORT: 마스터 프로세스의 통신 포트.
  • RANK: 현재 프로세스의 순위(고유 ID).
  • WORLD_SIZE: 총 프로세스 수.
import os
import torch
import torch.distributed as dist



def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

모델 및 데이터 준비

모델을 준비한 뒤 DDP로 감싸줍니다. 또한 DistributedSampler를 사용해 데이터셋을 분산 학습에 맞게 나눕니다.

from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler


model = MyModel().to(device)
model = DDP(model, device_ids=[rank])

data_sampler = DistributedSampler(dataset)
data_loader = DataLoader(dataset, sampler=data_sampler, batch_size=batch_size)

학습 루프 작성

DDP를 사용할 때는 일반적인 학습 루프와 유사하지만, 각 프로세스가 독립적으로 실행되며 DistributedSampler를 재설정해야 합니다:

for epoch in range(num_epochs):
    data_sampler.set_epoch(epoch)  # 각 epoch마다 샘플링 초기화
    for data, target in data_loader:
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

프로세스 종료

모든 학습이 끝난 후에는 분산 프로세스를 정리합니다.

dist.destroy_process_group()

최종 코드

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

# 모델 정의
class MyModel(torch.nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.linear = torch.nn.Linear(10, 1)

    def forward(self, x):
        return self.linear(x)

# 분산 학습 함수
def main(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    dist.init_process_group(
        backend='nccl',
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

    # 데이터 준비
    dataset = torch.utils.data.TensorDataset(torch.randn(100, 10), torch.randn(100, 1))
    sampler = DistributedSampler(dataset)
    data_loader = DataLoader(dataset, sampler=sampler, batch_size=32)

    # 모델 준비
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    criterion = torch.nn.MSELoss()

    # 학습 루프
    for epoch in range(10):
        sampler.set_epoch(epoch)
        for data, target in data_loader:
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

    dist.destroy_process_group()

if __name__ == "__main__":
    world_size = 2  # GPU 개수
    torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size)

참고자료

  1. PyTorch Distributed Data Parallel Tutorial
반응형
반응형

1. Pruning 준비하기

Pruning(가지치기)는 모델 경량화 방법 중 하나로 파라미터를 줄여 모델의 경량화를 목표로 한다. 이전 을 통해 이론적 배경에 대해 살펴본 바 있었는데, 이번에는 PyTorch에서 제공하고 있는 도구를 활용해 Pruning을 적용하는 것을 연습해보고자 한다.



우선, model이라는 변수에 딥러닝 모델 인스턴스가 있음을 가정하고, 필요한 가지치기 클래스를 불러온다. 그리고 module이라는 이름으로 적용 대상인 하나의 층을 가져온다.

import torch.nn.utils.prune as prune

model = Model()
module = model.conv1

2. Pruning 적용하기

지정한 module에 대해서 torch.nn.utils.prune에서 제공하는 기법 중 하나를 선택하고, 모듈과 파라미터를 지정한다. 아래 예제는 module의 가중치 50%를 랜덤으로 가지치기 하는 방법을 선택했다.

prune.random_unstructured(module, name="weight", amount=0.5)

만약 가중치가 아니라 편향값에 Pruning을 수행하려면 아래와 같이 수행할 수 있다. 여기서는 L1 Norm 값이 가장 작은 편향값 3개를 가지치기를 시도하였다.

prune.l1_unstructured(module, name="bias", amount=3)

잘 적용되었는지 확인하려면, 아래 코드 중 하나로 결과값을 출력할 수 있다.

print(module.weight)
print(list(module.named_parameters()))

3.정리

Pruning 기법이 영구적으로 적용되게 하고 싶을 때 prune을 적용하면서 생성된 것을 제거하는 과정이 필요하다. 이를 위해서는 prune에 있는 remove를 사용한다.

prune.remove(module, 'weight')

참고자료

  1. PyTorch Pruning Tutorial
반응형
반응형

기본 개념

백트래킹(backtracking) 알고리즘은 문제 해결을 위한 모든 가능한 해를 체계적으로 탐색하는 방법입니다. 가능한 해를 구성하면서 조건을 만족하지 않는 경우 그 해를 포기하고 이전 단계로 돌아가는 방식 (backtrack)으로 동작합니다. 이 알고리즘은 DFS(깊이 우선 탐색) 방식으로 트리나 그래프 구조에서 해를 찾아가며 불필요한 탐색을 줄여줍니다.

알고리즘 구조

백트래킹 알고리즘은 깊이 우선 탐색(DFS) 방식으로 동작하며, 재귀적으로 호출됩니다. 조건에 부합하지 않은 선택에 대해서는 가지치기(pruning)를 통해 더 이상 탐색하지 않습니다.

종료 조건 확인

먼저, 현재 상태가 문제의 정답이라면 저장하고 종료합니다. 재귀적으로 호출되기에 종료 조건을 확인해서 종료해야 하는 경우 함수 동작을 멈추도록 합니다. 이를 의사코드로 나타내면 다음과 같습니다.

def backtrack(현재_상태):
    if 정답인지(현재_상태):  # 종료 조건
        해답_저장(현재_상태)
        return

선택 탐색

종료 조건이 아니라면, 현재 상태에서 선택할 수 있는 모든 경우를 살펴보도록 for문으로 탐색합니다. 백트래킹은 탐색 도중 특정 조건을 만족하는 경우에 대해서만 선택을 시도하고, 탐색이 끝나면 선택을 취소하여 이전 상태로 복구합니다. 파이썬 자료구조에서 리스트에 추가(append)를 통해 선택과 pop을 통해 취소를 구현할 수 있습니다. 기본적인 의사코드는 아래와 같습니다.

for 가능한_선택 in 현재_상태의_모든_선택:
    if 유망한_선택(가능한_선택):  # 가지치기 조건
        선택(가능한_선택)
        backtrack(새로운_상태)
        선택_취소(가능한_선택)  # 원래 상태로 복구

전체 구조

이들을 조합하면 아래와 같은 의사코드 전체를 만들 수 있습니다.

def backtrack(현재_상태):
    if 정답인지(현재_상태):  # 종료 조건
        해답_저장(현재_상태)
        return

    for 가능한_선택 in 현재_상태의_모든_선택:
        if 유망한_선택(가능한_선택):  # 가지치기 조건
            선택(가능한_선택)
            backtrack(새로운_상태)
            선택_취소(가능한_선택)  # 원래 상태로 복구

백트레킹 예제

아래는 백트래킹 대표 예제 입니다.

조합 생성 문제

주어진 배열에서 길이가 k인 모든 조합을 구하는 문제 입니다.

def generate_combinations(nums, k):
    def backtrack(start, combination):
        if len(combination) == k:  # 종료 조건
            result.append(combination[:])
            return

        for i in range(start, len(nums)):
            combination.append(nums[i])  # 선택
            backtrack(i + 1, combination)  # 다음 선택으로 이동
            combination.pop()  # 선택 취소

    result = []
    backtrack(0, [])
    return result

# 실행 예시
nums = [1, 2, 3]
k = 2
print(generate_combinations(nums, k))  # 출력: [[1, 2], [1, 3], [2, 3]]

N-Queens 문제

체스판 위에 N개의 퀸을 배치하여 서로 공격하지 않도록 만드는 문제 입니다.

def solve_n_queens(n):
    def is_valid(queen_positions, row, col):
        # 기존 퀸들과 같은 열 또는 대각선 상에 있는지 확인
        for r, c in enumerate(queen_positions):
            if c == col or abs(c - col) == abs(r - row):  # 같은 열 또는 대각선
                return False
        return True

    def backtrack(queen_positions):
        # 종료 조건: 모든 퀸을 배치한 경우
        if len(queen_positions) == n:
            solutions.append(queen_positions[:])  # 현재 배치를 결과에 저장
            return

        for col in range(n):  # 현재 행에서 모든 열을 탐색
            if is_valid(queen_positions, len(queen_positions), col):
                queen_positions.append(col)  # 선택
                backtrack(queen_positions)  # 다음 행으로 이동
                queen_positions.pop()  # 선택 취소 (백트래킹)

    solutions = []
    backtrack([])  # 초기 상태: 퀸을 아무 곳에도 두지 않음
    return solutions

# 실행 예시
n = 4
result = solve_n_queens(n)
print(result)  # 출력: [[1, 3, 0, 2], [2, 0, 3, 1]]

반응형
반응형

1. Slack 패키지 설치하기

Airflow에는 다른 앱들과 연동해서 사용도 가능합니다. 다만, Airflow 자체 라이브러리가 아닌 별도의 패키지를 설치해서 기능을 확장해야 하는데, provider라고 합니다. provider package는 첨부의 링크에서 확인할 수 있습니다.


여기서 slack에서 제공하는 provide package를 사용하기 위해서는 아래와 같은 명령을 실행하여 패키지를 설치합니다.

pip3 install 'apache-airflow-providers-slack\[http\]'==8.6.0

2. Slack API Key 발급

Slack API: Applications 에서 "Create New App"을 클릭합니다. 클릭한 후에 "From scratch" > "App Name"을 설정하고 설치한 Slack Workspace을 지정합니다. 그 다음 Basic Information에서 Incoming Webhooks를 클릭하고, 활성화 및 워크 스페이스에 Webhook를 추가합니다.

이제 Airflow 알림과 Slack workspace 사이 연결을 할 준비가 되었는데, 메시지를 전송할 채널을 지정하고 URL을 복사합니다. 여기서 Webhook URL의 sevices 이후의 내용은 외부에 노출되지 않도록 잘 보관해야 합니다.

3. Airflow 웹서버 연결

Airflow UI의 웹서버 Admin - Connections - Add a new channels ('+' 표시) 순으로 연결을 지정합니다. 앞서 복사했던 webhook url을 활용해 아래와 같이 저장해줍니다.

  • Connection ID : 식별자로 airflow 코드와 동일하게 맞춰주기
  • Connection Type : HTTP
  • Host : https://hooks.slack.com/services
  • Password : webhook url에서 services/ 이후 전부

4. 코드

지금까지 airflow와 slack을 연결하고, 활용할 수 있는 기초 작업이 마무리됐습니다. 이제 필요한 코드를 작성합니다. 먼저 dag을 만들기 전에 필요한 함수를 같은 디렉토리에 utils 폴더를 만들고 아래 코드를 작성합니다.(파일명은 slack_notifier.py로 작성) 아래 코드에서는 webhook를 연결하고 슬랙에 알림을 보내는 함수를 정의합니다.

from airflow.providers.slack_operators.slack_webhook import SlackWebhookOperator


SLACK_DAG_CON_ID = "connection_id"


def send_message(slack_msg):
    return SlackWebhookOperator(
        task_id="send_message",
        http_conn_id=SLACK_DAG_CON_ID,
        message=slack_msg,
        username="airflow"
    )


def task_fail_slack_alert(context):
    slack_msg = f"""
                :red_circle: Task Failed.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)


def task_success_slack_alert(context):
    slack_msg = f"""
                :large_green_circle: Task Succeeded.
                *Task*: {context.get("task_instance").task_id}
                *Dag*: {context.get("task_instance").dag_id}
                *Execution Time*: {context.get("execution_date")}
                """
    alert = send_message(slack_msg)

    return alert.execute(context=context)

위 util 함수가 만들어지면 airflow를 움직이는 DAG을 작성합니다. DAG 작성에 필요한 기초 개념은 이전 글에 있으니 참고 바랍니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exception import AirflowFailException

from utils.slack_notifier import task_fail_slack_alert, task_success_slack_alert


default_args = {
    "owner": "your_name",
    "depends_on_past": False,           # False는 과거 Task의 성공 여부와 상관없이 실행
    "start_date": datetime(2024, 1, 1)
}


def _handle_job_error() -> None:
    raise AirflowFailException

with DAG(
    dag_id="slack_webhook_dag",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
    on_failure_callback=task_fail_slack_alert,
    on_success_callback=task_success_slack_alert,
) as dag:
    execution_date = "{{ ds }}"

    task_notification = PythonOperator(
        task_id="slack_alert",
        python_callable=_handle_job_error,
        op_args=[execution_date],
    )

    task_notification

참고자료

[1] 변성윤. "[Product Serving] Batch Serving과 Airflow"

반응형
반응형

MLflow는 머신러닝 관련 실험들을 관리하고 내용들을 기록할 수 있는 오픈소스 솔루션입니다. 실험의 소스 코드, 하이퍼 파라미터, 지표, 기타 부산물들을 저장하고, 해당 모델들을 협업하는 동료들과 공유할 필요가 있는데 하나의 MLflow 서버 위에서 각자 자기 실험을 만들고 공유할 수 있습니다.

 

비슷한 기능의 Weights & Biases가 있기는 하지만, 이는 별도 솔루션을 구매해야 하기 때문에 비용 측면에 있어서 강점이 있습니다. 

 

1. MLflow 설치와 실행

우선, MLflow가 없는 경우 설치가 필요합니다. 

pip install mlflow

 

설치가 완료되었다면, MLflow를 실행하기 위해서는 아래와 같은 코드를 통해 UI를 불러올 수 있습니다.

# UI 실행
$ mlflow ui

# 호스트 및 포트 지정
$ mlflow server --host 127.0.0.1 --port 8080

 

2. 훈련 기록

기록을 위해 모델을 훈련시키고 필요한 메타 데이터 저장을 아래와 같이 수행한다고 가정하겠습니다.

import mlflow
from mlflow.models import infer_signature


# Define the model hyperparameters
params = {
    "solver": "lbfgs",
    "max_iter": 1000,
    "multi_class": "auto",
    "random_state": 8888,
}

# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)

# Predict on the test set
y_pred = lr.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)

 

위 모델에서 나오는 데이터들을 기록할 수 있도록 아래 코드를 추가할 수 있습니다.

# Set our tracking server uri for logging
mlflow.set_tracking_uri(uri="http://127.0.0.1:8080")

# Create a new MLflow Experiment
mlflow.set_experiment("MLflow Quickstart")

# Start an MLflow run
with mlflow.start_run():
    # Log the hyperparameters
    mlflow.log_params(params)

    # Log the loss metric
    mlflow.log_metric("accuracy", accuracy)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Basic LR model for iris data")

    # Infer the model signature
    signature = infer_signature(X_train, lr.predict(X_train))

    # Log the model
    model_info = mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="iris_model",
        signature=signature,
        input_example=X_train,
        registered_model_name="tracking-quickstart",
    )

 

3. 모델 로드 및 추론

모델 기록이 끝난 후에 모델을 로드하고, 추론을 수행할 수 있습니다. 아래 코드는  mlflow의 pyfunc을 사용해 로드하는 예시 코드입니다. 

# Load the model back for predictions as a generic Python Function model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

predictions = loaded_model.predict(X_test)

iris_feature_names = datasets.load_iris().feature_names

result = pd.DataFrame(X_test, columns=iris_feature_names)
result["actual_class"] = y_test
result["predicted_class"] = predictions

 

4. MLflow 환경 관리 

MLflow에서는 실험은 하나의 프로젝트 단위로 생각할 수 있고, 그 아래에서 실행되는 여러 Run으로 구성됩니다. 실험은 아래와 같은 코드로 실행 가능합니다.

mlflow experiments create --experiment-name [실험명]

 

프로젝트를 어떤 환경에서 실행시킬지 정의하는 MLProject.yaml 파일을 작성합니다. 이 파일은 패키지 모듈의 상단에 위치해서 프로젝트의 메타 정보를 저장합니다.

name: My Project

python_env: python_env.yaml
# or
# conda_env: my_env.yaml
# or
# docker_env:
#    image:  mlflow-docker-example

entry_points:
  main:
    parameters:
      data_file: path
      regularization: {type: float, default: 0.1}
    command: "python train.py -r {regularization} {data_file}"
  validate:
    parameters:
      data_file: path
    command: "python validate.py {data_file}"

 

그리고 연결된 python_env.yaml 파일은 파이썬의 버전과 관련된 정보를 저장합니다.

# Python version required to run the project.
python: "3.8.15"
# Dependencies required to build packages. This field is optional.
build_dependencies:
  - pip
  - setuptools
  - wheel==0.37.1
# Dependencies required to run the project.
dependencies:
  - mlflow==2.3
  - scikit-learn==1.0.2

 

CLI로 mlflow run을 통해 프로젝트를 실행할 수 있습니다. 

mlflow run <projrect> --experiment-name <실험명>

참고자료

[1] 변성윤. "[Product Serving] 모델 관리와 모델 평가". boostcamp AI Tech. 

[2] MLflow 공식 문서 Tutorial :  https://mlflow.org/docs/latest/getting-started/intro-quickstart/index.html

반응형
반응형

Airflow와 DAG

이전 글에서 일정 기간 단위로 Product Serving 하는 것을 Batch Serving이라고 합니다. 이와는 다르지만, 일정 주기를 기준으로 소프트웨어 프로그램을 자동으로 실행(스케줄링)하는 것을 Batch Processing 입니다. 이러한 스케줄링을 진행하는 방법 중 하나로 Airflow가 대표적으로 사용되고 있습니다. 

 

2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴

 

Airflow는 Airbnb에서 개발한 스케줄링 및 워크플로우 도구입니다. Airflow의 작업들의 흐름과 순서를 정의하는 요소를 DAG(Directed Acylic Graph)라고 합니다. DAG는 Airflow의 가장 중요한 요소로 동작 방식, 실행 주기 등을 설정할 수 있습니다. DAG를 생성할 때 DAG() 클래스에 전달되는 주요 인자들은 다음과 같습니다. [1]

  • dag_id : DAG의 고유 식별자로 알파벳, 언더바 등 ASCII로 구성.
  • schedule : DAG의 실행주기를 설정. cron 표현식, timedelta 객체 등 사용. (2.4 버전 이전에는 schedule_interval)
  • start_date : DAG의 첫 실행 날짜를 지정. datetime.datetime을 사용.
  • end_date : DAG의 마지막 실행 날짜를 지정. datetime.datetime을 사용.
  • catchup : 과거에 지나간 일자에 대해 DAG 실행 여부
  • default_args : DAG 내 operator에게 공통적으로 적용할 인자를 딕셔너리로 전달. 
  • tags : DAG에 태그를 지정해 Airflow UI에서 필터링하고 관리하는데 사용. 
더보기

Cron 표현

* * * * * 에서 순서대로 분(0~59), 시(0~23), 일(1~31), 월(1~12), 요일(0~6)

Operator

Airflow에서 Operator는 또 다른 핵심 구성 요소로 작업을 정의하고 실행하는데 사용됩니다. 각각의 Operator는 고유한 작업을 수행하도록 설계되어 있으며 DAG 내에서 작업(task)로 사용됩니다. 주요하게 사용되는 Operator는 다음과 같습니다.

 

PythonOperator은 파이썬 함수와 같은 Callable 객체를 넘겨서 실행이 가능한 Operator입니다. 특정 데이터를 처리하거나 외부 API를 호출하는 등 작업을 수행하는데 사용할 수 있습니다. 사용 예제는 다음과 같습니다. 

from airflow.operators.python import PythonOperator

def my_function():
    print("Hello, Airflow!")

my_task = PythonOperator(
    task_id='my_task',
    python_callable=my_function,
)

 

특정 조건에 따라 다른 작업으로 넘어갈 때는 BranchPythonOperator를 사용할 수 있습니다. 사용 예제는 다음과 같습니다.

from airflow.operators.python import BranchPythonOperator

def choose_branch():
    return 'task_a' if some_condition else 'task_b'

branching_task = BranchPythonOperator(
    task_id='branching_task',
    python_callable=choose_branch,
)

 

BashOperator는 Bash 스크립트를 실행해야할 때 사용합니다. 쉘 스크립트를 실행하거나 시스템 명령어를 호출하는데 사용합니다. 사용 예제는 다음과 같습니다.

from airflow.operators.bash import BashOperator

my_task = BashOperator(
    task_id='my_task',
    bash_command='echo "Hello, Airflow!"',
)

 

이외에도 아무것도 실행하지 않는 DummyOperator (DAG 의 여러 태스크 중 기다려야 하는 상황에서 사용), 특정 호스트로 HTTP 요청을 보내고 응답을 반환하는데 사용하는 SimmpleHttpOperator 등 개발 작업을 하는데 필요한 많은 오퍼레이터들이 정의되어 있습니다.   


그리고 앞서 DAG에서 default_args를 통해 operator에 전달하는데, 작업에 적용되는 아래와 같은 인자들을 공통적으로 적용할 수 있습니다. 아래 언급한 인자 이외에 더 많은 것들도 있으니 공식 문서를 통해 확인하시면 좋습니다.[2]

  • owner : 작업의 소유자를 지정
  • depends_on_past : 특정 task가 이전 DAG 실행 결과에 의존할지 여부
  • start_date : 작업 시작 날짜
  • end_date : 작업 종료 날짜
  • retries : 작업 실패 시 재시도 횟수
  • retry_delay : 재시도 간 대기 시간을 timedelta 객체로 지정

그럼 이 내용까지 포함해서 DAG와 함께 사용하는 예제를 살펴보겠습니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email': ['test@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

with DAG(
    dag_id='example_dag',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    description='An example DAG',
    tags=['example'],
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "Hello, Airflow!"',
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command='echo "This is task 2."',
    )

    task1 >> task2   # task1 하고 task2 순서

 

Airflow의 전체 구조

Airflow는 크게 다음 그림과 같은 구조로 이뤄져 있습니다. 각 요소들의 역할은 아래 처럼 정리할 수 있습니다. 

  • Scheduler : 각종 메타 정보를 기록하는 역할을 수행합니다. DAG 디렉토리의 .py 파일들을 파싱해서 DB에 저장하고, DAG들의 스케줄링과 Executor를 통해 실행 시점이 되면 DAG을 실행하고 결과를 저장합니다.
  • Executor : 태스크를 실행하는 역할을 수행하나, Executor의 유형에 따라 외부 프로세스로 실행하는 Remote Executor를 사용하는 경우 외부 Worker로 전달합니다.
  • Worker : DAG의 작업을 수행하고, 생성되는 로그를 저장합니다.
  • Metadaa Database : Scheduler에 의해 메타데이터를 저장합니다. MySQL이나 PostgreSQL 등을 사용합니다.
  • Webserver : Web UI를 담당하고, 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화하는 역할을 수행합니다.

Airflow의 전체 구조 [3]

 

참고자료

[1] https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG

[2] https://airflow.apache.org/docs/apache-airflow/1.10.8/tutorial.html#default-arguments

[3] Architecture Overview — Airflow Documentation

[4] 변성윤. "[Product Serving] Batch Serving과 Airflow" 

반응형
반응형

이전에 머신러닝, 딥러닝 프로덕트의 디자인 패턴에 대해 살펴본 적이 있습니다. (아래 링크 참고)

 

2024.12.09 - [Note/IT 노트] - 머신러닝 / 딥러닝의 Product Serving과 디자인 패턴

 

그 중 하나인 Web Single 패턴은 API 서버 코드에 모델을 포함시켜 배포하는 방식으로 서비스(모델의 예측과 추론)가 필요한 곳에서 요청에 따라 모델의 결과를 반환하는 방식입니다. 이를 직접 구현하기 위해서는 아래와 같은 요소들을 구현할 필요가 있습니다.

 

  • Config 구현
  • 데이터베이스 구현
  • FastAPI 서버 구현
    • 모델 불러오기
    • 예측 결과 반환 및 저장 
    • 데이터베이스 조회

Config 구현

웹이 동작할 수 있도록 필요한 요소들을 정리해둔 config 파일을 만듭니다. 여기서 pydantic을 통해 데이터 검증을 수행합니다. pydantic에 대한 사용방법은 이전 글(아래)을 참고하시기 바랍니다.

# config.py
from pydantic import Field
from pydantic_settings import BaseSettings


class Config(BaseSettings):
    db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL")
    model_path: str = Field(default="model.joblib", env="MODEL_PATH")
    app_env: str = Field(default="local", env="APP_ENV")


config = Config()

데이터베이스 구현

데이터를 저장하고 가져오기 위한 데이터베이스를 구현합니다. FastAPI에서 활용할 수 있는 SQLModel을 활용하여 config에서 정의한 db_url로 구동을 위한 엔진을 만듭니다. 

# database.py
import datetime
from sqlmodel import SQLModel, Field, create_engine
from config import config 


class PredictionResult(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    result: int
    created_at : str = Field(default_factory=datetime.datetime.now) # 동적으로 기본값 설정


engine = create_engine(config.db_url)

모델 불러오기

모델을 불러올 수 있도록 별도의 의존성 파이썬 파일을 만듭니다. 모델을 가져오는 등의 기능을 구현할 파일입니다. 

# dependency.py
model = None


def load_model(model_path: str):
    import joblib

    global model
    model = joblib.load(model_path)

 

그리고 FastAPI 서버의 메인 코드에 아래와 같이 구현해 최초 생성할 때 데이터베이스 테이블을 만들고 모델을 로드합니다. lifespan을 통해 애플리케이션이 구동될 때 작동할 수 있도록 합니다. 

# main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from loguru import logger
from sqlmodel import SQLModel

from config import config
from database import engine
from dependency import load_model 


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 데이터베이스 테이블 생성
    logger.info("Creating databas table")
    SQLModel.metadata.create_all(engine)

    # 모델 로드
    logger.info("Loading model")
    load_model(config.model_path)

    yield


app = FastAPI(lifespan=lifespan)

예측 결과 반환 및 저장

애플리케이션이 동작할 때 모델이 로드되었으므로 이 모델을 활용해서 예측을 요청받으면 예측 결과를 반환하고 저장해야 합니다. 하지만, 어떤 방식으로 요청을 받고 응답을 할지 정의되어 있지 않으므로 추가적인 정의가 필요한데, 이를 위해 schema라는 파일을 만듭니다. schema는 네트워크를 통해 데이터를 주고 받을 때 어떤 형태로 주고 받을지 정의하는 것을 말합니다.

 

아래 코드에서는 요청을 받을 때는 feature를 리스트로 받고, 예측 결과를 반환할 때는 id와 result를 정수형으로 반환하도록 했습니다.   

# schemas.py
from pydantic import BaseModel


class PredictionRequest(BaseModel):
    feature: list


class PredictionResponse(BaseModel):
    id: int
    result: int

 

그리고 조금 더 유연한 적용을 위해 모델을 가져올 수 있도록 dependency 파일에 get_model 함수를 추가합니다. 

# dependency.py

def get_model():
    global model
    return model

 

 

이제 api.py 파일을 작성합니다. api.py는 클라이언트에서 API를 호출하고, 학습 결과를 반환하는 역할을 수행합니다. 스크립트는 아래 같은 순서로 작성됩니다.

  1. APIRouter를 이용해 라우팅을 관리합니다. (앱이 작을 때는 없어도 무관하나, 앱이 커질 경우를 대비해 미래 관리하는 것이 좋을 것 같습니다)
  2. get_model 함수로 모델을 가져오고 예측을 수행하게끔 합니다. (이 부분은 예측 모델 함수의 I/O에 ㄸ라서 달라질 수 있습니다)
  3. 정의된 schema인 PredictionResult에 넣습니다.
  4. SQLModel의 Session을 통해 데이터베이스에 결과를 넣고 저장합니다. 
# api.py
from fastapi import APIRouter
from sqlmodel import Session

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


router = APIRouter()

@router.post("/predict")
def predict(request:PredictionRequest) -> PredictionResponse:
    model  = get_model()
    prediction = int(model.predict([request.feature])[0]) # 반환 결과에 따라 변경
    prediction_result = PredictionResult(result=prediction)

	# 데이터베이스 연결 관리 
    with Session(engine) as session: 
        session.add(prediction_result)      # 새로운 객체를 세션에 추가
        session.commit()                    # 세션 변경사항을 DB에 저장
        session.refresh(prediction_result)  # 세션에 있는 객체를 업데이트 

    return PredictionResponse(id=prediction_result.id, result=prediction)

 

이를 동작하게 하려면 main.py에 아래 코드를 추가하면 됩니다. 앞서 라우팅을 선언했으니 include_router로 충분합니다.

# main.py
...
from fastapi import router 

...
app = FastAPI(lifespan=lifespan)
app.include_router(router)  # router를 포함하도록 추가

데이터베이스 조회

저장한 데이터베이스를 GET 메서드로 조회하기 위한 코드입니다. 아래 코드는 Session 객체에서 데이터베이스의 전체를 선택하고 가져오는 방법입니다. SQLModel에서 select를 통해 SQL의 SELECT와 동일한 구문을 생성하고 결과를 조회해서 PredictionResponse schema에 가져옵니다. 

# api.py
from sqlmodel import Session, select

from schemas import PredictionRequest, PredictionResponse
from dependency import get_model
from database import PredictionResult, engine


@router.get("/predict")
def get_predictions() -> list[PredictionResponse]:
    with Session(engine) as session:
        statement = select(PredictionResult)
        prediction_results = session.exec(statement).all()

        return [
            PredictionResponse(id=prediction_results.id, result=prediction_results.result)
            for prediction_result in prediction_results
        ]

이번에는 id를 필터링해서 데이터베이스에서 id로 필터링해서 조회하는 방법입니다. 예외 처리를 추가했고, id로 get 조회하는 것이 다릅니다. 

# api.py
from fastapi import HTTPException, status


def get_prediction(id: int) -> PredictionResponse:
    with Session(engine) as session:
        prediction_result = session.get(PredictionResult, id)
        if not prediction_result:
            raise HTTPException(
                detail="Not found", status_code=status.HTTP_404_NOT_FOUND
            )
        return PredictionResponse(
            id=prediction_result.id, result=prediction_result.result
        )

 

참고자료

[1] https://github.com/zzsza/Boostcamp-AI-Tech-Product-Serving/tree/main/02-online-serving(fastapi)/projects/web_single

 

반응형