Skip to content

Commit eb70bec

Browse files
committed
add es _count API
add elastic search /{index}/_count API add count optimization in search request to use metadata if possible
1 parent ee21256 commit eb70bec

File tree

7 files changed

+182
-17
lines changed

7 files changed

+182
-17
lines changed

quickwit/quickwit-search/src/root.rs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,47 @@ async fn search_partial_hits_phase_with_scroll(
576576
}
577577
}
578578

579+
/// Check if the request is a count request without any filters, so we can just check the split
580+
/// metadata.
581+
///
582+
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
583+
fn is_metadata_count_request(request: &SearchRequest) -> bool {
584+
let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap();
585+
if query_ast != QueryAst::MatchAll {
586+
return false;
587+
}
588+
if request.max_hits != 0 || request.start_offset != 0 {
589+
return false;
590+
}
591+
592+
// TODO: if the start and end timestamp encompass the whole split, it is still a count query
593+
// So some could be checked on metadata
594+
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
595+
return false;
596+
}
597+
if request.aggregation_request.is_some()
598+
|| !request.snippet_fields.is_empty()
599+
|| request.search_after.is_some()
600+
{
601+
return false;
602+
}
603+
true
604+
}
605+
606+
/// Get a leaf search response that returns the num_docs of the split
607+
fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
608+
split_metadatas
609+
.iter()
610+
.map(|metadata| LeafSearchResponse {
611+
num_hits: metadata.num_docs as u64,
612+
partial_hits: Vec::new(),
613+
failed_splits: Vec::new(),
614+
num_attempted_splits: 1,
615+
intermediate_aggregation_result: None,
616+
})
617+
.collect()
618+
}
619+
579620
#[instrument(level = "debug", skip_all)]
580621
pub(crate) async fn search_partial_hits_phase(
581622
searcher_context: &SearcherContext,
@@ -584,20 +625,29 @@ pub(crate) async fn search_partial_hits_phase(
584625
split_metadatas: &[SplitMetadata],
585626
cluster_client: &ClusterClient,
586627
) -> crate::Result<LeafSearchResponse> {
587-
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
588-
let assigned_leaf_search_jobs = cluster_client
589-
.search_job_placer
590-
.assign_jobs(jobs, &HashSet::default())
591-
.await?;
592-
let mut leaf_request_tasks = Vec::new();
593-
for (client, client_jobs) in assigned_leaf_search_jobs {
594-
let leaf_requests =
595-
jobs_to_leaf_requests(search_request, indexes_metas_for_leaf_search, client_jobs)?;
596-
for leaf_request in leaf_requests {
597-
leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
598-
}
599-
}
600-
let leaf_search_responses: Vec<LeafSearchResponse> = try_join_all(leaf_request_tasks).await?;
628+
let leaf_search_responses: Vec<LeafSearchResponse> =
629+
if is_metadata_count_request(search_request) {
630+
get_count_from_metadata(split_metadatas)
631+
} else {
632+
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
633+
let assigned_leaf_search_jobs = cluster_client
634+
.search_job_placer
635+
.assign_jobs(jobs, &HashSet::default())
636+
.await?;
637+
let mut leaf_request_tasks = Vec::new();
638+
for (client, client_jobs) in assigned_leaf_search_jobs {
639+
let leaf_requests = jobs_to_leaf_requests(
640+
search_request,
641+
indexes_metas_for_leaf_search,
642+
client_jobs,
643+
)?;
644+
for leaf_request in leaf_requests {
645+
leaf_request_tasks
646+
.push(cluster_client.leaf_search(leaf_request, client.clone()));
647+
}
648+
}
649+
try_join_all(leaf_request_tasks).await?
650+
};
601651

602652
// Creates a collector which merges responses into one
603653
let merge_collector =

quickwit/quickwit-serve/src/elastic_search_api/filter.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use warp::{Filter, Rejection};
2525

2626
use super::model::{
2727
FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams,
28+
SearchQueryParamsCount,
2829
};
2930
use crate::elastic_search_api::model::{
3031
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
@@ -159,6 +160,17 @@ pub(crate) fn elastic_field_capabilities_filter() -> impl Filter<
159160
.and(json_or_empty())
160161
}
161162

163+
#[utoipa::path(get, tag = "Count", path = "/{index}/_count")]
164+
pub(crate) fn elastic_index_count_filter(
165+
) -> impl Filter<Extract = (Vec<String>, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone
166+
{
167+
warp::path!("_elastic" / String / "_count")
168+
.and_then(extract_index_id_patterns)
169+
.and(warp::get().or(warp::post()).unify())
170+
.and(serde_qs::warp::query(serde_qs::Config::default()))
171+
.and(json_or_empty())
172+
}
173+
162174
#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
163175
pub(crate) fn elastic_index_search_filter(
164176
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone

quickwit/quickwit-serve/src/elastic_search_api/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ use rest_handler::{
3939
use serde::{Deserialize, Serialize};
4040
use warp::{Filter, Rejection};
4141

42-
use self::rest_handler::es_compat_index_field_capabilities_handler;
42+
use self::rest_handler::{
43+
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
44+
};
4345
use crate::elastic_search_api::model::ElasticSearchError;
4446
use crate::json_api_response::JsonApiResponse;
4547
use crate::{BodyFormat, BuildInfo};
@@ -57,6 +59,7 @@ pub fn elastic_api_handlers(
5759
es_compat_cluster_info_handler(node_config, BuildInfo::get())
5860
.or(es_compat_search_handler(search_service.clone()))
5961
.or(es_compat_index_search_handler(search_service.clone()))
62+
.or(es_compat_index_count_handler(search_service.clone()))
6063
.or(es_compat_scroll_handler(search_service.clone()))
6164
.or(es_compat_index_multi_search_handler(search_service.clone()))
6265
.or(es_compat_index_field_capabilities_handler(

quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub use multi_search::{
3939
use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
4040
pub use scroll::ScrollQueryParams;
4141
pub use search_body::SearchBody;
42-
pub use search_query_params::SearchQueryParams;
42+
pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount};
4343
use serde::{Deserialize, Serialize};
4444

4545
#[derive(Debug, Clone, Eq, PartialEq)]

quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,63 @@ pub struct SearchQueryParams {
152152
pub version: Option<bool>,
153153
}
154154

155+
#[serde_with::skip_serializing_none]
156+
#[derive(Default, Debug, Serialize, Deserialize)]
157+
#[serde(deny_unknown_fields)]
158+
pub struct SearchQueryParamsCount {
159+
#[serde(default)]
160+
pub allow_no_indices: Option<bool>,
161+
#[serde(default)]
162+
pub analyze_wildcard: Option<bool>,
163+
#[serde(default)]
164+
pub analyzer: Option<String>,
165+
#[serde(default)]
166+
pub default_operator: Option<BooleanOperand>,
167+
#[serde(default)]
168+
pub df: Option<String>,
169+
#[serde(serialize_with = "to_simple_list")]
170+
#[serde(deserialize_with = "from_simple_list")]
171+
#[serde(default)]
172+
pub expand_wildcards: Option<Vec<ExpandWildcards>>,
173+
#[serde(default)]
174+
pub ignore_throttled: Option<bool>,
175+
#[serde(default)]
176+
pub ignore_unavailable: Option<bool>,
177+
#[serde(default)]
178+
pub lenient: Option<bool>,
179+
#[serde(default)]
180+
pub max_concurrent_shard_requests: Option<u64>,
181+
#[serde(default)]
182+
pub preference: Option<String>,
183+
#[serde(default)]
184+
pub q: Option<String>,
185+
#[serde(default)]
186+
pub request_cache: Option<bool>,
187+
#[serde(serialize_with = "to_simple_list")]
188+
#[serde(deserialize_with = "from_simple_list")]
189+
#[serde(default)]
190+
pub routing: Option<Vec<String>>,
191+
}
192+
impl From<SearchQueryParamsCount> for SearchQueryParams {
193+
fn from(value: SearchQueryParamsCount) -> Self {
194+
SearchQueryParams {
195+
allow_no_indices: value.allow_no_indices,
196+
analyze_wildcard: value.analyze_wildcard,
197+
analyzer: value.analyzer,
198+
default_operator: value.default_operator,
199+
df: value.df,
200+
expand_wildcards: value.expand_wildcards,
201+
ignore_throttled: value.ignore_throttled,
202+
ignore_unavailable: value.ignore_unavailable,
203+
preference: value.preference,
204+
q: value.q,
205+
request_cache: value.request_cache,
206+
routing: value.routing,
207+
..Default::default()
208+
}
209+
}
210+
}
211+
155212
// Parse a single sort field parameter from ES sort query string parameter.
156213
fn parse_sort_field_str(sort_field_str: &str) -> Result<SortField, SearchError> {
157214
if let Some((field, order_str)) = sort_field_str.split_once(':') {

quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ use quickwit_proto::ServiceErrorCode;
3838
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
3939
use quickwit_query::BooleanOperand;
4040
use quickwit_search::{SearchError, SearchService};
41+
use serde::{Deserialize, Serialize};
4142
use serde_json::json;
4243
use warp::{Filter, Rejection};
4344

4445
use super::filter::{
45-
elastic_cluster_info_filter, elastic_field_capabilities_filter,
46+
elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter,
4647
elastic_index_field_capabilities_filter, elastic_index_search_filter,
4748
elastic_multi_search_filter, elastic_scroll_filter, elastic_search_filter,
4849
};
@@ -51,6 +52,7 @@ use super::model::{
5152
ElasticSearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
5253
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
5354
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
55+
SearchQueryParamsCount,
5456
};
5557
use super::{make_elastic_api_response, TrackTotalHits};
5658
use crate::format::BodyFormat;
@@ -119,6 +121,16 @@ pub fn es_compat_index_search_handler(
119121
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
120122
}
121123

124+
/// GET or POST _elastic/{index}/_count
125+
pub fn es_compat_index_count_handler(
126+
search_service: Arc<dyn SearchService>,
127+
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
128+
elastic_index_count_filter()
129+
.and(with_arg(search_service))
130+
.then(es_compat_index_count)
131+
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
132+
}
133+
122134
/// POST _elastic/_search
123135
pub fn es_compat_index_multi_search_handler(
124136
search_service: Arc<dyn SearchService>,
@@ -287,6 +299,27 @@ fn partial_hit_from_search_after_param(
287299
Ok(Some(parsed_search_after))
288300
}
289301

302+
#[derive(Debug, Serialize, Deserialize)]
303+
struct ElasticSearchCountResponse {
304+
count: u64,
305+
}
306+
307+
async fn es_compat_index_count(
308+
index_id_patterns: Vec<String>,
309+
search_params: SearchQueryParamsCount,
310+
search_body: SearchBody,
311+
search_service: Arc<dyn SearchService>,
312+
) -> Result<ElasticSearchCountResponse, ElasticSearchError> {
313+
let search_params: SearchQueryParams = search_params.into();
314+
let (search_request, _append_shard_doc) =
315+
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
316+
let search_response: SearchResponse = search_service.root_search(search_request).await?;
317+
let search_response_rest: ElasticSearchCountResponse = ElasticSearchCountResponse {
318+
count: search_response.num_hits,
319+
};
320+
Ok(search_response_rest)
321+
}
322+
290323
async fn es_compat_index_search(
291324
index_id_patterns: Vec<String>,
292325
search_params: SearchQueryParams,
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
endpoint: "gharchive/_count"
2+
params:
3+
q: type:PushEvent
4+
expected:
5+
count: 60
6+
---
7+
endpoint: "gharchive/_count"
8+
expected:
9+
count: 100
10+

0 commit comments

Comments
 (0)