Skip to content

Commit 883bebd

Browse files
nameexhaustionWashiil
authored andcommitted
fix: Fix errors on native scan_iceberg (pola-rs#23811)
1 parent 1a6b850 commit 883bebd

File tree

3 files changed

+99
-93
lines changed

3 files changed

+99
-93
lines changed

crates/polars-parquet/src/arrow/read/schema/convert.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
2-
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, TimeUnit};
2+
use std::sync::Arc;
3+
4+
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, Metadata, TimeUnit};
5+
use polars_utils::format_pl_smallstr;
36
use polars_utils::pl_str::PlSmallStr;
47

58
use crate::arrow::read::schema::SchemaInferenceOptions;
@@ -309,11 +312,27 @@ pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
309312
/// Returns `None` iff the parquet type has no associated primitive types,
310313
/// i.e. if it is a column-less group type.
311314
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
312-
Some(Field::new(
313-
type_.get_field_info().name.clone(),
315+
let field_info = type_.get_field_info();
316+
317+
let metadata: Option<Arc<Metadata>> = field_info.id.map(|x: i32| {
318+
Arc::new(
319+
[(
320+
PlSmallStr::from_static("PARQUET:field_id"),
321+
format_pl_smallstr!("{x}"),
322+
)]
323+
.into(),
324+
)
325+
});
326+
327+
let mut arrow_field = Field::new(
328+
field_info.name.clone(),
314329
to_dtype(type_, options)?,
315330
is_nullable(type_.get_field_info()),
316-
))
331+
);
332+
333+
arrow_field.metadata = metadata;
334+
335+
Some(arrow_field)
317336
}
318337

319338
/// Converts a parquet list to arrow list.

crates/polars-plan/src/plans/optimizer/expand_datasets.rs

Lines changed: 70 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -29,66 +29,82 @@ impl OptimizationRule for ExpandDatasets {
2929
_expr_arena: &mut Arena<crate::prelude::AExpr>,
3030
node: Node,
3131
) -> PolarsResult<Option<IR>> {
32-
let ir = lp_arena.get(node);
33-
32+
// # Note
33+
// This function mutates the IR node in-place rather than returning the new IR - the
34+
// StackOptimizer will re-call this function otherwise.
3435
if let IR::Scan {
36+
sources,
3537
scan_type,
3638
unified_scan_args,
37-
..
38-
} = ir
39+
40+
file_info: _,
41+
hive_parts: _,
42+
predicate: _,
43+
output_schema: _,
44+
} = lp_arena.get_mut(node)
3945
{
4046
let projection = unified_scan_args.projection.clone();
4147
let limit = match unified_scan_args.pre_slice.clone() {
4248
Some(v @ Slice::Positive { .. }) => Some(v.end_position()),
4349
_ => None,
4450
};
4551

46-
match scan_type.as_ref() {
52+
match scan_type.as_mut() {
4753
#[cfg(feature = "python")]
4854
FileScanIR::PythonDataset {
4955
dataset_object,
5056
cached_ir,
5157
} => {
5258
let cached_ir = cached_ir.clone();
53-
5459
let mut guard = cached_ir.lock().unwrap();
5560

56-
// Note: We always get called twice in succession from the stack optimizer,
57-
// as it was designed to optimize until fixed point. Ensure we return
58-
// Ok(None) if the mutex contains the initialized state.
59-
if match guard.as_ref() {
60-
// Reject cached if limit or projection does not match. This can happen if a scan is reused.
61+
if config::verbose() {
62+
eprintln!(
63+
"expand_datasets(): python[{}]: limit: {:?}, project: {}",
64+
dataset_object.name(),
65+
limit,
66+
projection.as_ref().map_or(
67+
PlSmallStr::from_static("all"),
68+
|x| format_pl_smallstr!("{}", x.len())
69+
)
70+
)
71+
}
72+
73+
let can_use_existing = match guard.as_ref() {
6174
Some(resolved) => {
6275
let ExpandedDataset {
6376
limit: cached_limit,
6477
projection: cached_projection,
65-
resolved_ir: _,
78+
expanded_dsl: _,
6679
python_scan: _,
6780
} = resolved;
6881

6982
cached_limit == &limit && cached_projection == &projection
7083
},
7184

7285
None => false,
73-
} {
74-
return Ok(None);
75-
}
86+
};
7687

77-
if config::verbose() {
78-
eprintln!(
79-
"expand_datasets(): python[{}]: limit: {:?}, project: {}",
80-
dataset_object.name(),
88+
if !can_use_existing {
89+
let expanded_dsl =
90+
dataset_object.to_dataset_scan(limit, projection.as_deref())?;
91+
92+
*guard = Some(ExpandedDataset {
8193
limit,
82-
projection.as_ref().map_or(
83-
PlSmallStr::from_static("all"),
84-
|x| format_pl_smallstr!("{}", x.len())
85-
)
86-
)
94+
projection,
95+
expanded_dsl,
96+
python_scan: None,
97+
})
8798
}
8899

89-
let plan = dataset_object.to_dataset_scan(limit, projection.as_deref())?;
100+
let ExpandedDataset {
101+
limit: _,
102+
projection: _,
103+
expanded_dsl,
104+
python_scan,
105+
} = guard.as_mut().unwrap();
90106

91-
let (resolved_ir, python_scan) = match plan {
107+
match expanded_dsl {
92108
DslPlan::Scan {
93109
sources: resolved_sources,
94110
unified_scan_args: resolved_unified_scan_args,
@@ -97,22 +113,6 @@ impl OptimizationRule for ExpandDatasets {
97113
} => {
98114
use crate::dsl::FileScanDsl;
99115

100-
let mut ir = ir.clone();
101-
102-
let IR::Scan {
103-
sources,
104-
scan_type,
105-
unified_scan_args,
106-
107-
file_info: _,
108-
hive_parts: _,
109-
predicate: _,
110-
output_schema: _,
111-
} = &mut ir
112-
else {
113-
unreachable!()
114-
};
115-
116116
// We only want a few configuration flags from here (e.g. column casting config).
117117
// The rest we either expect to be None (e.g. projection / row_index), or ignore.
118118
let UnifiedScanArgs {
@@ -131,25 +131,26 @@ impl OptimizationRule for ExpandDatasets {
131131
include_file_paths: _include_file_paths @ None,
132132
deletion_files,
133133
column_mapping,
134-
} = *resolved_unified_scan_args
134+
} = resolved_unified_scan_args.as_ref()
135135
else {
136136
panic!(
137137
"invalid scan args from python dataset resolve: {:?}",
138138
&resolved_unified_scan_args
139139
)
140140
};
141141

142-
unified_scan_args.cloud_options = cloud_options;
143-
unified_scan_args.rechunk = rechunk;
144-
unified_scan_args.cache = cache;
145-
unified_scan_args.cast_columns_policy = cast_columns_policy;
146-
unified_scan_args.missing_columns_policy = missing_columns_policy;
147-
unified_scan_args.extra_columns_policy = extra_columns_policy;
148-
unified_scan_args.deletion_files = deletion_files;
149-
unified_scan_args.column_mapping = column_mapping;
150-
151-
*sources = resolved_sources;
152-
*scan_type = Box::new(match *resolved_scan_type {
142+
unified_scan_args.cloud_options = cloud_options.clone();
143+
unified_scan_args.rechunk = *rechunk;
144+
unified_scan_args.cache = *cache;
145+
unified_scan_args.cast_columns_policy = cast_columns_policy.clone();
146+
unified_scan_args.missing_columns_policy = *missing_columns_policy;
147+
unified_scan_args.extra_columns_policy = *extra_columns_policy;
148+
unified_scan_args.deletion_files = deletion_files.clone();
149+
unified_scan_args.column_mapping = column_mapping.clone();
150+
151+
*sources = resolved_sources.clone();
152+
153+
*scan_type = Box::new(match *resolved_scan_type.clone() {
153154
#[cfg(feature = "csv")]
154155
FileScanDsl::Csv { options } => FileScanIR::Csv { options },
155156

@@ -182,18 +183,15 @@ impl OptimizationRule for ExpandDatasets {
182183
file_info: _,
183184
} => FileScanIR::Anonymous { options, function },
184185
});
185-
186-
(ir, None)
187186
},
188187

189-
DslPlan::PythonScan { options } => (
190-
ir.clone(),
191-
Some(ExpandedPythonScan {
188+
DslPlan::PythonScan { options } => {
189+
*python_scan = Some(ExpandedPythonScan {
192190
name: dataset_object.name(),
193-
scan_fn: options.scan_fn.unwrap(),
194-
variant: options.python_source,
195-
}),
196-
),
191+
scan_fn: options.scan_fn.clone().unwrap(),
192+
variant: options.python_source.clone(),
193+
})
194+
},
197195

198196
dsl => {
199197
polars_bail!(
@@ -203,24 +201,12 @@ impl OptimizationRule for ExpandDatasets {
203201
)
204202
},
205203
};
206-
207-
let resolved = ExpandedDataset {
208-
limit,
209-
projection,
210-
resolved_ir,
211-
python_scan,
212-
};
213-
214-
*guard = Some(resolved);
215-
216-
let resolved_ir = guard.as_ref().map(|x| x.resolved_ir.clone()).unwrap();
217-
218-
return Ok(Some(resolved_ir));
219204
},
220205

221206
_ => {},
222207
}
223208
}
209+
224210
Ok(None)
225211
}
226212
}
@@ -229,7 +215,7 @@ impl OptimizationRule for ExpandDatasets {
229215
pub struct ExpandedDataset {
230216
limit: Option<usize>,
231217
projection: Option<Arc<[PlSmallStr]>>,
232-
resolved_ir: IR,
218+
expanded_dsl: DslPlan,
233219

234220
/// Fallback python scan
235221
#[cfg(feature = "python")]
@@ -256,7 +242,7 @@ impl Debug for ExpandedDataset {
256242
let ExpandedDataset {
257243
limit,
258244
projection,
259-
resolved_ir,
245+
expanded_dsl,
260246

261247
#[cfg(feature = "python")]
262248
python_scan,
@@ -265,8 +251,10 @@ impl Debug for ExpandedDataset {
265251
return display::ExpandedDataset {
266252
limit,
267253
projection,
268-
resolved_ir,
269-
254+
expanded_dsl: &match expanded_dsl.display() {
255+
Ok(v) => v.to_string(),
256+
Err(e) => e.to_string(),
257+
},
270258
#[cfg(feature = "python")]
271259
python_scan: python_scan.as_ref().map(
272260
|ExpandedPythonScan {
@@ -281,18 +269,17 @@ impl Debug for ExpandedDataset {
281269
.fmt(f);
282270

283271
mod display {
272+
use std::fmt::Debug;
284273
use std::sync::Arc;
285274

286275
use polars_utils::pl_str::PlSmallStr;
287276

288-
use crate::prelude::IR;
289-
290277
#[derive(Debug)]
291278
#[expect(unused)]
292279
pub struct ExpandedDataset<'a> {
293280
pub limit: &'a Option<usize>,
294281
pub projection: &'a Option<Arc<[PlSmallStr]>>,
295-
pub resolved_ir: &'a IR,
282+
pub expanded_dsl: &'a str,
296283

297284
#[cfg(feature = "python")]
298285
pub python_scan: Option<PlSmallStr>,

py-polars/tests/unit/io/test_iceberg.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ class TestIcebergScanIO:
5353
"""Test coverage for `iceberg` scan ops."""
5454

5555
def test_scan_iceberg_plain(self, iceberg_path: str) -> None:
56-
df = pl.scan_iceberg(iceberg_path)
57-
assert len(df.collect()) == 3
58-
assert df.collect_schema() == {
56+
q = pl.scan_iceberg(iceberg_path)
57+
assert len(q.collect()) == 3
58+
assert q.collect_schema() == {
5959
"id": pl.Int32,
6060
"str": pl.String,
6161
"ts": pl.Datetime(time_unit="us", time_zone=None),
6262
}
6363

6464
def test_scan_iceberg_snapshot_id(self, iceberg_path: str) -> None:
65-
df = pl.scan_iceberg(iceberg_path, snapshot_id=7051579356916758811)
66-
assert len(df.collect()) == 3
67-
assert df.collect_schema() == {
65+
q = pl.scan_iceberg(iceberg_path, snapshot_id=7051579356916758811)
66+
assert len(q.collect()) == 3
67+
assert q.collect_schema() == {
6868
"id": pl.Int32,
6969
"str": pl.String,
7070
"ts": pl.Datetime(time_unit="us", time_zone=None),

0 commit comments

Comments
 (0)