Skip to content

Commit 1500deb

Browse files
committed
Changes in Spark Streaming UI
1 parent 9d38d3c commit 1500deb

21 files changed

+115
-31
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
4040
import org.apache.spark.partial.GroupedCountEvaluator
4141
import org.apache.spark.partial.PartialResult
4242
import org.apache.spark.storage.StorageLevel
43-
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
43+
import org.apache.spark.util.{Utils, BoundedPriorityQueue, CallSite}
4444
import org.apache.spark.util.collection.OpenHashMap
4545
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4646

@@ -124,7 +124,7 @@ abstract class RDD[T: ClassTag](
124124
val id: Int = sc.newRddId()
125125

126126
/** A friendly name for this RDD */
127-
@transient var name: String = null
127+
@transient var name: String = sc.getLocalProperty("rddName")
128128

129129
/** Assign a name to this RDD */
130130
def setName(_name: String): this.type = {
@@ -1214,14 +1214,11 @@ abstract class RDD[T: ClassTag](
12141214

12151215
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
12161216
@transient private[spark] val creationSite = {
1217-
val short: String = sc.getLocalProperty("spark.job.callSiteShort")
1217+
val short: String = sc.getLocalProperty(name + Utils.CALL_SITE_SHORT)
12181218
if (short != null) {
1219-
CallSite(short, sc.getLocalProperty("spark.job.callSiteLong"))
1219+
CallSite(short, sc.getLocalProperty(name + Utils.CALL_SITE_LONG))
12201220
} else {
1221-
val callSite: CallSite = Utils.getCallSite
1222-
//sc.setLocalProperty("spark.job.callSiteShort", callSite.short)
1223-
//sc.setLocalProperty("spark.job.callSiteLong", callSite.long)
1224-
callSite
1221+
Utils.getCallSite
12251222
}
12261223
}
12271224
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ private[spark] case class CallSite(val short: String, val long: String)
5252
private[spark] object Utils extends Logging {
5353
val random = new Random()
5454

55+
private[spark] val CALL_SITE_SHORT: String = ".callSite.short"
56+
private[spark] val CALL_SITE_LONG: String = ".callSite.long"
57+
5558
def sparkBin(sparkHome: String, which: String): File = {
5659
val suffix = if (isWindows) ".cmd" else ""
5760
new File(sparkHome + File.separator + "bin", which + suffix)
@@ -800,7 +803,7 @@ private[spark] object Utils extends Logging {
800803
* A regular expression to match classes of the "core" Spark API that we want to skip when
801804
* finding the call site of a method.
802805
*/
803-
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?\.[A-Z]""".r
806+
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.streaming)?(\.streaming\.dstream)?(\.streaming\.scheduler)?(\.streaming\.twitter)?(\.streaming\.kafka)?(\.streaming\.flume)?(\.streaming\.mqtt)?(\.streaming\.zeromq)?\.[A-Z]""".r
804807
private val SCALA_CLASS_REGEX = """^scala(\.util)?(\.collection)?(\.collection\.mutable)?(\.collection\.immutable)?(\.concurrent\.forkjoin)?\.[A-Z]""".r
805808
private val AKKA_CLASS_REGEX = """^akka(\.actor)?(\.dispatch)?\.[A-Z]""".r
806809
private val JAVA_CLASS_REGEX = """^java(\.util\.concurrent)?(\.lang)?\.[A-Z]""".r

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
3030
import org.apache.spark.streaming._
3131
import org.apache.spark.streaming.StreamingContext._
3232
import org.apache.spark.streaming.scheduler.Job
33-
import org.apache.spark.util.{Utils, MetadataCleaner}
33+
import org.apache.spark.util.{CallSite, Utils, MetadataCleaner}
3434

3535
/**
3636
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -106,6 +106,24 @@ abstract class DStream[T: ClassTag] (
106106
/** Return the StreamingContext associated with this DStream */
107107
def context = ssc
108108

109+
private[streaming] val RDD_NAME: String = "rddName";
110+
111+
@transient var name: String = null
112+
113+
/** Assign a name to this DStream */
114+
def setName(_name: String) = {
115+
name = _name
116+
}
117+
118+
/* Find the creation callSite */
119+
val creationSite = Utils.getCallSite
120+
121+
/* Store the creation callSite in threadlocal */
122+
private[streaming] def setCallSite = {
123+
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_SHORT, creationSite.short)
124+
ssc.sparkContext.setLocalProperty(name + Utils.CALL_SITE_LONG, creationSite.long)
125+
}
126+
109127
/** Persist the RDDs of this DStream with the given storage level */
110128
def persist(level: StorageLevel): DStream[T] = {
111129
if (this.isInitialized) {

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
5757
override def start() { }
5858

5959
override def stop() { }
60+
setName("UnionRDD")
6061

6162
/**
6263
* Finds the files that were modified since the last time this method was called and makes
@@ -71,6 +72,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
7172
assert(validTime.milliseconds >= ignoreTime,
7273
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
7374

75+
setCallSite
76+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
7477
// Find new files
7578
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
7679
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))

streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ class FilteredDStream[T: ClassTag](
2727
filterFunc: T => Boolean
2828
) extends DStream[T](parent.ssc) {
2929

30+
setName("FilteredRDD")
31+
3032
override def dependencies = List(parent)
3133

3234
override def slideDuration: Duration = parent.slideDuration
3335

3436
override def compute(validTime: Time): Option[RDD[T]] = {
35-
parent.getOrCompute(validTime).map(_.filter(filterFunc))
37+
setCallSite
38+
val rdd: Option[RDD[T]] = parent.getOrCompute(validTime).map(_.filter(filterFunc))
39+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
40+
return rdd
3641
}
3742
}
3843

streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
2828
flatMapValueFunc: V => TraversableOnce[U]
2929
) extends DStream[(K, U)](parent.ssc) {
3030

31+
setName("FlatMappedValuesRDD")
32+
3133
override def dependencies = List(parent)
3234

3335
override def slideDuration: Duration = parent.slideDuration
3436

3537
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
36-
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
38+
setCallSite
39+
val rdd: Option[RDD[(K, U)]] = parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
40+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
41+
return rdd
3742
}
3843
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
2727
flatMapFunc: T => Traversable[U]
2828
) extends DStream[U](parent.ssc) {
2929

30+
setName("FlatMappedRDD")
31+
3032
override def dependencies = List(parent)
3133

3234
override def slideDuration: Duration = parent.slideDuration
3335

3436
override def compute(validTime: Time): Option[RDD[U]] = {
35-
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
37+
setCallSite
38+
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
39+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
40+
return rdd
3641
}
3742
}
3843

streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,11 @@ class ForEachDStream[T: ClassTag] (
3434

3535
override def compute(validTime: Time): Option[RDD[Unit]] = None
3636

37-
//TODO: where to clear up the threadlocal values?
3837
override def generateJob(time: Time): Option[Job] = {
3938
parent.getOrCompute(time) match {
4039
case Some(rdd) =>
41-
parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
42-
parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
40+
//parent.ssc.sc.setLocalProperty("spark.job.callSiteShort", rdd.creationSite.short)
41+
//parent.ssc.sc.setLocalProperty("spark.job.callSiteLong", rdd.creationSite.long)
4342
val jobFunc = () => {
4443
foreachFunc(rdd, time)
4544
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ private[streaming]
2525
class GlommedDStream[T: ClassTag](parent: DStream[T])
2626
extends DStream[Array[T]](parent.ssc) {
2727

28+
setName("GlommedRDD")
29+
2830
override def dependencies = List(parent)
2931

3032
override def slideDuration: Duration = parent.slideDuration
3133

3234
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
33-
parent.getOrCompute(validTime).map(_.glom())
35+
setCallSite
36+
val rdd: Option[RDD[Array[T]]] = parent.getOrCompute(validTime).map(_.glom())
37+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
38+
return rdd
3439
}
3540
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,17 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
2828
preservePartitioning: Boolean
2929
) extends DStream[U](parent.ssc) {
3030

31+
setName("MapPartitionsRDD")
32+
3133
override def dependencies = List(parent)
3234

3335
override def slideDuration: Duration = parent.slideDuration
3436

3537
override def compute(validTime: Time): Option[RDD[U]] = {
36-
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
38+
setCallSite
39+
val rdd: Option[RDD[U]] = parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
40+
ssc.sparkContext.setLocalProperty(RDD_NAME, name)
41+
return rdd
3742
}
3843
}
3944

0 commit comments

Comments
 (0)