Skip to content

Commit 2a51617

Browse files
committed
SPARK-1205: Clean up callSite/origin/generator.
This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. Author: Patrick Wendell <[email protected]> Closes #106 from pwendell/callsite and squashes the following commits: fc1d009 [Patrick Wendell] Compile fix e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite 62e77ef [Patrick Wendell] Review feedback 576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.
1 parent a59419c commit 2a51617

File tree

8 files changed

+16
-38
lines changed

8 files changed

+16
-38
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -832,13 +832,12 @@ class SparkContext(
832832
setLocalProperty("externalCallSite", null)
833833
}
834834

835+
/**
836+
* Capture the current user callsite and return a formatted version for printing. If the user
837+
* has overridden the call site, this will return the user's version.
838+
*/
835839
private[spark] def getCallSite(): String = {
836-
val callSite = getLocalProperty("externalCallSite")
837-
if (callSite == null) {
838-
Utils.formatSparkCallSite
839-
} else {
840-
callSite
841-
}
840+
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
842841
}
843842

844843
/**

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
135135
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
136136
wrapRDD(rdd.subtract(other, p))
137137

138-
def generator: String = rdd.generator
139-
140138
override def toString = rdd.toString
141139

142140
/** Assign a name to this RDD */

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.api.java
1919

2020
import java.util.{Comparator, List => JList}
2121

22-
import scala.Tuple2
2322
import scala.collection.JavaConversions._
2423
import scala.reflect.ClassTag
2524

@@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
500499

501500
def name(): String = rdd.name
502501

503-
/** Reset generator */
504-
def setGenerator(_generator: String) = {
505-
rdd.setGenerator(_generator)
506-
}
507502
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
126126
this
127127
}
128128

129-
/** User-defined generator of this RDD*/
130-
@transient var generator = Utils.getCallSiteInfo.firstUserClass
131-
132-
/** Reset generator*/
133-
def setGenerator(_generator: String) = {
134-
generator = _generator
135-
}
136-
137129
/**
138130
* Set this RDD's storage level to persist its values across operations after the first time
139131
* it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](
10311023

10321024
private var storageLevel: StorageLevel = StorageLevel.NONE
10331025

1034-
/** Record user function generating this RDD. */
1035-
@transient private[spark] val origin = sc.getCallSite()
1026+
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1027+
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
1028+
private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)
10361029

10371030
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
10381031

@@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
10951088
}
10961089

10971090
override def toString: String = "%s%s[%d] at %s".format(
1098-
Option(name).map(_ + " ").getOrElse(""),
1099-
getClass.getSimpleName,
1100-
id,
1101-
origin)
1091+
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
11021092

11031093
def toJavaRDD() : JavaRDD[T] = {
11041094
new JavaRDD(this)(elementClassTag)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ class DAGScheduler(
279279
} else {
280280
// Kind of ugly: need to register RDDs with the cache and map output tracker here
281281
// since we can't do it in the RDD constructor because # of partitions is unknown
282-
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
282+
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
283283
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
284284
}
285285
stage

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
213213
* @param indent Indent number before info
214214
*/
215215
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
216+
val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
216217
val rddInfo =
217-
if (rdd.getStorageLevel != StorageLevel.NONE) {
218-
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
219-
rdd.origin + " " + rdd.generator
220-
} else {
221-
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
222-
rdd.origin + " " + rdd.generator
223-
}
218+
s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
219+
s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
224220
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
225221
rdd.dependencies.foreach {
226222
case shufDep: ShuffleDependency[_, _] =>

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[spark] class Stage(
100100
id
101101
}
102102

103-
val name = callSite.getOrElse(rdd.origin)
103+
val name = callSite.getOrElse(rdd.getCreationSite)
104104

105105
override def toString = "Stage " + id
106106

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
719719
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
720720
}
721721

722-
def formatSparkCallSite = {
723-
val callSiteInfo = getCallSiteInfo
722+
/** Returns a printable version of the call site info suitable for logs. */
723+
def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
724724
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
725725
callSiteInfo.firstUserLine)
726726
}

0 commit comments

Comments
 (0)