From 22663c31306e61d9a40ec272b1ab237b11f827b8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 15 Jul 2025 12:12:56 -0500 Subject: [PATCH 01/19] add fallback and docs --- datafusion/datasource-parquet/src/opener.rs | 63 +++---- .../datasource-parquet/src/row_filter.rs | 155 ++++++++++++++++-- datafusion/datasource-parquet/src/source.rs | 21 ++- docs/source/library-user-guide/upgrading.md | 11 ++ 4 files changed, 201 insertions(+), 49 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 15189a9a6423..b1d4fbf8675d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -95,7 +95,7 @@ pub(super) struct ParquetOpener { /// Optional parquet FileDecryptionProperties pub file_decryption_properties: Option>, /// Rewrite expressions in the context of the file schema - pub expr_adapter: Arc, + pub(crate) expr_adapter_factory: Option>, } impl FileOpener for ParquetOpener { @@ -120,6 +120,7 @@ impl FileOpener for ParquetOpener { let projected_schema = SchemaRef::from(self.logical_file_schema.project(&self.projection)?); + let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory); let schema_adapter = self .schema_adapter_factory .create(projected_schema, Arc::clone(&self.logical_file_schema)); @@ -136,7 +137,8 @@ impl FileOpener for ParquetOpener { let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); - let expr_adapter = Arc::clone(&self.expr_adapter); + let expr_adapter_factory = self.expr_adapter_factory.clone(); + let mut predicate_file_schema = Arc::clone(&self.logical_file_schema); let mut enable_page_index = self.enable_page_index; let file_decryption_properties = self.file_decryption_properties.clone(); @@ -242,35 +244,32 @@ impl FileOpener for ParquetOpener { // Adapt the predicate to the physical file schema. // This evaluates missing columns and inserts any necessary casts. - predicate = predicate - .map(|p| { - let partition_values = partition_fields - .iter() - .cloned() - .zip(file.partition_values) - .collect_vec(); - let adapter = expr_adapter - .create( - Arc::clone(&logical_file_schema), - Arc::clone(&physical_file_schema), - ) - .with_partition_values(partition_values); - adapter.rewrite(p).map_err(ArrowError::from).map(|p| { - // After rewriting to the file schema, further simplifications may be possible. - // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` - // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). - PhysicalExprSimplifier::new(&physical_file_schema) - .simplify(p) - .map_err(ArrowError::from) + if let Some(expr_adapter_factory) = expr_adapter_factory { + predicate = predicate + .map(|p| { + let partition_values = partition_fields + .iter() + .cloned() + .zip(file.partition_values) + .collect_vec(); + let expr = expr_adapter_factory + .create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ) + .with_partition_values(partition_values) + .rewrite(p)?; + // Now that we've rewritten the predicate, we can simplify it + PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) }) - }) - .transpose()? - .transpose()?; + .transpose()?; + predicate_file_schema = Arc::clone(&physical_file_schema); + } // Build predicates for this specific file let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( predicate.as_ref(), - &physical_file_schema, + &predicate_file_schema, &predicate_creation_errors, ); @@ -307,9 +306,11 @@ impl FileOpener for ParquetOpener { let row_filter = row_filter::build_row_filter( &predicate, &physical_file_schema, + &predicate_file_schema, builder.metadata(), reorder_predicates, &file_metrics, + &schema_adapter_factory, ); match row_filter { @@ -640,7 +641,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -726,7 +727,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -828,7 +829,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), } }; let make_meta = || FileMeta { @@ -940,7 +941,7 @@ mod test { enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, file_decryption_properties: None, - expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -1053,7 +1054,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), } }; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 5626f83186e3..70750a75bc61 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -67,7 +67,6 @@ use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use itertools::Itertools; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; @@ -75,8 +74,9 @@ use parquet::file::metadata::ParquetMetaData; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::Result; +use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; +use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use datafusion_physical_plan::metrics; @@ -106,6 +106,8 @@ pub(crate) struct DatafusionArrowPredicate { rows_matched: metrics::Count, /// how long was spent evaluating this predicate time: metrics::Time, + /// used to perform type coercion while filtering rows + schema_mapper: Arc, } impl DatafusionArrowPredicate { @@ -130,6 +132,7 @@ impl DatafusionArrowPredicate { rows_pruned, rows_matched, time, + schema_mapper: candidate.schema_mapper, }) } } @@ -140,6 +143,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let batch = self.schema_mapper.map_batch(batch)?; + // scoped timer updates on drop let mut timer = self.time.timer(); @@ -182,6 +187,9 @@ pub(crate) struct FilterCandidate { /// required to pass thorugh a `SchemaMapper` to the table schema /// upon which we then evaluate the filter expression. projection: Vec, + /// A `SchemaMapper` used to map batches read from the file schema to + /// the filter's projection of the table schema. + schema_mapper: Arc, /// The projected table schema that this filter references filter_schema: SchemaRef, } @@ -222,11 +230,26 @@ struct FilterCandidateBuilder { /// columns in the file schema that are not in the table schema or columns that /// are in the table schema that are not in the file schema. file_schema: SchemaRef, + /// The schema of the table (merged schema) -- columns may be in different + /// order than in the file and have columns that are not in the file schema + table_schema: SchemaRef, + /// A `SchemaAdapterFactory` used to map the file schema to the table schema. + schema_adapter_factory: Arc, } impl FilterCandidateBuilder { - pub fn new(expr: Arc, file_schema: Arc) -> Self { - Self { expr, file_schema } + pub fn new( + expr: Arc, + file_schema: Arc, + table_schema: Arc, + schema_adapter_factory: Arc, + ) -> Self { + Self { + expr, + file_schema, + table_schema, + schema_adapter_factory, + } } /// Attempt to build a `FilterCandidate` from the expression @@ -238,21 +261,20 @@ impl FilterCandidateBuilder { /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { let Some(required_indices_into_table_schema) = - pushdown_columns(&self.expr, &self.file_schema)? + pushdown_columns(&self.expr, &self.table_schema)? else { return Ok(None); }; let projected_table_schema = Arc::new( - self.file_schema + self.table_schema .project(&required_indices_into_table_schema)?, ); - let projection_into_file_schema = collect_columns(&self.expr) - .iter() - .map(|c| c.index()) - .sorted_unstable() - .collect_vec(); + let (schema_mapper, projection_into_file_schema) = self + .schema_adapter_factory + .create(Arc::clone(&projected_table_schema), self.table_schema) + .map_schema(&self.file_schema)?; let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?; let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?; @@ -262,6 +284,7 @@ impl FilterCandidateBuilder { required_bytes, can_use_index, projection: projection_into_file_schema, + schema_mapper: Arc::clone(&schema_mapper), filter_schema: Arc::clone(&projected_table_schema), })) } @@ -403,9 +426,11 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, physical_file_schema: &SchemaRef, + predicate_file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, + schema_adapter_factory: &Arc, ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -422,6 +447,8 @@ pub fn build_row_filter( FilterCandidateBuilder::new( Arc::clone(expr), Arc::clone(physical_file_schema), + Arc::clone(predicate_file_schema), + Arc::clone(schema_adapter_factory), ) .build(metadata) }) @@ -465,9 +492,13 @@ mod test { use super::*; use datafusion_common::ScalarValue; + use arrow::datatypes::{Field, TimeUnit::Nanosecond}; + use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::{col, Expr}; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::metrics::{Count, Time}; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; @@ -489,15 +520,111 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new(expr, table_schema.clone()) - .build(metadata) - .expect("building candidate"); + let candidate = FilterCandidateBuilder::new( + expr, + table_schema.clone(), + table_schema, + schema_adapter_factory, + ) + .build(metadata) + .expect("building candidate"); assert!(candidate.is_none()); } + #[test] + fn test_filter_type_coercion() { + let testdata = datafusion_common::test_util::parquet_test_data(); + let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) + .expect("opening file"); + + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + // This is the schema we would like to coerce to, + // which is different from the physical schema of the file. + let table_schema = Schema::new(vec![Field::new( + "timestamp_col", + DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))), + false, + )]); + + // Test all should fail + let expr = col("timestamp_col").lt(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), + None, + )); + let expr = logical2physical(&expr, &table_schema); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let table_schema = Arc::new(table_schema.clone()); + let candidate = FilterCandidateBuilder::new( + expr, + file_schema.clone(), + table_schema.clone(), + schema_adapter_factory, + ) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); + + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &metadata, + Count::new(), + Count::new(), + Time::new(), + ) + .expect("creating filter predicate"); + + let mut parquet_reader = parquet_reader_builder + .with_projection(row_filter.projection().clone()) + .build() + .expect("building reader"); + + // Parquet file is small, we only need 1 record batch + let first_rb = parquet_reader + .next() + .expect("expected record batch") + .expect("expected error free record batch"); + + let filtered = row_filter.evaluate(first_rb.clone()); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8]))); + + // Test all should pass + let expr = col("timestamp_col").gt(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), + None, + )); + let expr = logical2physical(&expr, &table_schema); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let candidate = FilterCandidateBuilder::new( + expr, + file_schema, + table_schema, + schema_adapter_factory, + ) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); + + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &metadata, + Count::new(), + Count::new(), + Time::new(), + ) + .expect("creating filter predicate"); + + let filtered = row_filter.evaluate(first_rb); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); + } + #[test] fn nested_data_structures_prevent_pushdown() { let table_schema = Arc::new(get_lists_table_schema()); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 59681c318a42..67570bb4fd1b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -473,6 +473,22 @@ impl FileSource for ParquetSource { .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); + let expr_adapter_factory = match ( + self.schema_adapter_factory.as_ref(), + base_config.expr_adapter.as_ref(), + ) { + (Some(_), Some(_)) => { + log::warn!( + "ParquetSource: both schema_adapter_factory and expr_adapter are set. \ + Using schema_adapter_factory only." + ); + None + } + (None, Some(expr_adapter)) => Some(Arc::clone(expr_adapter)), + (Some(_), None) => None, + (None, None) => Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + }; + let parquet_file_reader_factory = self.parquet_file_reader_factory.clone().unwrap_or_else(|| { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ @@ -511,10 +527,7 @@ impl FileSource for ParquetSource { schema_adapter_factory, coerce_int96, file_decryption_properties, - expr_adapter: base_config - .expr_adapter - .clone() - .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory)), + expr_adapter_factory, }) } diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index a28686e01fc3..21604a859ef4 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -120,6 +120,17 @@ SET datafusion.execution.spill_compression = 'zstd'; For more details about this configuration option, including performance trade-offs between different compression codecs, see the [Configuration Settings](../user-guide/configs.md) documentation. +### Custom `SchemaAdapterFactory` will no longer be used for predicate pushdown + +We are moving away from converting data (using `SchemaAdapter`) to converting the expressions themselves (which is more efficient and flexible). +The first place this change has taken place is in predicate pushdown for Parquet. +By default if you do not use a custom `SchemaAdapterFactory` we will use expression conversion instead. +If you do set a custom `SchemaAdapterFactory` we will continue to use it but emit a warning about that code path being deprecated. + +To resolve this you need to implement a custom `PhysicalExprAdapterFactory` and use that instead of a `SchemaAdapterFactory`. +See the [default values](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs) for an example of how to do this. +Opting into the new APIs will set you up for future changes since we plan to expand use of `PhysicalExprAdapterFactory` to other areas of DataFusion. + ## DataFusion `48.0.1` ### `datafusion.execution.collect_statistics` now defaults to `true` From 4cfff839b473c8aee515bf72a12396d49c2e84cd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 15 Jul 2025 12:30:56 -0500 Subject: [PATCH 02/19] fix test --- datafusion/datasource-parquet/src/opener.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b1d4fbf8675d..18c993156f60 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -641,7 +641,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), } }; @@ -727,7 +727,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), } }; @@ -829,7 +829,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), } }; let make_meta = || FileMeta { @@ -941,7 +941,7 @@ mod test { enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), } }; @@ -1054,7 +1054,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), } }; From dc49b8e4605461e5cd33566d646031e987bcfcb6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 15 Jul 2025 15:31:14 -0500 Subject: [PATCH 03/19] add test --- datafusion/datasource-parquet/src/opener.rs | 199 +++++++++++++++++++- 1 file changed, 194 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 18c993156f60..61ae46d5dbb2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -522,22 +522,31 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::{ + compute::cast, + datatypes::{DataType, Field, Schema, SchemaRef}, + }; use bytes::{BufMut, BytesMut}; use chrono::Utc; use datafusion_common::{ - record_batch, stats::Precision, ColumnStatistics, ScalarValue, Statistics, + assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, ScalarValue, + Statistics, }; use datafusion_datasource::{ - file_meta::FileMeta, file_stream::FileOpener, - schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, + file_meta::FileMeta, + file_stream::FileOpener, + schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, + SchemaMapper, + }, + PartitionedFile, }; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ expressions::DynamicFilterPhysicalExpr, planner::logical2physical, schema_rewriter::DefaultPhysicalExprAdapterFactory, PhysicalExpr, }; - use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -565,6 +574,25 @@ mod test { (num_batches, num_rows) } + async fn collect_batches( + mut stream: std::pin::Pin< + Box< + dyn Stream< + Item = Result< + arrow::array::RecordBatch, + arrow::error::ArrowError, + >, + > + Send, + >, + >, + ) -> Vec { + let mut batches = vec![]; + while let Some(Ok(batch)) = stream.next().await { + batches.push(batch); + } + batches + } + async fn write_parquet( store: Arc, filename: &str, @@ -1096,4 +1124,165 @@ mod test { assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } + + fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { + match metrics.sum_by_name(metric_name) { + Some(v) => v.as_usize(), + _ => { + panic!( + "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}" + ); + } + } + } + + #[tokio::test] + async fn test_custom_schema_adapter_no_rewriter() { + // Make a hardcoded schema adapter that adds a new column "b" with default value 0.0 + // and converts the first column "a" from Int32 to UInt64. + #[derive(Debug, Clone)] + struct CustomSchemaMapper; + + impl SchemaMapper for CustomSchemaMapper { + fn map_batch( + &self, + batch: arrow::array::RecordBatch, + ) -> datafusion_common::Result { + let a_column = cast(batch.column(0), &DataType::UInt64)?; + // Add in a new column "b" with default value 0.0 + let b_column = + arrow::array::Float64Array::from(vec![Some(0.0); batch.num_rows()]); + let columns = vec![a_column, Arc::new(b_column)]; + let new_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt64, false), + Field::new("b", DataType::Float64, false), + ])); + Ok(arrow::record_batch::RecordBatch::try_new( + new_schema, columns, + )?) + } + + fn map_column_statistics( + &self, + file_col_statistics: &[ColumnStatistics], + ) -> datafusion_common::Result> { + Ok(vec![ + file_col_statistics[0].clone(), + ColumnStatistics::new_unknown(), + ]) + } + } + + #[derive(Debug, Clone)] + struct CustomSchemaAdapter; + + impl SchemaAdapter for CustomSchemaAdapter { + fn map_schema( + &self, + _file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> + { + let mapper = Arc::new(CustomSchemaMapper); + let projection = vec![0]; // We only need to read the first column "a" from the file + Ok((mapper, projection)) + } + + fn map_column_index( + &self, + index: usize, + file_schema: &Schema, + ) -> Option { + if index < file_schema.fields().len() { + Some(index) + } else { + None // The new column "b" is not in the original schema + } + } + } + + #[derive(Debug, Clone)] + struct CustomSchemaAdapterFactory; + + impl SchemaAdapterFactory for CustomSchemaAdapterFactory { + fn create( + &self, + _projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(CustomSchemaAdapter) + } + } + + // Test that if no expression rewriter is provided we use a schemaadapter to adapt the data to the expresssion + let store = Arc::new(InMemory::new()) as Arc; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + // Write out the batch to a Parquet file + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt64, false), + Field::new("b", DataType::Float64, false), + ])); + + let file_meta = FileMeta { + object_meta: ObjectMeta { + location: Path::from("test.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + let make_opener = |predicate| ParquetOpener { + partition_index: 0, + projection: Arc::new([0, 1]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: Arc::clone(&table_schema), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new( + Arc::clone(&store), + )), + partition_fields: vec![], + pushdown_filters: true, + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), + enable_row_group_stats_pruning: false, + coerce_int96: None, + file_decryption_properties: None, + expr_adapter_factory: None, + }; + + let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(file_meta.clone(), file.clone()) + .unwrap() + .await + .unwrap(); + let batches = collect_batches(stream).await; + let expected = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | 0.0 |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &batches); + let metrics = opener.metrics.clone_inner(); + assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0); + assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2); + } } From a817ceac39ac79eae83c5a5e62f54595b90d2650 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 15 Jul 2025 15:34:40 -0500 Subject: [PATCH 04/19] better warns --- datafusion/datasource-parquet/src/source.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 67570bb4fd1b..89884b684839 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -480,12 +480,18 @@ impl FileSource for ParquetSource { (Some(_), Some(_)) => { log::warn!( "ParquetSource: both schema_adapter_factory and expr_adapter are set. \ - Using schema_adapter_factory only." + Using schema_adapter_factory only. \ + See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details.", ); None } (None, Some(expr_adapter)) => Some(Arc::clone(expr_adapter)), - (Some(_), None) => None, + (Some(_), None) => { + log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ + Use PhysicalExprAdapterFactory API instead. \ + See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details."); + None + } (None, None) => Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), }; From f78c4944045074d4e5b523a04c8056e29ba9d6f8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 15 Jul 2025 22:43:52 -0500 Subject: [PATCH 05/19] clippy --- datafusion/datasource-parquet/src/opener.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 61ae46d5dbb2..ba27feea1bff 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1273,7 +1273,9 @@ mod test { .await .unwrap(); let batches = collect_batches(stream).await; - let expected = vec![ + + #[rustfmt::skip] + let expected = [ "+---+-----+", "| a | b |", "+---+-----+", From 091501f241c7ef33160ba95a529c18ace55720c8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 09:33:56 -0500 Subject: [PATCH 06/19] Update docs/source/library-user-guide/upgrading.md Co-authored-by: Andrew Lamb --- docs/source/library-user-guide/upgrading.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 21604a859ef4..add4cf82cfd9 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -120,7 +120,7 @@ SET datafusion.execution.spill_compression = 'zstd'; For more details about this configuration option, including performance trade-offs between different compression codecs, see the [Configuration Settings](../user-guide/configs.md) documentation. -### Custom `SchemaAdapterFactory` will no longer be used for predicate pushdown +### Deprecating `SchemaAdapterFactory` and `SchemaAdapter` We are moving away from converting data (using `SchemaAdapter`) to converting the expressions themselves (which is more efficient and flexible). The first place this change has taken place is in predicate pushdown for Parquet. From 71727936cbfe157a4951f3ba85c55bf788f7487f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 09:34:13 -0500 Subject: [PATCH 07/19] allow both without error --- datafusion/datasource-parquet/src/source.rs | 16 +++++++--------- datafusion/datasource/src/file_scan_config.rs | 8 ++++---- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 89884b684839..c802ebeb9a70 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -475,17 +475,15 @@ impl FileSource for ParquetSource { let expr_adapter_factory = match ( self.schema_adapter_factory.as_ref(), - base_config.expr_adapter.as_ref(), + base_config.expr_adapter_factory.as_ref(), ) { - (Some(_), Some(_)) => { - log::warn!( - "ParquetSource: both schema_adapter_factory and expr_adapter are set. \ - Using schema_adapter_factory only. \ - See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details.", - ); - None + (Some(_), Some(expr_adapter_factory)) => { + log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ + Use PhysicalExprAdapterFactory API instead. \ + See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details."); + Some(Arc::clone(expr_adapter_factory)) } - (None, Some(expr_adapter)) => Some(Arc::clone(expr_adapter)), + (None, Some(expr_adapter_factory)) => Some(Arc::clone(expr_adapter_factory)), (Some(_), None) => { log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ Use PhysicalExprAdapterFactory API instead. \ diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7e13f84ce0a5..eced3123ac74 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -191,7 +191,7 @@ pub struct FileScanConfig { pub batch_size: Option, /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. - pub expr_adapter: Option>, + pub expr_adapter_factory: Option>, } /// A builder for [`FileScanConfig`]'s. @@ -467,7 +467,7 @@ impl FileScanConfigBuilder { file_compression_type, new_lines_in_values, batch_size, - expr_adapter, + expr_adapter_factory: expr_adapter, } } } @@ -488,7 +488,7 @@ impl From for FileScanConfigBuilder { table_partition_cols: config.table_partition_cols, constraints: Some(config.constraints), batch_size: config.batch_size, - expr_adapter: config.expr_adapter, + expr_adapter: config.expr_adapter_factory, } } } @@ -702,7 +702,7 @@ impl FileScanConfig { new_lines_in_values: false, file_source: Arc::clone(&file_source), batch_size: None, - expr_adapter: None, + expr_adapter_factory: None, } } From 1c4c90ad2193588cd803a6510a12353fa8531a55 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 09:54:07 -0500 Subject: [PATCH 08/19] add link to issue, add clarifying comments --- datafusion/datasource-parquet/src/source.rs | 47 +++++++++++++-------- docs/source/library-user-guide/upgrading.md | 4 ++ 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index c802ebeb9a70..c7e6e60ea614 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -468,29 +468,40 @@ impl FileSource for ParquetSource { let projection = base_config .file_column_projection_indices() .unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect()); - let schema_adapter_factory = self - .schema_adapter_factory - .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); - let expr_adapter_factory = match ( - self.schema_adapter_factory.as_ref(), + if self.schema_adapter_factory.is_some() { + log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ + Use PhysicalExprAdapterFactory API instead. \ + See https://github.com/apache/datafusion/issues/16800 for discussion and https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for upgrade instructions."); + } + + let (expr_adapter_factory, schema_adapter_factory) = match ( base_config.expr_adapter_factory.as_ref(), + self.schema_adapter_factory.as_ref(), ) { - (Some(_), Some(expr_adapter_factory)) => { - log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ - Use PhysicalExprAdapterFactory API instead. \ - See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details."); - Some(Arc::clone(expr_adapter_factory)) + (Some(expr_adapter_factory), Some(schema_adapter_factory)) => { + // Use both the schema adapter factory and the expr adapter factory. + // This results in the the SchemaAdapter being used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) + // but the PhysicalExprAdapterFactory being used for predicate pushdown and stats pruning. + (Some(Arc::clone(expr_adapter_factory)), Some(Arc::clone(schema_adapter_factory))) } - (None, Some(expr_adapter_factory)) => Some(Arc::clone(expr_adapter_factory)), - (Some(_), None) => { - log::warn!("The SchemaAdapter API will be removed from ParquetSource in a future release. \ - Use PhysicalExprAdapterFactory API instead. \ - See https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for more details."); - None + (Some(expr_adapter_factory), None) => { + // If no custom schema adapter factory is provided but an expr adapter factory is provided use the expr adapter factory alongside the default schema adapter factory. + // This means that the PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning, while the default schema adapter factory will be used for projections. + (Some(Arc::clone(expr_adapter_factory)), Arc::new(DefaultSchemaAdapterFactory) as _) + }, + (None, Some(schema_adapter_factory)) => { + // If a custom schema adapter factory is provided but no expr adapter factory is provided use the custom SchemaAdapter for both projections and predicate pushdown. + // This maximizes compatiblity with existing code that uses the SchemaAdapter API and did not explicitly opt into the PhysicalExprAdapterFactory API. + (None, Some(Arc::clone(schema_adapter_factory))) } - (None, None) => Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + (None, None) => { + // If no custom schema adapter factory or expr adapter factory is provided, use the default schema adapter factory and the default physical expr adapter factory. + // This means that the default SchemaAdapter will be used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) + // and the default PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning. + // This is the default behavior with not customization and means that most users of DataFusion will be cut over to the new PhysicalExprAdapterFactory API. + (Some(Arc::new(DefaultPhysicalExprAdapterFactory::default())), Arc::new(DefaultSchemaAdapterFactory) as _) + }, }; let parquet_file_reader_factory = diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index add4cf82cfd9..1f6e2ba26b85 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -131,6 +131,10 @@ To resolve this you need to implement a custom `PhysicalExprAdapterFactory` and See the [default values](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs) for an example of how to do this. Opting into the new APIs will set you up for future changes since we plan to expand use of `PhysicalExprAdapterFactory` to other areas of DataFusion. +See [#16800] for details. + +[#16800]: https://github.com/apache/datafusion/issues/16800 + ## DataFusion `48.0.1` ### `datafusion.execution.collect_statistics` now defaults to `true` From e676bd4771c8356ddcc6bc9ea345a7e138648f36 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 10:33:20 -0500 Subject: [PATCH 09/19] fix types --- datafusion/datasource-parquet/src/source.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index c7e6e60ea614..2251c5c20fe5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -483,25 +483,34 @@ impl FileSource for ParquetSource { // Use both the schema adapter factory and the expr adapter factory. // This results in the the SchemaAdapter being used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) // but the PhysicalExprAdapterFactory being used for predicate pushdown and stats pruning. - (Some(Arc::clone(expr_adapter_factory)), Some(Arc::clone(schema_adapter_factory))) + ( + Some(Arc::clone(expr_adapter_factory)), + Arc::clone(schema_adapter_factory), + ) } (Some(expr_adapter_factory), None) => { // If no custom schema adapter factory is provided but an expr adapter factory is provided use the expr adapter factory alongside the default schema adapter factory. // This means that the PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning, while the default schema adapter factory will be used for projections. - (Some(Arc::clone(expr_adapter_factory)), Arc::new(DefaultSchemaAdapterFactory) as _) - }, + ( + Some(Arc::clone(expr_adapter_factory)), + Arc::new(DefaultSchemaAdapterFactory) as _, + ) + } (None, Some(schema_adapter_factory)) => { // If a custom schema adapter factory is provided but no expr adapter factory is provided use the custom SchemaAdapter for both projections and predicate pushdown. // This maximizes compatiblity with existing code that uses the SchemaAdapter API and did not explicitly opt into the PhysicalExprAdapterFactory API. - (None, Some(Arc::clone(schema_adapter_factory))) + (None, Arc::clone(schema_adapter_factory) as _) } (None, None) => { // If no custom schema adapter factory or expr adapter factory is provided, use the default schema adapter factory and the default physical expr adapter factory. // This means that the default SchemaAdapter will be used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema) // and the default PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning. // This is the default behavior with not customization and means that most users of DataFusion will be cut over to the new PhysicalExprAdapterFactory API. - (Some(Arc::new(DefaultPhysicalExprAdapterFactory::default())), Arc::new(DefaultSchemaAdapterFactory) as _) - }, + ( + Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + Arc::new(DefaultSchemaAdapterFactory) as _, + ) + } }; let parquet_file_reader_factory = From 3d2ae4a4dbac12df6be00c79731504c714bf9b80 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 12:26:09 -0500 Subject: [PATCH 10/19] Add test --- .../core/src/datasource/listing/table.rs | 27 +++++++ .../schema_adapter_integration_tests.rs | 64 +++++++++++++++ datafusion/core/tests/parquet/mod.rs | 1 + .../core/tests/parquet/schema_adapter.rs | 77 +++++++++++++++++++ 4 files changed, 169 insertions(+) create mode 100644 datafusion/core/tests/parquet/schema_adapter.rs diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3c079ba4db3e..96d6d6c6f969 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -48,6 +48,7 @@ use datafusion_execution::{ use datafusion_expr::{ dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; +use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; @@ -99,6 +100,8 @@ pub struct ListingTableConfig { schema_source: SchemaSource, /// Optional [`SchemaAdapterFactory`] for creating schema adapters schema_adapter_factory: Option>, + /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters + physical_expr_adapter_factory: Option>, } impl ListingTableConfig { @@ -281,6 +284,7 @@ impl ListingTableConfig { options: Some(listing_options), schema_source: self.schema_source, schema_adapter_factory: self.schema_adapter_factory, + physical_expr_adapter_factory: self.physical_expr_adapter_factory, }) } @@ -300,6 +304,7 @@ impl ListingTableConfig { options: _, schema_source, schema_adapter_factory, + physical_expr_adapter_factory, } = self; let (schema, new_schema_source) = match file_schema { @@ -322,6 +327,7 @@ impl ListingTableConfig { options: Some(options), schema_source: new_schema_source, schema_adapter_factory, + physical_expr_adapter_factory, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -364,6 +370,7 @@ impl ListingTableConfig { options: Some(options), schema_source: self.schema_source, schema_adapter_factory: self.schema_adapter_factory, + physical_expr_adapter_factory: self.physical_expr_adapter_factory, }) } None => config_err!("No `ListingOptions` set for inferring schema"), @@ -415,6 +422,26 @@ impl ListingTableConfig { pub fn schema_adapter_factory(&self) -> Option<&Arc> { self.schema_adapter_factory.as_ref() } + + /// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`] + /// + /// The expression adapter factory is used to create physical expression adapters that can + /// handle schema evolution and type conversions when evaluating expressions + /// with different schemas than the table schema. + /// + /// If not provided, a default physical expression adapter factory will be used unless a custom + /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. + /// + /// See https://github.com/apache/datafusion/issues/16800 for details on this transition. + pub fn with_physical_expr_adapter_factory( + self, + physical_expr_adapter_factory: Arc, + ) -> Self { + Self { + physical_expr_adapter_factory: Some(physical_expr_adapter_factory), + ..self + } + } } /// Options for creating a [`ListingTable`] diff --git a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs index 833af04680db..e3d53a31c549 100644 --- a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs @@ -148,6 +148,70 @@ async fn test_parquet_integration_with_schema_adapter() -> Result<()> { Ok(()) } +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter() -> Result<()> { + // Create a temporary directory for our test file + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + let file_path_str = file_path.to_str().unwrap(); + + // Create test data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Write test parquet file + let file = std::fs::File::create(file_path_str)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?; + writer.write(&batch)?; + writer.close()?; + + // Create a session context + let ctx = SessionContext::new(); + + // Create a ParquetSource with the adapter factory + let source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {})); + + // Create a scan config + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?, + schema.clone(), + ) + .with_source(source) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + + // There should be one batch + assert_eq!(batches.len(), 1); + + // Verify the schema has uppercase column names + let result_schema = batches[0].schema(); + assert_eq!(result_schema.field(0).name(), "ID"); + assert_eq!(result_schema.field(1).name(), "NAME"); + + Ok(()) +} + + #[tokio::test] async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // This test verifies that the same schema adapter factory can be reused diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94d6d152a384..b8c1a846dd70 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -51,6 +51,7 @@ mod page_pruning; mod row_group_pruning; mod schema; mod schema_coercion; +mod schema_adapter; mod utils; #[cfg(test)] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs new file mode 100644 index 000000000000..9d13c8e9a50a --- /dev/null +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use arrow::array::{record_batch, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::{BufMut, BytesMut}; +use datafusion::assert_batches_eq; +use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; +use datafusion::prelude::SessionContext; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource::{ + schema_adapter::DefaultSchemaAdapterFactory, +}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; +use object_store::{memory::InMemory, path::Path, ObjectStore}; +use parquet::arrow::ArrowWriter; + +async fn write_parquet(batch: RecordBatch, store: Arc, path: &str) { + let mut out = BytesMut::new().writer(); + { + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + store.put(&Path::from(path), data.into()).await.unwrap(); +} + +#[tokio::test] +async fn single_file() { + let batch = + record_batch!(("extra", Int64, [1, 2, 3]), ("c1", Int32, [1, 2, 3])).unwrap(); + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "test.parquet"; + write_parquet(batch, store.clone(), path).await; + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int64, false), + Field::new("c2", DataType::Utf8, true), + ])); + + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)) + .with_physical_expr_adapter_factory(Arc::new( + DefaultPhysicalExprAdapterFactory, + )); + + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.register_table("t", Arc::new(table)).unwrap(); + + let batches = ctx + .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 IS NULL") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = [ + "+----+----+", + "| c2 | c1 |", + "+----+----+", + "| | 2 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); +} From 0e7cfa7fda258ea19b61edb68c25e404ff740559 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 12:31:30 -0500 Subject: [PATCH 11/19] header --- .../core/src/datasource/listing/table.rs | 6 +++--- datafusion/core/tests/parquet/mod.rs | 2 +- .../core/tests/parquet/schema_adapter.rs | 21 ++++++++++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 96d6d6c6f969..fb2f08bfe08d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -424,14 +424,14 @@ impl ListingTableConfig { } /// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`] - /// + /// /// The expression adapter factory is used to create physical expression adapters that can /// handle schema evolution and type conversions when evaluating expressions /// with different schemas than the table schema. - /// + /// /// If not provided, a default physical expression adapter factory will be used unless a custom /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. - /// + /// /// See https://github.com/apache/datafusion/issues/16800 for details on this transition. pub fn with_physical_expr_adapter_factory( self, diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index b8c1a846dd70..4f9dde08a692 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -50,8 +50,8 @@ mod filter_pushdown; mod page_pruning; mod row_group_pruning; mod schema; -mod schema_coercion; mod schema_adapter; +mod schema_coercion; mod utils; #[cfg(test)] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 9d13c8e9a50a..b0e9367c723a 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use arrow::array::{record_batch, RecordBatch}; @@ -6,10 +23,8 @@ use bytes::{BufMut, BytesMut}; use datafusion::assert_batches_eq; use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; use datafusion::prelude::SessionContext; +use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_datasource::ListingTableUrl; -use datafusion_datasource::{ - schema_adapter::DefaultSchemaAdapterFactory, -}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; use object_store::{memory::InMemory, path::Path, ObjectStore}; From 6bc7fc61888af8b7143d6c0d361642ce57dfd7b7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 12:39:54 -0500 Subject: [PATCH 12/19] docs --- datafusion/core/src/datasource/listing/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index fb2f08bfe08d..faf26c2823eb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -432,7 +432,7 @@ impl ListingTableConfig { /// If not provided, a default physical expression adapter factory will be used unless a custom /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used. /// - /// See https://github.com/apache/datafusion/issues/16800 for details on this transition. + /// See for details on this transition. pub fn with_physical_expr_adapter_factory( self, physical_expr_adapter_factory: Arc, From a846492631c25c1710b1e0dfb9c79186ce09ae7c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:07:20 -0500 Subject: [PATCH 13/19] better test --- .../core/src/datasource/listing/table.rs | 4 + .../core/tests/parquet/schema_adapter.rs | 299 +++++++++++++++++- datafusion/datasource/src/file_scan_config.rs | 12 +- .../physical-expr/src/schema_rewriter.rs | 36 +-- 4 files changed, 316 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index faf26c2823eb..d1cb32113fc4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -938,6 +938,8 @@ pub struct ListingTable { column_defaults: HashMap, /// Optional [`SchemaAdapterFactory`] for creating schema adapters schema_adapter_factory: Option>, + /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters + expr_adapter_factory: Option>, } impl ListingTable { @@ -979,6 +981,7 @@ impl ListingTable { constraints: Constraints::default(), column_defaults: HashMap::new(), schema_adapter_factory: config.schema_adapter_factory, + expr_adapter_factory: config.physical_expr_adapter_factory, }; Ok(table) @@ -1223,6 +1226,7 @@ impl TableProvider for ListingTable { .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) + .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), ) .await diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index b0e9367c723a..12aacf368747 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -17,16 +17,27 @@ use std::sync::Arc; -use arrow::array::{record_batch, RecordBatch}; -use arrow_schema::{DataType, Field, Schema}; +use arrow::array::{record_batch, RecordBatch, RecordBatchOptions}; +use arrow::compute::{cast_with_options, CastOptions}; +use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion::assert_batches_eq; +use datafusion::common::Result; use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; -use datafusion::prelude::SessionContext; -use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{ColumnStatistics, ScalarValue}; +use datafusion_datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, +}; use datafusion_datasource::ListingTableUrl; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; +use datafusion_physical_expr::expressions::{self, Column}; +use datafusion_physical_expr::schema_rewriter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; +use datafusion_physical_expr::{DefaultPhysicalExprAdapter, PhysicalExpr}; +use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -41,6 +52,180 @@ async fn write_parquet(batch: RecordBatch, store: Arc, path: &s store.put(&Path::from(path), data.into()).await.unwrap(); } +#[derive(Debug)] +struct CustomSchemaAdapterFactory; + +impl SchemaAdapterFactory for CustomSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(CustomSchemaAdapter { + logical_file_schema: projected_table_schema, + }) + } +} + +#[derive(Debug)] +struct CustomSchemaAdapter { + logical_file_schema: SchemaRef, +} + +impl SchemaAdapter for CustomSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + for (idx, field) in file_schema.fields().iter().enumerate() { + if field.name() == self.logical_file_schema.field(index).name() { + return Some(idx); + } + } + None + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + let projection = (0..file_schema.fields().len()).collect_vec(); + Ok(( + Arc::new(CustomSchemaMapper { + logical_file_schema: Arc::clone(&self.logical_file_schema), + }), + projection, + )) + } +} + +#[derive(Debug)] +struct CustomSchemaMapper { + logical_file_schema: SchemaRef, +} + +impl SchemaMapper for CustomSchemaMapper { + fn map_batch(&self, batch: RecordBatch) -> Result { + let mut output_columns = + Vec::with_capacity(self.logical_file_schema.fields().len()); + for field in self.logical_file_schema.fields() { + if let Some(array) = batch.column_by_name(field.name()) { + output_columns.push(cast_with_options( + array, + field.data_type(), + &CastOptions::default(), + )?); + } else { + // Create a new array with the default value for the field type + let default_value = match field.data_type() { + DataType::Int64 => ScalarValue::Int64(Some(0)), + DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())), + _ => unimplemented!("Unsupported data type: {:?}", field.data_type()), + }; + output_columns + .push(default_value.to_array_of_size(batch.num_rows()).unwrap()); + } + } + let batch = RecordBatch::try_new_with_options( + Arc::clone(&self.logical_file_schema), + output_columns, + &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), + ) + .unwrap(); + Ok(batch) + } + + fn map_column_statistics( + &self, + _file_col_statistics: &[ColumnStatistics], + ) -> Result> { + Ok(vec![ + ColumnStatistics::new_unknown(); + self.logical_file_schema.fields().len() + ]) + } +} + +// Implement a custom PhysicalExprAdapterFactory that fills in missing columns with the default value for the field type +#[derive(Debug)] +struct CustomPhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(CustomPhysicalExprAdapter { + logical_file_schema: Arc::clone(&logical_file_schema), + physical_file_schema: Arc::clone(&physical_file_schema), + inner: Arc::new(DefaultPhysicalExprAdapter::new( + logical_file_schema, + physical_file_schema, + )), + }) + } +} + +#[derive(Debug, Clone)] +struct CustomPhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + inner: Arc, +} + +impl PhysicalExprAdapter for CustomPhysicalExprAdapter { + fn rewrite(&self, mut expr: Arc) -> Result> { + expr = expr + .transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() { + let field_name = column.name(); + if self + .physical_file_schema + .field_with_name(field_name) + .ok() + .is_none() + { + let field = self + .logical_file_schema + .field_with_name(field_name) + .map_err(|_| { + datafusion_common::DataFusionError::Plan(format!( + "Field '{}' not found in logical file schema", + field_name + )) + })?; + // If the field does not exist, create a default value expression + // Note that we use slightly different logic here to create a default value so that we can see different behavior in tests + let default_value = match field.data_type() { + DataType::Int64 => ScalarValue::Int64(Some(1)), + DataType::Utf8 => ScalarValue::Utf8(Some("b".to_string())), + _ => unimplemented!( + "Unsupported data type: {:?}", + field.data_type() + ), + }; + return Ok(Transformed::yes(Arc::new(expressions::Literal::new( + default_value, + )))); + } + } + + Ok(Transformed::no(expr)) + }) + .data()?; + self.inner.rewrite(expr) + } + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + assert!( + partition_values.is_empty(), + "Partition values are not supported in this test" + ); + Arc::new(self.clone()) + } +} + #[tokio::test] async fn single_file() { let batch = @@ -56,8 +241,22 @@ async fn single_file() { Field::new("c2", DataType::Utf8, true), ])); - let ctx = SessionContext::new(); + let mut cfg = SessionConfig::new() + // Disable statistics collection for this test otherwise early pruning makes it hard to demonstrate data adaptation + .with_collect_statistics(false) + .with_parquet_pruning(false) + .with_parquet_page_index_pruning(false); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + assert!( + !ctx.state() + .config_mut() + .options_mut() + .execution + .collect_statistics + ); + assert!(!ctx.state().config().collect_statistics()); let listing_table_config = ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) @@ -89,4 +288,92 @@ async fn single_file() { "+----+----+", ]; assert_batches_eq!(expected, &batches); + + // Test using a custom schema adapter and no explicit physical expr adapter + // This should use the custom schema adapter both for projections and predicate pushdown + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory)); + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.deregister_table("t").unwrap(); + ctx.register_table("t", Arc::new(table)).unwrap(); + let batches = ctx + .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'") + .await + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+----+----+", + "| c2 | c1 |", + "+----+----+", + "| a | 2 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); + + // Do the same test but with a custom physical expr adapter + // Now the default schema adapter will be used for projections, but the custom physical expr adapter will be used for predicate pushdown + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_physical_expr_adapter_factory(Arc::new( + CustomPhysicalExprAdapterFactory, + )); + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.deregister_table("t").unwrap(); + ctx.register_table("t", Arc::new(table)).unwrap(); + let batches = ctx + .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'") + .await + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+----+----+", + "| c2 | c1 |", + "+----+----+", + "| | 2 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); + + // If we use both then the custom physical expr adapter will be used for predicate pushdown and the custom schema adapter will be used for projections + let listing_table_config = + ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap()) + .infer_options(&ctx.state()) + .await + .unwrap() + .with_schema(table_schema.clone()) + .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory)) + .with_physical_expr_adapter_factory(Arc::new( + CustomPhysicalExprAdapterFactory, + )); + let table = ListingTable::try_new(listing_table_config).unwrap(); + ctx.deregister_table("t").unwrap(); + ctx.register_table("t", Arc::new(table)).unwrap(); + let batches = ctx + .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'") + .await + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+----+----+", + "| c2 | c1 |", + "+----+----+", + "| a | 2 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batches); } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index eced3123ac74..cf27f1bddae3 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -269,7 +269,7 @@ pub struct FileScanConfigBuilder { file_compression_type: Option, new_lines_in_values: Option, batch_size: Option, - expr_adapter: Option>, + expr_adapter_factory: Option>, } impl FileScanConfigBuilder { @@ -298,7 +298,7 @@ impl FileScanConfigBuilder { table_partition_cols: vec![], constraints: None, batch_size: None, - expr_adapter: None, + expr_adapter_factory: None, } } @@ -415,9 +415,9 @@ impl FileScanConfigBuilder { /// - Rewriting expression to use pre-computed values or file format specific optimizations pub fn with_expr_adapter( mut self, - expr_adapter: Arc, + expr_adapter: Option>, ) -> Self { - self.expr_adapter = Some(expr_adapter); + self.expr_adapter_factory = expr_adapter; self } @@ -440,7 +440,7 @@ impl FileScanConfigBuilder { file_compression_type, new_lines_in_values, batch_size, - expr_adapter, + expr_adapter_factory: expr_adapter, } = self; let constraints = constraints.unwrap_or_default(); @@ -488,7 +488,7 @@ impl From for FileScanConfigBuilder { table_partition_cols: config.table_partition_cols, constraints: Some(config.constraints), batch_size: config.batch_size, - expr_adapter: config.expr_adapter_factory, + expr_adapter_factory: config.expr_adapter_factory, } } } diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index ca3e4ffa20c4..d622ce4bc01e 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -173,29 +173,19 @@ pub struct DefaultPhysicalExprAdapter { partition_values: Vec<(FieldRef, ScalarValue)>, } -// impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { -// /// Rewrite the given physical expression to match the target schema -// /// -// /// This method applies the following transformations: -// /// 1. Replaces partition column references with literal values -// /// 2. Handles missing columns by inserting null literals -// /// 3. Casts columns when logical and physical schemas have different types -// fn rewrite_to_file_schema( -// &self, -// expr: Arc, -// logical_file_schema: &Schema, -// physical_file_schema: &Schema, -// partition_values: &[(FieldRef, ScalarValue)], -// ) -> Result> { -// let rewriter = DefaultPhysicalExprAdapterRewriter { -// logical_file_schema, -// physical_file_schema, -// partition_fields: partition_values, -// }; -// expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) -// .data() -// } -// } +impl DefaultPhysicalExprAdapter { + /// Create a new instance of the default physical expression adapter. + /// + /// This adapter rewrites expressions to match the physical schema of the file being scanned, + /// handling type mismatches and missing columns by filling them with default values. + pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self { + Self { + logical_file_schema, + physical_file_schema, + partition_values: Vec::new(), + } + } +} impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> Result> { From 7c314b0d74a381a215933a58ce43f5aa22e6ee69 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:08:11 -0500 Subject: [PATCH 14/19] Update docs/source/library-user-guide/upgrading.md Co-authored-by: Andrew Lamb --- docs/source/library-user-guide/upgrading.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 1f6e2ba26b85..b7e74711ccfd 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -123,6 +123,8 @@ For more details about this configuration option, including performance trade-of ### Deprecating `SchemaAdapterFactory` and `SchemaAdapter` We are moving away from converting data (using `SchemaAdapter`) to converting the expressions themselves (which is more efficient and flexible). + +See [issue #16800](https://github.com/apache/datafusion/issues/16800) for more information The first place this change has taken place is in predicate pushdown for Parquet. By default if you do not use a custom `SchemaAdapterFactory` we will use expression conversion instead. If you do set a custom `SchemaAdapterFactory` we will continue to use it but emit a warning about that code path being deprecated. From ac8737a9292f923db1a7e1e4b0a47f2e3434fd42 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:10:07 -0500 Subject: [PATCH 15/19] fmt --- datafusion/core/tests/parquet/schema_adapter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 12aacf368747..2c08054a398f 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -202,9 +202,9 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter { field.data_type() ), }; - return Ok(Transformed::yes(Arc::new(expressions::Literal::new( - default_value, - )))); + return Ok(Transformed::yes(Arc::new( + expressions::Literal::new(default_value), + ))); } } From 5ccddcb729ee54cf30681032b466240fec0f9280 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:11:08 -0500 Subject: [PATCH 16/19] restore comment --- datafusion/datasource-parquet/src/opener.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ba27feea1bff..7c208d1426ac 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -259,7 +259,9 @@ impl FileOpener for ParquetOpener { ) .with_partition_values(partition_values) .rewrite(p)?; - // Now that we've rewritten the predicate, we can simplify it + // After rewriting to the file schema, further simplifications may be possible. + // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` + // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) }) .transpose()?; From 16ddbae09ee436d27d319bc829e3b8f19fe071fc Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:20:08 -0500 Subject: [PATCH 17/19] fix --- datafusion-examples/examples/default_column_values.rs | 2 +- datafusion-examples/examples/json_shredding.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs index b0270ffd8a3c..b504ef3aad6f 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/default_column_values.rs @@ -263,7 +263,7 @@ impl TableProvider for DefaultValueTableProvider { .with_projection(projection.cloned()) .with_limit(limit) .with_file_group(file_group) - .with_expr_adapter(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _); + .with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _)); Ok(Arc::new(DataSourceExec::new(Arc::new( file_scan_config.build(), diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs index ba9158f6913e..866e4a8a152c 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/json_shredding.rs @@ -273,7 +273,7 @@ impl TableProvider for ExampleTableProvider { .with_limit(limit) .with_file_group(file_group) // if the rewriter needs a reference to the table schema you can bind self.schema() here - .with_expr_adapter(Arc::new(ShreddedJsonRewriterFactory) as _); + .with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _)); Ok(Arc::new(DataSourceExec::new(Arc::new( file_scan_config.build(), From 4885780c710a3f21419c268984bd20965a666aa3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:51:15 -0500 Subject: [PATCH 18/19] clippy --- datafusion/core/tests/parquet/schema_adapter.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 2c08054a398f..f049ac058e1a 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -188,8 +188,7 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter { .field_with_name(field_name) .map_err(|_| { datafusion_common::DataFusionError::Plan(format!( - "Field '{}' not found in logical file schema", - field_name + "Field '{field_name}' not found in logical file schema", )) })?; // If the field does not exist, create a default value expression From 7a651a0e1fdea16ab995a39a862655335f8fa71b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 16 Jul 2025 21:12:36 -0500 Subject: [PATCH 19/19] rename test --- datafusion/core/tests/parquet/schema_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index f049ac058e1a..abc1550b31ca 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -226,7 +226,7 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter { } #[tokio::test] -async fn single_file() { +async fn test_custom_schema_adapter_and_custom_expression_adapter() { let batch = record_batch!(("extra", Int64, [1, 2, 3]), ("c1", Int32, [1, 2, 3])).unwrap();