Skip to content

Commit 4d86e98

Browse files
committed
Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later.
1 parent 93f1c69 commit 4d86e98

File tree

3 files changed

+147
-50
lines changed

3 files changed

+147
-50
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala

Lines changed: 138 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@ import org.apache.spark.streaming.scheduler._
2929
import org.apache.spark.ui.{ServerInfo, SparkUI}
3030
import org.apache.spark.ui.JettyUtils._
3131
import org.apache.spark.util.{Distribution, Utils}
32-
import java.util.Locale
32+
import java.util.{Calendar, Locale}
3333

34-
private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener {
34+
private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener {
3535

3636
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
3737
private val runningBatchInfos = new HashMap[Time, BatchInfo]
3838
private val completedaBatchInfos = new Queue[BatchInfo]
39-
private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100)
39+
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
40+
private var totalBatchesCompleted = 0L
41+
42+
val batchDuration = ssc.graph.batchDuration.milliseconds
4043

4144
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
4245
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
@@ -52,6 +55,11 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
5255
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
5356
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
5457
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
58+
totalBatchesCompleted += 1L
59+
}
60+
61+
def numTotalBatchesCompleted: Long = synchronized {
62+
totalBatchesCompleted
5563
}
5664

5765
def numNetworkReceivers: Int = synchronized {
@@ -89,7 +97,8 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
8997
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
9098
(0 until numNetworkReceivers).map { receiverId =>
9199
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
92-
val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble))
100+
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
101+
val distributionOption = Distribution(recordsOfParticularReceiver)
93102
(receiverId, distributionOption)
94103
}.toMap
95104
}
@@ -99,44 +108,42 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
99108
}
100109
}
101110

102-
private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
103111

104-
private val sc = ssc.sparkContext
105-
private val conf = sc.conf
106-
private val appName = sc.appName
107-
private val bindHost = Utils.localHostName()
108-
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
109-
private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
110-
private val securityManager = sc.env.securityManager
111-
private val listener = new StreamingUIListener(conf)
112-
private val handlers: Seq[ServletContextHandler] = {
113-
Seq(
114-
createServletHandler("/",
115-
(request: HttpServletRequest) => render(request), securityManager),
116-
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
117-
)
118-
}
112+
private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
119113

120-
private var serverInfo: Option[ServerInfo] = None
114+
private val listener = parent.listener
115+
private val calendar = Calendar.getInstance()
116+
private val startTime = calendar.getTime()
121117

122-
ssc.addStreamingListener(listener)
123118

124-
def bind() {
125-
try {
126-
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
127-
logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
128-
} catch {
129-
case e: Exception =>
130-
logError("Failed to create Spark JettyUtils", e)
131-
System.exit(1)
132-
}
133-
}
119+
def render(request: HttpServletRequest): Seq[Node] = {
134120

135-
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
121+
val content =
122+
generateBasicStats() ++
123+
<h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
124+
generateNetworkStatsTable() ++
125+
generateBatchStatsTable()
126+
UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
127+
}
136128

137-
private def render(request: HttpServletRequest): Seq[Node] = {
138-
val content = generateBatchStatsTable() ++ generateNetworkStatsTable()
139-
UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview")
129+
private def generateBasicStats(): Seq[Node] = {
130+
131+
val timeSinceStart = System.currentTimeMillis() - startTime.getTime
132+
<ul class ="unstyled">
133+
<li>
134+
<strong>Started at: </strong> {startTime.toString}
135+
</li>
136+
<li>
137+
<strong>Time since start: </strong>{msDurationToString(timeSinceStart)}
138+
</li>
139+
<li>
140+
<strong>Batch interval: </strong>{msDurationToString(listener.batchDuration)}
141+
</li>
142+
<li>
143+
<strong>Processed batches: </strong>{listener.numTotalBatchesCompleted}
144+
</li>
145+
<li></li>
146+
</ul>
140147
}
141148

142149
private def generateBatchStatsTable(): Seq[Node] = {
@@ -173,18 +180,12 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
173180

174181
val batchStats =
175182
<ul class="unstyled">
176-
<li>
177-
<h5>Statistics over last {numBatches} processed batches</h5>
178-
</li>
179-
<li>
180-
{table.getOrElse("No statistics have been generated yet.")}
181-
</li>
183+
{table.getOrElse("No statistics have been generated yet.")}
182184
</ul>
183185

184186
val content =
185-
<h4>Batch Processing Statistics</h4> ++
186-
<div>{batchCounts}</div> ++
187-
<div>{batchStats}</div>
187+
<h5>Batch Processing Statistics</h5> ++
188+
<div>{batchStats}</div>
188189

189190
content
190191
}
@@ -198,7 +199,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
198199
val dataRows = (0 until numNetworkReceivers).map { receiverId =>
199200
val receiverName = s"Receiver-$receiverId"
200201
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
201-
d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch")
202+
d.getQuantiles().map(r => numberToString(r.toLong) + " records/second")
202203
}.getOrElse {
203204
Seq("-", "-", "-", "-", "-")
204205
}
@@ -210,8 +211,8 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
210211
}
211212

212213
val content =
213-
<h4>Network Input Statistics</h4> ++
214-
<div>{table.getOrElse("No network receivers")}</div>
214+
<h5>Network Input Statistics</h5> ++
215+
<div>{table.getOrElse("No network receivers")}</div>
215216

216217
content
217218
}
@@ -241,6 +242,95 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
241242
}
242243
"%.1f%s".formatLocal(Locale.US, value, unit)
243244
}
245+
246+
/**
247+
* Returns a human-readable string representing a duration such as "5 second 35 ms"
248+
*/
249+
private def msDurationToString(ms: Long): String = {
250+
try {
251+
val second = 1000L
252+
val minute = 60 * second
253+
val hour = 60 * minute
254+
val day = 24 * hour
255+
val week = 7 * day
256+
val year = 365 * day
257+
258+
def toString(num: Long, unit: String): String = {
259+
if (num == 0) {
260+
""
261+
} else if (num == 1) {
262+
s"$num $unit"
263+
} else {
264+
s"$num ${unit}s"
265+
}
266+
}
267+
268+
val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms"
269+
val secondString = toString((ms % minute) / second, "second")
270+
val minuteString = toString((ms % hour) / minute, "minute")
271+
val hourString = toString((ms % day) / hour, "hour")
272+
val dayString = toString((ms % week) / day, "day")
273+
val weekString = toString((ms % year) / week, "week")
274+
val yearString = toString(ms / year, "year")
275+
276+
Seq(
277+
second -> millisecondsString,
278+
minute -> s"$secondString $millisecondsString",
279+
hour -> s"$minuteString $secondString",
280+
day -> s"$hourString $minuteString $secondString",
281+
week -> s"$dayString $hourString $minuteString",
282+
year -> s"$weekString $dayString $hourString"
283+
).foreach {
284+
case (durationLimit, durationString) if (ms < durationLimit) =>
285+
return durationString
286+
case e: Any => // matcherror is thrown without this
287+
}
288+
return s"$yearString $weekString $dayString"
289+
} catch {
290+
case e: Exception =>
291+
logError("Error converting time to string", e)
292+
return ""
293+
}
294+
}
295+
}
296+
297+
298+
private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging {
299+
300+
val sc = ssc.sparkContext
301+
val conf = sc.conf
302+
val appName = sc.appName
303+
val listener = new StreamingUIListener(ssc)
304+
val overviewPage = new StreamingPage(this)
305+
306+
private val bindHost = Utils.localHostName()
307+
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
308+
private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
309+
private val securityManager = sc.env.securityManager
310+
private val handlers: Seq[ServletContextHandler] = {
311+
Seq(
312+
createServletHandler("/",
313+
(request: HttpServletRequest) => overviewPage.render(request), securityManager),
314+
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
315+
)
316+
}
317+
318+
private var serverInfo: Option[ServerInfo] = None
319+
320+
ssc.addStreamingListener(listener)
321+
322+
def bind() {
323+
try {
324+
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
325+
logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
326+
} catch {
327+
case e: Exception =>
328+
logError("Failed to create Spark JettyUtils", e)
329+
System.exit(1)
330+
}
331+
}
332+
333+
private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
244334
}
245335

246336
object StreamingUI {

streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,15 @@ private[spark] object UIUtils {
2525
type="text/css" />
2626
<script src={prependBaseUri("/static/sorttable.js")} ></script>
2727
<title>{appName} - {title}</title>
28+
<script type="text/JavaScript">
29+
<!--
30+
function timedRefresh(timeoutPeriod) {
31+
setTimeout("location.reload(true);",timeoutPeriod);
32+
}
33+
// -->
34+
</script>
2835
</head>
29-
<body>
36+
<body onload="JavaScript:timedRefresh(1000);">
3037
<div class="navbar navbar-static-top">
3138
<div class="navbar-inner">
3239
<a href={prependBaseUri(basePath, "/")} class="brand">

streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class UISuite extends FunSuite with BeforeAndAfterAll {
2424
val startTime = System.currentTimeMillis()
2525
while (System.currentTimeMillis() - startTime < duration) {
2626
servers.map(_.send(Random.nextString(10) + "\n"))
27-
Thread.sleep(1)
27+
//Thread.sleep(1)
2828
}
2929
ssc.stop()
3030
servers.foreach(_.stop())

0 commit comments

Comments
 (0)