-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19858][SS]Add output mode to flatMapGroupsWithState and disallow invalid cases #17197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #74139 has finished for PR 17197 at commit
|
|
|
||
| private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath | ||
|
|
||
| test("supported strings in outputMode(string)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to InternalOutputModesSuite
|
Test build #74144 has finished for PR 17197 at commit
|
|
retest this please |
|
Test build #74164 has finished for PR 17197 at commit
|
|
retest this please |
|
Test build #74180 has finished for PR 17197 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good overall, just need to add more comments
| } | ||
|
|
||
| /** Collect all the streaming `FlatMapGroupsWithState`s in a sub plan */ | ||
| def collectFlatMapGroupsWithState(subplan: LogicalPlan): Seq[FlatMapGroupsWithState] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if used only once, why make a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| } | ||
|
|
||
| /** Collect all the streaming `mapGroupsWithState`s in a sub plan */ | ||
| def collectMapGroupsWithState(subplan: LogicalPlan): Seq[FlatMapGroupsWithState] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if used only once, why make a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " + | ||
| "streaming DataFrame/Dataset") | ||
| // mapGroupsWithState | ||
| case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments explaining in words whats allowed?
This is hard to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " + | ||
| "streaming DataFrame/Dataset") | ||
| // mapGroupsWithState | ||
| case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible define pattern which makes this case looks simply like
case m@MapGroupsWithState(isStreamng=true) => ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No such syntax. I can use something like case m@MapGroupsWithState(_, _, ..., _, outputMode, _) => if outputMode == Update. But doesn't save the codes.
| // flatMapGroupsWithState(Update) with aggregation | ||
| case m: FlatMapGroupsWithState | ||
| if m.isStreaming && | ||
| m.outputMode == InternalOutputModes.Update && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this indentatio seems messy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| assertSupportedInBatchPlan( | ||
| "mapGroupsWithState - mapGroupsWithState on batch relation", | ||
| MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation)) | ||
| // FlatMapGroupsWithState in batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a bit more verbose comments here ... "e.g." both function modes equivalent and supported in batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it similar to the comments above "// Aggregation: Distinct aggregates not supported on streaming relation"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Test build #74217 has finished for PR 17197 at commit
|
|
LGTM. Merging to master and 2.1 |
|
I merged it to master but i there are nontrivial conflicts in 2.1. Can you make a new PR? |
What changes were proposed in this pull request?
Add a output mode parameter to
flatMapGroupsWithStateand just definemapGroupsWithStateasflatMapGroupsWithState(Update).UnsupportedOperationCheckeris modified to disallow unsupported cases.How was this patch tested?
The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: