명령-질의 책임 분리(CQRS)
읽기와 쓰기는 다르고, 또 다르게 취급해야 한다는 의견은 논란의 여지가 없다.
12.1 쓰기 위해 존재하는 도메인 모델
지금까지 비즈니스 상황에 부합하는 도메인 규칙을 만족시키는 소프트웨어를 만드는 방법들에 대한 설명을 이어왔다.
- '현재 사용 가능한 재고보다 더 많은 재고를 할당할 수 없다' 와 같은 명시적인 제약과 '각 주문 라인은 한 배치에만 할당된다' 와 같은 암시적인 제약을 설정하였다.
- 위의 규칙을 일관성 있게 지키기 위해 작은 작업 덩이리로 쪼개고 이를 커밋할 때 도움이 되는 작업 단위(UoW)나 애그리게이트 패턴을 도입했다.
- 이런 작은 작업 덩어리 사이에서 변경된 내용을 통신하기 위해 도메인 이벤트 패턴을 도입하여 '재고가 손상되거나 분실될 경우 배치의 사용 가능한 수량을 조정하고 필요하다면 주문을 재할당하라' 와 같은 규칙을 작성할 수 있다.
이러한 모든 복잡도는 결국 데이터를 유연하게 쓰기 위한 도구이다.
12.2 가구를 구매하지 않는 사용자
주문에 재고를 할당해주는 위와 같은 시스템 뿐만 아니라, 제품 뷰 건수가 초당 수 백건에 달하는 경우가 있을 수도 있다. 상품 재고가 아직 있는지 확인하거나 배송이 얼마나 오래 걸리는지를 예측하기 위해 누군가가 상품 목록 페이지나 상품 페이지를 계속 방문할 수 있다.
재고 배치, 배치의 도착 날짜, 가용 수량 등을 접근하는 패턴은 매우 다양하며, 고객은 매 페이지마다 질의가 몇 초 지났는지를 알 수 없다. 데이터를 읽어올 때 일관성이 없다면 서비스가 엉킬 수 있다.
예를 들어, 밥이라는 고객이 상품 페이지를 방문했을 때 봤던 가용 재고가 20초 정도 늦은 정보라고 가정한다. 이미 마지막 상품은 해리가 구매했다. 밥의 주문이 할당하려고 한다면 실패할 것이다.
상품 페이지를 표시한 순간부터 이미 표시된 데이터는 최신 정보가 아니다. 모든 분산 시스템은 일관성이 없어서 재고를 할당하려면 반드시 시스템의 현재 상태를 검사해야 한다. 웹 서버와 고객이 2명 이상있다면 항상 웹 페이지에 보이는 데이터 중 일부가 잘못된 데이터일 가능성이 높다.
만약 어떻게든 이러한 문제를 해결했다고 가정해보자.
밥이 상품을 구매하고, 창고 담당자가 밥의 상품을 배송하다가 떨어뜨려 상품이 손상되었다. 유일한 방법은 환불해주거나 더 많은 재고를 주문하여 배송을 늦추는 방법이다.
무엇을 하든, 현실은 소프트웨어 시스템과 일관성이 없다. 따라서 비즈니스 프로세스는 이런 이상한 경우를 모두 처리할 수 있어야 한다. 일관성이 없는 데이터를 근본적으로 피할 수는 없으므로 읽기 측면에서 성능과 일관성을 바꿔도 좋다. 즉, 일관성을 다소 포기하더라도 읽기의 최종 일관성을 유지하여 성능을 향상시킬 수 있다.
읽기 | 쓰기 | |
동작 | 간단한 읽기 | 복잡한 비즈니스 로직 |
캐시 가능성 | 높음 | 캐시 불가능 |
일관성 | 오래된 값 제공 가능 | 트랜잭션 일관성이 있어야 함 |
12.3 Post/리디렉션/Get 과 CQS
Post/리디렉션/Get 패턴에서 웹 엔드포인트는 HTTP POST 요청을 받고, 처리한 결과를 보여주기 위해 리디렉션으로 응답한다. 예를 들어 /batches 에 POST 를 해서 새로운 배치를 만들면, /batches/123 으로 리디렉션을 해서 새로운 배치를 보여줄 수 있다.
이런 접근 방법은 사용자가 브라우저에서 결과를 보기 위해 POST 요청 페이지를 다시 읽거나 POST 결과를 북마크할 때 발생할 수 있는 문제를 해결해준다. 다시 읽기를 하면 POST 가 두 번 제출될 수 있고, 북마크의 경우 POST 엔드포인트에 GET 요청을 보내서 오류 페이지를 볼 수도 있는 문제가 있기 때문이다.
두 가지 문제 모두 쓰기 연산에 대한 응답으로 데이터를 보내서 발생하는 문제이다. Post/리디렉션/Get 은 연산의 쓰기와 읽기 단계를 분리해서 이런 문제를 피할 수 있다.
이 기법은 명령-질의 분리(CQS) 의 간단한 예다. CQS 에서는 한 가지 간단한 규칙을 따른다. 함수는 상태를 변경하거나 질문에 답하는 일 중에 한 가지만 해야 한다. 두 가지 일을 모두 다 해서는 안된다. 전등을 껐다 켜지 않고도 "전등이 켜저 있는가?" 라는 질문에 항상 답할 수 있어야 한다.
기존 코드도 CQS 위반을 해결할 수 있다.
@pytest.mark.usefixtures("postgres_db")
@pytest.mark.usefixtures("restart_api")
def test_happy_path_returns_202_and_batch_is_allocated():
orderid = random_orderid()
sku, othersku = random_sku(), random_sku("other")
earlybatch = random_batchref(1)
laterbatch = random_batchref(2)
otherbatch = random_batchref(3)
api_client.post_to_add_batch(laterbatch, sku, 100, "2011-01-02")
api_client.post_to_add_batch(earlybatch, sku, 100, "2011-01-01")
api_client.post_to_add_batch(otherbatch, othersku, 100, None)
r = api_client.post_to_allocate(orderid, sku, qty=3)
assert r.status_code == 202
r = api_client.get_allocation(orderid)
assert r.ok
assert r.json() == [
{"sku": sku, "batchref": earlybatch},
]
@pytest.mark.usefixtures("postgres_db")
@pytest.mark.usefixtures("restart_api")
def test_unhappy_path_returns_400_and_error_message():
unknown_sku, orderid = random_sku(), random_orderid()
r = api_client.post_to_allocate(
orderid, unknown_sku, qty=20, expect_success=False
)
assert r.status_code == 400
assert r.json()["message"] == f"Invalid sku {unknown_sku}"
r = api_client.get_allocation(orderid)
assert r.status_code == 404
테스트 코드에서 POST 와 GET 요청을 따로 분리하여 할당 요청과 그에 대한 배치 결과를 각각 POST, GET 으로 요청 및 반환 받는지 확인한다.
@app.route("/allocations/<orderid>", methods=["GET"])
def allocations_view_endpoint(orderid):
uow = unit_of_work.SqlAlchemyUnitOfWork()
result = views.allocations(orderid, uow)
if not result:
return "not found", 404
return jsonify(result), 200
할당의 결과를 반환하는 엔드포인트이다. 읽기 전용 view 를 만들어서 활용하였다.
from allocation.service_layer import unit_of_work
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
results = list(uow.session.execute(
"""
SELECT ol.sku, b.reference
FROM allocations AS a
JOIN batches AS b ON a.batch_id = b.id
JOIN order_lines as ol ON a.orderline_id = ol.id
WHERE ol.orderid = :orderid
""",
dict(orderid=orderid),
))
return [{'sku': sku, 'batchref': batchref} for sku, batchref in results]
위와 같이 저장소 객체에서 배치 목록을 반환해주는 메서드를 추가할 수 있다.
12.5 CQRS 뷰 테스트하기
def test_allocations_view(sqlite_session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
messagebus.handle(commands.CreateBatch("sku1batch", "sku1", 50, None), uow)
messagebus.handle(commands.CreateBatch("sku2batch", "sku2", 50, today), uow)
messagebus.handle(commands.Allocate("order1", "sku1", 20), uow)
messagebus.handle(commands.Allocate("order1", "sku2", 20), uow)
# 제대로 데이터를 얻는지 보기 위해 여러 배치와 주문을 추가
messagebus.handle(commands.CreateBatch("sku1batch-later", "sku1", 50, today), uow)
messagebus.handle(commands.Allocate("otherorder", "sku1", 30), uow)
messagebus.handle(commands.Allocate("otherorder", "sku2", 10), uow)
assert views.allocations("order1", uow) == [
{"sku": "sku1", "batchref": "sku1batch"},
{"sku": "sku2", "batchref": "sku2batch"},
]
12.6 대안 1 : 기존 저장소 사용하기
views.py 에 배치 목록을 반환하는 메서드에는 raw SQL 을 사용하였다. 이에 대한 저장소와 도메인 객체 등을 활용한 대안들에 대해서 알아본다.
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
products = uow.products.for_order(orderid=orderid) # 1
batches = [b for p in products for b in p.batches] # 2
return [
{'sku': b.sku, 'batchref': b.reference}
for b in batches
if orderid in b.orderids # 3
]
- 저장소는 Product 객체를 반환하며 주어진 주문에서 SKU에 해당하는 모든 상품을 찾아야 한다. 저장소에 .for_order() 라는 도우미 메서드를 만든다.
- 이 시점에서는 상품이 있지만 실제로 바라는 것은 배치에 대한 참조다. 따라서 리스트 컴프리헨션을 사용해 모든 배치를 가져온다.
- 원하는 주문에 대한 배치만 찾기 위해 다시 배치를 걸러낸다. 이 과정은 다시 Batch 객체가 자신이 어떤 주문에 할당됐는지 알려준다는 사실에 의존한다.
배치가 자신이 할당된 주문을 알려주는 부분은 .orderid 프로퍼티를 통해 구현할 수 있다.
class Batch:
...
@property
def orderids(self):
return {l.orderid for l in self._allocations}
기존 저장소와 도메인 모델 클래스를 재사용하는 방법이 그렇게 쉽지는 않다. 새로운 도우미 메서드를 양쪽에 다 추가하고 파이썬 수준에서 루프와 걸러내는 작업을 여러 번 수행해야 한다. 데이베이스를 사용하면 이 모든 작업을 훨씬 더 효율적으로 수행할 수 있다.
이런 방식은 기존 추상화를 재사용한다는 장점이 있지만, 반대로 구현이 꽤 투박하게 느껴진다는 단점이 있다.
12.7 읽기 연산에 최적화되지 않은 도메인 모델
도메인 모델을 만드는 데 든 노력은 주로 쓰기 연산을 위한 것이었다. 읽기를 위한 요구 사항은 개념적으로 도메인 모델을 위한 요구 사항과 상당히 다르다. 간단한 CRUD 앱을 만든다면 도메인 모델이나 CQRS 가 필요 없다. 하지만 도메인이 복잡하면 복잡할수록 도메인 모델과 CQRS 모두가 더 많이 필요해진다.
도메인 모델의 복잡도가 커질수록 모델을 구성하는 방법에 대한 선택의 폭이 넓어지고, 읽기 연산에 도메인 모델을 사용하는 게 더 어려워진다.
12.8 대안 2 : ORM 사용하기
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
uow.session.query(model.Batch).join(
model.OrderLine, model.Batch._allocations
).filter(
model.OrderLine.orderid == orderid
)
return [
{'sku': b.sku, 'batchref': b.batchref}
for b in batches
]
과연 이 코드가 raw SQL 보다 더 쉽게 이해되고 작성하기 쉬운가? 글쎄, 별 차이가 없을 뿐더러 오히려 SQL 이 더 나을 수도 있다. ORM 을 사용하려면 몇 번의 시도가 필요하고 문서를 많이 살펴봐야한다. 더불어 ORM은 몇 가지 성능상의 문제를 야기한다.
12.9 SELECT N+1
SELECT N+1 은 ORM 에서 일반적인 성능 문제다. 객체 리스트를 가져올 때 ORM 은 보통 필요한 모든 객체의 ID 를 가져오는 질의를 먼저 수행한다. 그 후 각 객체의 애트리뷰트를 얻기 위한 개별 질의를 수행한다. 특히 객체에 외래키 관계가 많은 경우 이런 일이 더 자주 발생한다.
참고 : https://velog.io/@kim6515516/npuls
물론 SQLAlchemy 를 포함한 ORM 은 이러한 문제를 해결하도록 여러 기능을 제공하고 있다.
12.10 이벤트 기반 아키텍처 재사용
기존 raw SQL 로 작성된 부분을 좀 더 다듬어 볼 수 있다.
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
results = uow.session.execute(
"""
SELECT sku, batchref FROM allocations_view WHERE orderid = :orderid
""",
dict(orderid=orderid),
)
return [dict(r) for r in results]
이 뷰 모델을 위해 정규화하지 않은 데이터를 별도로 보관해야 한다.
allocations_view = Table(
"allocations_view",
metadata,
Column("orderid", String(255)),
Column("sku", String(255)),
Column("batchref", String(255)),
)
아무리 잘 튜닝한 인덱스가 있어도, 결국 관계형 데이터베이서는 조인을 위해 CPU 를 많이 사용한다. 가장 빠른 질의는 항상 SELECT * FROM MYTABLE WHERE KEY =: VALUE 일 것이다.
이런 접근 방식은 질의 실행 속도가 빨라지는 점 이외에도 규모를 확장할 수 있다는 장점이 있다.
하지만 읽기 모델을 최신 상태로 유지하는 일이 쉽지 않다. 데이터베이스 뷰나 트리거가 일반적인 해법이지만 데이터베이스 종류에 따라 한계가 정해진다. 이번에는 데이터베이스 기능을 활용하는 대신 이벤트 기반 아키텍처를 재사용하는 방법에 대해 알아본다.
12.10.1 이벤트 핸들러를 사용해 읽기 모델 테이블 업데이트하기
메시지버스의 Allocated 이벤트에 두 번째 핸들러를 추가한다.
EVENT_HANDLERS = {
events.Allocated: [
handlers.publish_allocated_event,
handlers.add_allocation_to_read_model,
],
...
}
다음은 업데이트 뷰 모델의 코드이다.
def add_allocation_to_read_model(
event: events.Allocated,
uow: unit_of_work.SqlAlchemyUnitOfWork,
):
with uow:
uow.session.execute(
"""
INSERT INTO allocations_view (orderid, sku, batchref)
VALUES (:orderid, :sku, :batchref)
""",
dict(orderid=event.orderid, sku=event.sku, batchref=event.batchref),
)
uow.commit()
Deallocated 도 추가한다.
EVENT_HANDLERS = {
events.Allocated: [
handlers.publish_allocated_event,
handlers.add_allocation_to_read_model,
],
events.Deallocated: [
handlers.remove_allocation_from_read_model,
handlers.reallocate,
],
...
}
def remove_allocation_from_read_model(
event: events.Deallocated,
uow: unit_of_work.SqlAlchemyUnitOfWork,
):
with uow:
uow.session.execute(
"""
DELETE FROM allocations_view
WHERE orderid = :orderid AND sku = :sku
""",
dict(orderid=event.orderid, sku=event.sku),
)
uow.commit()
이를 통해 다음과 같은 순서로 쓰기와 읽기가 이루어진다.
- 쓰기
- allocations 엔드포인트로 POST 요청
- 플라스크 앱에서 메시지버스로 Allocate 커맨드 생성
- 도메인 모델에 allocate() 메서드가 수행되고 DB 에 커밋
- 도메인 모델에서 Allocated 이벤트가 생성되어 메시지 버스로 전달
- DB 의 읽기 전용 뷰 모델 업데이트
- 202 OK 반환
- 읽기
- allocations 엔드포인트로 GET 요청
- 플라스크에서 뷰를 통해 읽기 전용 뷰 모델 SELECT
- 해당 정보를 반환
프로그램이 깨지만 어떤 일이 발생하는가?
만약 뷰 모델이 업데이트 되지 않았다면 재고가 계속해서 표시되어 고객들이 주문을 하지만 실패가 될 것이다. 이 때, 뷰 모델을 다시 만드는 것은 쉽다.
- 쓰기 쪽의 현재 상태를 질의하여 현재 할당된 내용을 찾는다.
- 할당된 원소마다 add_allocate_to_read_model 핸들러를 호출한다.
12.11 읽기 모델 구현을 변경하기 쉽다
완전히 다른 저장소 엔진인 레디스를 사용해 읽기 모델을 구축하기로 결정하면 어떤 일이 발생할지에 대해 살펴보면 이벤트 기반 모델이 실제 얼마나 유연한지 알 수 있다.
def add_allocation_to_read_model(event: events.Allocated, _):
redis_eventpublisher.update_readmodel(event.orderid, event.sku, event.batchref)
def remove_allocation_from_read_model(event: events.Deallocated, _):
redis_eventpublisher.update_readmodel(event.orderid, event.sku, None)
def update_readmodel(orderid, sku, batchref):
r.hset(orderid, sku, batchref)
def get_readmodel(orderid):
return r.hgetall(orderid)
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
batches = redis_eventpublisher.get_readmodel(orderid)
return [
{'batchref': b.decode(), 'sku': s.decode()}
for s, b in batches.items()
]
매우 간단하게 구현이 가능하다.
읽기 모델 업데이트를 관리하는 기능이 필요하다고 결정했다면 이벤트 핸들러가 좋은 방법이다. 이벤트 핸들러를 사용해 읽기 모델을 업데이트하면 나중에 해당 읽기 모델의 구현을 변경하기도 쉬워진다.
12.12 마치며
방법 | 장점 | 단점 |
저장소를 그냥 사용한다. | 간단하고 일관성 있는 접근 가능 | 복잡한 패턴의 질의의 경우 성능 문제 발생 |
ORM 과 커스텀 질의를 사용한다. | DB 설정과 모델 정의 재사용 가능 | 자체 문법이 있고, 나름대로의 문제점이 있는 다른 질의 언어를 한 가지 더 도입해야 한다. |
수기로 작성한 SQL 을 사용한다. | 표준 질의 문법을 사용해 성능을 세밀하게 제어 가능 | DB 스키마 변경 시 수기로 작성한 질의와 ORM을 함께 바꿔야 한다. 정규화가 잘 된 스키마는 여전히 성능상 한계가 있을 수 있다. |
이벤트를 사용해 별도로 읽기 저장소 만들기 | 읽기 전용 복사본은 규모를 키우기 쉽다. 데이터가 바뀔 때 뷰를 구축해 질의를 가능한 한 간단하게 만들 수 있다. | 복잡하다. |
'python' 카테고리의 다른 글
파이썬 클린 코드 - 1장 (코드 포매팅과 도구) (0) | 2022.01.25 |
---|---|
Architecture Patterns with Python(13장) (0) | 2022.01.20 |
Architecture Patterns with Python(11장) (0) | 2022.01.07 |
Architecture Patterns with Python(10장) (0) | 2021.12.17 |
Architecture Patterns with Python(9장) (0) | 2021.12.11 |