Skip to content

Conversation

@ilayaperumalg
Copy link
Contributor

  • The ReceiverTracker receivers RegisterReceiver messages two times
    1. When the actor at ReceiverSupervisorImpl's preStart is invoked
    2. After the receiver is started at the executor onReceiverStart() at ReceiverSupervisorImpl

Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the ReceiverTracker, it makes sense to call register receiver only after the
receiver is started.

  - The ReceiverTracker receivers `RegisterReceiver` messages two times
     1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
     2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`

Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ilayaperumalg ilayaperumalg changed the title [SPARK-4803] Remove duplicate RegisterReceiver message [SPARK-4803] [streaming] Remove duplicate RegisterReceiver message Dec 9, 2014
@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@JoshRosen
Copy link
Contributor

This change sounds reasonable to me, but I'd like to have @tdas take a quick look at this to see if I'm missing something obvious.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24508 has started for PR 3648 at commit 634abde.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24508 has finished for PR 3648 at commit 634abde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24508/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Dec 23, 2014

Actually, can you add a unit test for this? This change looks reasonable to me but I am afraid that this bug can get reintroduced once again later and will be super hard to catch.

For the unit test you can use create a new testsuite called ReceiverTrackerSuite and start a context with a fake receiver (as in ReceiverSuite) and then check the internal state of receiverInfo after context.start() and after context.stop(). You could also use a StreamingListener to count the number of times "RegisterReceiver" is received. Also, could you add test for SPARK-4802 as well?

@tdas
Copy link
Contributor

tdas commented Jan 5, 2015

Ping, any thoughts?

@tdas
Copy link
Contributor

tdas commented Jan 9, 2015

Ping, once again.

@ilayaperumalg
Copy link
Contributor Author

@tdas sorry, I haven't got a chance to work on this yet. I will try to get something in a few days.

@tdas
Copy link
Contributor

tdas commented Jan 9, 2015

Alright.

@SparkQA
Copy link

SparkQA commented Jan 12, 2015

Test build #25389 has started for PR 3648 at commit 3118e5e.

  • This patch merges cleanly.

@ilayaperumalg
Copy link
Contributor Author

@tdas I realized that the issue that is fixed in this PR can be tested via StreamingListenerSuite's receiverInfo reporting test case. Hence, I fixed that test case to:

collector.startedReceiverStreamIds.size should equal (1)

what do you think?

@SparkQA
Copy link

SparkQA commented Jan 12, 2015

Test build #25389 has finished for PR 3648 at commit 3118e5e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25389/
Test FAILed.

@JoshRosen
Copy link
Contributor

It looks like the modified test failed?

Error Message

The code passed to eventually never returned normally. Attempted 8 times over 1.879575722 seconds. Last failure message: 0 did not equal 1.

Stacktrace

sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 8 times over 1.879575722 seconds. Last failure message: 0 did not equal 1.
    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
    at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply$mcV$sp(StreamingListenerSuite.scala:76)
    at org.apache.spark.streaming.StreamingListenerSuite$$anonfun$2.apply(StreamingListenerSuite.scala:66)```

@ilayaperumalg
Copy link
Contributor Author

Interesting as the test passed initially before the modified test (which means the condition that fails above was greater than equal to 1 when the test passed initially). Could there be timing issues?

Also, I don't understand why the condition was checking greater than or equal to 1 (which I believe the reason for the issue addressed in this PR) instead of equal to 1.

@ilayaperumalg
Copy link
Contributor Author

Any suggestions?

@tdas
Copy link
Contributor

tdas commented Jan 14, 2015

This could be a timing issue. Could increase the timeout (say, 2 seconds), and then run it in Jenkins 3-4 times?

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25554 has started for PR 3648 at commit 868efab.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25554 has finished for PR 3648 at commit 868efab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25554/
Test PASSed.

@ilayaperumalg
Copy link
Contributor Author

The test passed now (after increasing the timeout value). Can someone re-run the test to see if the test result is consistent?

@tdas
Copy link
Contributor

tdas commented Jan 14, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25577 has started for PR 3648 at commit 868efab.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25577 has finished for PR 3648 at commit 868efab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25577/
Test PASSed.

@ilayaperumalg
Copy link
Contributor Author

@tdas do you think this is ok to merge now?

@tdas
Copy link
Contributor

tdas commented Jan 20, 2015

I think so. Merging this.

@asfgit asfgit closed this in 4afad9c Jan 20, 2015
asfgit pushed a commit that referenced this pull request Jan 20, 2015
  - The ReceiverTracker receivers `RegisterReceiver` messages two times
     1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
     2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`

Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.

Author: Ilayaperumal Gopinathan <[email protected]>

Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:

868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message

(cherry picked from commit 4afad9c)
Signed-off-by: Tathagata Das <[email protected]>
bomeng pushed a commit to Huawei-Spark/spark that referenced this pull request Jan 21, 2015
  - The ReceiverTracker receivers `RegisterReceiver` messages two times
     1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
     2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`

Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.

Author: Ilayaperumal Gopinathan <[email protected]>

Closes apache#3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:

868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants