Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

use crate::stack::StackGuard;
use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::expr::{Sort, WildcardOptions};

use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query,
SelectInto, SetExpr,
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
};
use sqlparser::tokenizer::Span;

Expand All @@ -48,8 +49,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.plan_with_clause(with, planner_context)?;
}

let pipe_operators = query.pipe_operators.clone();

let set_expr = *query.body;
match set_expr {
let plan = match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
let plan =
Expand Down Expand Up @@ -78,6 +81,76 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.limit_clause, planner_context)
}
}?;

self.pipe_operators(plan, pipe_operators, planner_context)
}

/// Apply pipe operators to a plan
fn pipe_operators(
&self,
plan: LogicalPlan,
pipe_operators: Vec<PipeOperator>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut plan = plan;
for pipe_operator in pipe_operators {
plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
}
Ok(plan)
}

/// Apply a pipe operator to a plan
fn pipe_operator(
&self,
plan: LogicalPlan,
pipe_operator: PipeOperator,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match pipe_operator {
PipeOperator::Where { expr } => {
self.plan_selection(Some(expr), plan, planner_context)
}
PipeOperator::OrderBy { exprs } => {
let sort_exprs = self.order_by_to_sort_expr(
exprs,
plan.schema(),
planner_context,
true,
None,
)?;
self.order_by(plan, sort_exprs)
}
PipeOperator::Limit { expr, offset } => self.limit(
plan,
Some(LimitClause::LimitOffset {
limit: Some(expr),
offset: offset.map(|offset| Offset {
value: offset,
rows: OffsetRows::None,
}),
limit_by: vec![],
}),
planner_context,
),
PipeOperator::Select { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let select_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
self.project(plan, select_exprs)
}
PipeOperator::Extend { exprs } => {
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let extend_exprs =
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
let all_exprs =
std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
.chain(extend_exprs)
.collect();
self.project(plan, all_exprs)
}

x => not_impl_err!("`{x}` pipe operator is not supported yet"),
}
}

Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok((intermediate_plan, intermediate_select_exprs))
}

fn plan_selection(
pub(crate) fn plan_selection(
&self,
selection: Option<SQLExpr>,
plan: LogicalPlan,
Expand Down Expand Up @@ -655,7 +655,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
fn prepare_select_exprs(
pub(crate) fn prepare_select_exprs(
&self,
plan: &LogicalPlan,
projection: Vec<SelectItem>,
Expand Down Expand Up @@ -823,7 +823,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

/// Wrap a plan in a projection
fn project(&self, input: LogicalPlan, expr: Vec<SelectExpr>) -> Result<LogicalPlan> {
pub(crate) fn project(
&self,
input: LogicalPlan,
expr: Vec<SelectExpr>,
) -> Result<LogicalPlan> {
// convert to Expr for validate_schema_satisfies_exprs
let exprs = expr
.iter()
Expand Down
91 changes: 91 additions & 0 deletions datafusion/sqllogictest/test_files/pipe_operator.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# BigQuery supports the pipe operator syntax
# TODO: Make the Generic dialect support the pipe operator syntax
statement ok
set datafusion.sql_parser.dialect = 'BigQuery';

statement ok
CREATE TABLE test(
a INT,
b FLOAT,
c VARCHAR,
n VARCHAR
) AS VALUES
(1, 1.1, 'a', NULL),
(2, 2.2, 'b', NULL),
(3, 3.3, 'c', NULL)
;

# WHERE pipe
query IRTT
SELECT *
FROM test
|> WHERE a > 1
----
2 2.2 b NULL
3 3.3 c NULL

# ORDER BY pipe
query IRTT
SELECT *
FROM test
|> ORDER BY a DESC
----
3 3.3 c NULL
2 2.2 b NULL
1 1.1 a NULL

# ORDER BY pipe, limit
query IRTT
SELECT *
FROM test
|> ORDER BY a DESC
|> LIMIT 1
----
3 3.3 c NULL

# SELECT pipe
query I
SELECT *
FROM test
|> SELECT a
----
1
2
3

# EXTEND pipe
query IRR
SELECT *
FROM test
|> SELECT a, b
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1
2 2.2 4.2
3 3.3 6.3

query IRR
SELECT *
FROM test
|> SELECT a, b
|> where a = 1
|> EXTEND a + b AS a_plus_b
----
1 1.1 2.1
95 changes: 95 additions & 0 deletions docs/source/user-guide/sql/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,98 @@ bar") |
bar |
+-----------------+
```

## Pipe operators

Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`.
The SQL dialect can be set like this:

```sql
set datafusion.sql_parser.dialect = 'BigQuery';
```

DataFusion currently supports the following pipe operators:

- [WHERE](#pipe_where)
- [ORDER BY](#pipe_order_by)
- [LIMIT](#pipe_limit)
- [SELECT](#pipe_select)
- [EXTEND](#pipe_extend)

(pipe_where)=

### WHERE

```sql
> select * from range(0,10)
|> where value < 2;
+-------+
| value |
+-------+
| 0 |
| 1 |
+-------+
```

(pipe_order_by)=

### ORDER BY

```sql
> select * from range(0,3)
|> order by value desc;
+-------+
| value |
+-------+
| 2 |
| 1 |
| 0 |
+-------+
```

(pipe_limit)=

### LIMIT

```sql
> select * from range(0,3)
|> order by value desc
|> limit 1;
+-------+
| value |
+-------+
| 2 |
+-------+
```

(pipe_select)=

### SELECT

```sql
> select * from range(0,3)
|> select value + 10;
+---------------------------+
| range().value + Int64(10) |
+---------------------------+
| 10 |
| 11 |
| 12 |
+---------------------------+
```

(pipe_extend)=

### EXTEND

```sql
> select * from range(0,3)
|> extend -value AS minus_value;
+-------+-------------+
| value | minus_value |
+-------+-------------+
| 0 | 0 |
| 1 | -1 |
| 2 | -2 |
+-------+-------------+
```