Skip to content

Commit 4ea23db

Browse files
committed
SPARK-1019: pyspark RDD take() throws an NPE
Author: Patrick Wendell <[email protected]> Closes #112 from pwendell/pyspark-take and squashes the following commits: daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
1 parent 6bd2eaa commit 4ea23db

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TaskContext(
4646
}
4747

4848
def executeOnCompleteCallbacks() {
49-
onCompleteCallbacks.foreach{_()}
49+
// Process complete callbacks in the reverse order of registration
50+
onCompleteCallbacks.reverse.foreach{_()}
5051
}
5152
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
100100
}
101101
}.start()
102102

103+
/*
104+
* Partial fix for SPARK-1019: Attempts to stop reading the input stream since
105+
* other completion callbacks might invalidate the input. Because interruption
106+
* is not synchronous this still leaves a potential race where the interruption is
107+
* processed only after the stream becomes invalid.
108+
*/
109+
context.addOnCompleteCallback(() => context.interrupted = true)
110+
103111
// Return an iterator that read lines from the process's stdout
104112
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
105113
val stdoutIterator = new Iterator[Array[Byte]] {

0 commit comments

Comments
 (0)