업무적으로 기존 mysql DB 를 통해서 가져오던 데이터를 Elastic search 로 이관하여 쿼리를 재작성해야 하는 일이 생겼다.
사용자가 필터를 통해 선택한 사항에 맞는 데이터를 실시간으로 집계해서 보여줘야 하는 API 였는데, RDB 를 사용하는 경우 테이블 간의 join 이 많다보니 쿼리 수행시간이 오래걸렸다. 그렇다고 DW 를 구축하는 일도 제한적이었다. 사용자가 필터에서 어떤 항목을 선택할 지 알 수 없었기 때문이다.
최초에는 RDB 의 테이블 스키마를 최대한 nosql 처럼 컬럼으로 펼쳤다. join 을 최소화하기 위함이었다. 그럼에도 한계가 있었고, 그 중에서도 특히 시계열 데이터를 표현하는 데에 있어서 특히 어려움이 있었다. 필터에서 날짜 정보가 가장 중요한 필터 요소 중 하나였고, 6개월 이상에 걸친 데이터를 실시간 집계하는데에 10초 이상 소요되었다.
이러저러한 이유로 ES 를 도입하게 되었고, 기존 SQL 쿼리를 ES 의 DSL 쿼리로 전환하는 과정에서 겪은 어려움과 또 알게 된 점들을 정리하고자 하는 목적이다.
ES 에 대한 기본적인 설명은 김종민님의 Elastic 가이드북에 너무나도 친절하고 상세히 나와있다.
더불어, python 을 사용하였기 때문에 Elasticsearch 에서 지원하는 Elasticsearch 패키지와 Elasticsearch DSL 을 사용하였다.
Elasticsearch 의 쿼리는 기본적으로 json 형태이다. 이를 orm 처럼 사용할 수 있게 도와주는 패키지가 Elasticsearch DSL 이다.
참고) Elasticsearch, Elasticsearch DSL 모두 구간 버전을 명확히 명시하여 설치해야 한다.
- elasticsearch = ">=7.0.0, <8.0.0"
- elasticsearch-dsl = ">=7.0.0, <8.0.0"
이번 글에서는 우선 connection, index, document 생성 방식에 대해서 먼저 정리하고자 한다.
회사에서 Elastic Cloud 를 사용하였기 때문에 계정 정보를 이용하여 connection 을 맺었다.
connection 은 여러가지 방법을 통해서 맺을 수 있는데, cloud 를 사용하는 경우에는 cloud_id, api_key_id, api_key_detail 의 정보를 사용할 수 있고, 이는 Elasticsearch 기본 패키지에서 가능하다.
Elasticsearch_dsl 을 사용하는 경우에는 host, username, password 를 통해서 가능하며 아래와 같다.
from elasticsearch import Elasticsearch
from elasticsearch_dsl.connections import connections
# basic
conn = Elasticsearch(
cloud_id=self.cloud_id,
api_key=(self.api_key_id, self.api_key_detail),
)
# dsl
connections.create_connection(
alias="test",
hosts=hosts,
http_auth=f"{username}:{password}"
)
conn = connections.get_connection(alias="test")
# add connection
connections.add_connection(alias="test2", conn=conn)
new_conn = connections.get_connection(alias="test2")
DSL 을 사용하는 경우에는 최초 connection 을 생성하고 alias 를 지정하면 그에 맞는 connection 을 가져올 수 있다.
add_connection 함수를 통해서 새로운 connection 을 추가할 수 있으며 마찬가지로 그에 맞는 alias 를 통해 connection 을 가져올 수 있다.
connection 은 이 정도로 정리하고, index 와 document 에 대해서 알아본다.
RDB의 테이블과 유사한 개념인 index 안에 각각의 document 가 들어간다. 마치 row 가 한 줄 쌓이는 것과 같다.
document 가 공통적으로 갖는 항목들을 field 라고 하며, 이는 컬럼과 같다.
index 를 생성할 때, Elasticsearch DSL 을 활용하면 보다 손쉽게 document schema 를 정의하고 생성할 수 있다.
from elasticsearch_dsl import Document, Date, InnerDoc, Text, Keyword, Nested, Integer, Short
class TestInnerDoc(InnerDoc):
inner_str_field = Keyword()
inner_int_field = Integer()
inner_year_field = Short()
class TestDoc(Document):
test_str_field = Keyword()
test_text_field = Text()
inner_doc = Nested(TestInnerDoc)
create_at = Date(default_timezone='UTC')
class Index:
name = "test-index-v1"
settings = {
"number_of_shards": 5,
"number_of_replicas": 1
}
이런 식의 document 필드를 dataclass 모듈을 사용하듯이 적용할 수 있다. 필드 중에서 nested 는 depth 가 한 단계 더 내려간 형태를 말한다. 이렇게 하는 이유는 하나의 document 가 어떠한 키 값을 기준으로 특정 항목이 동시에 여러 값을 가질 수 있는 경우를 위해 nested 라는 필드로 만들고 그 안에 리스트 형태로 데이터를 넣기 위함이다.
생성하고자 하는 document 안에 index class 를 정의하여, 어떤 이름의 인덱스에 어떠한 설정 값으로 생성할지를 정의할 수 있다.
이렇게 설계가 완료되면, init 을 해준다.
TestDoc.init()
doc_list = [
TestDoc(
test_str_field=row["name"]
...
meta={"id": idx)
)
for idx, row in enumerate(data)
]
init 과 함께 index, document 가 동시에 생성된다.
이 후에는 데이터를 넣고 list 형태로 만든다. 그 이유는 bulk insert 를 하기 위함이다.
ES 에서는 대량의 데이터를 삽입하고 불러올 때에 사용할 수 있는 기능이 있다.
from elasticsearch import helpers
helpers.bulk(es, (doc.to_dict(include_meta=True) for doc in doc_list))
print(es.count(index="test-index-v1"))
helpers 에는 bulk, scan 함수가 있으며 각각 데이터를 삽입, 호출하기 위한 목적이다.
bulk 함수에는 두 개의 인자를 넣어주는다. 첫 번째는 현재 커넥션이 연결되어 있는 ES 객체이며, 두 번째는 삽입할 대상이다. 위 코드에서는 각각의 document 를 dictionary 로 변환하여 삽입해주었고, 앞서 정의한 meta 정보(id) 를 포함시켰다.
DSL 을 사용하지 않으면 json 형태로 정의해야 한다.
index_body = {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"str_field": {"type": "keyword"}
....
}
}
}
es.indices.create(index="test-index-v1", body=index_body)
data_body = [
{
"_index": "test-index-v1",
"_id": idx,
"_source": {
"str_field": row["name"]
....
}
}
for idx, row in enumerate(data)
]
helpers.bulk(es, data_body)
es.indices.refresh(index="test-index-v1")
print(es.count(index="test-index-v1", pretty=True)["count"])
기본 ES 패키지를 사용하면 위와 같이 생성할 수 있다. index 필드나 document 를 json 형태로 구현하면 된다.
'elastic search' 카테고리의 다른 글
Elastic Search DSL(Aggregation) (0) | 2021.11.10 |
---|---|
Elastic Search DSL(Search) (0) | 2021.11.10 |