Skip to content

Conversation

@pgandhi999
Copy link

…rtBasedAggregate

Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer.

What changes were proposed in this pull request?

The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.

How was this patch tested?

The patch was tested as part of SPARK-24935 as documented in PR #23778.

…rtBasedAggregate

Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer.

The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.
@pgandhi999
Copy link
Author

cc @cloud-fan This is a bug with SortBasedAggregate that was exposed while testing PR #23778 . Have filed a separate JIRA alongwith the PR here. Request you to review it. Thank you.

@pgandhi999
Copy link
Author

ok to test

@SparkQA
Copy link

SparkQA commented Mar 20, 2019

Test build #103691 has finished for PR 24149 at commit 400db3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Hacking the aggregation mode to call AggregateFunction.merge to merge two aggregation buffers
private val mergeAggregationBuffers: (InternalRow, InternalRow) => Unit = {
var (sortBasedAggExpressions, sortBasedAggFunctions): (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's var instead of val?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to val

private def createNewAggregationBuffer(): SpecificInternalRow = {
val bufferFieldTypes = aggregateFunctions.flatMap(_.aggBufferAttributes.map(_.dataType))
private def createNewAggregationBuffer(
functions: Array[AggregateFunction]): SpecificInternalRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indentation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

private def initAggregationBuffer(buffer: SpecificInternalRow): Unit = {
private def initAggregationBuffer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only called once, let's inline it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@cloud-fan
Copy link
Contributor

good catch! Can we add a UT? we can create a TypedImperativeAggregate implementation which fails if initialize is not called.

Adding unit test and refactoring code
@pgandhi999
Copy link
Author

@cloud-fan Thank you for reviewing. Have added a unit test which was failing before the code change.

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103773 has started for PR 24149 at commit ea050f7.

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103775 has started for PR 24149 at commit 0714876.

var maxValueBuffer: MaxValue = null
override def createAggregationBuffer(): MaxValue = {
// Returns Int.MinValue if all inputs are null
maxValueBuffer = new MaxValue(Int.MinValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to save it to a member variable? I think the bug can be exposed even if we just return the buffer here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I am still looking more into it, but for some reason, calling merge() without invoking initialize() does not cause any visible exception on normal UDAF functions, but it fails with a Null Pointer Exception when I test it with the test case described in SPARK-24935(PR #24144 ). My guess is that for the above test case, since, two different aggregation buffer instances are created(SketchState and UnionState), the exception shows up. Will investigate more on it and get back to you soon. Thank you.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103782 has finished for PR 24149 at commit 0714876.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…RK-27207

[SPARK-27207] : Upmerging with master branch
@pgandhi999
Copy link
Author

@cloud-fan Regarding our discussion in PR #24144 , I just found out a case where Spark initializes a UDAF, runs update and then runs merge. It happens in SortBasedAggregator. So, the code blows up in this case. The code in ObjectAggregationIterator.scala is pasted below:

// Two-way merges initialAggBufferIterator and inputIterator
      private def findNextSortedGroup(): Boolean = {
        if (hasNextInput || hasNextAggBuffer) {
          // Find smaller key of the initialAggBufferIterator and initialAggBufferIterator
          groupingKey = findGroupingKey()
          result = new AggregationBufferEntry(groupingKey, makeEmptyAggregationBuffer)

          // Firstly, update the aggregation buffer with input rows.
          while (hasNextInput &&
            groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) {
            processRow(result.aggregationBuffer, inputIterator.getValue)
            hasNextInput = inputIterator.next()
          }

          // Secondly, merge the aggregation buffer with existing aggregation buffers.
          // NOTE: the ordering of these two while-block matter, mergeAggregationBuffer() should
          // be called after calling processRow.
          while (hasNextAggBuffer &&
            groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) {
            mergeAggregationBuffers(result.aggregationBuffer, initialAggBufferIterator.getValue)
            hasNextAggBuffer = initialAggBufferIterator.next()
          }

          true
        } else {
          false
        }
      }

It calls update first and then calls merge on the same buffer. I found out the issue while testing this PR today.

@cloud-fan
Copy link
Contributor

But we call update and merge for different copies of the aggregate functions, don't we?

@pgandhi999
Copy link
Author

Turns out that is not true in the case of SortBasedAggregator. In the code that I have pasted above, processRow() is called on result.aggregationBuffer which performs update and later, mergeAggregationBuffers is called on the same result.aggregationBuffer which performs merge. This is what I could infer from the code as well as debug logs that I added, correct me if I am wrong.

@cloud-fan
Copy link
Contributor

Yea it's the same buffer instance, but not same aggregate function instance, IIUC.

@pgandhi999
Copy link
Author

pgandhi999 commented Mar 27, 2019

@cloud-fan So after going through the code, I see that we are calling update and merge for different copies of aggregatefunctions but are using the buffer created for one copy of aggregatefunction. I am really not an expert with the aggregation framework so was wondering if you could guide me here by elaborating more about how Spark aggregate functions use the aggregation buffer? Thank you once again for your continued guidance and support in this matter.

@cloud-fan
Copy link
Contributor

Each aggregate function will create its own buffer. When we feed a buffer to a agg func, we are not asking the agg func to replace its own buffer with the new one, but we ask it to merge the new buffer to its own buffer.

pgandhi added 2 commits March 29, 2019 13:28
…te functions and write unit test

Fix SortBasedAggregator to ensure that update and merge are performed with two different sets of aggregate functions, one for update and one for merge respectively.
@pgandhi999
Copy link
Author

@cloud-fan So I realized that the bug was caused as I was creating the aggregate buffer for sortBasedMergeAggFunctions, calling processRow on update operation with aggregateFunctions and then calling merge once again with sortBasedMergeAggFunctions. Fixed the bug by having a separate update buffer initialized with aggregateFunctions, perform the update on that buffer, merge it's results to a new aggregate buffer initialized for sortBasedMergeAggFunctions, finally on which merge is called and the final result is returned. It may not be the best solution so your valuable guidance in this matter is really appreciated. Thank you.

@SparkQA
Copy link

SparkQA commented Mar 29, 2019

Test build #104090 has finished for PR 24149 at commit 088cbc6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2019

Test build #104099 has finished for PR 24149 at commit 6a5ed71.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…izing for different aggregate functions"

This reverts commit 6a5ed71.

Reverting to previous commit
pgandhi added 2 commits April 30, 2019 09:41
Since, apache#24459 fixes the init-update-merge issue, the fix here is reverted.
…RK-27207

[SPARK-27207] : Upmerging with master branch
@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105032 has finished for PR 24149 at commit db46cf7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

@cloud-fan Have updated the PR. Thank you.

@pgandhi999 pgandhi999 changed the title [SPARK-27207] : Ensure aggregate buffers are initialized again for So… [SPARK-27207][SQL] : Ensure aggregate buffers are initialized again for So… Apr 30, 2019
@pgandhi999
Copy link
Author

Hello @cloud-fan , WDYT about the updated PR? Thank you.

}
}

test("SPARK-27207: customized Hive UDAF with two aggregation buffers for Sort" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without your patch? I think we can write a test with a custom UDAF which fails without initialization, but Hive UDAF does not fail without initialization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it used to fail earlier without my patch, but your latest patch seems to have fixed it. Will come up with another test case. Thank you.

@SparkQA
Copy link

SparkQA commented May 6, 2019

Test build #105167 has finished for PR 24149 at commit df330fa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105168 has finished for PR 24149 at commit 5bd474c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* in aggregation buffer.
*/
private case class TypedMax2(
child: Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indentation.

* Calculate the max value with object aggregation buffer. This stores class MaxValue
* in aggregation buffer.
*/
private case class TypedMax2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simplify it? I think we just need to do some initialization work in createAggregationBuffer.

case class MyUDAF ... {
  var initialized = false
  override def createAggregationBuffer(): MyBuffer = {
    initialized = true
    null
  }

  override def update(buffer: MaxValue, input: InternalRow): MyBuffer = {
    assert(initialized)
    null
  }

  ...
}

withSQLConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold" -> "5") {
val df = data.toDF("value", "key").coalesce(2)
val query = df.groupBy($"key").agg(typedMax2($"value"), count($"value"), typedMax2($"value"))
query.show(10, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should not use show in the test. Let's use checkAnswer.

(newExpressions, initializeAggregateFunctions(newExpressions, 0))
}

// Hacking the aggregation mode to call AggregateFunction.merge to merge two aggregation buffers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment should be put before

val newExpressions = aggregateExpressions.map {
      case agg @ AggregateExpression(_, Partial, _, _) =>
        agg.copy(mode = PartialMerge)
      case agg @ AggregateExpression(_, Complete, _, _) =>
        agg.copy(mode = Final)
      case other => other
    }

@cloud-fan
Copy link
Contributor

cloud-fan commented May 7, 2019

After some more thoughts, I have some different ideas now.

I checked the TypedImperativeAggregate implementations, the initialize method is used to initialize the aggregate buffer, not to initialize the TypedImperativeAggregate instance. That said, TypedImperativeAggregate implementations should be stateless.

Come back to this bug, it can only be exposed if the UDAF needs to initialize itself, which should not be allowed. I think we can just add some doc to TypedImperativeAggregate, saying that it must be stateless. Sorry for the back and forth!

@pgandhi999
Copy link
Author

@cloud-fan I do see your point but don't you still think, if fallback to SortBasedAggregate occurs, the initialize method for the new aggregate function should be invoked as I recall you said earlier:

Each aggregate function will create its own buffer. When we feed a buffer to a agg func, we are not asking the agg func to replace its own buffer with the new one, but we ask it to merge the new buffer to its own buffer.

By the above logic, we still need to ensure that the respective aggregate buffer is initialized for the new set of aggregate functions. Or we should just use the same set of aggregate functions again for SortBasedAggregate. Correct me if I am wrong here.

@cloud-fan
Copy link
Contributor

Each aggregate function will create its own buffer, but the aggregate function doesn't hold the buffer, the buffer is managed by Spark. Aggregate function should be stateless.

Let's say we have an aggregate expression expr. expr creates an aggregate function f1 to do the work before sort fallback happens. f1 creates a buffer and starts to accumulate the buffer. When sort fallback happens, expr creates a new aggregate function f2. We still ask f1 to create the buffer, and then f2 starts working and accumulate the buffer.

Since f1 and f2 are the same functions, this should be fine.

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105221 has finished for PR 24149 at commit 006616e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

I see. Sure I can go ahead and close this PR. Thank you.

@cloud-fan
Copy link
Contributor

@pgandhi999 it could be great if you can update the classdoc of AggregateFunction and say that it should be stateless. thanks!

@pgandhi999
Copy link
Author

@cloud-fan I have updated the doc. If it also needs to be updated someplace else, do let me know. Thank you.

@SparkQA
Copy link

SparkQA commented May 8, 2019

Test build #105257 has finished for PR 24149 at commit 28ea0f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pgandhi999
Copy link
Author

test this please.

@SparkQA
Copy link

SparkQA commented May 8, 2019

Test build #105265 has finished for PR 24149 at commit 28ea0f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 0969d7a May 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants