Skip to content

Commit 4036bd5

Browse files
committed
support aggregate
1 parent db16f2a commit 4036bd5

File tree

3 files changed

+137
-3
lines changed

3 files changed

+137
-3
lines changed

datafusion/sql/src/query.rs

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ use datafusion_expr::{
2828
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
2929
};
3030
use sqlparser::ast::{
31-
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
32-
OrderByKind, PipeOperator, Query, SelectInto, SetExpr, SetOperator, SetQuantifier,
33-
TableAlias,
31+
Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
32+
OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
33+
SetOperator, SetQuantifier, TableAlias,
3434
};
3535
use sqlparser::tokenizer::Span;
3636

@@ -190,6 +190,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
190190
queries,
191191
planner_context,
192192
),
193+
PipeOperator::Aggregate {
194+
full_table_exprs,
195+
group_by_expr,
196+
} => self.pipe_operator_aggregate(
197+
plan,
198+
full_table_exprs,
199+
group_by_expr,
200+
planner_context,
201+
),
193202

194203
x => not_impl_err!("`{x}` pipe operator is not supported yet"),
195204
}
@@ -296,6 +305,76 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
296305
}
297306
}
298307

308+
/// Handle AGGREGATE pipe operator
309+
fn pipe_operator_aggregate(
310+
&self,
311+
plan: LogicalPlan,
312+
full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
313+
group_by_expr: Vec<ExprWithAliasAndOrderBy>,
314+
planner_context: &mut PlannerContext,
315+
) -> Result<LogicalPlan> {
316+
// Convert aggregate expressions directly
317+
let aggr_exprs: Vec<Expr> = full_table_exprs
318+
.into_iter()
319+
.map(|expr_with_alias_and_order_by| {
320+
let expr_with_alias = expr_with_alias_and_order_by.expr;
321+
let sql_expr = expr_with_alias.expr;
322+
let alias = expr_with_alias.alias;
323+
324+
// Convert SQL expression to DataFusion expression
325+
let df_expr =
326+
self.sql_to_expr(sql_expr, plan.schema(), planner_context)?;
327+
328+
// Apply alias if present, but handle the case where the expression might already be aliased
329+
match alias {
330+
Some(alias_ident) => {
331+
// If the expression is already an alias, replace the alias name
332+
match df_expr {
333+
Expr::Alias(alias_expr) => {
334+
Ok(alias_expr.expr.alias(alias_ident.value))
335+
}
336+
_ => Ok(df_expr.alias(alias_ident.value)),
337+
}
338+
}
339+
None => Ok(df_expr),
340+
}
341+
})
342+
.collect::<Result<Vec<_>>>()?;
343+
344+
// Convert group by expressions directly
345+
let group_by_exprs: Vec<Expr> = group_by_expr
346+
.into_iter()
347+
.map(|expr_with_alias_and_order_by| {
348+
let expr_with_alias = expr_with_alias_and_order_by.expr;
349+
let sql_expr = expr_with_alias.expr;
350+
let alias = expr_with_alias.alias;
351+
352+
// Convert SQL expression to DataFusion expression
353+
let df_expr =
354+
self.sql_to_expr(sql_expr, plan.schema(), planner_context)?;
355+
356+
// Apply alias if present (though group by aliases are less common)
357+
match alias {
358+
Some(alias_ident) => {
359+
// If the expression is already an alias, replace the alias name
360+
match df_expr {
361+
Expr::Alias(alias_expr) => {
362+
Ok(alias_expr.expr.alias(alias_ident.value))
363+
}
364+
_ => Ok(df_expr.alias(alias_ident.value)),
365+
}
366+
}
367+
None => Ok(df_expr),
368+
}
369+
})
370+
.collect::<Result<Vec<_>>>()?;
371+
372+
// Create the aggregate logical plan
373+
LogicalPlanBuilder::from(plan)
374+
.aggregate(group_by_exprs, aggr_exprs)?
375+
.build()
376+
}
377+
299378
/// Wrap the logical plan in a `SelectInto`
300379
fn select_into(
301380
&self,

datafusion/sqllogictest/test_files/pipe_operator.slt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,44 @@ select * from range(0,10)
128128
2
129129
3
130130
4
131+
132+
# AGGREGATE pipe
133+
query II
134+
(
135+
SELECT 'apples' AS item, 2 AS sales
136+
UNION ALL
137+
SELECT 'bananas' AS item, 5 AS sales
138+
UNION ALL
139+
SELECT 'apples' AS item, 7 AS sales
140+
)
141+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales;
142+
----
143+
3 14
144+
145+
query TII rowsort
146+
(
147+
SELECT 'apples' AS item, 2 AS sales
148+
UNION ALL
149+
SELECT 'bananas' AS item, 5 AS sales
150+
UNION ALL
151+
SELECT 'apples' AS item, 7 AS sales
152+
)
153+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
154+
GROUP BY item;
155+
----
156+
apples 2 9
157+
bananas 1 5
158+
159+
query TII rowsort
160+
(
161+
SELECT 'apples' AS item, 2 AS sales
162+
UNION ALL
163+
SELECT 'bananas' AS item, 5 AS sales
164+
UNION ALL
165+
SELECT 'apples' AS item, 7 AS sales
166+
)
167+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
168+
GROUP BY item
169+
|> WHERE num_items > 1;
170+
----
171+
apples 2 9

docs/source/user-guide/sql/operators.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,3 +784,17 @@ DataFusion currently supports the following pipe operators:
784784
| 4 |
785785
+-------+
786786
```
787+
788+
(pipe_aggregate)=
789+
790+
### AGGREGATE
791+
792+
```sql
793+
> select * from range(0,3)
794+
|> aggregate sum(value) AS total;
795+
+-------+
796+
| total |
797+
+-------+
798+
| 3 |
799+
+-------+
800+
```

0 commit comments

Comments
 (0)