Skip to content

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Jul 24, 2025

Adds execution-level implementation to properly apply Iceberg column mappings.

Related prior work

Changes

  • Enum Projection::(Plain|Mapped)

Introduces a new Projection enum that is used instead of the existing projected_file_schema: SchemaRef. This encapsulates e.g. resolved Iceberg column transforms (ColumnSelector), which can be retrieved via get_mapped_projection_ref_*. This interface is designed to also be used by the Parquet reader in the future to support filters with casting/renaming.

This enum also replaces the existing projection: SchemaRef parameter inside of BeginReadArgs. Readers that do not indicate the new MAPPED_COLUMN_PROJECTION capability are guaranteed to be passed Projection::Plain and can use the inner SchemaRef directly.

  • missing_columns.rs has been deleted - insertion of missing columns is now handled by ColumnSelector::Constant.
Column Transform
Some(
    [
        Transformed(
            (
                Transformed(
                    (
                        Position(
                            0,
                        ),
                        ListValuesMapping {
                            values_selector: Transformed(
                                (
                                    Position(
                                        0,
                                    ),
                                    StructFieldsMapping {
                                        field_selectors: [
                                            Position(
                                                0,
                                            ),
                                            Transformed(
                                                (
                                                    Position(
                                                        1,
                                                    ),
                                                    StructFieldsMapping {
                                                        field_selectors: [
                                                            Transformed(
                                                                (
                                                                    Transformed(
                                                                        (
                                                                            Position(
                                                                                2,
                                                                            ),
                                                                            Cast {
                                                                                dtype: Int64,
                                                                                options: Overflowing,
                                                                            },
                                                                        ),
                                                                    ),
                                                                    Rename {
                                                                        name: "field_1",
                                                                    },
                                                                ),
                                                            ),
                                                            Transformed(
                                                                (
                                                                    Transformed(
                                                                        (
                                                                            Position(
                                                                                0,
                                                                            ),
                                                                            Cast {
                                                                                dtype: Int64,
                                                                                options: Overflowing,
                                                                            },
                                                                        ),
                                                                    ),
                                                                    Rename {
                                                                        name: "field_2",
                                                                    },
                                                                ),
                                                            ),
                                                        ],
                                                    },
                                                ),
                                            ),
                                        ],
                                    },
                                ),
                            ),
                        },
                    ),
                ),
                Rename {
                    name: "column_1",
                },
            ),
        ),
        Transformed(
            (
                Transformed(
                    (
                        Position(
                            1,
                        ),
                        ListValuesMapping {
                            values_selector: Transformed(
                                (
                                    Position(
                                        0,
                                    ),
                                    StructFieldsMapping {
                                        field_selectors: [
                                            Transformed(
                                                (
                                                    Position(
                                                        2,
                                                    ),
                                                    Rename {
                                                        name: "field_1",
                                                    },
                                                ),
                                            ),
                                            Transformed(
                                                (
                                                    Position(
                                                        0,
                                                    ),
                                                    Rename {
                                                        name: "field_2",
                                                    },
                                                ),
                                            ),
                                        ],
                                    },
                                ),
                            ),
                        },
                    ),
                ),
                Rename {
                    name: "column_2",
                },
            ),
        ),
    ],
)

@github-actions github-actions bot added fix Bug fix python Related to Python Polars rust Related to Rust Polars labels Jul 24, 2025
predicate: None,
};

let schema_before_selection = if incoming_schema.len() == final_output_schema.len()
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the column selectors are applied after attaching the row_index column. This is prone to name collisions as the underlying file can contain a column called "row_index" that gets renamed to a different column.

This PR changes the ordering of operations to always apply the column selectors first before any other operation that adds/removes columns (e.g. row_index) to resolve this problem. As a result this code here is no longer needed.

@@ -586,4 +597,194 @@ impl ColumnSelectorBuilder {

mismatch_err("")
}

/// Adds transforms on top of the `input_selector` if necessary.
pub fn attach_iceberg_transforms(
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the existing attach_transforms, but builds the transform based the (physical_id: u32, dtype: IcebergColumnType) instead of (name: PlSmallStr, dtype: DataType).

&mut incoming_schema.iter_names().map(|x| x.as_str()),
)
#[derive(Debug, Clone)]
pub enum ForbidExtraColumns {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize and use a ForbidExtraColumns instead of passing the ExtraColumnPolicy - for Iceberg this will properly check for extra columns using the physical ID.

@@ -604,58 +605,14 @@ impl ReaderStarter {
..Default::default()
};

let mut extra_ops_post = extra_ops_this_file;
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code has been moved below to the spawned async fn start_reader_impl to reduce blocking in this loop.

[Upstream Issue]
PyIceberg writes NULL as empty lists into the Parquet file.
* Issue on Polars repo - https://github.com/pola-rs/polars/issues/23715
* Issue on PyIceberg repo - https://github.com/apache/iceberg-python/issues/2246
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Provides projections for columns that are sourced from the file.
#[derive(Debug, Clone)]
pub enum ProjectionBuilder {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This builds Projection per-file based on the schema in the file.

Copy link

codecov bot commented Jul 24, 2025

Codecov Report

❌ Patch coverage is 88.02589% with 74 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.37%. Comparing base (e99abdc) to head (b1d2b47).
⚠️ Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
...ces/multi_file_reader/extra_ops/column_selector.rs 76.76% 33 Missing ⚠️
...s/multi_file_reader/reader_interface/projection.rs 86.72% 15 Missing ⚠️
...ces/multi_file_reader/initialization/projection.rs 86.86% 13 Missing ⚠️
..._sources/multi_file_reader/reader_interface/mod.rs 11.11% 8 Missing ⚠️
crates/polars-core/src/schema/iceberg.rs 81.81% 2 Missing ⚠️
...rces/multi_file_reader/reader_pipelines/generic.rs 98.55% 2 Missing ⚠️
.../polars-stream/src/nodes/io_sources/parquet/mod.rs 90.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #23713      +/-   ##
==========================================
+ Coverage   81.28%   81.37%   +0.08%     
==========================================
  Files        1644     1650       +6     
  Lines      223249   224060     +811     
  Branches     2841     2851      +10     
==========================================
+ Hits       181479   182338     +859     
+ Misses      41071    41013      -58     
- Partials      699      709      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@nameexhaustion nameexhaustion force-pushed the iceberg-column-mapping branch from 5cdf788 to 70eb121 Compare July 24, 2025 14:10
@nameexhaustion nameexhaustion marked this pull request as ready for review July 24, 2025 16:17
@ritchie46
Copy link
Member

Iceberg complexity. :')

@ritchie46 ritchie46 merged commit 2970c00 into pola-rs:main Jul 25, 2025
28 checks passed
gfvioli pushed a commit to gfvioli/polars that referenced this pull request Jul 25, 2025
Washiil pushed a commit to Washiil/polars that referenced this pull request Jul 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix Bug fix python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect results on native Iceberg scans when columns have been renamed
2 participants