1717
1818package org .apache .spark .sql .sources
1919
20- import org .apache .spark .Logging
21- import org .apache .spark .rdd .{RDD , UnionRDD }
20+ import org .apache .spark .{ Logging , TaskContext }
21+ import org .apache .spark .rdd .{MapPartitionsRDD , RDD , UnionRDD }
2222import org .apache .spark .sql .catalyst .expressions
2323import org .apache .spark .sql .catalyst .expressions ._
2424import org .apache .spark .sql .catalyst .planning .PhysicalOperation
@@ -184,7 +184,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
184184 }
185185 }
186186
187- dataRows.mapPartitions { iterator =>
187+ // Since we know for sure that this closure is serializable, we can avoid the overhead
188+ // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
189+ // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
190+ val mapPartitionsFunc = (_ : TaskContext , _ : Int , iterator : Iterator [Row ]) => {
188191 val dataTypes = requiredColumns.map(schema(_).dataType)
189192 val mutableRow = new SpecificMutableRow (dataTypes)
190193 iterator.map { dataRow =>
@@ -196,6 +199,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
196199 mutableRow.asInstanceOf [expressions.Row ]
197200 }
198201 }
202+ new MapPartitionsRDD (dataRows, mapPartitionsFunc, preservesPartitioning = false )
203+
199204 } else {
200205 dataRows
201206 }
0 commit comments