Skip to content

Commit 47cce98

Browse files
zhichao-litdas
authored andcommitted
[SPARK-6077] Remove streaming tab while stopping StreamingContext
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect. snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0 How to reproduce: 1) import org.apache.spark.SparkConf import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ..... 2) ssc.stop(false) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Author: lisurprise <[email protected]> Closes #4828 from zhichao-li/master and squashes the following commits: c329806 [lisurprise] add test for attaching/detaching streaming tab 51e6c7f [lisurprise] move detach method into StreamingTab 31a44fa [lisurprise] add unit test for attaching and detaching new tab db25ed2 [lisurprise] clean code 8281bcb [lisurprise] clean code 193c542 [lisurprise] remove streaming tab while closing streaming context (cherry picked from commit f149b8b) Signed-off-by: Tathagata Das <[email protected]>
1 parent 67fa6d1 commit 47cce98

File tree

9 files changed

+179
-101
lines changed

9 files changed

+179
-101
lines changed

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.ui
2020
import javax.servlet.http.HttpServletRequest
2121

2222
import scala.collection.mutable.ArrayBuffer
23+
import scala.collection.mutable.HashMap
2324
import scala.xml.Node
2425

2526
import org.eclipse.jetty.servlet.ServletContextHandler
2627
import org.json4s.JsonAST.{JNothing, JValue}
2728

28-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2929
import org.apache.spark.ui.JettyUtils._
3030
import org.apache.spark.util.Utils
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3132

3233
/**
3334
* The top level component of the UI hierarchy that contains the server.
@@ -45,6 +46,7 @@ private[spark] abstract class WebUI(
4546

4647
protected val tabs = ArrayBuffer[WebUITab]()
4748
protected val handlers = ArrayBuffer[ServletContextHandler]()
49+
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
4850
protected var serverInfo: Option[ServerInfo] = None
4951
protected val localHostName = Utils.localHostName()
5052
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
@@ -60,14 +62,30 @@ private[spark] abstract class WebUI(
6062
tab.pages.foreach(attachPage)
6163
tabs += tab
6264
}
65+
66+
def detachTab(tab: WebUITab) {
67+
tab.pages.foreach(detachPage)
68+
tabs -= tab
69+
}
70+
71+
def detachPage(page: WebUIPage) {
72+
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
73+
}
6374

6475
/** Attach a page to this UI. */
6576
def attachPage(page: WebUIPage) {
6677
val pagePath = "/" + page.prefix
67-
attachHandler(createServletHandler(pagePath,
68-
(request: HttpServletRequest) => page.render(request), securityManager, basePath))
69-
attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
70-
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
78+
val renderHandler = createServletHandler(pagePath,
79+
(request: HttpServletRequest) => page.render(request), securityManager, basePath)
80+
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
81+
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
82+
attachHandler(renderHandler)
83+
attachHandler(renderJsonHandler)
84+
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
85+
.append(renderHandler)
86+
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
87+
.append(renderJsonHandler)
88+
7189
}
7290

7391
/** Attach a handler to this UI. */

core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,24 @@
1717

1818
package org.apache.spark.ui
1919

20+
import javax.servlet.http.HttpServletRequest
21+
2022
import scala.collection.JavaConversions._
23+
import scala.xml.Node
2124

22-
import org.openqa.selenium.{By, WebDriver}
2325
import org.openqa.selenium.htmlunit.HtmlUnitDriver
26+
import org.openqa.selenium.{By, WebDriver}
2427
import org.scalatest._
2528
import org.scalatest.concurrent.Eventually._
2629
import org.scalatest.selenium.WebBrowser
2730
import org.scalatest.time.SpanSugar._
2831

29-
import org.apache.spark._
3032
import org.apache.spark.LocalSparkContext._
33+
import org.apache.spark._
3134
import org.apache.spark.api.java.StorageLevels
3235
import org.apache.spark.shuffle.FetchFailedException
3336

37+
3438
/**
3539
* Selenium tests for the Spark Web UI.
3640
*/
@@ -310,4 +314,46 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
310314
}
311315
}
312316
}
317+
318+
test("attaching and detaching a new tab") {
319+
withSpark(newSparkContext()) { sc =>
320+
val sparkUI = sc.ui.get
321+
322+
val newTab = new WebUITab(sparkUI, "foo") {
323+
attachPage(new WebUIPage("") {
324+
def render(request: HttpServletRequest): Seq[Node] = {
325+
<b>"html magic"</b>
326+
}
327+
})
328+
}
329+
sparkUI.attachTab(newTab)
330+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
331+
go to (sc.ui.get.appUIAddress.stripSuffix("/"))
332+
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
333+
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
334+
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
335+
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
336+
find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
337+
}
338+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
339+
// check whether new page exists
340+
go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
341+
find(cssSelector("b")).get.text should include ("html magic")
342+
}
343+
sparkUI.detachTab(newTab)
344+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
345+
go to (sc.ui.get.appUIAddress.stripSuffix("/"))
346+
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
347+
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
348+
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
349+
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
350+
find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
351+
}
352+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
353+
// check new page not exist
354+
go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
355+
find(cssSelector("b")) should be(None)
356+
}
357+
}
358+
}
313359
}

core/src/test/scala/org/apache/spark/ui/UISuite.scala

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.ui
1919

2020
import java.net.ServerSocket
21-
import javax.servlet.http.HttpServletRequest
2221

2322
import scala.io.Source
2423
import scala.util.{Failure, Success, Try}
@@ -28,9 +27,8 @@ import org.scalatest.FunSuite
2827
import org.scalatest.concurrent.Eventually._
2928
import org.scalatest.time.SpanSugar._
3029

31-
import org.apache.spark.{SparkContext, SparkConf}
3230
import org.apache.spark.LocalSparkContext._
33-
import scala.xml.Node
31+
import org.apache.spark.{SparkConf, SparkContext}
3432

3533
class UISuite extends FunSuite {
3634

@@ -72,40 +70,6 @@ class UISuite extends FunSuite {
7270
}
7371
}
7472

75-
ignore("attaching a new tab") {
76-
withSpark(newSparkContext()) { sc =>
77-
val sparkUI = sc.ui.get
78-
79-
val newTab = new WebUITab(sparkUI, "foo") {
80-
attachPage(new WebUIPage("") {
81-
def render(request: HttpServletRequest): Seq[Node] = {
82-
<b>"html magic"</b>
83-
}
84-
})
85-
}
86-
sparkUI.attachTab(newTab)
87-
eventually(timeout(10 seconds), interval(50 milliseconds)) {
88-
val html = Source.fromURL(sparkUI.appUIAddress).mkString
89-
assert(!html.contains("random data that should not be present"))
90-
91-
// check whether new page exists
92-
assert(html.toLowerCase.contains("foo"))
93-
94-
// check whether other pages still exist
95-
assert(html.toLowerCase.contains("stages"))
96-
assert(html.toLowerCase.contains("storage"))
97-
assert(html.toLowerCase.contains("environment"))
98-
assert(html.toLowerCase.contains("executors"))
99-
}
100-
101-
eventually(timeout(10 seconds), interval(50 milliseconds)) {
102-
val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
103-
// check whether new page exists
104-
assert(html.contains("magic"))
105-
}
106-
}
107-
}
108-
10973
test("jetty selects different port under contention") {
11074
val server = new ServerSocket(0)
11175
val startPort = server.getLocalPort

streaming/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@
8282
<artifactId>junit</artifactId>
8383
<scope>test</scope>
8484
</dependency>
85+
<dependency>
86+
<groupId>org.seleniumhq.selenium</groupId>
87+
<artifactId>selenium-java</artifactId>
88+
<scope>test</scope>
89+
</dependency>
8590
<dependency>
8691
<groupId>com.novocode</groupId>
8792
<artifactId>junit-interface</artifactId>

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ class StreamingContext private[streaming] (
578578
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
579579
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
580580
if (stopSparkContext) sc.stop()
581+
uiTab.foreach(_.detach())
581582
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
582583
state = Stopped
583584
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
3232
extends WebUIPage("") with Logging {
3333

3434
private val listener = parent.listener
35-
private val startTime = Calendar.getInstance().getTime()
35+
private val startTime = System.currentTimeMillis()
3636
private val emptyCell = "-"
3737

3838
/** Render the page */
@@ -47,7 +47,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
4747

4848
/** Generate basic stats of the streaming program */
4949
private def generateBasicStats(): Seq[Node] = {
50-
val timeSinceStart = System.currentTimeMillis() - startTime.getTime
50+
val timeSinceStart = System.currentTimeMillis() - startTime
5151
<ul class ="unstyled">
5252
<li>
5353
<strong>Started at: </strong> {startTime.toString}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ private[spark] class StreamingTab(ssc: StreamingContext)
3636
ssc.addStreamingListener(listener)
3737
attachPage(new StreamingPage(this))
3838
parent.attachTab(this)
39+
40+
def detach() {
41+
getSparkUI(ssc).detachTab(this)
42+
}
3943
}
4044

4145
private object StreamingTab {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming
19+
20+
import org.openqa.selenium.WebDriver
21+
import org.openqa.selenium.htmlunit.HtmlUnitDriver
22+
import org.scalatest._
23+
import org.scalatest.concurrent.Eventually._
24+
import org.scalatest.selenium.WebBrowser
25+
import org.scalatest.time.SpanSugar._
26+
27+
import org.apache.spark._
28+
29+
30+
31+
32+
/**
33+
* Selenium tests for the Spark Web UI.
34+
*/
35+
class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase {
36+
37+
implicit var webDriver: WebDriver = _
38+
39+
override def beforeAll(): Unit = {
40+
webDriver = new HtmlUnitDriver
41+
}
42+
43+
override def afterAll(): Unit = {
44+
if (webDriver != null) {
45+
webDriver.quit()
46+
}
47+
}
48+
49+
/**
50+
* Create a test SparkStreamingContext with the SparkUI enabled.
51+
*/
52+
private def newSparkStreamingContext(): StreamingContext = {
53+
val conf = new SparkConf()
54+
.setMaster("local")
55+
.setAppName("test")
56+
.set("spark.ui.enabled", "true")
57+
val ssc = new StreamingContext(conf, Seconds(1))
58+
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
59+
ssc
60+
}
61+
62+
test("attaching and detaching a Streaming tab") {
63+
withStreamingContext(newSparkStreamingContext()) { ssc =>
64+
val sparkUI = ssc.sparkContext.ui.get
65+
66+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
67+
go to (sparkUI.appUIAddress.stripSuffix("/"))
68+
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
69+
}
70+
71+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
72+
// check whether streaming page exists
73+
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
74+
val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
75+
statisticText should contain("Network receivers:")
76+
statisticText should contain("Batch interval:")
77+
}
78+
79+
ssc.stop(false)
80+
81+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
82+
go to (sparkUI.appUIAddress.stripSuffix("/"))
83+
find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
84+
}
85+
86+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
87+
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
88+
val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
89+
statisticText should not contain ("Network receivers:")
90+
statisticText should not contain ("Batch interval:")
91+
}
92+
}
93+
}
94+
}
95+

streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)