-
Couldn't load subscription status.
- Fork 1.7k
Restore custom SchemaAdapter functionality for Parquet #16791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I'd like to add a unit test that confirms the custom schema adapter factory will be used if specified. |
done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb
I am a bit unclear on how the physicalexprrewriter and the schema adapter now interact when doing schema evolution. In particular I am not sure about the intended behavior when they are both present
I wonder if we could make an exmple showing how to use the two APIs together that would make it clearer what was supposed to happen
|
|
||
| To resolve this you need to implement a custom `PhysicalExprAdapterFactory` and use that instead of a `SchemaAdapterFactory`. | ||
| See the [default values](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs) for an example of how to do this. | ||
| Opting into the new APIs will set you up for future changes since we plan to expand use of `PhysicalExprAdapterFactory` to other areas of DataFusion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A link to some description of the future plan would be super helpful here
| rows_matched: metrics::Count, | ||
| /// how long was spent evaluating this predicate | ||
| time: metrics::Time, | ||
| /// used to perform type coercion while filtering rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit unclear how the schema mapper and expression rewriter work together -- I think it is the case that the schema is mapped first and then the simplified physical expression is evaluated against the mapped schema rather than the file schema
Maybe we can add a comment explaining how this works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit unclear how the schema mapper and expression rewriter work together
If you have an expression adapter you map the expression and the expression is now evaluated against the physical file schema. So there is no longer a need / point of having a SchemaAdapter. It will still be there but it becomes a no-op because it's adapting between identical schemas.
| } | ||
|
|
||
| fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> { | ||
| let batch = self.schema_mapper.map_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applying the schema mapper first means the predicate is applied on batches that have been mapped to the table schema, but wasn't the predicate rewritten to be in terms of the file schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code basically becomes a no-op if a predicate rewriter is used. But instead of making it Option or something like that I think it's easier to minimize the version on version diff and code paths to leave it be.
It's a no-op because when the schema mapping is calculated it will be from the physical file schema to the physical file schema.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_custom_schema_adapter_no_rewriter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, I think this test should be of the "end to end" variety (in core_integration) that shows how these APIs interact to rewrite predicates / schemas correctly. It is not super clear to me how these low level APIs would be used by users and thus not sure if this test covers the cases correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have an expression adapter you map the expression and the expression is now evaluated against the physical file schema. So there is no longer a need / point of having a SchemaAdapter. It will still be there but it becomes a no-op because it's adapting between identical schemas. I'm sorry I did not give more detail in the PR description. The idea is that there are one of 4 scenarios that users will fall into:
This makes it completely backwards compatible: anyone using a custom SchemaAdapter has to make no code changes to continue using it. If they want to opt into the new mechanism they can either stop setting a custom SchemaAdapter or also set a custom PhysicalExprAdapter. A SchemaAdapter, custom or the default, is still used for projections. |
Co-authored-by: Andrew Lamb <[email protected]>
|
I opened #16800 to track the big picture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb
| .unwrap() | ||
| .with_schema(table_schema.clone()) | ||
| .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory)) | ||
| .with_physical_expr_adapter_factory(Arc::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the idea to show the default being used? Or did you mean to also provide a custom factory that changed the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to follow up with a commit that uses several combinations of custom factories
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| See https://github.com/apache/datafusion/issues/16800 for discussion and https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-49-0-0 for upgrade instructions."); | ||
| } | ||
|
|
||
| let (expr_adapter_factory, schema_adapter_factory) = match ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good.
Should we mark SchemaAdapter a deprecated as well? Maybe as a follow on PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit torn: we don't yet have an alternative for projections. We kind of want to mark it as deprecated for predicates but not for projections, which is weird. It may be in a bit of a limbo state for a release or two.
Co-authored-by: Andrew Lamb <[email protected]>
#16235 (comment)
This essentially partially reverts #16461 by keeping backward compatibility with the existing SchemaAdapter.