Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use anyhow::{bail, Context};
use clap::{arg, Arg, ArgAction, ArgMatches, Command};
use quickwit_serve::EnvFilterReloadFn;
use tracing::Level;

use crate::index::{build_index_command, IndexCliCommand};
Expand Down Expand Up @@ -90,10 +91,10 @@ impl CliCommand {
}
}

pub async fn execute(self) -> anyhow::Result<()> {
pub async fn execute(self, env_filter_reload_fn: EnvFilterReloadFn) -> anyhow::Result<()> {
match self {
CliCommand::Index(subcommand) => subcommand.execute().await,
CliCommand::Run(subcommand) => subcommand.execute().await,
CliCommand::Run(subcommand) => subcommand.execute(env_filter_reload_fn).await,
CliCommand::Source(subcommand) => subcommand.execute().await,
CliCommand::Split(subcommand) => subcommand.execute().await,
CliCommand::Tool(subcommand) => subcommand.execute().await,
Expand Down
16 changes: 11 additions & 5 deletions quickwit/quickwit-cli/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::env;
use std::sync::Arc;

use anyhow::Context;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::BatchConfig;
use opentelemetry::sdk::{trace, Resource};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use quickwit_serve::BuildInfo;
use quickwit_serve::{BuildInfo, EnvFilterReloadFn};
use tracing::Level;
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
Expand All @@ -39,20 +40,21 @@ pub fn setup_logging_and_tracing(
level: Level,
ansi_colors: bool,
build_info: &BuildInfo,
) -> anyhow::Result<()> {
) -> anyhow::Result<EnvFilterReloadFn> {
#[cfg(feature = "tokio-console")]
{
if std::env::var_os(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY).is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a type error when that feature is enabled Result<()> vs Result<EnvFilterReloadFn>

console_subscriber::init();
return Ok(());
return Ok(quickwit_serve::do_nothing_env_filter_reload_fn());
}
}
let env_filter = env::var("RUST_LOG")
.map(|_| EnvFilter::from_default_env())
.or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN")))
.context("failed to set up tracing env filter")?;
global::set_text_map_propagator(TraceContextPropagator::new());
let registry = tracing_subscriber::registry().with(env_filter);
let (reloadable_env_filter, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter);
let registry = tracing_subscriber::registry().with(reloadable_env_filter);
let event_format = tracing_subscriber::fmt::format()
.with_target(true)
.with_timer(
Expand Down Expand Up @@ -102,5 +104,9 @@ pub fn setup_logging_and_tracing(
.try_init()
.context("failed to register tracing subscriber")?;
}
Ok(())
Ok(Arc::new(move |env_filter_def: &str| {
let new_env_filter = EnvFilter::try_new(env_filter_def)?;
reload_handle.reload(new_env_filter)?;
Ok(())
}))
}
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ async fn main_impl() -> anyhow::Result<()> {
start_jemalloc_metrics_loop();

let build_info = BuildInfo::get();
setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?;
let return_code: i32 = if let Err(err) = command.execute().await {
let env_filter_reload_fn =
setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?;
let return_code: i32 = if let Err(err) = command.execute(env_filter_reload_fn).await {
eprintln!("{} command failed: {:?}\n", "✘".color(RED_COLOR), err);
1
} else {
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::{Protocol, Uri};
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_serve::{serve_quickwit, BuildInfo};
use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn};
use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent};
use tokio::signal;
use tracing::{debug, info};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl RunCliCommand {
})
}

pub async fn execute(&self) -> anyhow::Result<()> {
pub async fn execute(&self, env_filter_reload_fn: EnvFilterReloadFn) -> anyhow::Result<()> {
debug!(args = ?self, "run-service");
let version_text = BuildInfo::get_version_text();
info!("quickwit version: {version_text}");
Expand Down Expand Up @@ -115,6 +115,7 @@ impl RunCliCommand {
metastore_resolver,
storage_resolver,
shutdown_signal,
env_filter_reload_fn,
)
.await;
let return_code = match serve_result {
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ impl TestEnv {
services: Some(QuickwitService::supported_services()),
};
tokio::spawn(async move {
if let Err(error) = run_command.execute().await {
if let Err(error) = run_command
.execute(quickwit_serve::do_nothing_env_filter_reload_fn())
.await
{
error!(err=?error, "failed to start a quickwit server");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl ClusterSandbox {
metastore_resolver,
storage_resolver,
shutdown_signal,
quickwit_serve::do_nothing_env_filter_reload_fn(),
)
.await?;
Result::<_, anyhow::Error>::Ok(result)
Expand Down
11 changes: 11 additions & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod index_api;
mod indexing_api;
mod ingest_api;
mod jaeger_api;
mod log_level_handler;
mod metrics;
mod metrics_api;
mod node_info_handler;
Expand Down Expand Up @@ -131,6 +132,12 @@ const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_
const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6;
const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE";

pub type EnvFilterReloadFn = Arc<dyn Fn(&str) -> anyhow::Result<()> + Send + Sync>;

pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn {
Arc::new(|_| Ok(()))
}

fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
.and_then(|metastore_client_max_concurrency_str| {
Expand Down Expand Up @@ -187,6 +194,8 @@ struct QuickwitServices {
/// the root requests.
pub search_service: Arc<dyn SearchService>,

pub env_filter_reload_fn: EnvFilterReloadFn,

/// The control plane listens to various events.
/// We must maintain a reference to the subscription handles to continue receiving
/// notifications. Otherwise, the subscriptions are dropped.
Expand Down Expand Up @@ -359,6 +368,7 @@ pub async fn serve_quickwit(
metastore_resolver: MetastoreResolver,
storage_resolver: StorageResolver,
shutdown_signal: BoxFutureInfaillible<()>,
env_filter_reload_fn: EnvFilterReloadFn,
) -> anyhow::Result<HashMap<String, ActorExitStatus>> {
let cluster = start_cluster_service(&node_config).await?;

Expand Down Expand Up @@ -627,6 +637,7 @@ pub async fn serve_quickwit(
otlp_logs_service_opt,
otlp_traces_service_opt,
search_service,
env_filter_reload_fn,
});
// Setup and start gRPC server.
let (grpc_readiness_trigger_tx, grpc_readiness_signal_rx) = oneshot::channel::<()>();
Expand Down
61 changes: 61 additions & 0 deletions quickwit/quickwit-serve/src/log_level_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use hyper::StatusCode;
use serde::Deserialize;
use tracing::{error, info};
use warp::{Filter, Rejection};

use crate::{with_arg, EnvFilterReloadFn};

#[derive(Deserialize)]
struct EnvFilter {
filter: String,
}

#[utoipa::path(get, tag = "Log level", path = "/log_level")]
pub fn log_level_handler(
env_filter_reload_fn: EnvFilterReloadFn,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
warp::path("log_level")
.and(warp::get().or(warp::post()).unify())
.and(warp::path::end())
.and(with_arg(env_filter_reload_fn))
.and(warp::query::<EnvFilter>())
.then(
|env_filter_reload_fn: EnvFilterReloadFn, env_filter: EnvFilter| async move {
match env_filter_reload_fn(env_filter.filter.as_str()) {
Ok(_) => {
info!(filter = env_filter.filter, "change log level");
warp::reply::with_status(
format!("changed log level to:[{}]", env_filter.filter),
StatusCode::OK,
)
}
Err(_) => {
error!(filter = env_filter.filter, "invalid log level");
warp::reply::with_status(
format!("invalid log level:[{}]", env_filter.filter),
StatusCode::BAD_REQUEST,
)
}
}
},
)
}
5 changes: 5 additions & 0 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::index_api::index_management_handlers;
use crate::indexing_api::indexing_get_handler;
use crate::ingest_api::ingest_api_handlers;
use crate::jaeger_api::jaeger_api_handlers;
use crate::log_level_handler::log_level_handler;
use crate::metrics_api::metrics_handler;
use crate::node_info_handler::node_info_handler;
use crate::otlp_api::otlp_ingest_api_handlers;
Expand Down Expand Up @@ -172,6 +173,9 @@ fn api_v1_routes(
RuntimeInfo::get(),
quickwit_services.node_config.clone(),
))
.or(log_level_handler(
quickwit_services.env_filter_reload_fn.clone(),
))
.or(indexing_get_handler(
quickwit_services.indexing_service_opt.clone(),
))
Expand Down Expand Up @@ -630,6 +634,7 @@ mod tests {
node_config: Arc::new(node_config.clone()),
search_service: Arc::new(MockSearchService::new()),
jaeger_service_opt: None,
env_filter_reload_fn: crate::do_nothing_env_filter_reload_fn(),
};

let handler = api_v1_routes(Arc::new(quickwit_services))
Expand Down