@@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
8383 */
8484 @ throws(classOf [Exception ])
8585 def get (): T = Await .result(this , Duration .Inf )
86+
87+ /**
88+ * Returns the job IDs run by the underlying async operation.
89+ *
90+ * This returns the current snapshot of the job list. Certain operations may run multiple
91+ * jobs, so multiple calls to this method may return different lists.
92+ */
93+ def jobIds : Seq [Int ]
94+
8695}
8796
8897
@@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
150159 }
151160 }
152161
153- /** Get the corresponding job id for this action. */
154- def jobId = jobWaiter.jobId
162+ def jobIds = Seq (jobWaiter.jobId)
155163}
156164
157165
@@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
171179 // is cancelled before the action was even run (and thus we have no thread to interrupt).
172180 @ volatile private var _cancelled : Boolean = false
173181
182+ @ volatile private var jobs : Seq [Int ] = Nil
183+
174184 // A promise used to signal the future.
175185 private val p = promise[T ]()
176186
@@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
219229 }
220230 }
221231
232+ this .jobs = jobs ++ job.jobIds
233+
222234 // Wait for the job to complete. If the action is cancelled (with an interrupt),
223235 // cancel the job and stop the execution. This is not in a synchronized block because
224236 // Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
@@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
255267 override def isCompleted : Boolean = p.isCompleted
256268
257269 override def value : Option [Try [T ]] = p.future.value
270+
271+ def jobIds = jobs
272+
258273}
0 commit comments