-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-6077] Remove streaming tab while stopping StreamingContext #4828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have some style issues here, like spaces around operators, braces, and parentheses. Have a look at other code. Also we usually write e.g. foreach { tab => not foreach(tab = {
I think the code here can be more concise and avoid return with a construction built around `getTabs.find(_.isInstanceOf[StreamingTab]).map(...).getOrElse(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen , Thanks for the advice. I've just tested and refactored the code accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapping is wrong here; Unit should be on the previous line and no space before the brace. No space before :. Also, System.currentTimeMillis() is much more conventional for getting the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
This looks OK to me. I'd like to give @tdas at least a day to look at it. |
|
Hey this is a decent fix, but I think this is not the right one. With this fix, the new ssc will be reflected in the new streaming tab, however it will be still visible even after the earlier ssc has been stopped. The right solution is that the tab should be removed when a stream ing context is stopped. Since only one streaming context can be active on the same spark context at the same time, attach-on-start-and-remove-on-stop will fix the multiple tab problem in the right way. Does that make sense? |
|
@tdas do you think we should add a Selenium test for this? |
|
Will that be stable? Not flaky? In the past we had simple web ui tests that On Mon, Mar 2, 2015 at 1:55 PM, Josh Rosen [email protected] wrote:
|
|
Poorly-written Selenium tests can be flaky if they don't account for things like asynchrony (e.g. when testing Javascript interactions), but I don't think that will be a problem here. We now have a bunch of Selenium tests for the Spark core UI and I don't think I've ever seen them fail in Jenkins: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala |
|
Then we should go for it! On Mon, Mar 2, 2015 at 2:28 PM, Josh Rosen [email protected] wrote:
|
7694813 to
193c542
Compare
|
@tdas Just updated the code with the solution you mentioned(attach-on-start-and-remove-on-stop) by adding the detachTab and detachPage function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the import ordering.
Java import
Scala import
Third party import
Spark import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We dont use the idiom "to --> 2" in our code. pageToHandlers is better.
|
Can you add unit tests for detaching pages and tabs. @JoshRosen will be able to help you out. |
|
BTW, please update the title and JIRA as we are not updating existing tab any more. |
|
Spark core's
In order to test the tab attach / detach functionality, I think we should re-enable and extend this test to check that we're able to remove the tab that we just attached. It looks like this test may have been disabled due to port contention issues (#466), but I think those issues should be resolved now. If it will be easier, you might rewriting this test using the new UISeleniumSuite framework, which will let you use Selenium CSS selectors, etc. in your test. |
|
ok to test |
|
Test build #28500 has started for PR 4828 at commit
|
|
LGTM, assuming the tests pass. |
|
Oh wait. I just realized that there is a already a unit test in org.apace.spark.streaming.UISuite which tested whether the streaming tab works. But thats ignored too. Could you update that testsuite to use selenium and test whether the streaming tab is attached on and detached correctly. I just realized that the unit tests in the PR tests detaching for tabs, but does not test whether it works specifically for streaming (suppose the line to detach gets deleted accidentally in future, no unit test would fail). So please add the streaming's specific unit test - rename spark.streaming.UISuite to UISeleniumSuite, and then update test to use selenium. Other than that, existing changes look good. |
|
Test build #28500 has finished for PR 4828 at commit
|
|
Test PASSed. |
|
Test build #28502 has started for PR 4828 at commit
|
|
Test build #28503 has started for PR 4828 at commit
|
|
@tdas UISeleniumSuite has been added for streaming and the LocalStreamingContext utilities is borrowed from LocalSparkContext . |
|
Test build #28504 has started for PR 4828 at commit
|
|
Test build #28502 has finished for PR 4828 at commit
|
|
Test PASSed. |
|
Test build #28503 has finished for PR 4828 at commit
|
|
Test PASSed. |
|
Test build #28504 has finished for PR 4828 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already defined in streaming.TestSuiteBase. Rather than creating this new file, please use that.
|
Test build #28556 has started for PR 4828 at commit
|
|
Test build #28556 has finished for PR 4828 at commit
|
|
Test PASSed. |
|
LGTM! Will merge asap. |
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect. snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0 How to reproduce: 1) import org.apache.spark.SparkConf import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ..... 2) ssc.stop(false) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Author: lisurprise <[email protected]> Closes #4828 from zhichao-li/master and squashes the following commits: c329806 [lisurprise] add test for attaching/detaching streaming tab 51e6c7f [lisurprise] move detach method into StreamingTab 31a44fa [lisurprise] add unit test for attaching and detaching new tab db25ed2 [lisurprise] clean code 8281bcb [lisurprise] clean code 193c542 [lisurprise] remove streaming tab while closing streaming context (cherry picked from commit f149b8b) Signed-off-by: Tathagata Das <[email protected]>
|
Merged this in master and branch-1.3 |
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect.
snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0
How to reproduce:
1)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.
{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey( + )
wordCounts.print()
ssc.start()
.....
2)
ssc.stop(false)
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()