Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 7, 2017

What changes were proposed in this pull request?

Add a output mode parameter to flatMapGroupsWithState and just define mapGroupsWithState as flatMapGroupsWithState(Update).

UnsupportedOperationChecker is modified to disallow unsupported cases.

  • Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
  • For streaming (map/flatMap)GroupsWithState, see the following table:
Operators Supported Query Output Mode
flatMapGroupsWithState(Update) without aggregation Update
flatMapGroupsWithState(Update) with aggregation None
flatMapGroupsWithState(Append) without aggregation Append
flatMapGroupsWithState(Append) before aggregation Append, Update, Complete
flatMapGroupsWithState(Append) after aggregation None
Multiple flatMapGroupsWithState(Append)s Append
Multiple mapGroupsWithStates None
Mxing mapGroupsWithStates and flatMapGroupsWithStates None
Other cases of multiple flatMapGroupsWithState None

How was this patch tested?

The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState:

[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond)
[info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -  multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds)
[info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds)
[info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds)

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74139 has finished for PR 17197 at commit c0c13df.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FlatMapGroupsWithState(
  • case class FlatMapGroupsWithStateExec(

@zsxwing zsxwing changed the title [WIP]Add output mode to flatMapGroupsWithState and disallow invalid cases [SPARK-19858][SS]Add output mode to flatMapGroupsWithState and disallow invalid cases Mar 7, 2017

private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath

test("supported strings in outputMode(string)") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to InternalOutputModesSuite

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74144 has finished for PR 17197 at commit b572a5f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FlatMapGroupsWithState(
  • case class FlatMapGroupsWithStateExec(

@zsxwing
Copy link
Member Author

zsxwing commented Mar 8, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74164 has finished for PR 17197 at commit b572a5f.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FlatMapGroupsWithState(
  • case class FlatMapGroupsWithStateExec(

@zsxwing
Copy link
Member Author

zsxwing commented Mar 8, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74180 has finished for PR 17197 at commit b572a5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FlatMapGroupsWithState(
  • case class FlatMapGroupsWithStateExec(

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good overall, just need to add more comments

}

/** Collect all the streaming `FlatMapGroupsWithState`s in a sub plan */
def collectFlatMapGroupsWithState(subplan: LogicalPlan): Seq[FlatMapGroupsWithState] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if used only once, why make a function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}

/** Collect all the streaming `mapGroupsWithState`s in a sub plan */
def collectMapGroupsWithState(subplan: LogicalPlan): Seq[FlatMapGroupsWithState] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if used only once, why make a function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " +
"streaming DataFrame/Dataset")
// mapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments explaining in words whats allowed?
This is hard to understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " +
"streaming DataFrame/Dataset")
// mapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible define pattern which makes this case looks simply like
case m@MapGroupsWithState(isStreamng=true) => ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No such syntax. I can use something like case m@MapGroupsWithState(_, _, ..., _, outputMode, _) => if outputMode == Update. But doesn't save the codes.

// flatMapGroupsWithState(Update) with aggregation
case m: FlatMapGroupsWithState
if m.isStreaming &&
m.outputMode == InternalOutputModes.Update &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this indentatio seems messy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

assertSupportedInBatchPlan(
"mapGroupsWithState - mapGroupsWithState on batch relation",
MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, Seq(att), batchRelation))
// FlatMapGroupsWithState in batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a bit more verbose comments here ... "e.g." both function modes equivalent and supported in batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it similar to the comments above "// Aggregation: Distinct aggregates not supported on streaming relation"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74217 has finished for PR 17197 at commit 1303147.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Mar 8, 2017

LGTM. Merging to master and 2.1

@tdas
Copy link
Contributor

tdas commented Mar 8, 2017

I merged it to master but i there are nontrivial conflicts in 2.1. Can you make a new PR?

@asfgit asfgit closed this in 1bf9012 Mar 8, 2017
@zsxwing zsxwing deleted the mapgroups-check branch March 8, 2017 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants