Skip to content

Commit 25acb64

Browse files
Push the limits (#17347)
Add physical optimizer rule to push limits past certain window functions (part 1).
1 parent e353f4e commit 25acb64

File tree

9 files changed

+272
-6
lines changed

9 files changed

+272
-6
lines changed

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,10 @@ config_namespace! {
727727
/// during aggregations, if possible
728728
pub enable_topk_aggregation: bool, default = true
729729

730+
/// When set to true, the optimizer will attempt to push limit operations
731+
/// past window functions, if possible
732+
pub enable_window_limits: bool, default = true
733+
730734
/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
731735
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
732736
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.

datafusion/common/src/tree_node.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,11 @@ impl<T> Transformed<T> {
680680
Self::new(data, true, TreeNodeRecursion::Continue)
681681
}
682682

683+
/// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] statement.
684+
pub fn complete(data: T) -> Self {
685+
Self::new(data, true, TreeNodeRecursion::Stop)
686+
}
687+
683688
/// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement.
684689
pub fn no(data: T) -> Self {
685690
Self::new(data, false, TreeNodeRecursion::Continue)

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod optimizer;
3939
pub mod output_requirements;
4040
pub mod projection_pushdown;
4141
pub use datafusion_pruning as pruning;
42+
mod limit_pushdown_past_window;
4243
pub mod sanity_checker;
4344
pub mod topk_aggregation;
4445
pub mod update_aggr_exprs;
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::config::ConfigOptions;
20+
use datafusion_common::tree_node::{Transformed, TreeNode};
21+
use datafusion_common::ScalarValue;
22+
use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
23+
use datafusion_physical_plan::execution_plan::CardinalityEffect;
24+
use datafusion_physical_plan::limit::GlobalLimitExec;
25+
use datafusion_physical_plan::sorts::sort::SortExec;
26+
use datafusion_physical_plan::windows::BoundedWindowAggExec;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use std::cmp;
29+
use std::sync::Arc;
30+
31+
/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits that were not pushed
32+
/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". If the window is
33+
/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment needed to grow the limit
34+
/// and continue pushdown.
35+
#[derive(Default, Clone, Debug)]
36+
pub struct LimitPushPastWindows;
37+
38+
impl LimitPushPastWindows {
39+
pub fn new() -> Self {
40+
Self
41+
}
42+
}
43+
44+
impl PhysicalOptimizerRule for LimitPushPastWindows {
45+
fn optimize(
46+
&self,
47+
original: Arc<dyn ExecutionPlan>,
48+
config: &ConfigOptions,
49+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
50+
if !config.optimizer.enable_window_limits {
51+
return Ok(original);
52+
}
53+
let mut latest_limit: Option<usize> = None;
54+
let mut latest_max = 0;
55+
let result = original.transform_down(|node| {
56+
// helper closure to DRY out most the early return cases
57+
let mut reset = |node,
58+
max: &mut usize|
59+
-> datafusion_common::Result<
60+
Transformed<Arc<dyn ExecutionPlan>>,
61+
> {
62+
latest_limit = None;
63+
*max = 0;
64+
Ok(Transformed::no(node))
65+
};
66+
67+
// traversing sides of joins will require more thought
68+
if node.children().len() > 1 {
69+
return reset(node, &mut latest_max);
70+
}
71+
72+
// grab the latest limit we see
73+
if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() {
74+
latest_limit = limit.fetch().map(|fetch| fetch + limit.skip());
75+
latest_max = 0;
76+
return Ok(Transformed::no(node));
77+
}
78+
79+
// grow the limit if we hit a window function
80+
if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() {
81+
for expr in window.window_expr().iter() {
82+
let frame = expr.get_window_frame();
83+
if frame.units != WindowFrameUnits::Rows {
84+
return reset(node, &mut latest_max); // expression-based limits?
85+
}
86+
let Some(end_bound) = bound_to_usize(&frame.end_bound) else {
87+
return reset(node, &mut latest_max);
88+
};
89+
latest_max = cmp::max(end_bound, latest_max);
90+
}
91+
return Ok(Transformed::no(node));
92+
}
93+
94+
// Apply the limit if we hit a sort node
95+
if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
96+
let latest = latest_limit.take();
97+
let Some(fetch) = latest else {
98+
latest_max = 0;
99+
return Ok(Transformed::no(node));
100+
};
101+
let fetch = match sort.fetch() {
102+
None => fetch + latest_max,
103+
Some(existing) => cmp::min(existing, fetch + latest_max),
104+
};
105+
let sort: Arc<dyn ExecutionPlan> = Arc::new(sort.with_fetch(Some(fetch)));
106+
latest_max = 0;
107+
return Ok(Transformed::complete(sort));
108+
}
109+
110+
// we can't push the limit past nodes that decrease row count
111+
match node.cardinality_effect() {
112+
CardinalityEffect::Equal => {}
113+
_ => return reset(node, &mut latest_max),
114+
}
115+
116+
Ok(Transformed::no(node))
117+
})?;
118+
Ok(result.data)
119+
}
120+
121+
fn name(&self) -> &str {
122+
"LimitPushPastWindows"
123+
}
124+
125+
fn schema_check(&self) -> bool {
126+
false // we don't change the schema
127+
}
128+
}
129+
130+
fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
131+
match bound {
132+
WindowFrameBound::Preceding(_) => Some(0),
133+
WindowFrameBound::CurrentRow => Some(0),
134+
WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
135+
Some(*scalar as usize)
136+
}
137+
_ => None,
138+
}
139+
}
140+
141+
// tests: all branches are covered by sqllogictests

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation;
3737
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3838

3939
use crate::coalesce_async_exec_input::CoalesceAsyncExecInput;
40+
use crate::limit_pushdown_past_window::LimitPushPastWindows;
4041
use datafusion_common::config::ConfigOptions;
4142
use datafusion_common::Result;
4243
use datafusion_physical_plan::ExecutionPlan;
@@ -59,7 +60,7 @@ pub trait PhysicalOptimizerRule: Debug {
5960
/// A human readable name for this optimizer rule
6061
fn name(&self) -> &str;
6162

62-
/// A flag to indicate whether the physical planner should valid the rule will not
63+
/// A flag to indicate whether the physical planner should validate that the rule will not
6364
/// change the schema of the plan after the rewriting.
6465
/// Some of the optimization rules might change the nullable properties of the schema
6566
/// and should disable the schema check.
@@ -131,6 +132,10 @@ impl PhysicalOptimizer {
131132
// into an `order by max(x) limit y`. In this case it will copy the limit value down
132133
// to the aggregation, allowing it to use only y number of accumulators.
133134
Arc::new(TopKAggregation::new()),
135+
// Tries to push limits down through window functions, growing as appropriate
136+
// This can possibly be combined with [LimitPushdown]
137+
// It needs to come after [EnforceSorting]
138+
Arc::new(LimitPushPastWindows::new()),
134139
// The LimitPushdown rule tries to push limits down as far as possible,
135140
// replacing operators with fetching variants, or adding limits
136141
// past operators that support limit pushdown.

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE
241241
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
242242
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
243243
physical_plan after LimitAggregation SAME TEXT AS ABOVE
244+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
244245
physical_plan after LimitPushdown SAME TEXT AS ABOVE
245246
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
246247
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
@@ -321,6 +322,7 @@ physical_plan after OutputRequirements
321322
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
322323
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
323324
physical_plan after LimitAggregation SAME TEXT AS ABOVE
325+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
324326
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
325327
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
326328
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
@@ -365,6 +367,7 @@ physical_plan after OutputRequirements
365367
01)GlobalLimitExec: skip=0, fetch=10
366368
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
367369
physical_plan after LimitAggregation SAME TEXT AS ABOVE
370+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
368371
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
369372
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
370373
physical_plan after EnsureCooperative SAME TEXT AS ABOVE

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true
289289
datafusion.optimizer.enable_dynamic_filter_pushdown true
290290
datafusion.optimizer.enable_round_robin_repartition true
291291
datafusion.optimizer.enable_topk_aggregation true
292+
datafusion.optimizer.enable_window_limits true
292293
datafusion.optimizer.expand_views_at_output false
293294
datafusion.optimizer.filter_null_join_keys false
294295
datafusion.optimizer.hash_join_single_partition_threshold 1048576
@@ -402,6 +403,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru
402403
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
403404
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
404405
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
406+
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
405407
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
406408
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
407409
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition

0 commit comments

Comments
 (0)