diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index b4248b74f50ab..a627e5265723f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -210,7 +210,7 @@ private[sql] trait SQLTestUtilsBase * * @todo Probably this method should be moved to a more general place */ - protected def withTempDir(f: File => Unit): Unit = { + protected def withTempDir[A](f: File => A): A = { val dir = Utils.createTempDir().getCanonicalFile try f(dir) finally { // wait for all tasks to finish before deleting files diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 2a086be57f517..249a299646f26 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -18,18 +18,26 @@ package org.apache.spark.sql.hive.orc import java.io.File -import java.util.Locale +import java.util.{Locale, Properties} +import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcStruct} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapred.JobConf import org.apache.orc.OrcConf.COMPRESS -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, Matchers} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.rdd.HadoopRDD +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.execution.datasources.orc.OrcOptions +import org.apache.spark.sql.hive.HadoopTableReader +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{SerializableConfiguration, Utils} case class OrcData(intField: Int, stringField: String) @@ -225,7 +233,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } } -class OrcSourceSuite extends OrcSuite { +class OrcSourceSuite extends OrcSuite with SQLTestUtils with Matchers { override def beforeAll(): Unit = { super.beforeAll() @@ -278,4 +286,93 @@ class OrcSourceSuite extends OrcSuite { )).get.toString } } + + test("SPARK-22267 Spark SQL correctly reads parquet files when column order is different") { + checkReadDataFrameFromFile("parquet", spark.read.parquet, + HiveUtils.CONVERT_METASTORE_PARQUET.key) + } + + test("SPARK-22267 Spark SQL incorrectly reads ORC files when column order is different") { + checkReadDataFrameFromFile("orc", spark.read.orc, HiveUtils.CONVERT_METASTORE_ORC.key) + } + + def checkReadDataFrameFromFile(format: String, read: String => DataFrame, + convertConfigKey: String): Unit = { + withTempDir { dir => + import spark.implicits._ + + val path = dir.getCanonicalPath + + Seq(1 -> 2).toDF("c1", "c2").write.format(format).mode("overwrite").save(path) + checkAnswer(read(path), Row(1, 2)) + + Seq("true", "false").foreach { value => + withTable("t") { + withSQLConf(convertConfigKey -> value) { + sql(s"CREATE EXTERNAL TABLE t(c2 INT, c1 INT) STORED AS $format LOCATION '$path'") + checkAnswer(spark.table("t"), Row(2, 1)) + } + } + } + } + } + + test("SPARK-22267 HadoopRDD incorrectly reads ORC files when column order is different") { + for { + serializationDdl <- Seq("struct t { i32 c2, i32 c1}", "struct t { i32 c1, i32 c2}") + (columns, expected) <- Seq("c2,c1" -> "{2, 1}", "c1,c2" -> "{1, 2}") + } yield { + readOrcFile(serializationDdl, columns) shouldBe expected + } + } + + def readOrcFile(serializationDdl: String, columns: String): String = { + withTempDir { dir => + import spark.implicits._ + val path = dir.getCanonicalPath + Seq(1 -> 2).toDF("c1", "c2").write.format("orc").mode("overwrite").save(path) + + val properties = new Properties + // scalastyle:off ensure.single.space.before.token + Map( + "name" -> "default.t", + "columns.types" -> "int:int", + "serialization.ddl" -> serializationDdl, + "serialization.format" -> "1", + "columns" -> columns, + "columns.comments" -> "", + "bucket_count" -> "-1", + "EXTERNAL" -> "TRUE", + "serialization.lib" -> "org.apache.hadoop.hive.ql.io.orc.OrcSerde", + "file.inputformat" -> "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "file.outputformat" -> "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + "location" -> path, + "transient_lastDdlTime" -> "1510594468" + ).foreach(Function.tupled(properties.setProperty)) + // scalastyle:on ensure.single.space.before.token + + val (inputFormatClass, outputFormatClass) = + (classOf[OrcInputFormat], classOf[OrcOutputFormat]) + val tableDesc1 = new TableDesc(inputFormatClass, outputFormatClass, properties) + + val initializeJobConfFunc = { jobConf: JobConf => + HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc1)(jobConf) + } + + val broadCastedConf = + sparkContext.broadcast(new SerializableConfiguration(sparkContext.hadoopConfiguration)) + + val hadoopRDD = new HadoopRDD[NullWritable, OrcStruct]( + sparkContext, + broadCastedConf, + Some(initializeJobConfFunc), + inputFormatClass, + classOf[NullWritable], + classOf[OrcStruct], + 0) + + hadoopRDD.map(_._2).map(_.toString).collect().mkString(", ") + } + } + }