-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Description
Discussed in #11730
Originally posted by erchirag July 30, 2024
Hi
I have a parquet file with following example data:
{
"a" : "A1",
"myobjects" :
[
{
"name" : "1",
"value" : "V"
},
{
"name" : "2",
"value" : "V2"
}
]
}
When I am trying to access the subfield "Name" none of the following syntax is working for me using latest datafusion package (40.*) and I am getting error as "Nested identifiers not yet supported".
Examples I tried:
SELECT myobjects.element.name FROM my_table;
SELECT myobjects.name FROM my_table;
SELECT myobjects.item.name FROM my_table;
SELECT myobjects["name"] FROM my_table;
From DF I tried
col("myobjects.name")
col("myobjects.item.name")
col("myobjects.element.name");
None of them worked for me, Can someone please help me understand what am I doing wrong ?
Here is the sample self-contained rust code to repro
use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StringArray, StructArray, ListArray};
use arrow_buffer::OffsetBuffer;
use arrow_schema::{Schema, Field, DataType};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::prelude::*;
use tokio::runtime::Runtime;
use std::fs::File;
use std::path::Path;
fn generate_parquet(path: &str) -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new(
"myobjects",
DataType::List(Arc::new(Field::new(
"element",
DataType::Struct(arrow_schema::Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
))),
false,
),
]));
let a_col = Arc::new(StringArray::from(vec!["A1"])) as ArrayRef;
// Define the inner struct arrays
let names = Arc::new(StringArray::from(vec!["1", "2"])) as ArrayRef;
let values = Arc::new(StringArray::from(vec!["V", "V2"])) as ArrayRef;
// Create a struct array
let struct_array = Arc::new(StructArray::new(
arrow_schema::Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
]),
vec![names, values],
None,
)) as ArrayRef;
// Create an OffsetBuffer (correct type for ListArray)
let offsets = OffsetBuffer::new(vec![0, struct_array.len() as i32].into());
// IMPORTANT: Pass the DataType for the List ("myobjects"), not the entire field
let list_data_type = Arc::new(schema.field(1).clone());
// print schema
println!("Schema: {schema:?}");
// print list_data_type
println!("List Data Type: {list_data_type:?}");
// Create the field for the list array
let element_field = Arc::new(Field::new(
"element",
DataType::Struct(arrow_schema::Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
));
// Wrap the struct in a ListArray using ListArray::try_new
let myobjects_col = Arc::new(ListArray::new(
element_field,
offsets,
struct_array,
None,
)) as ArrayRef;
let batch = RecordBatch::try_new(schema.clone(), vec![a_col, myobjects_col])?;
let file = File::create(path)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
writer.write(&batch)?;
writer.close()?;
Ok(())
}
async fn query_parquet(path: &str) -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
ctx.register_parquet("my_table", path, ParquetReadOptions::default())
.await?;
let queries = vec![
"SELECT myobjects.element.name FROM my_table",
"SELECT myobjects.name FROM my_table",
"SELECT myobjects.item.name FROM my_table",
"SELECT myobjects[\"name\"] FROM my_table",
"SELECT myobjects.item.name FROM my_table",
];
for query in queries {
match ctx.sql(query).await {
Ok(df) => {
match df.collect().await {
Ok(results) => {
println!("Results for query '{}':", query);
print_batches(&results)?;
}
Err(e) => {
eprintln!("Error collecting results for query '{}': {}", query, e);
}
}
}
Err(e) => {
eprintln!("Error executing query '{}': {}", query, e);
}
}
}
let df = ctx.table("my_table").await?;
let df_queries = vec![
ctx.table("my_table").await?.select(vec![col("myobjects.element.name")]),
ctx.table("my_table").await?.select(vec![col("myobjects.name")]),
ctx.table("my_table").await?.select(vec![col("myobjects.item.name")]),
ctx.table("my_table").await?.select(vec![col("myobjects.element.name")]),
];
for (i, df_query) in df_queries.into_iter().enumerate() {
match df_query {
Ok(df) => {
match df.collect().await {
Ok(results) => {
println!("Results for DataFrame query {}:", i + 1);
print_batches(&results)?;
}
Err(e) => {
eprintln!("Error collecting results for DataFrame query {}: {}", i + 1, e);
}
}
}
Err(e) => {
eprintln!("Error executing DataFrame query {}: {}", i + 1, e);
}
}
}
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let path = "example.parquet";
generate_parquet(path)?;
let rt = Runtime::new()?;
rt.block_on(async {
query_parquet(path).await.unwrap();
});
Ok(())
}
[package]
name = "trace_store_df_client"
version = "0.1.0"
edition = "2021"
[dependencies]
arrow-array = "54.2.0"
arrow-buffer = "54.2.0"
arrow-schema = "54.2.0"
datafusion = "43.0.0"
parquet = "54.2.0"
tokio = { version = "1.43.0", features = ["full", "rt-multi-thread"] }
[profile.dev]
incremental = true
Metadata
Metadata
Assignees
Labels
No labels