@@ -53,15 +53,17 @@ private[spark] class HashShuffleReader[K, C](
5353 val recordIterator = wrappedStreams.flatMap { wrappedStream =>
5454 val kvIter = serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
5555 CompletionIterator [(Any , Any ), Iterator [(Any , Any )]](kvIter, {
56- // Close the stream once all the records have been read from it
56+ // Close the stream once all the records have been read from it to free underlying
57+ // ManagedBuffer as soon as possible. Note that in case of task failure, the task's
58+ // TaskCompletionListener will make sure this is released.
5759 wrappedStream.close()
5860 })
5961 }
6062
6163 // Update read metrics for each record materialized
62- val iter = new InterruptibleIterator [Any ](context, recordIterator) {
64+ val iter = new InterruptibleIterator [( Any , Any ) ](context, recordIterator) {
6365 val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
64- override def next (): Any = {
66+ override def next (): ( Any , Any ) = {
6567 readMetrics.incRecordsRead(1 )
6668 delegate.next()
6769 }
@@ -70,14 +72,14 @@ private[spark] class HashShuffleReader[K, C](
7072 val aggregatedIter : Iterator [Product2 [K , C ]] = if (dep.aggregator.isDefined) {
7173 if (dep.mapSideCombine) {
7274 // We are reading values that are already combined
73- val combinedKeyValuesIterator = iter.asInstanceOf [Iterator [(K ,C )]]
75+ val combinedKeyValuesIterator = iter.asInstanceOf [Iterator [(K , C )]]
7476 new InterruptibleIterator (context,
7577 dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context))
7678 } else {
7779 // We don't know the value type, but also don't care -- the dependency *should*
7880 // have made sure its compatible w/ this aggregator, which will convert the value
7981 // type to the combined type C
80- val keyValuesIterator = iter.asInstanceOf [Iterator [(K ,Nothing )]]
82+ val keyValuesIterator = iter.asInstanceOf [Iterator [(K , Nothing )]]
8183 new InterruptibleIterator (context,
8284 dep.aggregator.get.combineValuesByKey(keyValuesIterator, context))
8385 }
0 commit comments