Skip to content

Conversation

comphead
Copy link
Contributor

@comphead comphead commented Jul 25, 2025

Which issue does this PR close?

Rationale for this change

After introducing TrivialValueAccumulators in #16217 we found a ignore nulls test failed in Comet. I cannot reproduce the same in DataFusion, although I created the similar environment in DF.

If the key is spread across 2 files and batch_size = 1

file1

a b
2 1

file2

a b
2 2

And run query

 SELECT a, last_value(case when b==1 then null else b end) IGNORE NULLS FROM t1 GROUP BY a order by a;

or

 SELECT a, first_value(case when b==1 then null else b end) IGNORE NULLS FROM t1 GROUP BY a order by a;

The query actually returns

a b
2 null

While debugging a merge_batch I found Comet sends sometimes a state null, true whereas DF never does that and send s nulls, false instead. Probably it is explained that Comet calls physical operations directly, without optimizations.

So this fix is more hack but it works for Comet and Datafusion and helps to unblock the Comet migration.
Appreciate @ozankabak @alamb @berkaysynnada @mbutrovich for ideas for better fix (if any)

What changes are included in this PR?

Are these changes tested?

Existing tests, as I cannot reproduce it in DataFusion I'm not able to add a new test

Are there any user-facing changes?

@github-actions github-actions bot added the functions Changes to functions implementation label Jul 25, 2025
@comphead comphead changed the title fix: TrivialValueAccumulators to ignore nulls for ignore nulls fix: TrivialValueAccumulators to ignore null states for ignore nulls Jul 25, 2025
@comphead comphead changed the title fix: TrivialValueAccumulators to ignore null states for ignore nulls fix: TrivialValueAccumulators to ignore null states for ignore nulls Jul 25, 2025
@comphead comphead requested review from alamb and ozankabak July 25, 2025 20:39
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thansk @comphead -- the code looks good but I think we should probably make a test for it before merging

Also, given this change, do you think we should potentially make a new RC for 49.0.0?

@alamb
Copy link
Contributor

alamb commented Jul 25, 2025

Existing tests, as I cannot reproduce it in DataFusion I'm not able to add a new test

I don't think it is covered by existing tests as they all pass with and without this code change

If we can't make an end to end/sql test I think at least a unit test to make sure we don't break it in the future would be a good idea

@mbutrovich
Copy link
Contributor

I'm trying to put together a concise unit test based on the batches I see going through update_batch and merge_batch in the failing Comet test, but can't make sense of what I'm seeing. A TrivialFirstValueAccumulator has is_set to false, but when its state ended up at merge_batch the second array (which contains is_set as boolean) is flipped to true, so I'm trying to sort out how that's happening.

@berkaysynnada
Copy link
Contributor

Comet sends sometimes a state null, true whereas DF never does that and send s nulls, false instead. Probably it is explained that Comet calls physical operations directly, without optimizations

I couldn't understand this, can you explain a bit more?

fn state(&mut self) -> Result<Vec<ScalarValue>> {
    let is_set = if self.ignore_nulls && self.first.is_null() {
        false 
    } else {
        self.is_set
    };
    Ok(vec![self.first.clone(), ScalarValue::from(is_set)])
}

does it mean converting fn state()'s such would fix the problem?

@comphead
Copy link
Contributor Author

comphead commented Jul 26, 2025

does it mean converting fn state()'s such would fix the problem?

Thanks @berkaysynnada I tried the code below(which is more concise version of your snippet), it didn't help

    fn state(&mut self) -> Result<Vec<ScalarValue>> {
        if self.ignore_nulls && self.last.is_null() {
            self.is_set = false;
        };
        Ok(vec![self.last.clone(), ScalarValue::from(self.is_set)])
    }

I couldn't understand this, can you explain a bit more?

Please find some more details for this issue

First file

a b
0 1
0 2
2 1

Second file

a b
1 1
1 2
2 2

See the row with key 2 split across 2 files.

I'm running the same query

 SELECT a, last_value(case when b==1 then null else b end) IGNORE NULLS FROM t1 GROUP BY a;

and output values from update_batch and states from merge_batch from TrivialLastValueAccumulator

DF

> SELECT a, last_value(case when b==1 then null else b end) IGNORE NULLS FROM t1 GROUP BY a;
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1302:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      false,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      false,
    ],
]
+---+----------------------------------------------------------------------------+
| a | last_value(CASE WHEN t1.b = Int64(1) THEN NULL ELSE t1.b END) IGNORE NULLS |
+---+----------------------------------------------------------------------------+
| 2 | 2                                                                          |
| 1 | 2                                                                          |
| 0 | 2                                                                          |
+---+----------------------------------------------------------------------------+

Comet

25/07/26 10:06:39 INFO core/src/lib.rs: Comet native library version 0.10.0 initialized
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      null,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1301:9] &values = [
    PrimitiveArray<Int32>
    [
      2,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      2,
    ],
    BooleanArray
    [
      true,
    ],
]
[/Users/ovoievodin/.cargo/git/checkouts/datafusion-11a8b534adb6bd68/afb9099/datafusion/functions-aggregate/src/first_last.rs:1327:9] &states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      true,
    ],
]

+---+----------------------------+
|a  |last((IF((b = 1), NULL, b)))|
+---+----------------------------+
|0  |2                           |
|1  |2                           |
|2  |NULL                        |
+---+----------------------------+

So values in update_batch are the same but ordering is different, not sure if this matters, tbh
for states in merge_batch
the Comet sends

&states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      true,
    ],
]

which I believe confuses the accumulator as for DF the state sent as

 &states = [
    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      false,
    ],
]

and not applied as it is false.

Hope this explains

@berkaysynnada
Copy link
Contributor

@comphead thanks for the detailed explanation. The state()'s are taken from Comet, right? and Comet doesn't actually use my proposed change

 fn state(&mut self) -> Result<Vec<ScalarValue>> {
        if self.ignore_nulls && self.last.is_null() {
            self.is_set = false;
        };
        Ok(vec![self.last.clone(), ScalarValue::from(self.is_set)])
    }

?

If that’s the case, the problem seems to originate in Comet, because we shouldn’t be receiving:

    PrimitiveArray<Int32>
    [
      null,
    ],
    BooleanArray
    [
      true,
    ],
]

that’s an invalid state, and we shouldn’t be trying to fix it here IMO

@comphead
Copy link
Contributor Author

comphead commented Jul 28, 2025

@comphead thanks for the detailed explanation. The state()'s are taken from Comet, right?

Thanks @berkaysynnada, the state is taken from TrivialAccumulator. What Comet does is just call physical LastValue function https://github.com/apache/datafusion-comet/blob/320ce55eec9f4c846ad7a68bb69c182ae1fcd3ce/native/core/src/execution/planner.rs#L1859 so all internal details, including accumulators, states, rowhash are coming from DF.

that’s an invalid state, and we shouldn’t be trying to fix it here IMO

I suppose if we know this is an invalid state we still should ignore it like in this PR, if there is a mistake in planner/optimizer it can lead to silent data corruption

I'm trying to reproduce this with unit test if I sent batches with exacly same order and content like Comet does.

@ozankabak
Copy link
Contributor

ozankabak commented Jul 28, 2025

I suppose if we know this is an invalid state we still should ignore it like in this PR, if there is a mistake in planner/optimizer it can lead to silent data corruption

I think we should add a debug_assert! so that people can catch where/how the invalid state emerges. I don't think it should be swept under the rug

Edit: And if the invalid state originates from within DataFusion, we should fix that

@comphead
Copy link
Contributor Author

comphead commented Jul 29, 2025

I'm still on that, my Comet test likely showing some race conditions because test fails 90% but sometimes it passes. Sounds like depending on incoming data order DF can give incorrect result, but I still trying to create a stable test in DF

@mbutrovich
Copy link
Contributor

I've documented my investigation with a new issue on the Comet repo:
apache/datafusion-comet#2051

@comphead
Copy link
Contributor Author

comphead commented Aug 1, 2025

Closing this PR in favor of apache/datafusion-comet#2052
We suspect a FFI problem that flips the boolean is_set flag from false to true, and multithreaded execution makes a problem non deterministic, depending on states order.

@mbutrovich provided nice investigation in apache/datafusion-comet#2051 if anyone interested in FFI boundaries

@comphead comphead closed this Aug 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants