-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Is your feature request related to a problem or challenge?
From conversation with Andrew a couple days ago he mentioned this was an open feature request however I could not find an issue. @alamb do you remember who else was asking for this?
We have an implementation of this internally, it actually is more generic because we use it to generate columns from other columns, but it covers the use case of default values and it would be easy to make that API simple.
Essentially we declare:
pub trait MissingColumnGeneratorFactory: Debug + Send + Sync {
/// Create a [`MissingColumnGenerator`] for the given `field` and `file_schema`.
/// Returns None if the column cannot be generated by this generator.
/// Otherwise, returns a [`MissingColumnGenerator`] that can generate the missing column.
fn create(
&self,
field: &Field,
file_schema: &Schema,
) -> Option<Arc<dyn MissingColumnGenerator + Send + Sync>>;
}
pub trait MissingColumnGenerator: Debug + Send + Sync {
/// Generate a missing column for the given `field` from the provided `batch`.
/// When this method is called `batch` will contain all of the columns declared as dependencies in `dependencies`.
/// If the column cannot be generated, this method should return an error.
/// Otherwise, it should return the generated column as an `ArrayRef`.
/// No casting or post processing is done by this method, so the generated column should match the data type
/// of the `field` it is being generated, otherwise an Err will be returned upstream.
/// There is no guarantee about the order of the columns in the provided RecordBatch.
fn generate(&self, batch: RecordBatch) -> datafusion_common::Result<ArrayRef>;
/// Returns a list of column names that this generator depends on to generate the missing column.
/// This is used when creating the `RecordBatch` to ensure that all dependencies are present before calling `generate`.
/// The dependencies do not need to be declared in any particular order.
fn dependencies(&self) -> Vec<String>;
}
And then you pass in one or more MissingColumnGeneratorFactory
into SchemaAdapterFactory
.
There was a lot of pain figuring out how to properly adjust projections to take into account the injected dependency columns, but we've done that work already on our end.
The other thing to note is that adjustments are needed in filter pushdown, specifically here:
datafusion/datafusion/datasource-parquet/src/row_filter.rs
Lines 355 to 384 in 8061485
/// After visiting all children, rewrite column references to nulls if | |
/// they are not in the file schema. | |
/// We do this because they won't be relevant if they're not in the file schema, since that's | |
/// the only thing we're dealing with here as this is only used for the parquet pushdown during | |
/// scanning | |
fn f_up( | |
&mut self, | |
expr: Arc<dyn PhysicalExpr>, | |
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { | |
if let Some(column) = expr.as_any().downcast_ref::<Column>() { | |
// if the expression is a column, is it in the file schema? | |
if self.file_schema.field_with_name(column.name()).is_err() { | |
return self | |
.table_schema | |
.field_with_name(column.name()) | |
.and_then(|field| { | |
// Replace the column reference with a NULL (using the type from the table schema) | |
// e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` | |
// | |
// See comments on `FilterCandidateBuilder` for more information | |
let null_value = ScalarValue::try_from(field.data_type())?; | |
Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _)) | |
}) | |
// If the column is not in the table schema, should throw the error | |
.map_err(|e| arrow_datafusion_err!(e)); | |
} | |
} | |
Ok(Transformed::no(expr)) | |
} |
This last bit applies no matter if simpler defaults are being generated or more complex derived columns.