본문 바로가기

elastic search

Elastic Search DSL(Aggregation)

사실 ES 에서 이 Aggregation 이 제일 헬이고 핵심이다.

 

쿼리를 어떤 순서로 짜느냐에 따라서 결과의 depth 도 달라진다. 상당히 애를 많이 먹었기 때문에 머리가 땃땃할 때 정리해 놓는 게 좋다.

 

일단 Search 객체로 필터링 조건을 정의하고 나면, mysql 로 따졌을 때 WHERE 조건을 정의했다고 보면 된다. 이제 group by, order by 를 해야 되는데 이 부분에 대해 정리해본다.

 

elasticsearh dsl 패키지에서 집계를 위해 A 객체를 제공한다.

Q 와 유사한 형태로 A 객체 안에 쿼리 조건을 정의한다. 이 부분이 group by 를 위해 어떤 필드를 select 할 것인가라고 볼 수 있다.

search 의 결과에 집계를 이어가고 싶은 경우에는 aggs 키워드를 사용할 수 있다.

 

예시로 보자.

from elasticsearch_dsl import Search, A, Q

s = Search(using=es, index="test-index-v1").query(
    "bool",
    filter=[
        Q("range", **{"craeated_dt": {"gte": "2021-01-01", "lte": "2021-02-01"}}),
        Q("terms", **{"gender": {"male"}})
    ]
)

s.aggs.bucket("group_by", "terms", field="region") \
    .metric("distinct_count", "cardinality", field="user_id")
    

s = s[:0]
res = s.execute()

 

필터링 조건을 완성한 뒤, 해당 객체에 aggregation 쿼리를 추가한다. 기본적으로 group by 를 적용하기 위한 필드 하나 당 bucket 하나다. 해당 bucket 뒤에 바로 metric 을 통해 어떤 연산을 하고 싶은지를 붙인다. cardinality 는 distinct value 를 카운트 한다.

 

예제에서는 남성이면서 db 기준 생성일자가 2021년 1월 ~ 2월 사이의 유저를 대상으로 각 지역별 중복 제거된 유저 수를 구하는 쿼리다.

 

참고) cardinality 는 항상 정확하지 않다. 유사도를 기초로한 카운트라서 동일하지 않음에도 동일한 것으로 간주하고 카운트하는 경우가 발생할 수 있다. 이럴 때는 precision_threshold 인자를 최대 40000 까지 적용가능하며 (default=3000) 그 이상은 의미가 없다. 그럼에도 정확하게 안나올 때는 다른 방식을 사용해야 한다.

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html

 

Cardinality aggregation | Elasticsearch Guide [7.15] | Elastic

Pre-computing hashes is usually only useful on very large and/or high-cardinality fields as it saves CPU and memory. However, on numeric fields, hashing is very fast and storing the original values requires as much or less memory than storing the hashes. T

www.elastic.co

https://pratik-patil.medium.com/accurate-distinct-count-and-values-from-elasticsearch-dadce51ad4a7

 

Accurate Distinct Count and Values from Elasticsearch.

Need Accurate Distinct count of fields from Elasticsearch documents ? Cardinality won’t always work.

pratik-patil.medium.com

 

bucket script 라는 방식으로 집계 가능하며 내장된 default 언어인 painless 혹은 java 로 직접 스크립트를 짜서 적용할 수 있다.

s.aggs.metric(
            "count",
            "scripted_metric",
            params={"fieldName": "id"},
            init_script="state.list = []",
            map_script="""
                if(doc[params.fieldName] != null)
                state.list.add(doc[params.fieldName].value);
            """,
            combine_script="return state.list;",
            reduce_script="""
                Map uniqueValueMap = new HashMap();
                int
                count = 0;
                for (shardList in states) {
                    if (shardList != null) {
                        for (key in shardList) {
                            if (!uniqueValueMap.containsKey(key)) {
                                count += 1;
                                uniqueValueMap.put(key, key);
                            }
                        }
                    }
                }
                return count;
                """,
        )

위 코드를 실행하면 물론 잘 나온다. 다만, 데이터가 커지니까 JVM memory pressure 가 75% 넘어가면서 429 에러를 뱉은 후에 죽어버렸다. 어쩔 수 없이 위 코드를 좀 만져보았고, 그나마 메모리 측면에서 조금 개선할 수 있었다.

s.aggs.metric(
            "count",
            "scripted_metric",
            params={"fieldName": "id"},
            init_script="state.set = new HashSet();",
            map_script="""
                state.set.add(doc[params.fieldName].value);
            """,
            combine_script="return state.set;",
            reduce_script="""
                int count = 0;
                for (eachSet in states) {
                    count += eachSet.size()
                }
                return count;
            """,
        )

 

일단 HashSet 을 사용해서 애초에 담을 때부터 중복 없이 담았다. ArrayList 보다 메모리를 많이 차지하고 느리지만 이후에 반복문으로 다시 중복 여부를 걸러내고, 따로 또 담는 작업을 하는 것보다는 훨씬 효율적이다.

각 shard 에서 combine_script 까지 실행된 후에 reduce_script 에서 합치는데, 이 때 states 밑에 저장된 각각의 HashSet 을 반복문으로 가져오면서 size 를 count 변수에 더한다. 이미 중복이 제거된 상태로 담겨 있기 때문에 각 shard 에서 만들어진 HashSet 의 크기만 더해주면 전체 수를 알 수 있다.

 

파이썬만 해봐서 자바는 잘 몰랐는데(위 코드는 자바를 기반으로 한 painless 라는 언어임) 기본을 잘 알고 있으면 언어는 크게 중요하지 않다는 말이 어떤 뜻인지 처음 깨달았다. 가뜩이나 빠른 개발의 세계에서도 시대의 흐름과 무관한 기초가 탄탄해야 함을 다시 느꼈다.

 

 

본론으로 와서 위의 예시를 s.to_dict() 함수를 통해서 json 형태로 볼 수 있다. 매우 유용하니 쿼리가 감이 안올 때는 계속 프린트해서 확인하면서 잘못된 점을 찾아나가야 한다.

쿼리가 완성된 후에는 슬라이싱을 통해서 s=s[:0] 로 hits 를 결과에서 제외할 수 있다. hits 는 위의 모든 조건을 만족하는 document 를 의미하는데 default 10개이고 최대 10000개 까지 가져올 수 있다. 집계 결과만 필요하므로 굳이 document 를 모두 가져올 필요가 없을 때는 위와 같이 페이지네이션을 0으로 할 수 있다.

 

위의 예제를 mysql 로 바꾸면 아래와 같은 의미라고 볼 수 있다.

SELECT	REGION
    ,   COUNT(DISTINCT USER_ID) AS DISTINCT_COUNT
FROM	TEST_INDEX_V1
WHERE	CREATED_DT BETWEEN '2021-01-01' AND '2021-02-01'
    AND GENDER = 'MALE'
GROUP BY REGION

 

여기에 order_by 를 추가하고 싶으면 어떻게 할까?

from elasticsearch_dsl import Search, A, Q

s = Search(using=es, index="test-index-v1").query(
    "bool",
    filter=[
        Q("range", **{"craeated_dt": {"gte": "2021-01-01", "lte": "2021-02-01"}}),
        Q("terms", **{"gender": {"male"}})
    ]
)

s.aggs.bucket("group_by", "terms", field="region", order={"distinct_count": "desc"}, size=1000) \
    .metric("distinct_count", "cardinality", field="user_id")
    

s = s[:0]
res = s.execute()

 

region bucket 에서 바로 아래 metric 의 이름을 order 인자에 키로 넘겨주면 된다. 해당 필드의 key 값에 따라 정렬하고 싶다면, 즉 region 의 이름 순으로 정렬하고 싶다면 order={"_key": "asc"} 와 같이 _key 라는 값을 넘겨주면 된다.

terms 쿼리에서는 최대 집계 대상을 약 21억건까지 할 수 있다. 즉, 21억 row 의 데이터를 대상으로 집계가 가능하다는 뜻이다. 나중에 정리할 composite 쿼리는 최대가 10000개이다.

 

nested 된 필드를 활용한 집계는 어떻게 해야하는가

s.aggs.bucket("inner_nested", "nested", path="innerdoc") \
    .bucket("inner", "terms", field="innerdoc.inner_str_field")
    .bucket("reverse", "reverse_nested") \
    .metric("sum_score", "sum", field="score")

 

일단 nested field 에 접근하기 위해서는 nested 쿼리로 경로를 지정해준다. 그 이후에 group by 하고자 하는 해당 필드에 접근한다. 만약 그 필드를 기준으로 nested 필드 밖에 있는 또다른 필드로 집계를 하고 싶다면 nested 필드를 밖으로 꺼내는 reverse_nested 쿼리를 추가해야 한다. 이를 통해 depth 가 서로 달랐던 두 필드를 동일 depth 로 가져오는 것이다.

 

앞서 말했듯 각 bucket 이 1depth 를 갖는다. 위 코드는 metric 결과까지 총 4depth 를 갖는다. 결과는 빠르게 가져오지만 쿼리에 따라 depth 가 달라지므로 document 를 정의했던 것과 같이 일정한 모델로 만들기가 매우 까다롭다.

가지고 온 데이터를 로직에서 후처리해주는 과정이 다소 복잡하다.

 

이번에는 동시에 여러 필드를 기준으로 집계하는 composite 쿼리에 대해서 본다.

 

만약에 날짜 정보가 년,월,주로 구분되어 있는데 이 세가지 필드로 group by 를 하고 싶다면 각각을 bucket 으로 만들면 된다. 하지만 그럴 필요 없이 하나의 bucket 안에 담을 수도 있다.

 

sources = [
    {"year": A("terms", field=year_value)},
    {"month": A("terms", field=month_value)},
    {"week": A("terms", field=week_value)}
]

s.aggs.bucket("group_by", "composite", sources=sources, size=10000)\
    .metric("date_count", "cardinality", field="created_dt")

 

위 코드는 예를 들어 filter 에서 2020년 1월 1일 ~ 2021년 2월 1일 까지 범위로 잡았다고 했을 때,

년,월,주 별로 며칠씩이 있는지를 카운트하는 쿼리이다. 1월 1일이 월요일이고 2월 1일이 일요일이라면 당연히 모든 주차별로 7일씩 카운트 될 것이다.

 

composite 쿼리는 최대 10000건을 대상으로 집계할 수 있으며 그 이상은 after_key 라는 값을 통해서 반복문으로 가져와야 한다.

 

 

이번에는 bucket 내부에서 한 번 더 계산하는 bucket_script 쿼리이다.

s.aggs.bucket("group_by", "terms", field="user_id", order={"avg_score": "desc"}, size=1000)\
    .pipeline(
        "avg_score",
        "bucket_script",
        buckets_path={"sum_score": "sum_score"},
        script=f"params.sum_score / 31 * 100",
    )\
    .metric( "sum_score", "sum", field="score")\
    .bucket(
        "get_source", "top_hits", sort=[{"created_dt": {"order": "desc"}}], size=1
    )

 

위 쿼리는 꽤 복잡해보인다.

의도는 각 유저별로 일별 점수가 있다고 가정했을 때, filter 에서 정의한 기간 동안의 유저의 모든 점수를 가져와서 합산하고 이를 31로 나눈뒤에 100으로 나누는 평균 점수를 계산하고자 한 것이다.

추가로 합산 점수를 기준으로 내림차순으로 정렬한 뒤, 해당 유저에 대한 필터링 기간 내에서 가장 최근 document 를 가져왔다.

 

  1. group_by 라는 이름으로 user_id 필드 기준 bucket 을 만들었다. 정렬은 평균 점수로 했으며 해당 집계결과는 바로 다음 pipeline 에 정의되어 있다.
  2. pipeline 은 bucket 기준으로 집계된 metric 을 다시 한 번 계산하거나 정렬하거나 페이지네이션할 때 사용한다. 위에서는 bucket_script 를 통해서 집계를 해주었다. 인자로는 bucket_path 와 script 를 받는다. bucket_path 의 키는 script 에서 사용할 변수명이고 value 는 사용하고자 하는 metric 의 경로이다. 만약 밑의 depth 에 있다면 "parent bucket name > metric name" 과 같이 부등호를 통해서 경로를 지정한다. 반대로 depth 위로 올라가는 경로는 불가능하다. 또한 완전히 분리된 서로 다른 bucket 으로 접근하는 것도 불가능하다. script 는 일반적인 연산방식과 동일하게 표현할 수 있다. 다만 bucket_path 에 지정된 키 값을 활용하여 params.key 와 같은 표현식으로 접근해야 한다.
  3. top_hits 쿼리는 정렬 기준에 만족하는 document 를 가져오며 결과는 hit 와 같다. hit 는 bool 에 필터에 모두 만족하는 결과를 전부 다 가져오므로 집계 기준에 부합하는 document 만 가져오고 싶을 때 top_hits 를 사용한다.

만약에 집계를 특정 두 기간에 대해서 따로 하고 싶을 때는 어떻게 해야 할까?

s.aggs.bucket(
    "group_by",
    "range",
    field="created_dt",
    ranges=[
        {"from": "2020-01-01", "to": "2020-02-01||+1d"},
        {"from": "2021-01-01", "to": "2021-02-01||+1d"},
    ],
format="yyyy-MM-dd",
)

 

bucket 을 range 쿼리로 작성하면 되는데, 이 때 범위를 리스트로 넣으면 된다. gte, lte 키워드는 사용이 안되며, 대신 from / to 를 사용할 수 있다. from / to 는 filter 에서도 사용 가능한데 범위의 차이가 있다. aggregation 에서의 from / to 는 이상/미만이다. 즉 to 의 일자는 해당 일자를 포함하지 않는다. 따라서 이를 위해서는 종료일자에 하루를 더해주어야 하는데 ES 내부적으로 Date math 라는 방식으로 일자를 조정할 수 있다.

https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math

 

Common options | Elasticsearch Guide [7.15] | Elastic

The following options can be applied to all of the REST APIs. Pretty Resultsedit When appending ?pretty=true to any request made, the JSON returned will be pretty formatted (use it for debugging only!). Another option is to set ?format=yaml which will caus

www.elastic.co

 

마지막으로 bucket_sort 는 bucket 을 정렬하고 페이지네이션 하는 기능이다.

s.aggs.bucket(
    "group_by", "terms", field="user_id", size=max_size)\
    .metric("sum_score", "sum", field="score")\
    .pipeline(
        "sorted_result", 
        "bucket_sort", 
        "sort": [{"sum_score": {"order": "desc"}}], 
        **{"from": 0, "size": 100}
    )

 

bucket_sort 쿼리에서 from, size 인자를 넘겨주면 페이지네이션이 가능하며, sort 인자를 넘겨주면 기준에 따라 정렬도 가능하다.

 

 

쿼리를 짜는 과정에서 terms 에 한 번에 여러 필드를 넣고 싶었는데 ES 에서 multi_terms 라는 쿼리를 제공하고 있다. 다만 아직 파이썬 DSL 패키지에는 적용이 안되어있다. 나중에 업데이트 되면 사용해보면 좋을 것 같다.

https://www.elastic.co/guide/en/elasticsearch//reference/master/search-aggregations-bucket-multi-terms-aggregation.html#:~:text=Multi%20Terms%20aggregationedit,and%20will%20consume%20more%20memory.

'elastic search' 카테고리의 다른 글

Elastic Search DSL(Search)  (0) 2021.11.10
Elastic Search DSL(Connection, Index, Document)  (0) 2021.11.08