Skip to content

Nested Fields Access on StructArray field not working #14768

@erchirag

Description

@erchirag

Discussed in #11730

Originally posted by erchirag July 30, 2024
Hi

I have a parquet file with following example data:

{
     "a" : "A1",

     "myobjects" : 
      [
              {
                      "name" : "1",
                      "value" : "V"
              },

             {
                       "name" : "2",
                       "value" :  "V2"
             }

      ]
}

When I am trying to access the subfield "Name" none of the following syntax is working for me using latest datafusion package (40.*) and I am getting error as "Nested identifiers not yet supported".

Examples I tried:
SELECT myobjects.element.name FROM my_table;

SELECT myobjects.name FROM my_table;

SELECT myobjects.item.name FROM my_table;

SELECT myobjects["name"] FROM my_table;

From DF I tried
col("myobjects.name")

col("myobjects.item.name")

col("myobjects.element.name");

None of them worked for me, Can someone please help me understand what am I doing wrong ?

Here is the sample self-contained rust code to repro

use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StringArray, StructArray, ListArray};
use arrow_buffer::OffsetBuffer;
use arrow_schema::{Schema, Field, DataType};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::prelude::*;
use tokio::runtime::Runtime;
use std::fs::File;
use std::path::Path;

fn generate_parquet(path: &str) -> Result<(), Box<dyn std::error::Error>> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Utf8, false),
        Field::new(
            "myobjects",
            DataType::List(Arc::new(Field::new(
                "element",
                DataType::Struct(arrow_schema::Fields::from(vec![
                    Field::new("name", DataType::Utf8, false),
                    Field::new("value", DataType::Utf8, false),
                ])),
                false,
            ))),
            false,
        ),
    ]));

    let a_col = Arc::new(StringArray::from(vec!["A1"])) as ArrayRef;

    // Define the inner struct arrays
    let names = Arc::new(StringArray::from(vec!["1", "2"])) as ArrayRef;
    let values = Arc::new(StringArray::from(vec!["V", "V2"])) as ArrayRef;

    // Create a struct array
    let struct_array = Arc::new(StructArray::new(
        arrow_schema::Fields::from(vec![
            Field::new("name", DataType::Utf8, false),
            Field::new("value", DataType::Utf8, false),
        ]),
        vec![names, values],
        None,
    )) as ArrayRef;

    // Create an OffsetBuffer (correct type for ListArray)
    let offsets = OffsetBuffer::new(vec![0, struct_array.len() as i32].into());

    // IMPORTANT: Pass the DataType for the List ("myobjects"), not the entire field
    let list_data_type = Arc::new(schema.field(1).clone());

    // print schema
    println!("Schema: {schema:?}");

    // print list_data_type
    println!("List Data Type: {list_data_type:?}");

        // Create the field for the list array
        let element_field = Arc::new(Field::new(
            "element",
            DataType::Struct(arrow_schema::Fields::from(vec![
                Field::new("name", DataType::Utf8, false),
                Field::new("value", DataType::Utf8, false),
            ])),
            false,
        ));

    // Wrap the struct in a ListArray using ListArray::try_new
    let myobjects_col = Arc::new(ListArray::new(
        element_field,
        offsets,
        struct_array,
        None,
    )) as ArrayRef;

    let batch = RecordBatch::try_new(schema.clone(), vec![a_col, myobjects_col])?;

    let file = File::create(path)?;
    let props = WriterProperties::builder().build();
    let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
    writer.write(&batch)?;
    writer.close()?;

    Ok(())
}

async fn query_parquet(path: &str) -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();
    ctx.register_parquet("my_table", path, ParquetReadOptions::default())
        .await?;

    let queries = vec![
        "SELECT myobjects.element.name FROM my_table",
        "SELECT myobjects.name FROM my_table",
        "SELECT myobjects.item.name FROM my_table",
        "SELECT myobjects[\"name\"] FROM my_table",
        "SELECT myobjects.item.name FROM my_table",
    ];

    for query in queries {
        match ctx.sql(query).await {
            Ok(df) => {
                match df.collect().await {
                    Ok(results) => {
                        println!("Results for query '{}':", query);
                        print_batches(&results)?;
                    }
                    Err(e) => {
                        eprintln!("Error collecting results for query '{}': {}", query, e);
                    }
                }
            }
            Err(e) => {
                eprintln!("Error executing query '{}': {}", query, e);
            }
        }
    }

    let df = ctx.table("my_table").await?;
    let df_queries = vec![
        ctx.table("my_table").await?.select(vec![col("myobjects.element.name")]),
        ctx.table("my_table").await?.select(vec![col("myobjects.name")]),
        ctx.table("my_table").await?.select(vec![col("myobjects.item.name")]),
        ctx.table("my_table").await?.select(vec![col("myobjects.element.name")]),
    ];

    for (i, df_query) in df_queries.into_iter().enumerate() {
        match df_query {
            Ok(df) => {
                match df.collect().await {
                    Ok(results) => {
                        println!("Results for DataFrame query {}:", i + 1);
                        print_batches(&results)?;
                    }
                    Err(e) => {
                        eprintln!("Error collecting results for DataFrame query {}: {}", i + 1, e);
                    }
                }
            }
            Err(e) => {
                eprintln!("Error executing DataFrame query {}: {}", i + 1, e);
            }
        }
    }

    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let path = "example.parquet";

    generate_parquet(path)?;

    let rt = Runtime::new()?;
    rt.block_on(async {
        query_parquet(path).await.unwrap();
    });

    Ok(())
}
[package]
name = "trace_store_df_client"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow-array = "54.2.0"
arrow-buffer = "54.2.0"
arrow-schema = "54.2.0"
datafusion = "43.0.0"
parquet = "54.2.0"
tokio = { version = "1.43.0", features = ["full", "rt-multi-thread"] }

[profile.dev]
incremental = true

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions