Skip to content

Conversation

tdas
Copy link
Contributor

@tdas tdas commented May 22, 2015

KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.

@tdas
Copy link
Contributor Author

tdas commented May 22, 2015

@cfregly please take a look

@SparkQA
Copy link

SparkQA commented May 22, 2015

Test build #33334 has finished for PR 6348 at commit 6cf1cd4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 22, 2015

test this please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need to distinguish the thread names by receiver Id or some such?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prolly a good idea.

@cfregly
Copy link
Contributor

cfregly commented May 22, 2015

otherwise, looks good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing nothing about Kinesis. Just a small question: if we call worker.run twice, what will happen? I'm not sure if worker is reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not. And the ReceiverSupevisor guarnatees (assumption) that onStart() is not going to be called twice, so this wont be reused.

@SparkQA
Copy link

SparkQA commented May 22, 2015

Test build #33362 has finished for PR 6348 at commit 6cf1cd4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 23, 2015

I am merging this because no Kinesis test is affected, and Jenkins has successfully compiled it.
Thanks guys for reviewing.

asfgit pushed a commit that referenced this pull request May 23, 2015
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.

Author: Tathagata Das <[email protected]>

Closes #6348 from tdas/SPARK-7788 and squashes the following commits:

2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking

(cherry picked from commit 1c388a9)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 1c388a9 May 23, 2015
@SparkQA
Copy link

SparkQA commented May 23, 2015

Test build #33382 has finished for PR 6348 at commit 2584683.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cfregly
Copy link
Contributor

cfregly commented May 23, 2015

looks good

jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.

Author: Tathagata Das <[email protected]>

Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits:

2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.

Author: Tathagata Das <[email protected]>

Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits:

2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.

Author: Tathagata Das <[email protected]>

Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits:

2584683 [Tathagata Das] Added receiver id in thread name
6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants