본문 바로가기

elastic search

Elastic Search DSL(Connection, Index, Document)

업무적으로 기존 mysql DB 를 통해서 가져오던 데이터를 Elastic search 로 이관하여 쿼리를 재작성해야 하는 일이 생겼다.

사용자가 필터를 통해 선택한 사항에 맞는 데이터를 실시간으로 집계해서 보여줘야 하는 API 였는데, RDB 를 사용하는 경우 테이블 간의 join 이 많다보니 쿼리 수행시간이 오래걸렸다. 그렇다고 DW 를 구축하는 일도 제한적이었다. 사용자가 필터에서 어떤 항목을 선택할 지 알 수 없었기 때문이다.

 

최초에는 RDB 의 테이블 스키마를 최대한 nosql 처럼 컬럼으로 펼쳤다. join 을 최소화하기 위함이었다. 그럼에도 한계가 있었고, 그 중에서도 특히 시계열 데이터를 표현하는 데에 있어서 특히 어려움이 있었다. 필터에서 날짜 정보가 가장 중요한 필터 요소 중 하나였고, 6개월 이상에 걸친 데이터를 실시간 집계하는데에 10초 이상 소요되었다.

 

이러저러한 이유로 ES 를 도입하게 되었고, 기존 SQL 쿼리를 ES 의 DSL 쿼리로 전환하는 과정에서 겪은 어려움과 또 알게 된 점들을 정리하고자 하는 목적이다.

 

ES 에 대한 기본적인 설명은 김종민님의 Elastic 가이드북에 너무나도 친절하고 상세히 나와있다.

https://esbook.kimjmin.net/

 

Elastic 가이드 북 - Elastic 가이드북

7. 인덱스 설정과 매핑 - Settings & Mappings

esbook.kimjmin.net

 

더불어, python 을 사용하였기 때문에 Elasticsearch 에서 지원하는 Elasticsearch 패키지와 Elasticsearch DSL 을 사용하였다.

Elasticsearch 의 쿼리는 기본적으로 json 형태이다. 이를 orm 처럼 사용할 수 있게 도와주는 패키지가 Elasticsearch DSL 이다.

 

참고) Elasticsearch, Elasticsearch DSL 모두 구간 버전을 명확히 명시하여 설치해야 한다.

  • elasticsearch = ">=7.0.0, <8.0.0"
  • elasticsearch-dsl = ">=7.0.0, <8.0.0"

 

이번 글에서는 우선 connection, index, document 생성 방식에 대해서 먼저 정리하고자 한다.

 

회사에서 Elastic Cloud 를 사용하였기 때문에 계정 정보를 이용하여 connection 을 맺었다.

connection 은 여러가지 방법을 통해서 맺을 수 있는데, cloud 를 사용하는 경우에는 cloud_id, api_key_id, api_key_detail 의 정보를 사용할 수 있고, 이는 Elasticsearch 기본 패키지에서 가능하다.

 

Elasticsearch_dsl 을 사용하는 경우에는 host, username, password 를 통해서 가능하며 아래와 같다.

from elasticsearch import Elasticsearch
from elasticsearch_dsl.connections import connections


# basic
conn = Elasticsearch(
    cloud_id=self.cloud_id,
    api_key=(self.api_key_id, self.api_key_detail),
)

# dsl
connections.create_connection(
    alias="test",
    hosts=hosts,
    http_auth=f"{username}:{password}"
)
conn = connections.get_connection(alias="test")


# add connection
connections.add_connection(alias="test2", conn=conn)
new_conn = connections.get_connection(alias="test2")

 

DSL 을 사용하는 경우에는 최초 connection 을 생성하고 alias 를 지정하면 그에 맞는 connection 을 가져올 수 있다.

add_connection 함수를 통해서 새로운 connection 을 추가할 수 있으며 마찬가지로 그에 맞는 alias 를 통해 connection 을 가져올 수 있다.

connection 은 이 정도로 정리하고, index 와 document 에 대해서 알아본다.

 

 

RDB의 테이블과 유사한 개념인 index 안에 각각의 document 가 들어간다. 마치 row 가 한 줄 쌓이는 것과 같다.

document 가 공통적으로 갖는 항목들을 field 라고 하며, 이는 컬럼과 같다.

 

index 를 생성할 때, Elasticsearch DSL 을 활용하면 보다 손쉽게 document schema 를 정의하고 생성할 수 있다.

from elasticsearch_dsl import Document, Date, InnerDoc, Text, Keyword, Nested, Integer, Short

class TestInnerDoc(InnerDoc):
    inner_str_field = Keyword()
    inner_int_field = Integer()
    inner_year_field = Short()


class TestDoc(Document):
    test_str_field = Keyword()
    test_text_field = Text()
    inner_doc = Nested(TestInnerDoc)
    create_at = Date(default_timezone='UTC')
    
    class Index:
    	name = "test-index-v1"
        settings = {
            "number_of_shards": 5,
            "number_of_replicas": 1
        }

 

이런 식의 document 필드를 dataclass 모듈을 사용하듯이 적용할 수 있다.  필드 중에서 nested 는 depth 가 한 단계 더 내려간 형태를 말한다. 이렇게 하는 이유는 하나의 document 가 어떠한 키 값을 기준으로 특정 항목이 동시에 여러 값을 가질 수 있는 경우를 위해 nested 라는 필드로 만들고 그 안에 리스트 형태로 데이터를 넣기 위함이다.

 

생성하고자 하는 document 안에 index class 를 정의하여, 어떤 이름의 인덱스에 어떠한 설정 값으로 생성할지를 정의할 수 있다.

이렇게 설계가 완료되면, init 을 해준다.

 

TestDoc.init()
doc_list = [
    TestDoc(
        test_str_field=row["name"]
        ...
        meta={"id": idx)
    )
    for idx, row in enumerate(data)
]

 

init 과 함께 index, document 가 동시에 생성된다.

이 후에는 데이터를 넣고 list 형태로 만든다. 그 이유는 bulk insert 를 하기 위함이다.

ES 에서는 대량의 데이터를 삽입하고 불러올 때에 사용할 수 있는 기능이 있다.

 

from elasticsearch import helpers

helpers.bulk(es, (doc.to_dict(include_meta=True) for doc in doc_list))
print(es.count(index="test-index-v1"))

 

helpers 에는 bulk, scan 함수가 있으며 각각 데이터를 삽입, 호출하기 위한 목적이다.

bulk 함수에는 두 개의 인자를 넣어주는다. 첫 번째는 현재 커넥션이 연결되어 있는 ES 객체이며, 두 번째는 삽입할 대상이다. 위 코드에서는 각각의 document 를 dictionary 로 변환하여 삽입해주었고, 앞서 정의한 meta 정보(id) 를 포함시켰다.

 

DSL 을 사용하지 않으면 json 형태로 정의해야 한다.

index_body = {
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
        "str_field": {"type": "keyword"}
        ....
        }
    }
}

es.indices.create(index="test-index-v1", body=index_body)

data_body = [
    {
        "_index": "test-index-v1",
        "_id": idx,
        "_source": {
            "str_field": row["name"]
            ....
        }
    }
    for idx, row in enumerate(data)
]
helpers.bulk(es, data_body)
es.indices.refresh(index="test-index-v1")
print(es.count(index="test-index-v1", pretty=True)["count"])

 

기본 ES 패키지를 사용하면 위와 같이 생성할 수 있다. index 필드나 document 를 json 형태로 구현하면 된다.

'elastic search' 카테고리의 다른 글

Elastic Search DSL(Aggregation)  (0) 2021.11.10
Elastic Search DSL(Search)  (0) 2021.11.10