Skip to content

Conversation

@jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

The stream-stream join tests add data to multiple sources, and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached.

Fortunately, MemoryStream synchronizes batch generation on itself, and StreamExecution won't generate empty batches. So we can resolve this race condition by synchronizing successive AddDataMemory actions against every MemoryStream together. Then we can be sure StreamExecution won't start generating a batch before all the data is present.

How was this patch tested?

existing tests

@jose-torres
Copy link
Contributor Author

@tdas

@SparkQA
Copy link

SparkQA commented Feb 21, 2018

Test build #87571 has finished for PR 20646 at commit 1df90e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

java.lang.RuntimeException: [unresolved dependency: com.sun.jersey#jersey-core;1.14: configuration not found in com.sun.jersey#jersey-core;1.14: 'master(compile)'. Missing configuration: 'compile'. It was required from org.apache.hadoop#hadoop-yarn-common;2.6.5 compile]

Surely unrelated to this change.

@jose-torres
Copy link
Contributor Author

retest this please

@jose-torres
Copy link
Contributor Author

(https://issues.apache.org/jira/browse/SPARK-23369 was already filed for previous flake)

addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]])
}
if (addDataMemoryActions.nonEmpty) {
val synchronizeAll = addDataMemoryActions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some magic-ish code. Can you add a bit more comments on how this compose thing works?

startedTest.foreach { action =>
val actionIterator = startedTest.iterator.buffered
while (actionIterator.hasNext) {
// Synchronize sequential addDataMemory actions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Synchronize --> // Collectively synchronize .... actions so that the data gets added together in a single batch.

@tdas
Copy link
Contributor

tdas commented Feb 21, 2018

Actually, I am having second thoughts about this. This is fundamentally changing how the tests work, especially for stress tests. The stress tests actually test these corner cases (by randomly adding successive AddData) about what if data was being added while the previously added data is being picked up. With this change, we will accidentally not test those race-condition-prone cases.

Second, we are taking multiple locks here in multiple sources, and the StreamExecution is likely to take the same locks. I am really afraid that we are introducing deadlocks by doing this.

I am still thinking what the right approach here. I think it should be

  • Explicit synchronized adding of data to multiple sources.
  • Not holding locks in multiple sources.

@SparkQA
Copy link

SparkQA commented Feb 21, 2018

Test build #87572 has finished for PR 20646 at commit 1df90e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Feb 21, 2018

I opened a new PR to test out an alternate approach. PTAL - https://github.com/apache/spark/pull/20650/files?w=1

(note the w=1, that is to ignore whitespaces diffs in the diff view).

ghost pushed a commit to dbtsai/spark that referenced this pull request Feb 23, 2018
…*JoinSuite

**The best way to review this PR is to ignore whitespace/indent changes. Use this link - https://github.com/apache/spark/pull/20650/files?w=1**

## What changes were proposed in this pull request?

The stream-stream join tests add data to multiple sources and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached.

Prior attempt to solve this issue by jose-torres in apache#20646 attempted to simultaneously synchronize on all memory sources together when consecutive AddData was found in the actions. However, this carries the risk of deadlock as well as unintended modification of stress tests (see the above PR for a detailed explanation). Instead, this PR attempts the following.

- A new action called `StreamProgressBlockedActions` that allows multiple actions to be executed while the streaming query is blocked from making progress. This allows data to be added to multiple sources that are made visible simultaneously in the next batch.
- An alias of `StreamProgressBlockedActions` called `MultiAddData` is explicitly used in the `Streaming*JoinSuites` to add data to two memory sources simultaneously.

This should avoid unintentional modification of the stress tests (or any other test for that matter) while making sure that the flaky tests are deterministic.

## How was this patch tested?
Modified test cases in `Streaming*JoinSuites` where there are consecutive `AddData` actions.

Author: Tathagata Das <[email protected]>

Closes apache#20650 from tdas/SPARK-23408.
@jose-torres jose-torres closed this Mar 7, 2018
HeartSaVioR pushed a commit to HeartSaVioR/spark that referenced this pull request Feb 11, 2019
…*JoinSuite

**The best way to review this PR is to ignore whitespace/indent changes. Use this link - https://github.com/apache/spark/pull/20650/files?w=1**

The stream-stream join tests add data to multiple sources and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached.

Prior attempt to solve this issue by jose-torres in apache#20646 attempted to simultaneously synchronize on all memory sources together when consecutive AddData was found in the actions. However, this carries the risk of deadlock as well as unintended modification of stress tests (see the above PR for a detailed explanation). Instead, this PR attempts the following.

- A new action called `StreamProgressBlockedActions` that allows multiple actions to be executed while the streaming query is blocked from making progress. This allows data to be added to multiple sources that are made visible simultaneously in the next batch.
- An alias of `StreamProgressBlockedActions` called `MultiAddData` is explicitly used in the `Streaming*JoinSuites` to add data to two memory sources simultaneously.

This should avoid unintentional modification of the stress tests (or any other test for that matter) while making sure that the flaky tests are deterministic.

Modified test cases in `Streaming*JoinSuites` where there are consecutive `AddData` actions.

Author: Tathagata Das <[email protected]>

Closes apache#20650 from tdas/SPARK-23408.

NOTE: Modified a bit to cover DSv2 incompatibility between Spark 2.3 and 2.4 by Jungtaek Lim <[email protected]>
 * StreamingDataSourceV2Relation is a class for 2.3, whereas it is a case class for 2.4
srowen pushed a commit that referenced this pull request Feb 12, 2019
…in Streaming*JoinSuite

## What changes were proposed in this pull request?

**The best way to review this PR is to ignore whitespace/indent changes. Use this link - https://github.com/apache/spark/pull/20650/files?w=1**

The stream-stream join tests add data to multiple sources and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached.

Prior attempt to solve this issue by jose-torres in #20646 attempted to simultaneously synchronize on all memory sources together when consecutive AddData was found in the actions. However, this carries the risk of deadlock as well as unintended modification of stress tests (see the above PR for a detailed explanation). Instead, this PR attempts the following.

- A new action called `StreamProgressBlockedActions` that allows multiple actions to be executed while the streaming query is blocked from making progress. This allows data to be added to multiple sources that are made visible simultaneously in the next batch.
- An alias of `StreamProgressBlockedActions` called `MultiAddData` is explicitly used in the `Streaming*JoinSuites` to add data to two memory sources simultaneously.

This should avoid unintentional modification of the stress tests (or any other test for that matter) while making sure that the flaky tests are deterministic.

NOTE: This patch is modified a bit from origin PR (#20650) to cover DSv2 incompatibility between Spark 2.3 and 2.4: StreamingDataSourceV2Relation is a class for 2.3, whereas it is a case class for 2.4

## How was this patch tested?

Modified test cases in `Streaming*JoinSuites` where there are consecutive `AddData` actions.

Closes #23757 from HeartSaVioR/fix-streaming-join-test-flakiness-branch-2.3.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Co-authored-by: Tathagata Das <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants