Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/default_column_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl TableProvider for DefaultValueTableProvider {
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _);
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl TableProvider for ExampleTableProvider {
.with_limit(limit)
.with_file_group(file_group)
// if the rewriter needs a reference to the table schema you can bind self.schema() here
.with_expr_adapter(Arc::new(ShreddedJsonRewriterFactory) as _);
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
Expand Down
31 changes: 31 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion_execution::{
use datafusion_expr::{
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -99,6 +100,8 @@ pub struct ListingTableConfig {
schema_source: SchemaSource,
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
physical_expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}

impl ListingTableConfig {
Expand Down Expand Up @@ -281,6 +284,7 @@ impl ListingTableConfig {
options: Some(listing_options),
schema_source: self.schema_source,
schema_adapter_factory: self.schema_adapter_factory,
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
})
}

Expand All @@ -300,6 +304,7 @@ impl ListingTableConfig {
options: _,
schema_source,
schema_adapter_factory,
physical_expr_adapter_factory,
} = self;

let (schema, new_schema_source) = match file_schema {
Expand All @@ -322,6 +327,7 @@ impl ListingTableConfig {
options: Some(options),
schema_source: new_schema_source,
schema_adapter_factory,
physical_expr_adapter_factory,
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
Expand Down Expand Up @@ -364,6 +370,7 @@ impl ListingTableConfig {
options: Some(options),
schema_source: self.schema_source,
schema_adapter_factory: self.schema_adapter_factory,
physical_expr_adapter_factory: self.physical_expr_adapter_factory,
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
Expand Down Expand Up @@ -415,6 +422,26 @@ impl ListingTableConfig {
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
}

/// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`]
///
/// The expression adapter factory is used to create physical expression adapters that can
/// handle schema evolution and type conversions when evaluating expressions
/// with different schemas than the table schema.
///
/// If not provided, a default physical expression adapter factory will be used unless a custom
/// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
///
/// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
pub fn with_physical_expr_adapter_factory(
self,
physical_expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
) -> Self {
Self {
physical_expr_adapter_factory: Some(physical_expr_adapter_factory),
..self
}
}
}

/// Options for creating a [`ListingTable`]
Expand Down Expand Up @@ -911,6 +938,8 @@ pub struct ListingTable {
column_defaults: HashMap<String, Expr>,
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}

impl ListingTable {
Expand Down Expand Up @@ -952,6 +981,7 @@ impl ListingTable {
constraints: Constraints::default(),
column_defaults: HashMap::new(),
schema_adapter_factory: config.schema_adapter_factory,
expr_adapter_factory: config.physical_expr_adapter_factory,
};

Ok(table)
Expand Down Expand Up @@ -1196,6 +1226,7 @@ impl TableProvider for ListingTable {
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,70 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter() -> Result<()> {
// Create a temporary directory for our test file
let tmp_dir = TempDir::new()?;
let file_path = tmp_dir.path().join("test.parquet");
let file_path_str = file_path.to_str().unwrap();

// Create test data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
],
)?;

// Write test parquet file
let file = std::fs::File::create(file_path_str)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
writer.write(&batch)?;
writer.close()?;

// Create a session context
let ctx = SessionContext::new();

// Create a ParquetSource with the adapter factory
let source = ParquetSource::default()
.with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));

// Create a scan config
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
schema.clone(),
)
.with_source(source)
.build();

// Create a data source executor
let exec = DataSourceExec::from_data_source(config);

// Collect results
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;
let batches = datafusion::physical_plan::common::collect(stream).await?;

// There should be one batch
assert_eq!(batches.len(), 1);

// Verify the schema has uppercase column names
let result_schema = batches[0].schema();
assert_eq!(result_schema.field(0).name(), "ID");
assert_eq!(result_schema.field(1).name(), "NAME");

Ok(())
}


#[tokio::test]
async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
// This test verifies that the same schema adapter factory can be reused
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;
mod schema;
mod schema_adapter;
mod schema_coercion;
mod utils;

Expand Down
Loading