-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27207][SQL] : Ensure aggregate buffers are initialized again for So… #24149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rtBasedAggregate Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer. The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.
|
cc @cloud-fan This is a bug with SortBasedAggregate that was exposed while testing PR #23778 . Have filed a separate JIRA alongwith the PR here. Request you to review it. Thank you. |
|
ok to test |
|
Test build #103691 has finished for PR 24149 at commit
|
|
|
||
| // Hacking the aggregation mode to call AggregateFunction.merge to merge two aggregation buffers | ||
| private val mergeAggregationBuffers: (InternalRow, InternalRow) => Unit = { | ||
| var (sortBasedAggExpressions, sortBasedAggFunctions): ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's var instead of val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to val
| private def createNewAggregationBuffer(): SpecificInternalRow = { | ||
| val bufferFieldTypes = aggregateFunctions.flatMap(_.aggBufferAttributes.map(_.dataType)) | ||
| private def createNewAggregationBuffer( | ||
| functions: Array[AggregateFunction]): SpecificInternalRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 space indentation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
|
|
||
| private def initAggregationBuffer(buffer: SpecificInternalRow): Unit = { | ||
| private def initAggregationBuffer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's only called once, let's inline it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
good catch! Can we add a UT? we can create a |
Adding unit test and refactoring code
|
@cloud-fan Thank you for reviewing. Have added a unit test which was failing before the code change. |
|
Test build #103773 has started for PR 24149 at commit |
|
Test build #103775 has started for PR 24149 at commit |
| var maxValueBuffer: MaxValue = null | ||
| override def createAggregationBuffer(): MaxValue = { | ||
| // Returns Int.MinValue if all inputs are null | ||
| maxValueBuffer = new MaxValue(Int.MinValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to save it to a member variable? I think the bug can be exposed even if we just return the buffer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I am still looking more into it, but for some reason, calling merge() without invoking initialize() does not cause any visible exception on normal UDAF functions, but it fails with a Null Pointer Exception when I test it with the test case described in SPARK-24935(PR #24144 ). My guess is that for the above test case, since, two different aggregation buffer instances are created(SketchState and UnionState), the exception shows up. Will investigate more on it and get back to you soon. Thank you.
|
test this please |
|
Test build #103782 has finished for PR 24149 at commit
|
…RK-27207 [SPARK-27207] : Upmerging with master branch
|
@cloud-fan Regarding our discussion in PR #24144 , I just found out a case where Spark initializes a UDAF, runs It calls |
|
But we call |
|
Turns out that is not true in the case of |
|
Yea it's the same buffer instance, but not same aggregate function instance, IIUC. |
|
@cloud-fan So after going through the code, I see that we are calling |
|
Each aggregate function will create its own buffer. When we feed a buffer to a agg func, we are not asking the agg func to replace its own buffer with the new one, but we ask it to merge the new buffer to its own buffer. |
…te functions and write unit test Fix SortBasedAggregator to ensure that update and merge are performed with two different sets of aggregate functions, one for update and one for merge respectively.
|
@cloud-fan So I realized that the bug was caused as I was creating the aggregate buffer for |
|
Test build #104090 has finished for PR 24149 at commit
|
…r different aggregate functions
|
Test build #104099 has finished for PR 24149 at commit
|
…izing for different aggregate functions" This reverts commit 6a5ed71. Reverting to previous commit
Since, apache#24459 fixes the init-update-merge issue, the fix here is reverted.
…RK-27207 [SPARK-27207] : Upmerging with master branch
|
Test build #105032 has finished for PR 24149 at commit
|
|
@cloud-fan Have updated the PR. Thank you. |
|
Hello @cloud-fan , WDYT about the updated PR? Thank you. |
| } | ||
| } | ||
|
|
||
| test("SPARK-27207: customized Hive UDAF with two aggregation buffers for Sort" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test fail without your patch? I think we can write a test with a custom UDAF which fails without initialization, but Hive UDAF does not fail without initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it used to fail earlier without my patch, but your latest patch seems to have fixed it. Will come up with another test case. Thank you.
|
Test build #105167 has finished for PR 24149 at commit
|
|
Test build #105168 has finished for PR 24149 at commit
|
| * in aggregation buffer. | ||
| */ | ||
| private case class TypedMax2( | ||
| child: Expression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 space indentation.
| * Calculate the max value with object aggregation buffer. This stores class MaxValue | ||
| * in aggregation buffer. | ||
| */ | ||
| private case class TypedMax2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we simplify it? I think we just need to do some initialization work in createAggregationBuffer.
case class MyUDAF ... {
var initialized = false
override def createAggregationBuffer(): MyBuffer = {
initialized = true
null
}
override def update(buffer: MaxValue, input: InternalRow): MyBuffer = {
assert(initialized)
null
}
...
}
| withSQLConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold" -> "5") { | ||
| val df = data.toDF("value", "key").coalesce(2) | ||
| val query = df.groupBy($"key").agg(typedMax2($"value"), count($"value"), typedMax2($"value")) | ||
| query.show(10, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should not use show in the test. Let's use checkAnswer.
| (newExpressions, initializeAggregateFunctions(newExpressions, 0)) | ||
| } | ||
|
|
||
| // Hacking the aggregation mode to call AggregateFunction.merge to merge two aggregation buffers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment should be put before
val newExpressions = aggregateExpressions.map {
case agg @ AggregateExpression(_, Partial, _, _) =>
agg.copy(mode = PartialMerge)
case agg @ AggregateExpression(_, Complete, _, _) =>
agg.copy(mode = Final)
case other => other
}
|
After some more thoughts, I have some different ideas now. I checked the Come back to this bug, it can only be exposed if the UDAF needs to initialize itself, which should not be allowed. I think we can just add some doc to |
|
@cloud-fan I do see your point but don't you still think, if fallback to SortBasedAggregate occurs, the initialize method for the new aggregate function should be invoked as I recall you said earlier:
By the above logic, we still need to ensure that the respective aggregate buffer is initialized for the new set of aggregate functions. Or we should just use the same set of aggregate functions again for SortBasedAggregate. Correct me if I am wrong here. |
|
Each aggregate function will create its own buffer, but the aggregate function doesn't hold the buffer, the buffer is managed by Spark. Aggregate function should be stateless. Let's say we have an aggregate expression Since |
|
Test build #105221 has finished for PR 24149 at commit
|
|
I see. Sure I can go ahead and close this PR. Thank you. |
|
@pgandhi999 it could be great if you can update the classdoc of |
…RK-27207 [SPARK-27207] : Upmerging with master branch
|
@cloud-fan I have updated the doc. If it also needs to be updated someplace else, do let me know. Thank you. |
|
Test build #105257 has finished for PR 24149 at commit
|
|
test this please. |
|
Test build #105265 has finished for PR 24149 at commit
|
|
thanks, merging to master! |
…rtBasedAggregate
Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer.
What changes were proposed in this pull request?
The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.
How was this patch tested?
The patch was tested as part of SPARK-24935 as documented in PR #23778.