본문 바로가기

python

Architecture Patterns with Python(8장)

이벤트와 메시지 버스

 

비즈니스 로직을 만족하는 요소가 구현된 이후에, 무언가를 끼워 넣어야 하는 경우에는 아키텍처를 어떻게 유지할 수 있을까

 

도메인 이벤트 패턴과 이벤트에 대한 메시지 버스 패턴을 활용하는 방법을 예시로 알아본다.


앞서 계속해서 예제로 사용했던 주문과 배치 코드에서 재고가 없는 경우, 구매팀에 이메일로 통지하는 로직을 추가하려고 한다.

일반적으로 이러한 요구 사항이 생기면, 별 생각 없이 웹 컨트롤러에 넣기 쉽다.

 

8.1 지저분해지는 일 막기

8.1.1 웹 컨트롤러가 지저분해지는 일을 막자

@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
    line = model.OrderLine(
        request.json["orderid"],
        request.json["sku"],
        request.json["qty"],
    )
    try:
        uow = unit_of_work.SqlAlchemyUnitOfWork()
        batchref = services.allocate(line, uow)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            'out of stock',
            'sotck_admin@made.com',
            f'{line.orderid} - {line.sku}'
        )
        return jsonify({"message": str(e)}), 400

    return jsonify({"batchref": batchref}), 201

 

위와 같이 컨트롤러 여기저기에 기능을 끼워 넣으면 전체가 빠르게 지저분해진다. 이메일을 보내는 일은 HTTP 계층이 처리해야 할 일이 아닐 뿐더러, 새로 추가된 기능에 대한 단위 테스트를 진행할 수 있어야 한다.

 

8.1.2 모델이 지저분해지는 일을 막자

def allocate(self, line: OrderLine) -> str:
    try:
        batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
        batch.allocate(line)
        self.version_number += 1
        return batch.reference
    except StopIteration:
        email.send.mail('stock@made.com', f'Out of stock for {line.sku}')
        raise OutOfStock(f'Out of stock for sku {line.sku}')

 

이 방법도 좋지 않다. 모델이 email.send_mail 과 같은 인프라 구조에 의존하는 것은 바람직하지 않다.

도메인 모델은 단지 '실제 할당할 수 있는 것보다 더 많은 상품을 할당할 수는 없다' 라는 규칙에 집중해야 한다.

도메인 모델의 일은 재고가 부족한지 알아내는 것 뿐, 메일을 보내는 일은 다른 곳에서 해야 한다.

 

이러한 기능은 끄고 켤 수 있어야 하며, 도메인 모델의 규칙을 변경하지 않아도 전송 방식을 바꿀 수 있어야 한다(메일->SMS)

 

8.1.3 서비스 계층이 지저분해지는 일을 막자

 

'재고를 할당하려고 시도하고 할당에 실패하면 이메일을 보내야 한다' 라는 요구 사항은 오케스트레이션에 속한다. 앞서 오케스트레이션을 처리하기 위해 서비스 계층을 작성했다.

 

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
            raise

 

예외처리를 해서 메일을 보내고 다시 예외를 발생시키는 구조가 많이 어색해 보인다.

 

8.2 단일 책임 원칙

더불어 위의 코드는 SOLID 중에서도 단일 책임 원칙(single responsibility principle ; SRP) 위배다. 서비스 함수나 도메인 메서드의 이름은 모두 allocate 이지 allocate_and_send_mail_if_out_of_stock 과 같은 게 아니다.

 

함수 이름에 and, then 등의 단어를 사용했다면 이는 단일 책임 원칙을 위배했을 가능성이 높다.

 

SRP 는 어떤 클래스 혹은 함수를 수정해야 하는 이유가 단 하나만 존재해야 함을 뜻한다고 볼 수도 있다. 따라서 이메일을 SMS로 변경할 때는 allocate() 함수를 변경할 필요가 없어야 한다.

 

이러한 점들을 고려했을 때, 서비스 계층을 분리하여 이벤트와 관련된 추상화에 의존하도록 하는 방향으로 수정한다.

 

8.3 메시지 버스에 전부 다 싣기

8.3.1 이벤트를 기록하는 모델

 

모델은 이메일을 신경 쓰지 않고 이벤트 기록을 담당한다. 이벤트는 발생한 일에 대한 사실을 뜻한다. 이벤트에 응답하고 새로운 연산을 실행하기 위해 메시지 버스를 이용한다.

 

8.3.2 이벤트는 간단한 데이터 클래스다

 

이벤트는 값 객체에 속한다. 이벤트는 순수 데이터 구조이므로 아무 동작이 없다. 이벤트를 model.py 에 저장할 수도 있지만 이벤트만 다루는 별도의 파일로 만들 수도 있다. domain 이라는 디렉토리 밑에 domain/model.py 와 domain/event.py 로 구분한다.

 

domain/event.py

from dataclasses import dataclass

class Event:
    pass
    
@dataclass
class OutOfStock(Even):
    sku: str

 

8.3.3 모델은 이벤트를 발생시킨다

 

도메인 모델은 발생한 사실을 기록하기 위해 이벤트를 발생시킨다.

Product 할당을 요청했을 때 할당이 불가능하면 이벤트가 발생해야 한다.

 

def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine("order1", "SMALL-FORK", 10))

    allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
    assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")
    assert allocation is None

 

에그리게이트인 product 는 .events 라는 새로운 attribute 를 외부에 노출한다. product.events 에는 이벤트에 대한 사실을 담아놓은 리스트다.

 

class Product:
    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            batch.allocate(line)
            self.version_number += 1
            return batch.reference
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))
            # raise OutOfStock(f'Out of stock for sku {line.sku}')
            return None

 

Product 모델은 이메일을 보내는 코드를 직접 호출하지 않고, 대신 이메일에 필요한 품절 이벤트가 발생한 시점에 이를 기록하여 .event 속성에 추가한다. 그리고 품절이 발생했을 때, 예외를 일으키지 않는다. 왜냐하면 예외 케이스에 따라 수행해야 하는 일이 이어서 수행되기 때문이다. (이메일 보내기)

 

8.3.4 메시지 버스는 이벤트를 핸들러에 매핑한다

 

메시지 버스는 기본적으로 "이 이벤트가 발생하면 다음 핸들러 함수를 호출해야 한다" 라고 말한다. 메시지 버스는 간단한 발생/구독 시스템이다. 핸들러는 수신된 이벤트를 구독한다.

 

from typing import List, Dict, Callable, Type
from allocation.adapters import email
from allocation.domain import events


def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)


def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )


HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

 

8.4 첫 번째 : 서비스 계층이 모델에서 이벤트를 가져와 메시지 버스에 싣는다

도메인 모델은 이벤트를 발생시키고, 메시지 버스는 이벤트가 발생하면 적절한 핸드러를 호출한다. 모델에서 이벤트를 찾아서 메시지 버스에 실어주는 발행(publishing) 단계를 실행할 무언가가 필요하다.

 

간단하 방법은 서비스 계층에 코드를 추가하는 것이다.

 

from . import messagebus

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:
            batch = product.allocate(line)
            uow.commit()
            return batchref            
        finally:
            messagebus.handle(product.events)

 

위의 코드에서 서비스 계층은 이메일 인프라에 직접 의존하는 대신에 모델에서 받은 이벤트를 직접 메시지 버스에 올리는 일만 담당한다.

 

8.5 두 번째 : 서비스 계층은 자신만의 이벤트를 발생시킨다

 

첫 번째와는 조금 다르게, 서비스 계층이 도메인 모델에서 발생한 이벤트를 처리하기보다 직접 이벤트를 만들고 발생시키는 일을 책임지는 방식이 있다.

 

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit()
        
        if batchref is None:
            messagebus.handle(events.OutOfStock(line.sku))
        return batchref

- 할당에 실패하더라도 커밋을 한다. 무언가 잘못되지 않으면 무조건 커밋이 이루어진다.

 

8.6 세 번째 : UoW 가 메시지 버스에 이벤트를 발행한다.

UoW 는 저장소에 대한 접근을 제공하므로 현재 어떤 에그리게이트가 작업을 수행하는지 알고 있다. 따라서 이벤트를 찾아서 메시지 버스에 전달하기 좋은 곳이다.

 

class AbstractUnitOfWork(abc.ABC):
    products: repository.AbstractRepository

    ...

    def commit(self):
        self._commit()
        self.publish_events()

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError
        
    ...
    

class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    ...

    def _commit(self):
        self.session.commit()

- 커밋 후에 저장소에 전달된 모든 객체를 살펴보고 그 중에 이벤트를 메시지 버스에 전달한다.

- .seen 은 저장소가 모든 에그리게이트를 추적하기 위해 의존하는 속성이다.

 

class AbstractRepository(abc.ABC):
    def __init__(self):
        self.seen = set()  # type: Set[model.Product]

    def add(self, product: model.Product):
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):
        raise NotImplementedError

    @abc.abstractmethod
    def _get(self, sku) -> model.Product:
        raise NotImplementedError


class SqlAlchemyRepository(AbstractRepository):
    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):
        self.session.add(product)

    def _get(self, sku):
        return self.session.query(model.Product).filter_by(sku=sku).first()

 

- UoW 가 새 이벤트를 발행하기 위해서는 저장소에 요청해 이번 세션에 어떤 Product 객체를 사용했는지 알아야 한다. .seen 이라는 집합을 통해 사용한 Product 객체를 저장한다.

 

UoW 와 저장소가 이런 식으로 협력하면서 자동으로 객체를 추적하고 발생한 이벤트를 처리하면 서비스 계층은 이벤트 처리와는 무관하게 된다.

 

8.7 정리

도메인 이벤트는 시스템에서 워크플로를 다루는 또 다른 방법이다. "X일 때 Y를 하자" 라는 말은 이벤트를 추가하자는 말을 뜻한다. 도메인 이벤트 아키텍처를 활용하면 코드를 더 테스트하기 좋고 관찰하기 쉽게 만들 수 있다.

 

메시지 버스를 사용하면 어떤 요청에 대한 응답으로 여러 동작을 수행하는 경우 분리할 수 잇다. 또한 핵심 애플리케이션과 로직을 분리하여 이벤트 핸들러 구현을 쉽게 변경할 수 있다.

 

다만, 이벤트 처리 코드가 동기적으로 수행되기 때문에 모든 핸들러가 끝나기 전에는 서비스 계층 함수가 종료될 수 없다. 따라서 웹 엔드포인트에서 예상할 수 없는 성능 문제가 발생할 수 있다. 비동기 처리를 추가할 수도 있지만 로직이 복잡해진다.

여러 핸들러에 의해 분할된 워크플로는 혼란을 야기하고 순환적으로 서루 의존하여 무한 루프가 발생할 여지도 있다.