From 9968fba9979287aaa1f141ba18bfb9d4c116a3b3 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 26 May 2015 23:44:39 +0800 Subject: [PATCH 1/6] Tests the data type conversion code paths --- .../org/apache/spark/sql/sources/SimpleTextRelation.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index de907846b9180..0f959b3d0b86d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLContext} @@ -108,7 +109,10 @@ class SimpleTextRelation( sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record => Row(record.split(",").zip(fields).map { case (value, dataType) => - Cast(Literal(value), dataType).eval() + // `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.) + val catalystValue = Cast(Literal(value), dataType).eval() + // Here we're converting Catalyst values to Scala values to test `needsConversion` + CatalystTypeConverters.convertToScala(catalystValue, dataType) }: _*) } } From 5a00e66fe0e94fa35e1bab5ec3f1986d60914b02 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 19:29:07 -0700 Subject: [PATCH 2/6] Add assertions in order to reproduce SPARK-7858 --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index a500269f3cdcf..33c8fd6fec266 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -38,7 +38,9 @@ object RDDConversions { } else { val bufferedIterator = iterator.buffered val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType)) - val schemaFields = schema.fields.toArray + val schemaFields = schema.fields + assert(mutableRow.length == schemaFields.length, + s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}") val converters = schemaFields.map { f => CatalystTypeConverters.createToCatalystConverter(f.dataType) } @@ -65,7 +67,9 @@ object RDDConversions { } else { val bufferedIterator = iterator.buffered val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray) - val schemaFields = schema.fields.toArray + val schemaFields = schema.fields + assert(mutableRow.length == schemaFields.length, + s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}") val converters = schemaFields.map { f => CatalystTypeConverters.createToCatalystConverter(f.dataType) } From 6cd73666d82f4d1ab90289d3ae1156f2ea0e0c10 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 19:39:43 -0700 Subject: [PATCH 3/6] Fix SPARK-7858 by using output types for conversion. --- .../org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/ExistingRDD.scala | 28 ++++++++----------- .../sql/sources/DataSourceStrategy.scala | 2 +- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1ea596dddff02..3935f7b321b85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext) SparkPlan.currentContext.set(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - val rowRDD = RDDConversions.productToRowRdd(rdd, schema) + val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 33c8fd6fec266..04878ace07a41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLContext} /** @@ -31,19 +31,16 @@ import org.apache.spark.sql.{Row, SQLContext} */ @DeveloperApi object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): RDD[Row] = { + def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = { data.mapPartitions { iterator => if (iterator.isEmpty) { Iterator.empty } else { val bufferedIterator = iterator.buffered - val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType)) - val schemaFields = schema.fields - assert(mutableRow.length == schemaFields.length, - s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}") - val converters = schemaFields.map { - f => CatalystTypeConverters.createToCatalystConverter(f.dataType) - } + val mutableRow = new SpecificMutableRow(outputTypes) + assert(mutableRow.length == outputTypes.length, + s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}") + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) bufferedIterator.map { r => var i = 0 while (i < mutableRow.length) { @@ -60,19 +57,16 @@ object RDDConversions { /** * Convert the objects inside Row into the types Catalyst expected. */ - def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = { + def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = { data.mapPartitions { iterator => if (iterator.isEmpty) { Iterator.empty } else { val bufferedIterator = iterator.buffered - val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray) - val schemaFields = schema.fields - assert(mutableRow.length == schemaFields.length, - s"Input row has ${mutableRow.length} fields but schema has ${schemaFields.length}") - val converters = schemaFields.map { - f => CatalystTypeConverters.createToCatalystConverter(f.dataType) - } + val mutableRow = new SpecificMutableRow(outputTypes) + assert(mutableRow.length == outputTypes.length, + s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}") + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) bufferedIterator.map { r => var i = 0 while (i < mutableRow.length) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index dacd967cff856..c6a4dabbab05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { output: Seq[Attribute], rdd: RDD[Row]): SparkPlan = { val converted = if (relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, relation.schema) + execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) } else { rdd } From 2169a0f5a25a6f857cad025360f1a4dc59c31222 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 20:03:35 -0700 Subject: [PATCH 4/6] Remove use of SpecificMutableRow and BufferedIterator --- .../spark/sql/execution/ExistingRDD.scala | 60 ++++++++----------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 04878ace07a41..9749702392cb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -21,9 +21,9 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SQLContext} /** @@ -33,23 +33,19 @@ import org.apache.spark.sql.{Row, SQLContext} object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = { data.mapPartitions { iterator => - if (iterator.isEmpty) { - Iterator.empty - } else { - val bufferedIterator = iterator.buffered - val mutableRow = new SpecificMutableRow(outputTypes) - assert(mutableRow.length == outputTypes.length, - s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}") - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) - bufferedIterator.map { r => - var i = 0 - while (i < mutableRow.length) { - mutableRow(i) = converters(i)(r.productElement(i)) - i += 1 - } - - mutableRow + val numColumns = outputTypes.length + val mutableRow = new GenericMutableRow(numColumns) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + iterator.map { r => + assert (r.productArity == numColumns, + s"Expected row with $numColumns but got ${r.productArity} instead") + var i = 0 + while (i < numColumns) { + mutableRow(i) = converters(i)(r.productElement(i)) + i += 1 } + + mutableRow } } } @@ -59,23 +55,19 @@ object RDDConversions { */ def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = { data.mapPartitions { iterator => - if (iterator.isEmpty) { - Iterator.empty - } else { - val bufferedIterator = iterator.buffered - val mutableRow = new SpecificMutableRow(outputTypes) - assert(mutableRow.length == outputTypes.length, - s"Input row has ${mutableRow.length} fields but outputTypes has ${outputTypes.length}") - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) - bufferedIterator.map { r => - var i = 0 - while (i < mutableRow.length) { - mutableRow(i) = converters(i)(r(i)) - i += 1 - } - - mutableRow + val numColumns = outputTypes.length + val mutableRow = new GenericMutableRow(numColumns) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + iterator.map { r => + assert (r.length == numColumns, + s"Expected row with $numColumns but got ${r.length} instead") + var i = 0 + while (i < numColumns) { + mutableRow(i) = converters(i)(r(i)) + i += 1 } + + mutableRow } } } From 56b13e58b4020c73440d920df077b5eaa20e07eb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 23:00:47 -0700 Subject: [PATCH 5/6] Add regression test to hadoopFsRelationSuites --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 4 ---- .../org/apache/spark/sql/sources/DataSourceStrategy.scala | 2 +- .../apache/spark/sql/sources/hadoopFsRelationSuites.scala | 6 ++++++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9749702392cb3..f931dc95ef575 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -37,8 +37,6 @@ object RDDConversions { val mutableRow = new GenericMutableRow(numColumns) val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) iterator.map { r => - assert (r.productArity == numColumns, - s"Expected row with $numColumns but got ${r.productArity} instead") var i = 0 while (i < numColumns) { mutableRow(i) = converters(i)(r.productElement(i)) @@ -59,8 +57,6 @@ object RDDConversions { val mutableRow = new GenericMutableRow(numColumns) val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) iterator.map { r => - assert (r.length == numColumns, - s"Expected row with $numColumns but got ${r.length} instead") var i = 0 while (i < numColumns) { mutableRow(i) = converters(i)(r(i)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index c6a4dabbab05e..a147e1491223d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { output: Seq[Attribute], rdd: RDD[Row]): SparkPlan = { val converted = if (relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) + execution.RDDConversions.rowToRowRdd(rdd, relation.schema.map(_.dataType)) } else { rdd } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 70328e1ef810d..7c02d563f8d9a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -76,6 +76,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { df.filter('a > 1 && 'p1 < 2).select('b, 'p1), for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + // Project many copies of columns with different types (reproduction for SPARK-7858) + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) + yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1)) + // Self-join df.registerTempTable("t") withTempTable("t") { From e71c8664f298e62ffbaeb04eb374d758b62696cd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 23:01:48 -0700 Subject: [PATCH 6/6] Re-fix bug so that the tests pass again --- .../scala/org/apache/spark/sql/sources/DataSourceStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index a147e1491223d..c6a4dabbab05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { output: Seq[Attribute], rdd: RDD[Row]): SparkPlan = { val converted = if (relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, relation.schema.map(_.dataType)) + execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) } else { rdd }