Skip to content

Commit 6cd7366

Browse files
committed
Fix SPARK-7858 by using output types for conversion.
1 parent 5a00e66 commit 6cd7366

File tree

3 files changed

+13
-19
lines changed

3 files changed

+13
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
392392
SparkPlan.currentContext.set(self)
393393
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
394394
val attributeSeq = schema.toAttributes
395-
val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
395+
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
396396
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
397397
}
398398

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,24 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
2323
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
26-
import org.apache.spark.sql.types.StructType
26+
import org.apache.spark.sql.types.{DataType, StructType}
2727
import org.apache.spark.sql.{Row, SQLContext}
2828

2929
/**
3030
* :: DeveloperApi ::
3131
*/
3232
@DeveloperApi
3333
object RDDConversions {
34-
def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): RDD[Row] = {
34+
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
3535
data.mapPartitions { iterator =>
3636
if (iterator.isEmpty) {
3737
Iterator.empty
3838
} else {
3939
val bufferedIterator = iterator.buffered
40-
val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
41-
val schemaFields = schema.fields
42-
assert(mutableRow.length == schemaFields.length,
43-
s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}")
44-
val converters = schemaFields.map {
45-
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
46-
}
40+
val mutableRow = new SpecificMutableRow(outputTypes)
41+
assert(mutableRow.length == outputTypes.length,
42+
s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}")
43+
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
4744
bufferedIterator.map { r =>
4845
var i = 0
4946
while (i < mutableRow.length) {
@@ -60,19 +57,16 @@ object RDDConversions {
6057
/**
6158
* Convert the objects inside Row into the types Catalyst expected.
6259
*/
63-
def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = {
60+
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
6461
data.mapPartitions { iterator =>
6562
if (iterator.isEmpty) {
6663
Iterator.empty
6764
} else {
6865
val bufferedIterator = iterator.buffered
69-
val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray)
70-
val schemaFields = schema.fields
71-
assert(mutableRow.length == schemaFields.length,
72-
s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}")
73-
val converters = schemaFields.map {
74-
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
75-
}
66+
val mutableRow = new SpecificMutableRow(outputTypes)
67+
assert(mutableRow.length == outputTypes.length,
68+
s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}")
69+
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
7670
bufferedIterator.map { r =>
7771
var i = 0
7872
while (i < mutableRow.length) {

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
309309
output: Seq[Attribute],
310310
rdd: RDD[Row]): SparkPlan = {
311311
val converted = if (relation.needConversion) {
312-
execution.RDDConversions.rowToRowRdd(rdd, relation.schema)
312+
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
313313
} else {
314314
rdd
315315
}

0 commit comments

Comments
 (0)