Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,47 @@ async fn search_partial_hits_phase_with_scroll(
}
}

/// Check if the request is a count request without any filters, so we can just return the split
/// metadata count.
///
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
fn is_metadata_count_request(request: &SearchRequest) -> bool {
let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap();
if query_ast != QueryAst::MatchAll {
return false;
}
if request.max_hits != 0 {
return false;
}

// TODO: if the start and end timestamp encompass the whole split, it is still a count query
// So some could be checked on metadata
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return false;
}
if request.aggregation_request.is_some()
|| !request.snippet_fields.is_empty()
|| request.search_after.is_some()
{
return false;
}
true
}

/// Get a leaf search response that returns the num_docs of the split
fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
split_metadatas
.iter()
.map(|metadata| LeafSearchResponse {
num_hits: metadata.num_docs as u64,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
intermediate_aggregation_result: None,
})
.collect()
}

#[instrument(level = "debug", skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
Expand All @@ -584,20 +625,29 @@ pub(crate) async fn search_partial_hits_phase(
split_metadatas: &[SplitMetadata],
cluster_client: &ClusterClient,
) -> crate::Result<LeafSearchResponse> {
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_requests =
jobs_to_leaf_requests(search_request, indexes_metas_for_leaf_search, client_jobs)?;
for leaf_request in leaf_requests {
leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
}
let leaf_search_responses: Vec<LeafSearchResponse> = try_join_all(leaf_request_tasks).await?;
let leaf_search_responses: Vec<LeafSearchResponse> =
if is_metadata_count_request(search_request) {
get_count_from_metadata(split_metadatas)
} else {
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_requests = jobs_to_leaf_requests(
search_request,
indexes_metas_for_leaf_search,
client_jobs,
)?;
for leaf_request in leaf_requests {
leaf_request_tasks
.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
}
try_join_all(leaf_request_tasks).await?
};

// Creates a collector which merges responses into one
let merge_collector =
Expand Down
12 changes: 12 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use warp::{Filter, Rejection};

use super::model::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams,
SearchQueryParamsCount,
};
use crate::elastic_search_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
Expand Down Expand Up @@ -159,6 +160,17 @@ pub(crate) fn elastic_field_capabilities_filter() -> impl Filter<
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Count", path = "/{index}/_count")]
pub(crate) fn elastic_index_count_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone
{
warp::path!("_elastic" / String / "_count")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
.and(serde_qs::warp::query(serde_qs::Config::default()))
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use rest_handler::{
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use self::rest_handler::es_compat_index_field_capabilities_handler;
use self::rest_handler::{
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
};
use crate::elastic_search_api::model::ElasticSearchError;
use crate::json_api_response::JsonApiResponse;
use crate::{BodyFormat, BuildInfo};
Expand All @@ -57,6 +59,7 @@ pub fn elastic_api_handlers(
es_compat_cluster_info_handler(node_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_index_count_handler(search_service.clone()))
.or(es_compat_scroll_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service.clone()))
.or(es_compat_index_field_capabilities_handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use multi_search::{
use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::SearchQueryParams;
pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,64 @@ pub struct SearchQueryParams {
pub version: Option<bool>,
}

#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SearchQueryParamsCount {
#[serde(default)]
pub allow_no_indices: Option<bool>,
#[serde(default)]
pub analyze_wildcard: Option<bool>,
#[serde(default)]
pub analyzer: Option<String>,
#[serde(default)]
pub default_operator: Option<BooleanOperand>,
#[serde(default)]
pub df: Option<String>,
#[serde(serialize_with = "to_simple_list")]
#[serde(deserialize_with = "from_simple_list")]
#[serde(default)]
pub expand_wildcards: Option<Vec<ExpandWildcards>>,
#[serde(default)]
pub ignore_throttled: Option<bool>,
#[serde(default)]
pub ignore_unavailable: Option<bool>,
#[serde(default)]
pub lenient: Option<bool>,
#[serde(default)]
pub max_concurrent_shard_requests: Option<u64>,
#[serde(default)]
pub preference: Option<String>,
#[serde(default)]
pub q: Option<String>,
#[serde(default)]
pub request_cache: Option<bool>,
#[serde(serialize_with = "to_simple_list")]
#[serde(deserialize_with = "from_simple_list")]
#[serde(default)]
pub routing: Option<Vec<String>>,
}
impl From<SearchQueryParamsCount> for SearchQueryParams {
fn from(value: SearchQueryParamsCount) -> Self {
SearchQueryParams {
allow_no_indices: value.allow_no_indices,
analyze_wildcard: value.analyze_wildcard,
analyzer: value.analyzer,
default_operator: value.default_operator,
df: value.df,
expand_wildcards: value.expand_wildcards,
ignore_throttled: value.ignore_throttled,
ignore_unavailable: value.ignore_unavailable,
preference: value.preference,
q: value.q,
request_cache: value.request_cache,
routing: value.routing,
size: Some(0),
..Default::default()
}
}
}

// Parse a single sort field parameter from ES sort query string parameter.
fn parse_sort_field_str(sort_field_str: &str) -> Result<SortField, SearchError> {
if let Some((field, order_str)) = sort_field_str.split_once(':') {
Expand Down
35 changes: 34 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ use quickwit_proto::ServiceErrorCode;
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
use quickwit_search::{SearchError, SearchService};
use serde::{Deserialize, Serialize};
use serde_json::json;
use warp::{Filter, Rejection};

use super::filter::{
elastic_cluster_info_filter, elastic_field_capabilities_filter,
elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter,
elastic_index_field_capabilities_filter, elastic_index_search_filter,
elastic_multi_search_filter, elastic_scroll_filter, elastic_search_filter,
};
Expand All @@ -51,6 +52,7 @@ use super::model::{
ElasticSearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
SearchQueryParamsCount,
};
use super::{make_elastic_api_response, TrackTotalHits};
use crate::format::BodyFormat;
Expand Down Expand Up @@ -119,6 +121,16 @@ pub fn es_compat_index_search_handler(
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET or POST _elastic/{index}/_count
pub fn es_compat_index_count_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_count_filter()
.and(with_arg(search_service))
.then(es_compat_index_count)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// POST _elastic/_search
pub fn es_compat_index_multi_search_handler(
search_service: Arc<dyn SearchService>,
Expand Down Expand Up @@ -287,6 +299,27 @@ fn partial_hit_from_search_after_param(
Ok(Some(parsed_search_after))
}

#[derive(Debug, Serialize, Deserialize)]
struct ElasticSearchCountResponse {
count: u64,
}

async fn es_compat_index_count(
index_id_patterns: Vec<String>,
search_params: SearchQueryParamsCount,
search_body: SearchBody,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticSearchCountResponse, ElasticSearchError> {
let search_params: SearchQueryParams = search_params.into();
let (search_request, _append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let search_response_rest: ElasticSearchCountResponse = ElasticSearchCountResponse {
count: search_response.num_hits,
};
Ok(search_response_rest)
}

async fn es_compat_index_search(
index_id_patterns: Vec<String>,
search_params: SearchQueryParams,
Expand Down
10 changes: 10 additions & 0 deletions quickwit/rest-api-tests/scenarii/es_compatibility/0019-count.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
endpoint: "gharchive/_count"
params:
q: type:PushEvent
expected:
count: 60
---
endpoint: "gharchive/_count"
expected:
count: 100