본문 바로가기

python

Architecture Patterns with Python(11장)

이벤트 기반 아키텍처 : 이벤트를 사용한 마이크로서비스 통합

 

앞선 장에서 커맨드와 이벤트를 분리하여 시스템 내부적으로 어떻게 이벤트를 구분하고 이를 처리할 것인지를 정의하였다.

이번 장에서는 레디스와 같은 외부 메시지 버스(혹으 메시지 브로커) 를 통해 이벤트를 주고 받는 구조에 대해 알아본다.

 

 

11.1 분산된 진흙 공, 명사로 생각하기

보통 시스템의 이벤트를 명사로 나눈다. 재고 배치, 주문, 상품, 고객 등이다. 행위가 아닌 말그대로 명사 기준이다. 할당보다는 배치로 표현한 것이 이에 해당한다.

 

정상 경로를 예시로 들어보면 아래와 같이 진행된다.

  • 사용자가 웹 사이트에 방문해 재고가 있는 상품을 선택한다.(주문)
  • 상품을 장바구니에 담고 재고를 예약한다.(배치)
  • 주문이 완료되면 예약을 확정하고 창고에 출고를 지시한다.(창고)
  • 3번째 주문일 경우 고객 레코드를 변경해 일반 고객을 VIP 로 승격시킨다.(CRM)

 

각 단계의 시스템에서는 ReserveStock, ConfirmReservation, DispatchGoods, MakeCustomerVIP 등의 명령으로 생각할 수 있다.

 

이러한 스타일의 아키텍처에서는 데이터베이스 테이블 단위로 마이크로서비스를 만들고 HTTP API 를 빈약한(비즈니스 로직이 없는) 모델에 대한 CRUD 인터페이스로 취급하며, 서비스 중심의 설계를 처음 하는 사람들이 일반적으로 취하는 방식이다.

 

간단한 시스템의 경우에는 문제가 없지만 곧 분산된 진흙 공으로 악화되기 쉽다.

 

그 이유를 알아보기 위해 다른 경우를 생각해 볼 수 있다. 창고에 새로운 재고가 도착하면 상품이 운송 중에 손상된 경우가 있다. 이를 폐기하고 파트너사에 더 많은 재고를 요청해야 한다. 재고 모델을 업데이트해야 할 수도 있고 그로 인해 고객의 주문을 재할당해야 할 수도 있다.

 

그렇다면 이 로직은 어디에 들어갈 수 있을까?

 

창고 시스템이 재고의 손상 여부를 알고 있으므로 아래와 같은 처리 과정대로 진행된다.

  • 창고 담당자가 시스템에 재고 손상을 보고한다.
  • 창고 시스템은 사용 가능 재고를 감소시킨다.(창고)
  • 배치 시스템에서 주문을 재할당한다.(배치)
  • 주문 시스템에서 주문 상태를 업데이트 한다.(주문)
  • 주문 이력을 업데이트 한다.(CRM)

정상 경로에서는 재고를 할당하기 위해서는 주문 서비스가 배치 시스템을 제어해야 한다. 배치 시스템은 다시 창고 시스템을 제어한다. 하지만 창고에 문제가 생겼을 때 이를 해결하기 위해서는 반대로 창고 시스템은 배치 시스템을 제어해야 하고, 배치 시스템은 주문 시스템을 제어해야 한다.

시스템이 엉망진창이 될 수 있다.

 

11.2 분산 시스템에서 오류 처리하기

'모든 것은 망가진다' 는 소프트웨어 엔지니어링에서 일반적인 규칙이다. 어떤 주문 요청이 실패하면 이에 대한 처리 방법은 두 가지다. 어쨋든 주문을 넣되 할당은 하지 않거나 할당을 보장할 수 없으므로 주문을 거부할 수 있다.

 

배치와 주문 두 가지를 모두 바꿔야 하는 경우를 '결합됐다' 고 말한다. 시스템의 모든 부분이 동시에 제대로 작동할 때만 정상으로 작동하는 경우를 시간적 결합 이라고 한다. 시스템이 커지면 시스템 중 일부의 성능이 나빠질 확률이 지수적으로 높아진다.

 

결합(coupling) 이라는 용어 말고도 동시생산(connascence) 이라는 말로 다른 유형의 결합을 묘사하기도 한다. 연산을 성공하려면 여러 구성 요소의 정확한 작업 순서를 알고 있어야 하는 것을 실행의 동시생산(Connascence of execution (CoE))이라고 말한다. 주문 오류에 대한 결합은 타이밍의 동시생산(Connascence of timing (CoT)) 을 의미한다. 이는 전체 연산이 성공하려면 여러 가지 일이 차례로 일어나야 한다. 즉 한 가지 일이 일어난 다음에 다음 일이 일어나야 하는 것을 말한다. CoE 나 CoT 같이 두 클래스가 서로 밀접하게 연관되어 있는 경우를 강한 동시 생산이라고 한다.
반대로 이름의 동시생산 (Connascence of name (CoN)) 은 여러 구성 요소가 이벤트의 이름과 이벤트가 전달하는 필드의 이름에만 동의하면 되는 것을 말하며, 약한 동시 생산에 속한다.

소프트웨어가 다른 소프트웨어와 대화하지 않게 하지 않는 이상 결합을 피할 수는 없다. 상황에 따라 이를 잘 조절하는 것이 중요하다. 
참고 : https://connascence.io/

 

11.3 대안: 비동기 메시징을 사용한 시간적 결합

우선, 앞선 챕터에서 도메인 모델을 명사가 아닌 동사로 생각한다는 점을 살펴봤었다. 주문에 대한 시스템과 배치에 대한 시스템을 생각하는 대신 주문 행위에 대한 시스템과 할당 행위에 대한 시스템을 생각하는 것이다.

이런 방식으로 구분하면 어떤 시스템이 어떤 일을 하는지에 대해 생각하기가 쉽다. 주문 행위에 대해 생각해보면 우리가 정말 원하는 일은 주문을 넣었을 때 주문이 들어가는 것이다. 다른 모든 일은 언젠가 발생한다는 것만 보장할 수 있다면 나중에 발생할 수 있다.

 

애그리게이트에 대한 내용과 유사하게 마이크로서비스 자체도 일관성 경계여야 한다. 서로 다른 두 서비스 사이에는 최종 일관성을 받아들일 수 있고, 이는 동기화된 호출에 의존되지 않아도 된다는 뜻이다. 각 서비스는 외부에서 커맨드를 받고 결과를 저장하기 위해 이벤트를 발생시킨다. 해당 이벤트를 listen 하는 다른 서비스는 워크플로의 다음 단계를 촉발한다.

 

진흙 공 안티 패턴을 방지하기 위해 시간적으로 결합된 HTTP API 를 호출하는 대신 비동기 메시지로 시스템을 통합한다. BatchQuantityChanged 메시지가 외부 메시지를 기다리며, 메시지를 받으면 Allocated 이벤트를 발행한다.

 

이런 구조가 좋은 이유는 무엇인가?

첫 번째, 각 부분이 서로 독립적으로 실패할 수 있어서 잘못된 동작이 발생했을 때 처리가 쉽다. 할당 시스템에 문제가 있어도 주문을 받을 수 있다.

두 번째, 시스템 사이의 결합 강도를 낮출 수 있다. 처리 과정을 이루는 연산 순서를 바꾸거나 새로운 단계를 도입하고 싶을 때 이를 보다 쉽게 가능하다.

 

11.4 레디스 발행/구독 채널을 통합에 사용하기

이벤트를 시스템 밖으로 보내고 다른 시스템 안으로 넣는 서비스를 위한 메시지 버스 같은 것이 필요하다. 이런 인프라를 메시지 브로커라고 부른다. 메시지 브로커의 역할은 발행자로부터 메시지를 받아서 구독자에게 배달하는 것이다.

eventstore, kafka, Rabbit MQ 등이 있으며 Redis 가 일반적으로 많이 사용된다.

 

레디스를 사용한 새로운 흐름은 레디스가 전체 프로세스를 시작하는 BatchQuantityChanged 를 제공하고 마지막에는 Allocated 이벤트를 다시 레디스에 발행한다.

 

11.5 엔드 투 엔드 테스트를 사용해 모든 기능 시범 운영하기

@pytest.mark.usefixtures("postgres_db")
@pytest.mark.usefixtures("restart_api")
@pytest.mark.usefixtures("restart_redis_pubsub")
def test_change_batch_quantity_leading_to_reallocation():
    # 1 두 배치와 두 배치 중 한 쪽에 할당하는 주문으로 시작
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta="2011-01-01") # 2
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta="2011-01-02")
    response = api_client.post_to_allocate(orderid, sku, 10) # 2
    assert response.json()["batchref"] == earlier_batch 

    subscription = redis_client.subscribe_to("line_allocated") # 3

    # 1 할당된 배치의 수량을 변경해서 주문 수량보다 작게 만든다
    redis_client.publish_message(
        "change_batch_quantity",
        {"batchref": earlier_batch, "qty": 5},
    ) # 3

    # 1 주문이 재할당됐다는 메시지를 받을 때까지 기다린다
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True): # 4
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]["data"])
            assert data["orderid"] == orderid
            assert data["batchref"] == later_batch

 

  1. 주문 라인의 재할당을 일으키는 이벤트를 시스템에 보낸다. 그 다음 레디스에서 이벤트로 재할당이 발생하는지 살펴본다.
  2. api_client 는 두 유형의 테스트 사이에 공유하기 위해 리팩터링한 도우미 함수이다. 이 함수는 request.post 호출을 감싼다.
  3. redis_client 는 여러 레디스 채널에 메시지를 보내거나 여러 채널에서 메시지를 받는 일을 한다. change_batch_quantity 라는 채널을 사용해 배치 수량 변경 요청을 보내고 line_allocated 라는 채널을 리슨해 재할당 이벤트가 도착하는지 살펴본다.
  4. 테스트 대상 시스템이 비동기적으로 tenacity 라이브러리를 재사용해 루프를 추가해야 한다. 이 라이브러리를 사용하는 첫 번째 이유는 line_allocated 메시지가 도착하는 데 시간이 걸리기 때문이고, 두 번째 이유는 line_allocated 메시지가 채널을 통해 전달된 유일한 메시지가 아니기 때문이다.

 

11.5 레디스는 메시지 버스를 감싸는 다른 얇은 어댑터

우리가 사용하는 레디스 발행/구독 리스너(또는 event consumer) 는 플라스크와 비슷하다. 이벤트 리스너는 외부 세계를 변환해 이벤트로 만든다.

 

<redis message listener>

r = redis.Redis(**config.get_redis_host_and_port())


def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe("change_batch_quantity") # 1

    for m in pubsub.listen():
        handle_change_batch_quantity(m)


def handle_change_batch_quantity(m):
    logging.debug("handling %s", m)
    data = json.loads(m["data"]) # 2
    cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"]) # 1
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())


if __name__ == "__main__":
    main()

 

  1. main() 은 시작 시 changed_batch_quantity 채널을 구독한다.
  2. 시스템 진입점에서 해야 할 일은 JSON 을 역직렬화하고 해당 객체를 Command 로 변환하여 서비스 계층으로 넘기는 것이다. 이 처리 과정은 플라스크 어댑터가 하는 일과 같다.

<redis event publisher>

r = redis.Redis(**config.get_redis_host_and_port())


def publish(channel, event: events.Event):
    logging.debug("publishing: channel=%s, event=%s", channel, event) # 1
    r.publish(channel, json.dumps(asdict(event)))

 

  1.  여기서는 하드코딩된 채널을 사용하지만, 이벤트 클래스/이름과 적절한 채널을 매핑하는 정보를 저장할 수도 있다. 이렇게 하면 메시지 유형 중 일부에 대해 다른 채널을 사용할 수도 있다.

 

11.5.2 외부로 나가는 새 이벤트

 

@dataclass
class Allocated(Event):
    orderid: str
    sku: str
    qty: int
    batchref: str

 

위와 같이 새로운 이벤트를 정의하였다. 주문 라인 상세정보, 어떤 배치에 주문 라인이 할당됐는지 등 할당에 대해 알아야 할 필요가 있는 모든 정보를 저장한다.

이를 모델의 allocate() 메서드에 추가한다.

 

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            batch.allocate(line)
            self.version_number += 1
            self.events.append(  # 추가
                events.Allocated(
                    orderid=line.orderid,
                    sku=line.sku,
                    qty=line.qty,
                    batchref=batch.reference,
                )
            )
            return batch.reference

 

레디스에 알리는 Allocated 핸들러를 기존 핸들러에 추가한다.

 

EVENT_HANDLERS = {
    events.Allocated: [handlers.publish_allocated_event], # 추가
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

 

def publish_allocated_event(
    event: events.Allocated,
    uow: unit_of_work.AbstractUnitOfWork,
):
    redis_eventpublisher.publish("line_allocated", event)

 

 

11.6 내부 이벤트와 외부 이벤트 비교

내부와 외부 이벤트의 구분을 명확히 하면 좋다. 일부 이벤트는 밖에서 들어오지만 일부 이벤트는 승격되면서 외부에 이벤트를 발행할 수도 있다. 하지만 모든 이벤트가 다 외부에 이벤트를 내보내지는 않는다. 이벤트 소싱(Event Sourcing) 을 사용할 경우 이러한 특징이 특히 중요하다. 또한 외부로 나가는 이벤트는 검증을 적용하는 것이 중요하다.(부록 E 참고)

 

이벤트 소싱 : 발생한 이벤트를 모두 저장하는 방식. 비즈니스 로직을 이벤트 중심으로 구현하고, 에그리게이트를 DB 에 일련의 이벤트로 저장하는 기법이다. 각 이벤트가 에그리게이트의 상태를 의미한다.

 

11.7 마치며

이벤트는 외부에서 들어올 수도 있고, 외부로 발행할 수도 있다. 이벤트를 통하여 외부와 이야기한다. 이런 종류의 시간적인 결합을 사용하면 애플리케이션 통합 시 상당한 유연성을 얻을 수 있다.

 

장점 단점
분산된 큰 진흙 공을 피할 수 있다. 전체 정보 흐름을 알아보기 어렵다
서비스가 서로 결합되지 않는다. 개별 서비스를 변경하거나 새로운 서비스를 추가하기가 쉽다. 메시지 신뢰성과 최대 한 번과 최소 한 번 배달의 선택에 대해 생각해봐야 한다.