-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range() #17122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| var shouldStopRequired: Boolean = false | ||
|
|
||
| def isShouldStopRequired: Boolean = { | ||
| if (shouldStopRequired) return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just write shouldStopRequired || (this.parent != null && this.parent.isShouldStopRequired)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, it looks better. Done
|
Test build #73689 has finished for PR 17122 at commit
|
|
Test build #73702 has finished for PR 17122 at commit
|
| val range = ctx.freshName("range") | ||
| // we need to place consume() before calling isShouldStopRequired | ||
| val body = consume(ctx, Seq(ev)) | ||
| val shouldStop = if (isShouldStopRequired) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isShouldStopRequired complicates the logic. Is it necessary? How much improvement it brings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that isShouldStopRequired is simple logic. It just checks whether shouldStopRequired or parents shouldStopRequired is true
There are two reasons why isShouldStopRequired is necessary.
- The improvement is largely degraded from 7.6x to 5.5x without
isShouldStopRequired - We may miss some opportunities to enable compiler optimizations since the size of loop body would be increased without
isShouldStopRequired. This is because a JIT compiler has a threshold of loop body size to apply some loop optimizations such as loop unrolling.
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
cnt: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
cnt 247 / 289 4340.6 0.2 1.0X
|
|
||
| // to track the existence of apply() call in the current produce-consume cycle | ||
| // if apply is not called (e.g. in aggregation), we can skip shoudStop in the inner-most loop | ||
| parent.shouldStopRequired = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? The default value of shouldStopRequired is already false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to ensure produce() starts with parent.shouldStopRequired = false. This is because I am afraid other produce-consume may set true into shouldStopRequired if we have more than one-produce-consume in one parent.
However, in most of cases, it would not happen. For the simplicity, I eliminated this.
| | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | ||
| | long $value = ((long)$localIdx * ${step}L) + $number; | ||
| | $body | ||
| | $shouldStop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think under most of cases, we need shouldStop check. But currently shouldStopRequired is false by default, so you will consume many additional rows now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. nvm. the outer-most WholeStageCodegenExec's shouldStopRequired is true.
| override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { | ||
| val stopEarly = ctx.freshName("stopEarly") | ||
| ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") | ||
| shouldStopRequired = true // loop may break early even without append in loop body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this Limit is the parent of an Aggregate, the final shouldStopRequired is true. But actually we can skip the check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This implementation depends on slightly old revision that means there is no stopEarly() method. Removed this line.
|
This optimization looks good and the improvement is great. I have some comments regarding the changes of |
|
Test build #73730 has started for PR 17122 at commit |
| */ | ||
| final def produce(ctx: CodegenContext, parent: CodegenSupport): String = executeQuery { | ||
| this.parent = parent | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. done.
| throw new UnsupportedOperationException | ||
| } | ||
|
|
||
| /* for optimization */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deserve better comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. done.
| } | ||
|
|
||
| /* for optimization */ | ||
| var shouldStopRequired: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done
| val localIdx = ctx.freshName("localIdx") | ||
| val localEnd = ctx.freshName("localEnd") | ||
| val range = ctx.freshName("range") | ||
| // we need to place consume() before calling isShouldStopRequired |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to describe the reason that consume() may modify shouldStopRequired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, done.
| /* | ||
| * for optimization to suppress shouldStop() in a loop of WholeStageCodegen | ||
| */ | ||
| // true: require to insert shouldStop() into a loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
? I do not understand your comment. Do you suggest to remove line 213?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your comment style looks weird. Please put true... in the /*... */
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, the usual style is:
/**
* ....
*
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated comments around here
|
Test build #73734 has started for PR 17122 at commit |
| */ | ||
| // true: require to insert shouldStop() into a loop | ||
| protected var shouldStopRequired: Boolean = false | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a simple comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
Test build #73737 has started for PR 17122 at commit |
|
retest this please. |
| } | ||
|
|
||
| // set true if doConsume() inserts append() method that requires shouldStop() in the loop | ||
| protected var shouldStopRequired: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it is possible to do this without a mutable var? Code generation has way to much mutable state as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell We can do it technically. It looks simple. On the other hand, we have to maintain an immutable var carefully by investigating whether append() is required. In particular, we would add a new CodegenSupport-related class.
In contrast, the current approach is easy to maintain the var. When we would add a new CodegenSupport-related class, it is unnecessary to carefully investigate it.
This is a trade-off between simplicity and maintainability. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We spend quite a bit of time debugging issues caused by poorly managed mutable vars in code generation. So I'd rather avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would +1 for this. That is part of reason why I said it complicates the logic in previous comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I will rewrite this using immutable var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated to use an immutable variable.
|
Test build #73745 has finished for PR 17122 at commit
|
|
Test build #73781 has finished for PR 17122 at commit
|
| * isShouldStopRequired: require to insert shouldStop() into the loop if true | ||
| */ | ||
| def isShouldStopRequired: Boolean = { | ||
| return shouldStopRequired && !(this.parent != null && !this.parent.isShouldStopRequired) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this better to understand?
def isShouldStopRequired: Boolean = {
assert(this.parent != null)
shouldStopRequired && this.parent.isShouldStopRequired
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion. However, it caused an assertion failure at "SPARK-7150 range api" in DataFrameRangeSuite.
In the failure case, isShouldStopRequired is called in the class hierarchy by parent.
RangeExec -> FilterExec -> WholeStageCodegenExec
| return shouldStopRequired && !(this.parent != null && !this.parent.isShouldStopRequired) | ||
| } | ||
|
|
||
| // set false if doConsume() does not insert append() that requires shouldStop() in the loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion for the comment:
/**
* Set to false if this plan consumes all rows produced by children but doesn't output row to buffer
* by calling `append()`, so the children don't require `shouldStop()` in the loop of producing rows.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look good, done
| throw new UnsupportedOperationException | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion for the comment:
/**
* For optimization to suppress `shouldStop()` in a loop of WholeStageCodegen.
* Returning true means we need to insert `shouldStop()` into the loop producing rows, if any.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done
|
Have few suggestions about the code comments. Otherwise LGTM. |
|
Test build #73813 has finished for PR 17122 at commit
|
| * to buffer by calling append(), so the children don't require shouldStop() | ||
| * in the loop of producing rows. | ||
| */ | ||
| protected def shouldStopRequired: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be set to true for all blocking operators? Sort for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing out this. I overlooked Sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious about the performance improvement on Sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot see performance improvement on Sort. I think there are two reasons for this result.
One is that the loop body is too large. At the inner-most loop, a insertRow method is called. I think that the size of this method is too large to facilitate loop optimizations.
The other is that the hotspot method is not here. I guess that the hotspot method may be sort() at line 154.
Here is the generated code.
/* 114 */ while (true) {
/* 115 */ long range_range = range_batchEnd - range_number;
/* 116 */ if (range_range != 0L) {
/* 117 */ int range_localEnd = (int)(range_range / -1L);
/* 118 */ for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
/* 119 */ long range_value = ((long)range_localIdx * -1L) + range_number;
/* 120 */
/* 121 */ range_rowWriter.write(0, range_value);
/* 122 */ sort_sorter.insertRow((UnsafeRow)range_result);
/* 123 */
/* 124 */ // shouldStop check is eliminated
/* 125 */ }
/* 126 */ range_number = range_batchEnd;
/* 127 */ }
/* 128 */
/* 129 */ if (range_taskContext.isInterrupted()) {
/* 130 */ throw new TaskKilledException();
/* 131 */ }
/* 132 */
/* 133 */ long range_nextBatchTodo;
/* 134 */ if (range_numElementsTodo > 1000L) {
/* 135 */ range_nextBatchTodo = 1000L;
/* 136 */ range_numElementsTodo -= 1000L;
/* 137 */ } else {
/* 138 */ range_nextBatchTodo = range_numElementsTodo;
/* 139 */ range_numElementsTodo = 0;
/* 140 */ if (range_nextBatchTodo == 0) break;
/* 141 */ }
/* 142 */ range_numOutputRows.add(range_nextBatchTodo);
/* 143 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 144 */
/* 145 */ range_batchEnd += range_nextBatchTodo * -1L;
/* 146 */ }
/* 147 */
/* 148 */ }
/* 149 */
/* 150 */ protected void processNext() throws java.io.IOException {
/* 151 */ if (sort_needToSort) {
/* 152 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 153 */ sort_addToSorter();
/* 154 */ sort_sortedIter = sort_sorter.sort();
/* 155 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 156 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 157 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 158 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 159 */ sort_needToSort = false;
/* 160 */ }
/* 161 */
/* 162 */ while (sort_sortedIter.hasNext()) {
/* 163 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 164 */
/* 165 */ append(sort_outputRow);
/* 166 */
/* 167 */ if (shouldStop()) return;
/* 168 */ }
/* 169 */ }There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would sort be improved? Sort is very expensive operation, so it dominates the runtime of the job. The only possible improvement here is that you could avoid sorting with range (assuming that we do not overflow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. I just confirmed that this PR is not effective for Sort in general for answering an question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I mentioned sort is that there is no use in stopping early and that it would not be correct to do so. I was not really expecting any improvement.
|
Test build #73898 has finished for PR 17122 at commit
|
|
Test build #73904 has finished for PR 17122 at commit
|
| * Returning true means we need to insert shouldStop() into the loop producing rows, if any. | ||
| */ | ||
| def isShouldStopRequired: Boolean = { | ||
| return shouldStopRequired && !(this.parent != null && !this.parent.isShouldStopRequired) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think shouldStopRequired && (this.parent == null || this.parent.isShouldStopRequired) is better to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, done
|
LGTM, except the comment for |
|
Test build #74094 has finished for PR 17122 at commit
|
|
LGTM cc @hvanhovell |
|
LGTM merging to master |
What changes were proposed in this pull request?
This PR improves performance of operations with
range()by changing Java code generated by Catalyst. This PR is inspired by the blog article.This PR changes generated code in the following two points.
shouldStop()method if this method is unnecessary (e.g.append()is not generated).These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x.
Benchmark program:
Performance result without this PR
Performance result with this PR
Here is a comparison between generated code w/o and with this PR. Only the method
agg_doAggregateWithoutKeyis changed.Generated code without this PR
Generated code with this PR
A part of suppressing
shouldStop()was originally developed by @inouehrsHow was this patch tested?
Add new tests into
DataFrameRangeSuite