사실 ES 에서 이 Aggregation 이 제일 헬이고 핵심이다.
쿼리를 어떤 순서로 짜느냐에 따라서 결과의 depth 도 달라진다. 상당히 애를 많이 먹었기 때문에 머리가 땃땃할 때 정리해 놓는 게 좋다.
일단 Search 객체로 필터링 조건을 정의하고 나면, mysql 로 따졌을 때 WHERE 조건을 정의했다고 보면 된다. 이제 group by, order by 를 해야 되는데 이 부분에 대해 정리해본다.
elasticsearh dsl 패키지에서 집계를 위해 A 객체를 제공한다.
Q 와 유사한 형태로 A 객체 안에 쿼리 조건을 정의한다. 이 부분이 group by 를 위해 어떤 필드를 select 할 것인가라고 볼 수 있다.
search 의 결과에 집계를 이어가고 싶은 경우에는 aggs 키워드를 사용할 수 있다.
예시로 보자.
from elasticsearch_dsl import Search, A, Q
s = Search(using=es, index="test-index-v1").query(
"bool",
filter=[
Q("range", **{"craeated_dt": {"gte": "2021-01-01", "lte": "2021-02-01"}}),
Q("terms", **{"gender": {"male"}})
]
)
s.aggs.bucket("group_by", "terms", field="region") \
.metric("distinct_count", "cardinality", field="user_id")
s = s[:0]
res = s.execute()
필터링 조건을 완성한 뒤, 해당 객체에 aggregation 쿼리를 추가한다. 기본적으로 group by 를 적용하기 위한 필드 하나 당 bucket 하나다. 해당 bucket 뒤에 바로 metric 을 통해 어떤 연산을 하고 싶은지를 붙인다. cardinality 는 distinct value 를 카운트 한다.
예제에서는 남성이면서 db 기준 생성일자가 2021년 1월 ~ 2월 사이의 유저를 대상으로 각 지역별 중복 제거된 유저 수를 구하는 쿼리다.
참고) cardinality 는 항상 정확하지 않다. 유사도를 기초로한 카운트라서 동일하지 않음에도 동일한 것으로 간주하고 카운트하는 경우가 발생할 수 있다. 이럴 때는 precision_threshold 인자를 최대 40000 까지 적용가능하며 (default=3000) 그 이상은 의미가 없다. 그럼에도 정확하게 안나올 때는 다른 방식을 사용해야 한다.
https://pratik-patil.medium.com/accurate-distinct-count-and-values-from-elasticsearch-dadce51ad4a7
bucket script 라는 방식으로 집계 가능하며 내장된 default 언어인 painless 혹은 java 로 직접 스크립트를 짜서 적용할 수 있다.
s.aggs.metric(
"count",
"scripted_metric",
params={"fieldName": "id"},
init_script="state.list = []",
map_script="""
if(doc[params.fieldName] != null)
state.list.add(doc[params.fieldName].value);
""",
combine_script="return state.list;",
reduce_script="""
Map uniqueValueMap = new HashMap();
int
count = 0;
for (shardList in states) {
if (shardList != null) {
for (key in shardList) {
if (!uniqueValueMap.containsKey(key)) {
count += 1;
uniqueValueMap.put(key, key);
}
}
}
}
return count;
""",
)
위 코드를 실행하면 물론 잘 나온다. 다만, 데이터가 커지니까 JVM memory pressure 가 75% 넘어가면서 429 에러를 뱉은 후에 죽어버렸다. 어쩔 수 없이 위 코드를 좀 만져보았고, 그나마 메모리 측면에서 조금 개선할 수 있었다.
s.aggs.metric(
"count",
"scripted_metric",
params={"fieldName": "id"},
init_script="state.set = new HashSet();",
map_script="""
state.set.add(doc[params.fieldName].value);
""",
combine_script="return state.set;",
reduce_script="""
int count = 0;
for (eachSet in states) {
count += eachSet.size()
}
return count;
""",
)
일단 HashSet 을 사용해서 애초에 담을 때부터 중복 없이 담았다. ArrayList 보다 메모리를 많이 차지하고 느리지만 이후에 반복문으로 다시 중복 여부를 걸러내고, 따로 또 담는 작업을 하는 것보다는 훨씬 효율적이다.
각 shard 에서 combine_script 까지 실행된 후에 reduce_script 에서 합치는데, 이 때 states 밑에 저장된 각각의 HashSet 을 반복문으로 가져오면서 size 를 count 변수에 더한다. 이미 중복이 제거된 상태로 담겨 있기 때문에 각 shard 에서 만들어진 HashSet 의 크기만 더해주면 전체 수를 알 수 있다.
파이썬만 해봐서 자바는 잘 몰랐는데(위 코드는 자바를 기반으로 한 painless 라는 언어임) 기본을 잘 알고 있으면 언어는 크게 중요하지 않다는 말이 어떤 뜻인지 처음 깨달았다. 가뜩이나 빠른 개발의 세계에서도 시대의 흐름과 무관한 기초가 탄탄해야 함을 다시 느꼈다.
본론으로 와서 위의 예시를 s.to_dict() 함수를 통해서 json 형태로 볼 수 있다. 매우 유용하니 쿼리가 감이 안올 때는 계속 프린트해서 확인하면서 잘못된 점을 찾아나가야 한다.
쿼리가 완성된 후에는 슬라이싱을 통해서 s=s[:0] 로 hits 를 결과에서 제외할 수 있다. hits 는 위의 모든 조건을 만족하는 document 를 의미하는데 default 10개이고 최대 10000개 까지 가져올 수 있다. 집계 결과만 필요하므로 굳이 document 를 모두 가져올 필요가 없을 때는 위와 같이 페이지네이션을 0으로 할 수 있다.
위의 예제를 mysql 로 바꾸면 아래와 같은 의미라고 볼 수 있다.
SELECT REGION
, COUNT(DISTINCT USER_ID) AS DISTINCT_COUNT
FROM TEST_INDEX_V1
WHERE CREATED_DT BETWEEN '2021-01-01' AND '2021-02-01'
AND GENDER = 'MALE'
GROUP BY REGION
여기에 order_by 를 추가하고 싶으면 어떻게 할까?
from elasticsearch_dsl import Search, A, Q
s = Search(using=es, index="test-index-v1").query(
"bool",
filter=[
Q("range", **{"craeated_dt": {"gte": "2021-01-01", "lte": "2021-02-01"}}),
Q("terms", **{"gender": {"male"}})
]
)
s.aggs.bucket("group_by", "terms", field="region", order={"distinct_count": "desc"}, size=1000) \
.metric("distinct_count", "cardinality", field="user_id")
s = s[:0]
res = s.execute()
region bucket 에서 바로 아래 metric 의 이름을 order 인자에 키로 넘겨주면 된다. 해당 필드의 key 값에 따라 정렬하고 싶다면, 즉 region 의 이름 순으로 정렬하고 싶다면 order={"_key": "asc"} 와 같이 _key 라는 값을 넘겨주면 된다.
terms 쿼리에서는 최대 집계 대상을 약 21억건까지 할 수 있다. 즉, 21억 row 의 데이터를 대상으로 집계가 가능하다는 뜻이다. 나중에 정리할 composite 쿼리는 최대가 10000개이다.
nested 된 필드를 활용한 집계는 어떻게 해야하는가
s.aggs.bucket("inner_nested", "nested", path="innerdoc") \
.bucket("inner", "terms", field="innerdoc.inner_str_field")
.bucket("reverse", "reverse_nested") \
.metric("sum_score", "sum", field="score")
일단 nested field 에 접근하기 위해서는 nested 쿼리로 경로를 지정해준다. 그 이후에 group by 하고자 하는 해당 필드에 접근한다. 만약 그 필드를 기준으로 nested 필드 밖에 있는 또다른 필드로 집계를 하고 싶다면 nested 필드를 밖으로 꺼내는 reverse_nested 쿼리를 추가해야 한다. 이를 통해 depth 가 서로 달랐던 두 필드를 동일 depth 로 가져오는 것이다.
앞서 말했듯 각 bucket 이 1depth 를 갖는다. 위 코드는 metric 결과까지 총 4depth 를 갖는다. 결과는 빠르게 가져오지만 쿼리에 따라 depth 가 달라지므로 document 를 정의했던 것과 같이 일정한 모델로 만들기가 매우 까다롭다.
가지고 온 데이터를 로직에서 후처리해주는 과정이 다소 복잡하다.
이번에는 동시에 여러 필드를 기준으로 집계하는 composite 쿼리에 대해서 본다.
만약에 날짜 정보가 년,월,주로 구분되어 있는데 이 세가지 필드로 group by 를 하고 싶다면 각각을 bucket 으로 만들면 된다. 하지만 그럴 필요 없이 하나의 bucket 안에 담을 수도 있다.
sources = [
{"year": A("terms", field=year_value)},
{"month": A("terms", field=month_value)},
{"week": A("terms", field=week_value)}
]
s.aggs.bucket("group_by", "composite", sources=sources, size=10000)\
.metric("date_count", "cardinality", field="created_dt")
위 코드는 예를 들어 filter 에서 2020년 1월 1일 ~ 2021년 2월 1일 까지 범위로 잡았다고 했을 때,
년,월,주 별로 며칠씩이 있는지를 카운트하는 쿼리이다. 1월 1일이 월요일이고 2월 1일이 일요일이라면 당연히 모든 주차별로 7일씩 카운트 될 것이다.
composite 쿼리는 최대 10000건을 대상으로 집계할 수 있으며 그 이상은 after_key 라는 값을 통해서 반복문으로 가져와야 한다.
이번에는 bucket 내부에서 한 번 더 계산하는 bucket_script 쿼리이다.
s.aggs.bucket("group_by", "terms", field="user_id", order={"avg_score": "desc"}, size=1000)\
.pipeline(
"avg_score",
"bucket_script",
buckets_path={"sum_score": "sum_score"},
script=f"params.sum_score / 31 * 100",
)\
.metric( "sum_score", "sum", field="score")\
.bucket(
"get_source", "top_hits", sort=[{"created_dt": {"order": "desc"}}], size=1
)
위 쿼리는 꽤 복잡해보인다.
의도는 각 유저별로 일별 점수가 있다고 가정했을 때, filter 에서 정의한 기간 동안의 유저의 모든 점수를 가져와서 합산하고 이를 31로 나눈뒤에 100으로 나누는 평균 점수를 계산하고자 한 것이다.
추가로 합산 점수를 기준으로 내림차순으로 정렬한 뒤, 해당 유저에 대한 필터링 기간 내에서 가장 최근 document 를 가져왔다.
- group_by 라는 이름으로 user_id 필드 기준 bucket 을 만들었다. 정렬은 평균 점수로 했으며 해당 집계결과는 바로 다음 pipeline 에 정의되어 있다.
- pipeline 은 bucket 기준으로 집계된 metric 을 다시 한 번 계산하거나 정렬하거나 페이지네이션할 때 사용한다. 위에서는 bucket_script 를 통해서 집계를 해주었다. 인자로는 bucket_path 와 script 를 받는다. bucket_path 의 키는 script 에서 사용할 변수명이고 value 는 사용하고자 하는 metric 의 경로이다. 만약 밑의 depth 에 있다면 "parent bucket name > metric name" 과 같이 부등호를 통해서 경로를 지정한다. 반대로 depth 위로 올라가는 경로는 불가능하다. 또한 완전히 분리된 서로 다른 bucket 으로 접근하는 것도 불가능하다. script 는 일반적인 연산방식과 동일하게 표현할 수 있다. 다만 bucket_path 에 지정된 키 값을 활용하여 params.key 와 같은 표현식으로 접근해야 한다.
- top_hits 쿼리는 정렬 기준에 만족하는 document 를 가져오며 결과는 hit 와 같다. hit 는 bool 에 필터에 모두 만족하는 결과를 전부 다 가져오므로 집계 기준에 부합하는 document 만 가져오고 싶을 때 top_hits 를 사용한다.
만약에 집계를 특정 두 기간에 대해서 따로 하고 싶을 때는 어떻게 해야 할까?
s.aggs.bucket(
"group_by",
"range",
field="created_dt",
ranges=[
{"from": "2020-01-01", "to": "2020-02-01||+1d"},
{"from": "2021-01-01", "to": "2021-02-01||+1d"},
],
format="yyyy-MM-dd",
)
bucket 을 range 쿼리로 작성하면 되는데, 이 때 범위를 리스트로 넣으면 된다. gte, lte 키워드는 사용이 안되며, 대신 from / to 를 사용할 수 있다. from / to 는 filter 에서도 사용 가능한데 범위의 차이가 있다. aggregation 에서의 from / to 는 이상/미만이다. 즉 to 의 일자는 해당 일자를 포함하지 않는다. 따라서 이를 위해서는 종료일자에 하루를 더해주어야 하는데 ES 내부적으로 Date math 라는 방식으로 일자를 조정할 수 있다.
https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
마지막으로 bucket_sort 는 bucket 을 정렬하고 페이지네이션 하는 기능이다.
s.aggs.bucket(
"group_by", "terms", field="user_id", size=max_size)\
.metric("sum_score", "sum", field="score")\
.pipeline(
"sorted_result",
"bucket_sort",
"sort": [{"sum_score": {"order": "desc"}}],
**{"from": 0, "size": 100}
)
bucket_sort 쿼리에서 from, size 인자를 넘겨주면 페이지네이션이 가능하며, sort 인자를 넘겨주면 기준에 따라 정렬도 가능하다.
쿼리를 짜는 과정에서 terms 에 한 번에 여러 필드를 넣고 싶었는데 ES 에서 multi_terms 라는 쿼리를 제공하고 있다. 다만 아직 파이썬 DSL 패키지에는 적용이 안되어있다. 나중에 업데이트 되면 사용해보면 좋을 것 같다.
'elastic search' 카테고리의 다른 글
Elastic Search DSL(Search) (0) | 2021.11.10 |
---|---|
Elastic Search DSL(Connection, Index, Document) (0) | 2021.11.08 |