-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix: TrivialValueAccumulators
to ignore null states for ignore nulls
#16918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ignore nulls
ignore nulls
ignore nulls
TrivialValueAccumulators
to ignore null states for ignore nulls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thansk @comphead -- the code looks good but I think we should probably make a test for it before merging
Also, given this change, do you think we should potentially make a new RC for 49.0.0?
I don't think it is covered by existing tests as they all pass with and without this code change If we can't make an end to end/sql test I think at least a unit test to make sure we don't break it in the future would be a good idea |
I'm trying to put together a concise unit test based on the batches I see going through |
I couldn't understand this, can you explain a bit more? fn state(&mut self) -> Result<Vec<ScalarValue>> {
let is_set = if self.ignore_nulls && self.first.is_null() {
false
} else {
self.is_set
};
Ok(vec![self.first.clone(), ScalarValue::from(is_set)])
} does it mean converting |
Thanks @berkaysynnada I tried the code below(which is more concise version of your snippet), it didn't help
Please find some more details for this issue First file
Second file
See the row with key 2 split across 2 files. I'm running the same query
and output values from DF
Comet
So values in
which I believe confuses the accumulator as for DF the state sent as
and not applied as it is false. Hope this explains |
@comphead thanks for the detailed explanation. The fn state(&mut self) -> Result<Vec<ScalarValue>> {
if self.ignore_nulls && self.last.is_null() {
self.is_set = false;
};
Ok(vec![self.last.clone(), ScalarValue::from(self.is_set)])
} ? If that’s the case, the problem seems to originate in Comet, because we shouldn’t be receiving:
that’s an invalid state, and we shouldn’t be trying to fix it here IMO |
Thanks @berkaysynnada, the state is taken from TrivialAccumulator. What Comet does is just call physical
I suppose if we know this is an invalid state we still should ignore it like in this PR, if there is a mistake in planner/optimizer it can lead to silent data corruption I'm trying to reproduce this with unit test if I sent batches with exacly same order and content like Comet does. |
I think we should add a Edit: And if the invalid state originates from within DataFusion, we should fix that |
I'm still on that, my Comet test likely showing some race conditions because test fails 90% but sometimes it passes. Sounds like depending on incoming data order DF can give incorrect result, but I still trying to create a stable test in DF |
I've documented my investigation with a new issue on the Comet repo: |
Closing this PR in favor of apache/datafusion-comet#2052 @mbutrovich provided nice investigation in apache/datafusion-comet#2051 if anyone interested in FFI boundaries |
Which issue does this PR close?
Related to Release DataFusion
49.0.0
(July 2025) #16235 and chore: migrate to DF 49.0.0 datafusion-comet#2040Rationale for this change
After introducing TrivialValueAccumulators in #16217 we found a
ignore nulls
test failed in Comet. I cannot reproduce the same in DataFusion, although I created the similar environment in DF.If the key is spread across 2 files and batch_size = 1
file1
file2
And run query
or
The query actually returns
While debugging a
merge_batch
I found Comet sends sometimes a statenull, true
whereas DF never does that and send snulls, false
instead. Probably it is explained that Comet calls physical operations directly, without optimizations.So this fix is more hack but it works for Comet and Datafusion and helps to unblock the Comet migration.
Appreciate @ozankabak @alamb @berkaysynnada @mbutrovich for ideas for better fix (if any)
What changes are included in this PR?
Are these changes tested?
Existing tests, as I cannot reproduce it in DataFusion I'm not able to add a new test
Are there any user-facing changes?