Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ fn create_physical_plan_impl(

let cache_id = UniqueId::default();

// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
assert!(
!cache_nodes.contains_key(&cache_id),
"generated duplicate unique ID"
);

// Use cache so that this runs during the cache pre-filling stage and not on the
// thread pool, it could deadlock since the streaming engine uses the thread
// pool internally.
Expand Down Expand Up @@ -445,7 +451,6 @@ fn create_physical_plan_impl(
scan_type,
predicate,
unified_scan_args,
id: scan_mem_id,
} => {
let mut expr_conversion_state = ExpressionConversionState::new(true);

Expand Down Expand Up @@ -501,31 +506,39 @@ fn create_physical_plan_impl(
state.has_cache_parent = true;
state.has_cache_child = true;

if !cache_nodes.contains_key(&scan_mem_id) {
let build_func = build_streaming_executor
.expect("invalid build. Missing feature new-streaming");

let executor = build_func(root, lp_arena, expr_arena)?;

cache_nodes.insert(
scan_mem_id.clone(),
Box::new(executors::CacheExec {
input: Some(executor),
id: scan_mem_id.clone(),
// This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
count: 0,
is_new_streaming_scan: true,
}),
);
} else {
// Already exists - this scan IR is under a CSE (subplan). We need to
// increment the cache hit count here.
let cache_exec = cache_nodes.get_mut(&scan_mem_id).unwrap();
cache_exec.count = cache_exec.count.saturating_add(1);
}
let build_func = build_streaming_executor
.expect("invalid build. Missing feature new-streaming");

let executor = build_func(root, lp_arena, expr_arena)?;

// Generate a unique ID for this scan. Currently this scan can be visited only
// once, since common subplans are always behind a cache, which prevents
// multiple traversals of the common subplan.
//
// If this property changes in the future, the same scan could be visited
// and executed multiple times. It is a responsibility of the caller to
// insert a cache if multiple executions are not desirable.
let id = UniqueId::default();

// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
assert!(
!cache_nodes.contains_key(&id),
"generated duplicate unique ID"
);

cache_nodes.insert(
id.clone(),
Box::new(executors::CacheExec {
input: Some(executor),
id: id.clone(),
// This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
count: 0,
is_new_streaming_scan: true,
}),
);

Ok(Box::new(executors::CacheExec {
id: scan_mem_id,
id,
// Rest of the fields don't matter - the actual node was inserted into
// `cache_nodes`.
input: None,
Expand Down Expand Up @@ -621,6 +634,11 @@ fn create_physical_plan_impl(
});

cache_nodes.insert(id.clone(), cache);
} else {
// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
if let Some(exec) = cache_nodes.get(&id) {
assert!(!exec.is_new_streaming_scan, "generated duplicate unique ID");
}
}

Ok(Box::new(executors::CacheExec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ pub(super) fn dsl_to_ir(
scan_type: Box::new(scan_type_ir),
output_schema: None,
unified_scan_args,
id: Default::default(),
}
};

Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/plans/ir/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ impl<'a> IRDotDisplay<'a> {
scan_type,
unified_scan_args,
output_schema: _,
id: _,
} => {
let name: &str = (&**scan_type).into();
let path = ScanSourcesDisplay(sources);
Expand Down
9 changes: 0 additions & 9 deletions crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use polars_core::schema::Schema;
use polars_io::RowIndex;
use polars_utils::format_list_truncated;
use polars_utils::slice_enum::Slice;
use polars_utils::unique_id::UniqueId;
use recursive::recursive;

use self::ir::dot::ScanSourcesDisplay;
Expand Down Expand Up @@ -74,7 +73,6 @@ fn write_scan(
predicate: &Option<ExprIRDisplay<'_>>,
pre_slice: Option<Slice>,
row_index: Option<&RowIndex>,
scan_mem_id: Option<&UniqueId>,
deletion_files: Option<&DeletionFilesList>,
) -> fmt::Result {
write!(
Expand All @@ -84,10 +82,6 @@ fn write_scan(
ScanSourcesDisplay(sources),
)?;

if let Some(scan_mem_id) = scan_mem_id {
write!(f, " [id: {scan_mem_id}]")?;
}

let total_columns = total_columns - usize::from(row_index.is_some());
if n_columns > 0 {
write!(
Expand Down Expand Up @@ -688,7 +682,6 @@ pub fn write_ir_non_recursive(
.map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len }),
None,
None,
None,
)
},
IR::Slice {
Expand All @@ -715,7 +708,6 @@ pub fn write_ir_non_recursive(
unified_scan_args,
hive_parts: _,
output_schema: _,
id: scan_mem_id,
} => {
let n_columns = unified_scan_args
.projection
Expand All @@ -735,7 +727,6 @@ pub fn write_ir_non_recursive(
&predicate,
unified_scan_args.pre_slice.clone(),
unified_scan_args.row_index.as_ref(),
Some(scan_mem_id),
unified_scan_args.deletion_files.as_ref(),
)
},
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/ir/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl IR {
predicate,
unified_scan_args,
scan_type,
id: _,
} => Scan {
sources: sources.clone(),
file_info: file_info.clone(),
Expand All @@ -111,7 +110,6 @@ impl IR {
unified_scan_args: unified_scan_args.clone(),
predicate: predicate.is_some().then(|| exprs.pop().unwrap()),
scan_type: scan_type.clone(),
id: Default::default(),
},
DataFrameScan {
df,
Expand Down
9 changes: 0 additions & 9 deletions crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,6 @@ pub enum IR {
scan_type: Box<FileScanIR>,
/// generic options that can be used for all file types.
unified_scan_args: Box<UnifiedScanArgs>,
/// This used as part of a hack to prevent deadlocks when we run the in-memory engine with
/// scans dispatched to new-streaming. This ID is used as the ID of the CacheExec that
/// wraps this scan. It will not be needed once everything runs in new-streaming.
///
/// We use this instead of the Arc-address of the ScanSources as it's possible to pass the
/// same set of ScanSources with different scan options.
///
/// NOTE: This must be reset to a new ID during e.g. predicate / slice pushdown.
id: UniqueId,
},
DataFrameScan {
df: Arc<DataFrame>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl OptimizationRule for ExpandDatasets {
hive_parts: _,
predicate: _,
output_schema: _,
id: _,
} = &mut ir
else {
unreachable!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ impl PredicatePushDown<'_> {
scan_type,
unified_scan_args,
output_schema,
id: _,
} => {
let mut blocked_names = Vec::with_capacity(2);

Expand Down Expand Up @@ -412,7 +411,6 @@ impl PredicatePushDown<'_> {
unified_scan_args,
output_schema,
scan_type,
id: Default::default(),
}
} else {
let lp = Scan {
Expand All @@ -423,7 +421,6 @@ impl PredicatePushDown<'_> {
unified_scan_args,
output_schema,
scan_type,
id: Default::default(),
};
if let Some(predicate) = predicate {
let input = lp_arena.add(lp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,6 @@ impl ProjectionPushDown {
predicate,
mut unified_scan_args,
mut output_schema,
id: _,
} => {
let do_optimization = match &*scan_type {
FileScanIR::Anonymous { function, .. } => function.allows_projection_pushdown(),
Expand Down Expand Up @@ -574,7 +573,6 @@ impl ProjectionPushDown {
scan_type,
predicate,
unified_scan_args,
id: Default::default(),
};

Ok(lp)
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ impl SlicePushDown {
mut unified_scan_args,
predicate,
scan_type,
id: _,
}, Some(state)) if predicate.is_none() && match &*scan_type {
#[cfg(feature = "parquet")]
FileScanIR::Parquet { .. } => true,
Expand Down Expand Up @@ -252,7 +251,6 @@ impl SlicePushDown {
scan_type,
unified_scan_args,
predicate,
id: Default::default(),
};

Ok(lp)
Expand Down
3 changes: 0 additions & 3 deletions crates/polars-plan/src/plans/visitor/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl Hash for HashableEqLP<'_> {
output_schema: _,
scan_type,
unified_scan_args,
id: _,
} => {
// We don't have to traverse the schema, hive partitions etc. as they are derivative from the paths.
scan_type.hash(state);
Expand Down Expand Up @@ -263,7 +262,6 @@ impl HashableEqLP<'_> {
output_schema: _,
scan_type: stl,
unified_scan_args: ol,
id: _,
},
IR::Scan {
sources: pr,
Expand All @@ -273,7 +271,6 @@ impl HashableEqLP<'_> {
output_schema: _,
scan_type: str,
unified_scan_args: or,
id: _,
},
) => {
pl == pr
Expand Down
1 change: 0 additions & 1 deletion crates/polars-python/src/lazyframe/visitor/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
output_schema: _,
scan_type,
unified_scan_args,
id: _,
} => {
Scan {
paths: {
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ pub fn lower_ir(
scan_type,
predicate,
unified_scan_args,
id: _,
} = v.clone()
else {
unreachable!();
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/utils/late_materialized_df.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl LateMaterializedDataFrame {
function: self,
}),
unified_scan_args: Box::new(UnifiedScanArgs::default()),
id: Default::default(),
}
}
}
Expand Down
Loading