-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7788] Made KinesisReceiver.onStart() non-blocking #6348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cfregly please take a look |
Test build #33334 has finished for PR 6348 at commit
|
test this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a need to distinguish the thread names by receiver Id or some such?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prolly a good idea.
otherwise, looks good |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing nothing about Kinesis. Just a small question: if we call worker.run
twice, what will happen? I'm not sure if worker
is reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not. And the ReceiverSupevisor guarnatees (assumption) that onStart() is not going to be called twice, so this wont be reused.
Test build #33362 has finished for PR 6348 at commit
|
I am merging this because no Kinesis test is affected, and Jenkins has successfully compiled it. |
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes #6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking (cherry picked from commit 1c388a9) Signed-off-by: Tathagata Das <[email protected]>
Test build #33382 has finished for PR 6348 at commit
|
looks good |
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false. Author: Tathagata Das <[email protected]> Closes apache#6348 from tdas/SPARK-7788 and squashes the following commits: 2584683 [Tathagata Das] Added receiver id in thread name 6cf1cd4 [Tathagata Das] Made KinesisReceiver.onStart non-blocking
KinesisReceiver calls worker.run() which is a blocking call (while loop) as per source code of kinesis-client library - https://github.com/awslabs/amazon-kinesis-client/blob/v1.2.1/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java.
This results in infinite loop while calling sparkStreamingContext.stop(stopSparkContext = false, stopGracefully = true) perhaps because ReceiverTracker is never able to register the receiver (it's receiverInfo field is a empty map) causing it to be stuck in infinite loop while waiting for running flag to be set to false.