메시지 버스를 타고 시내로 나가기
8장에서 만든 메시지 버스를 보다 근본적인 요소로 만들어본다.
메시지 버스가 서비스 계층의 주 진입 지점으로 만든다.
9.1 새로운 아키텍처가 필요한 새로운 요구 사항
다음의 예기치 못한 상황이 발생했다고 가정해본다.
- 재고 조사를 하는 동안 지붕에 물이 새서 SPRINGY-MATTRESS 3개가 손상된 것을 발견했다.
- RELIABLE-FORK 배송에서 필요한 문서가 빠져서 몇 주 동안 세관에 머물러야 했다. 이후 RELIABLE-FORK 3개가 안전 검사에서 실패해 폐기했다.
- 세계적으로 반짝이는 금속이 부족해져서 다음 SPARKLY-BOOKCASE 배치를 생산할 수 없게 됐다.
이런 유형의 상황을 통해 시스템에 있는 배치 수량을 변경해야 한다는 사실을 알게 됐다.
시스템에서 BatchQuantityChanged 라는 예외조건을 정의하여, 해당 이벤트가 발생하면 배치의 수량을 변경해야 한다. 변경 후 수량이 이미 할당된 수량보다 적어지면, 배치를 할당 해제(deallocate) 해야 한다. 이후 각각을 새로 할당한다. 이를 AllocationRequired 라는 이벤트로 표현한다.
배치 수량을 조정하고 과도한 주문 라인을 할당 해제하는 change_batch_quantity 라는 서비스를 정의하고, 할당 해제가 일어날 때마다 AllocationRequired 이벤트를 발생시켜 기존 allocate 서비스에게 별도의 트랜잭션으로 전달한다.
9.1.1 구조 변경 상상해보기: 모든 것이 이벤트 핸들러다
시작에 앞서, 시스템의 두 가지 종류의 흐름에 대해 생각해본다.
- 서비스 계층 함수에 의해 처리되는 API 호출
- 내부 이벤트와 그 이벤트에 대한 핸들러
서비스 계층 함수와 이벤트 핸들러를 구분하는 것보다 모든 것이 이벤트 핸들러라면 보다 간단할 것 같다. 서비스 계층 함수도 하나의 이벤트로 생각하면, 더 이상 내부와 외부 이벤트 핸들러를 구분할 필요가 없다.
- services.allocate() 는 AllocationRequired 이벤트의 핸들러이거나 Allocate 이벤트를 출력으로 보낼 수 있다.
- services.add_batch() 도 BatchCreated 이벤트의 핸들러라고 볼 수 있다.
기존의 service 내부 함수를 이벤트로 봤을 때, 위와 같이 생각할 수 있으며, 새로운 요구 사항도 같은 패턴으로 볼 수 있다.
- BatchQuantityChanged 이벤트는 change_batch_quantity() 라는 핸들러를 호출할 수 있다.
- 새로운 AllocationRequired 이벤트가 services.allocate() 를 호출할 수 있다. 따라서 API 에서 새로운 할당 요청이 들어오는 것과 내부에서 할당 해제에 의해 발생하는 재할당은 개념상 구분되지 않는다.
서비스 계층 함수가 동작하는 사전 조건도 이벤트로 취급하여 이벤트 핸들러로 통합한다면, 새로운 요구 사항이 추가되었을 때 각 요구 사항을 이벤트로 정의하고 이에 해당하는 함수를 추가하거나 기존 함수를 호출할 수 있다.
점진적으로 수정해본다.
- 서비스 계층을 이벤트 핸들러로 리팩토링한다. 기존 services.allocate() 함수는 AllocationRequired 라는 이벤트의 핸들러가 된다.
- BatchQuantityChanged 이벤트를 시스템에 넣고 Allocated 이벤트가 발생하는지 검사하는 엔드 투 엔드 테스트를 만든다.
- BatchQuantityChanged 에 대한 새로운 핸들러를 만들고, 이 핸들러 구현은 AllocationRequired 이벤트를 발생시킨다. 그리고 API 에서 사용하는 핸들러와 같은 핸들러(=services.allocate())가 AllocationRequired 이벤트를 처리한다.
이 과정에서 메시지 버스와 UoW를 약간 변경하여 새 이벤트를 메시지 버스에 넣는 책임을 버스 자체로 옮긴다.
9.2 서비스 함수를 메시지 핸들러로 리팩터링하기
현재 API 입력을 받는 두 이벤트를 정의하는 것부터 시작한다. AllocationRequired, BatchCreated 두 이벤트다.
@dataclass
class BatchCreated(Event):
ref: str
sku: str
qty: int
eta: Optional[date] = None
@dataclass
class AllocationRequired(Event):
orderid: str
sku: str
qty: int
파일명 service.py 를 handler.py 로 바꾼다. 그리고 messagebus.py 에 있던 기존 메시지 핸들러인 send_out_of_stock_notification 을 추가한다. 주의할 점은 기존 핸들러를 포함한 모든 핸들러가 같은 입력(UoW 와 이벤트)을 갖도록 변경하는 것이다. 기존 서비스 계층 함수들은 이벤트를 인자로 받지 않았으나 각 함수가 그에 정의된 목적과 일치하는 이벤트를 인자로 받도록 한다.
def add_batch(
event: events.BatchCreated,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get(sku=event.sku)
if product is None:
product = model.Product(event.sku, batches=[])
uow.products.add(product)
product.batches.append(
model.Batch(event.ref, event.sku, event.qty, event.eta)
)
uow.commit()
def allocate(
event: events.AllocationRequired,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(event.orderid, event.sku, event.qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
def change_batch_quantity(
event: events.BatchQuantityChanged,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get_by_batchref(batchref=event.ref)
product.change_batch_quantity(ref=event.ref, qty=event.qty)
uow.commit()
def send_out_of_stock_notification(
event: events.OutOfStock,
uow: unit_of_work.AbstractUnitOfWork,
):
email.send(
"stock@made.com",
f"Out of stock for {event.sku}",
)
또한 기본 원시 타입으로 받던 인자를 정의된 객체로 변경시켰다. (sku -> even.sku)
앞서 5장에서 도메인 객체로 정의된 인자를 원시타입으로 바꾸는 작업을 했었는데, 다시 도메인 객체만 아닐 뿐 어찌됐든 객체로 수정했다. 아무런 생각없이 기본 타입에 대한 집착을 갖는 것은 분명 안티 패턴이다. 다만 5장에서의 작업은 도메인 객체와의 의존성을 낮추고 이를 분리시키기 위한 작업이었으므로 복잡도를 높이는 일은 아니었다.
이번에 다시 객체를 활용한 것은 도메인 객체와의 연결이 끊어진 상태에서 이벤트 객체를 도입하면, 애플리케이션에서 특정 유스 케이스를 호출할 때, 기본 타입들의 조합을 기억할 필요가 없다. 보다 효율적으로 테스트할 수 있게 된다.
9.2.1 메시지 버스는 이제 이벤트를 UoW로부터 수집한다
기존 메시지 버스에 있던 send_out_of_stock_notification 핸들러가 서비스 계층으로 옮겨가면서 UoW 를 인자로 받는다.
이벤트 핸들러는 이제 UoW 가 필요하다. 추가로 애플리케이션에서 메시지 버스는 더 중심 위치를 차지하게 됐다. 메시지 버스가 명시적으로 새 이벤트를 수집하고 처리하는 것을 담당하는 편이 더 타당하다.
현재는 UoW 와 메시지 버스 사이에 순환적인 의존성이 존재한다. 이를 단방향으로 만들어야 한다.
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork, # 1
):
queue = [event] # 2
while queue:
event = queue.pop(0) # 3
for handler in HANDLERS[type(event)]: # 3
handler(event, uow=uow) # 4
queue.extend(uow.collect_new_events()) # 5
- 메시지 버스는 이제 시작할 때마다 UoW를 전달받는다.
- 첫 번째 이벤트를 처리할 때 대기열을 시작한다.
- 대기열의 앞에서 이벤트를 pop 해서 적절한 핸들러에 넘긴다.
- 메시지 버스는 UoW 를 각 핸들러에 전달한다.
- 핸들러가 종료되면 새롭게 생성된 이벤트를 수집하고, 이 이벤트를 대기열에 추가한다.
기존 UoW 의 publish_events() 는 직접 메시지 버스의 핸들러를 호출하는 능동적인 함수였으나 collect_new_events 는 이벤트를 수집하는 행위만을 담당하는 덜 능동적인 함수로 바꼈다.
# 기존 커밋 함수
def commit(self):
self._commit()
self.publish_events()
# 기존 UoW 의 publish_events
def publish_events(self):
for product in self.products.seen:
while product.events:
event = product.events.pop(0)
messagebus.handle(event)
# 변경된 커밋 함수
def commit(self):
self._commit()
# 변경된 collect_new_events
def collect_new_events(self):
for product in self.products.seen:
while product.events:
yield product.events.pop(0)
- collect_new_events 함수는 더 이상 메시지 버스에 의존하지 않는다.
- 커밋이 일어나도 더 이상 publish_events 함수를 자동으로 호출하지 않는다. 대신 메시지 버스는 이벤트 대기열을 추적한다.
- UoW 는 더 이상 메시지 버스에 메시지를 능동적으로 추가하지 않는다. 단지 이벤트만 제공한다.
9.2.2 모든 테스트는 이벤트 바탕으로 다시 쓸 수 있다
이제 모든 테스트가 서비스 계층의 함수를 직접 호출하는 대신, 이벤트를 만들어서 메시지 버스에 넣는 방식으로 작동하게 만들 수 있다.
class TestAddBatch:
def test_for_new_product(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
)
assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
assert uow.committed
class TestAllocate:
def test_returns_allocation(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
)
results = messagebus.handle(
events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
)
assert results.pop(0) == "batch1"
9.2.3 임시 : 결과를 반환해야 하는 메시지 버스
API 와 서비스 계층은 allocate() 핸드러를 호출할 때 할당된 배치에 대한 참조를 알고 싶다. 즉, 메시지 버스를 임시로 고쳐서 이벤트를 반환하게 해야 한다.
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork,
):
results = []
queue = [event]
while queue:
event = queue.pop(0)
for handler in HANDLERS[type(event)]:
results.append(handler(event, uow=uow))
queue.extend(uow.collect_new_events())
return results
이렇게 해야 하는 이유는 시스템에서 읽기와 쓰기의 책임이 혼합되어 있기 때문인데 자세한 사항은 12장에서 더 살펴본다.
9.2.4 이벤트로 작동하도록 API 변경
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
try:
event = events.AllocationRequired(
request.json["orderid"], request.json["sku"], request.json["qty"]
)
results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork())
batchref = results.pop(0)
except InvalidSku as e:
return {"message": str(e)}, 400
return {"batchref": batchref}, 201
- json 요청에서 추출한 여러 기본 타입 값으로 서비스 계층을 호출하는 대신, 이벤트를 인스턴스화 하고 메시지 버스에 이벤트를 전달한다.
애플리케이션이 이벤트 기반으로 작동할 수 있게 됐다.
- 서비스 계층 함수들은 이제 이벤트 핸들러가 됐다.
- 서비스 계층 함수 호출과 도메인 모델에서 발생한 내부 이벤트를 처리하기 위한 함수 호출이 동일해졌다.
- 이벤트는 시스템 입력을 잡아내는 데이터 구조로 사용한다. 동시에 내부 작업 단위를 전달하기 위한 데이터 구조로도 사용한다.
9.3 새로운 요구 사항 구현하기
리팩터링 단계를 마치고, 새로운 요구 사항을 구현해본다.
입력으로 새 BatchQuantityChanged 이벤트를 받아 핸들러에 넘기고, 이 핸들러는 다시 어떤 AllocationRequired 이벤트를 발생시킨다. 이는 다시 기존 핸들러에 넘겨져서 재할당을 일으킬 수 있다.
9.3.1 새로운 이벤트
배치 수량의 변경을 알려주는 이벤트는 단순하다. 단지 배치에 대한 참조와 새로운 수량만 있으면 된다.
@dataclass
class BatchQuantityChanged(Event):
ref: str
qty: int
9.4 새 핸들러 시범운영하기
class TestChangeBatchQuantity:
def test_changes_available_quantity(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
)
[batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
assert batch.available_quantity == 100
messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)
assert batch.available_quantity == 50 # 1
def test_reallocates_if_necessary(self):
uow = FakeUnitOfWork()
event_history = [
events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
]
for e in event_history:
messagebus.handle(e, uow)
[batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
assert batch1.available_quantity == 10
assert batch2.available_quantity == 50
messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
# order1 or order2 will be deallocated, so we'll have 25 - 20
assert batch1.available_quantity == 5 # 2
# and 20 will be reallocated to the next batch
assert batch2.available_quantity == 30 # 2
- 수량만 변경하면 되는 간단한 경우는 구현이 간단하다.
- 할당된 수량보다 더 작게 수량을 바꾸면 최소 주문 한 개를 할당 해제하고 새로운 배치에 이 주문을 재할당해야 한다.
9.4.1 구현
새로운 핸들러를 구현한다.
def change_batch_quantity(
event: events.BatchQuantityChanged,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get_by_batchref(batchref=event.ref)
product.change_batch_quantity(ref=event.ref, qty=event.qty)
uow.commit()
batchref 로 product 를 가져오는 질의 타입 추가가 필요하다.
class AbstractRepository(abc.ABC):
...
def get_by_batchref(self, batchref) -> model.Product:
product = self._get_by_batchref(batchref)
if product:
self.seen.add(product)
return product
...
@abc.abstractmethod
def _get_by_batchref(self, batchref) -> model.Product:
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
...
def _get_by_batchref(self, batchref):
return (
self.session.query(model.Product)
.join(model.Batch)
.filter(orm.batches.c.reference == batchref)
.first()
)
FakeRepository 도 변경한다.
class FakeRepository(repository.AbstractRepository):
...
def _get_by_batchref(self, batchref):
return next(
(p for p in self._products for b in p.batches if b.reference == batchref),
None,
)
9.2.4 도메인 모델의 새 메서드
모델에 새 메서드를 추가한다. 이 메서드는 수량을 변경하자마자 인라인으로 할당을 해제하고 새 이벤트를 발행한다. 그리고 기존 할당 함수를 이벤트가 발생하도록 변경한다.
class Product:
...
def change_batch_quantity(self, ref: str, qty: int):
batch = next(b for b in self.batches if b.reference == ref)
batch._purchased_quantity = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(
events.AllocationRequired(line.orderid, line.sku, line.qty)
class Batch:
...
def deallocate_one(self) -> OrderLine:
return self._allocations.pop()
새 핸들러를 이벤트와 연결한다.
HANDLERS = {
events.BatchCreated: [handlers.add_batch],
events.BatchQuantityChanged: [handlers.change_batch_quantity],
events.AllocationRequired: [handlers.allocate],
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
이로써 요구 사항을 모두 구현했다.
9.5 선택: 가짜 메시지 버스와 독립적으로 이벤트 핸들러 단위 테스트하기
재할당 워크플로에 대한 주 테스트는 edge to edge 테스트다. 테스트는 실제 메시지 버스를 사용하고 전체 워크플로를 테스트한다. 이 워크플로에서는 BatchQuantityChanged 핸들러가 할당 해제를 일으키고, 새 AllocationRequired 이벤트가 발생한다. 이 이벤트에 대한 핸들러가 다시 호출된다. 이 테스트는 여러 이벤트와 핸들러의 연쇄를 테스트한다.
이벤트 연쇄의 복잡도에 따라 다른 핸들러와 독립적으로 일부 핸들러를 테스트하고 싶을 수도 있다. '가짜' 메시지 버스를 사용하면 이런 테스트를 할 수 있다.
다음 예제에서 FakeUnitOfWork 의 publish_events() 메서드를 변경해 실제 메시지 버스와 분리할 수 있다. 이 때 메시지 버스에 이벤트를 넣는 대신 발생시킨 이벤트를 리스트에 저장한다.
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
def __init__(self):
super().__init__()
self.events_published = [] # type: List[events.Event]
def publish_events(self):
for product in self.products.seen:
while product.events:
self.events_published.append(product.events.pop(0))
이제 FakeUnitOfWorkWithFakeMessageBus 를 사용해 messagebus.handle() 를 호출하면 이벤트의 핸들러만 실행된다. 따라서 서로 더 격리된 단위 테스트를 작성할 수 있다. 모든 부수 효과를 검사하는 대신 이미 할당된 수량이 수량 이하로 내려갈 경우 BatchQuantityChanged 가 AllocationRequired 이벤트를 발생시키는지만 검사할 수 있다.
def test_reallocates_if_necessary(self):
uow = FakeUnitOfWorkWithFakeMessageBus()
event_history = [
events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
]
for e in event_history:
messagebus.handle(e, uow)
[batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
assert batch1.available_quantity == 10
assert batch2.available_quantity == 50
messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
# 부수 효과로 이벤트 방출 대신 발생한 이벤트에 대해 어서션으로 조건 검사
[reallocation_event] = uow.events_published
assert isinstance(reallocation_event, events.AllocationRequired)
assert reallocation_event.orderid in {'order1', 'order2'}
assert reallocation_event.sku == 'INDIFFERENT-TABLE'
이런 식의 테스트를 수행할 지는 이벤트 연쇄의 복잡도에 달려 있다. 처음에는 edge to edge 테스트를 만들고 꼭 필요할 때만 독립적인 테스트를 작성하는 것이 좋다.
9.6 마치며
9.6.1 시스템을 어떻게 변경했는가?
이벤트는 시스템 안의 내부 메시지와 입력에 대한 데이터 구조를 정의하는 간단한 데이터 클래스다. 이벤트는 비즈니스 언어로 아주 잘 번역되기 때문에 DDD 관점에서 이벤트는 상당히 강력한 개념이다. (참고 : 이벤트 스토밍)
핸들러는 이벤트에 반응하는 방법이다. 핸들러는 모델을 호출하거나 외부 서비스를 호출할 수 있다. 원한다면 한 이벤트에 여러 핸들러를 정의할 수도 있다. 또한 핸들러는 다른 이벤트를 만들어낼 수도 있다. 이를 통해 핸들러가 수행하는 일의 크기를 세밀하게 조절해서 SRP를 유지할 수도 있다.
9.6.2 왜 이렇게 시스템을 변경했는가?
이런 아키텍처 패턴을 채택한 목적은 애플리케이션의 크기가 커지는 속도보다 복잡도가 증가하는 속도를 느리게 만들기 위해서이다.
메시지 버스에 실으면 아키텍처의 복잡도 측면에서는 비용을 지불하지만, 필요한 작업을 수행하기 위해 더 이상 개념적으로나 아키텍처적으로 코드를 변경할 필요 없이 요구 사항 대부분을 거의 다 처리할 수 있는 패턴을 얻게 된다. 여기서는 꽤 복잡한 유스 케이스(수량 변경, 할당 해제, 새로운 트랜잭션 시작, 재할당, 외부에 통지)를 추가 했지만, 아키텍처 측면에서 보면 이런 내용을 추가해도 아무 복잡도 추가가 없다.
새로운 이벤트와 새로운 핸들러를 추가하고 외부 어댑터(이메일)를 새로 붙였지만, 이 모두는 이미 우리가 이해하고 어떻게 추론해야 할지 잘 아는 아키텍처에 속한 object 의 범주에 들어간다. 따라서 이를 새로운 개발자에게 설명하기도 쉽다. 이 아키텍처에서 움직이는 부품은 모두 한 가지 작업만 담당하고, 서로 잘 정의된 방식으로 연결되며 예상할 수 없는 부작용도 없다.
- 전체 앱이 메시지 버스인 경우의 트레이드 오프
장점
- 핸들러와 서비스가 똑같은 object 라서 더 단순하다.
- 시스템 입력을 처리하기 좋은 데이터 구조가 있다.
단점
- 웹 이라는 관점에서 메시지 버스를 보면 여전히 예측하기 어려운 처리 방법이다. 작업이 언제 끝나는지 예측하기 어렵다.
- 모델 객체와 이벤트 사이에 필드 구조 중복이 있고, 이에 대한 유지보수가 필요하다. 한 쪽에 필드를 추가한다면 다른 쪽에 속한 객체에 두 개 이상 필드를 추가해야 한다.
'python' 카테고리의 다른 글
Architecture Patterns with Python(11장) (0) | 2022.01.07 |
---|---|
Architecture Patterns with Python(10장) (0) | 2021.12.17 |
Fluent Python (챕터 9) - 파이썬스러운 객체 (0) | 2021.12.09 |
Architecture Patterns with Python(8장) (0) | 2021.12.04 |
Fluent Python (챕터 8) - 객체 참조, 가변성, 재활용 (0) | 2021.11.19 |