Skip to content

Commit 07f7e86

Browse files
authored
refactor: Remove scan id (#23697)
1 parent 5e3e030 commit 07f7e86

File tree

14 files changed

+42
-60
lines changed

14 files changed

+42
-60
lines changed

crates/polars-mem-engine/src/planner/lp.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,12 @@ fn create_physical_plan_impl(
362362

363363
let cache_id = UniqueId::default();
364364

365+
// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
366+
assert!(
367+
!cache_nodes.contains_key(&cache_id),
368+
"generated duplicate unique ID"
369+
);
370+
365371
// Use cache so that this runs during the cache pre-filling stage and not on the
366372
// thread pool, it could deadlock since the streaming engine uses the thread
367373
// pool internally.
@@ -445,7 +451,6 @@ fn create_physical_plan_impl(
445451
scan_type,
446452
predicate,
447453
unified_scan_args,
448-
id: scan_mem_id,
449454
} => {
450455
let mut expr_conversion_state = ExpressionConversionState::new(true);
451456

@@ -501,31 +506,39 @@ fn create_physical_plan_impl(
501506
state.has_cache_parent = true;
502507
state.has_cache_child = true;
503508

504-
if !cache_nodes.contains_key(&scan_mem_id) {
505-
let build_func = build_streaming_executor
506-
.expect("invalid build. Missing feature new-streaming");
507-
508-
let executor = build_func(root, lp_arena, expr_arena)?;
509-
510-
cache_nodes.insert(
511-
scan_mem_id.clone(),
512-
Box::new(executors::CacheExec {
513-
input: Some(executor),
514-
id: scan_mem_id.clone(),
515-
// This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
516-
count: 0,
517-
is_new_streaming_scan: true,
518-
}),
519-
);
520-
} else {
521-
// Already exists - this scan IR is under a CSE (subplan). We need to
522-
// increment the cache hit count here.
523-
let cache_exec = cache_nodes.get_mut(&scan_mem_id).unwrap();
524-
cache_exec.count = cache_exec.count.saturating_add(1);
525-
}
509+
let build_func = build_streaming_executor
510+
.expect("invalid build. Missing feature new-streaming");
511+
512+
let executor = build_func(root, lp_arena, expr_arena)?;
513+
514+
// Generate a unique ID for this scan. Currently this scan can be visited only
515+
// once, since common subplans are always behind a cache, which prevents
516+
// multiple traversals of the common subplan.
517+
//
518+
// If this property changes in the future, the same scan could be visited
519+
// and executed multiple times. It is a responsibility of the caller to
520+
// insert a cache if multiple executions are not desirable.
521+
let id = UniqueId::default();
522+
523+
// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
524+
assert!(
525+
!cache_nodes.contains_key(&id),
526+
"generated duplicate unique ID"
527+
);
528+
529+
cache_nodes.insert(
530+
id.clone(),
531+
Box::new(executors::CacheExec {
532+
input: Some(executor),
533+
id: id.clone(),
534+
// This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
535+
count: 0,
536+
is_new_streaming_scan: true,
537+
}),
538+
);
526539

527540
Ok(Box::new(executors::CacheExec {
528-
id: scan_mem_id,
541+
id,
529542
// Rest of the fields don't matter - the actual node was inserted into
530543
// `cache_nodes`.
531544
input: None,
@@ -621,6 +634,11 @@ fn create_physical_plan_impl(
621634
});
622635

623636
cache_nodes.insert(id.clone(), cache);
637+
} else {
638+
// TODO: remove when https://github.com/pola-rs/polars/issues/23674 is resolved
639+
if let Some(exec) = cache_nodes.get(&id) {
640+
assert!(!exec.is_new_streaming_scan, "generated duplicate unique ID");
641+
}
624642
}
625643

626644
Ok(Box::new(executors::CacheExec {

crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ pub(super) fn dsl_to_ir(
161161
scan_type: Box::new(scan_type_ir),
162162
output_schema: None,
163163
unified_scan_args,
164-
id: Default::default(),
165164
}
166165
};
167166

crates/polars-plan/src/plans/ir/dot.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ impl<'a> IRDotDisplay<'a> {
215215
scan_type,
216216
unified_scan_args,
217217
output_schema: _,
218-
id: _,
219218
} => {
220219
let name: &str = (&**scan_type).into();
221220
let path = ScanSourcesDisplay(sources);

crates/polars-plan/src/plans/ir/format.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use polars_core::schema::Schema;
44
use polars_io::RowIndex;
55
use polars_utils::format_list_truncated;
66
use polars_utils::slice_enum::Slice;
7-
use polars_utils::unique_id::UniqueId;
87
use recursive::recursive;
98

109
use self::ir::dot::ScanSourcesDisplay;
@@ -74,7 +73,6 @@ fn write_scan(
7473
predicate: &Option<ExprIRDisplay<'_>>,
7574
pre_slice: Option<Slice>,
7675
row_index: Option<&RowIndex>,
77-
scan_mem_id: Option<&UniqueId>,
7876
deletion_files: Option<&DeletionFilesList>,
7977
) -> fmt::Result {
8078
write!(
@@ -84,10 +82,6 @@ fn write_scan(
8482
ScanSourcesDisplay(sources),
8583
)?;
8684

87-
if let Some(scan_mem_id) = scan_mem_id {
88-
write!(f, " [id: {scan_mem_id}]")?;
89-
}
90-
9185
let total_columns = total_columns - usize::from(row_index.is_some());
9286
if n_columns > 0 {
9387
write!(
@@ -688,7 +682,6 @@ pub fn write_ir_non_recursive(
688682
.map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len }),
689683
None,
690684
None,
691-
None,
692685
)
693686
},
694687
IR::Slice {
@@ -715,7 +708,6 @@ pub fn write_ir_non_recursive(
715708
unified_scan_args,
716709
hive_parts: _,
717710
output_schema: _,
718-
id: scan_mem_id,
719711
} => {
720712
let n_columns = unified_scan_args
721713
.projection
@@ -735,7 +727,6 @@ pub fn write_ir_non_recursive(
735727
&predicate,
736728
unified_scan_args.pre_slice.clone(),
737729
unified_scan_args.row_index.as_ref(),
738-
Some(scan_mem_id),
739730
unified_scan_args.deletion_files.as_ref(),
740731
)
741732
},

crates/polars-plan/src/plans/ir/inputs.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ impl IR {
102102
predicate,
103103
unified_scan_args,
104104
scan_type,
105-
id: _,
106105
} => Scan {
107106
sources: sources.clone(),
108107
file_info: file_info.clone(),
@@ -111,7 +110,6 @@ impl IR {
111110
unified_scan_args: unified_scan_args.clone(),
112111
predicate: predicate.is_some().then(|| exprs.pop().unwrap()),
113112
scan_type: scan_type.clone(),
114-
id: Default::default(),
115113
},
116114
DataFrameScan {
117115
df,

crates/polars-plan/src/plans/ir/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,6 @@ pub enum IR {
6363
scan_type: Box<FileScanIR>,
6464
/// generic options that can be used for all file types.
6565
unified_scan_args: Box<UnifiedScanArgs>,
66-
/// This used as part of a hack to prevent deadlocks when we run the in-memory engine with
67-
/// scans dispatched to new-streaming. This ID is used as the ID of the CacheExec that
68-
/// wraps this scan. It will not be needed once everything runs in new-streaming.
69-
///
70-
/// We use this instead of the Arc-address of the ScanSources as it's possible to pass the
71-
/// same set of ScanSources with different scan options.
72-
///
73-
/// NOTE: This must be reset to a new ID during e.g. predicate / slice pushdown.
74-
id: UniqueId,
7566
},
7667
DataFrameScan {
7768
df: Arc<DataFrame>,

crates/polars-plan/src/plans/optimizer/expand_datasets.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ impl OptimizationRule for ExpandDatasets {
108108
hive_parts: _,
109109
predicate: _,
110110
output_schema: _,
111-
id: _,
112111
} = &mut ir
113112
else {
114113
unreachable!()

crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ impl PredicatePushDown<'_> {
357357
scan_type,
358358
unified_scan_args,
359359
output_schema,
360-
id: _,
361360
} => {
362361
let mut blocked_names = Vec::with_capacity(2);
363362

@@ -412,7 +411,6 @@ impl PredicatePushDown<'_> {
412411
unified_scan_args,
413412
output_schema,
414413
scan_type,
415-
id: Default::default(),
416414
}
417415
} else {
418416
let lp = Scan {
@@ -423,7 +421,6 @@ impl PredicatePushDown<'_> {
423421
unified_scan_args,
424422
output_schema,
425423
scan_type,
426-
id: Default::default(),
427424
};
428425
if let Some(predicate) = predicate {
429426
let input = lp_arena.add(lp);

crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,6 @@ impl ProjectionPushDown {
446446
predicate,
447447
mut unified_scan_args,
448448
mut output_schema,
449-
id: _,
450449
} => {
451450
let do_optimization = match &*scan_type {
452451
FileScanIR::Anonymous { function, .. } => function.allows_projection_pushdown(),
@@ -574,7 +573,6 @@ impl ProjectionPushDown {
574573
scan_type,
575574
predicate,
576575
unified_scan_args,
577-
id: Default::default(),
578576
};
579577

580578
Ok(lp)

crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ impl SlicePushDown {
222222
mut unified_scan_args,
223223
predicate,
224224
scan_type,
225-
id: _,
226225
}, Some(state)) if predicate.is_none() && match &*scan_type {
227226
#[cfg(feature = "parquet")]
228227
FileScanIR::Parquet { .. } => true,
@@ -252,7 +251,6 @@ impl SlicePushDown {
252251
scan_type,
253252
unified_scan_args,
254253
predicate,
255-
id: Default::default(),
256254
};
257255

258256
Ok(lp)

0 commit comments

Comments
 (0)