Skip to content

Commit b52ac0e

Browse files
ash211rxin
authored andcommitted
SPARK-1757 Failing test for saving null primitives with .saveAsParquetFile()
https://issues.apache.org/jira/browse/SPARK-1757 The first test succeeds, but the second test fails with exception: ``` [info] - save and load case class RDD with Nones as parquet *** FAILED *** (14 milliseconds) [info] java.lang.RuntimeException: Unsupported datatype StructType(List()) [info] at scala.sys.package$.error(package.scala:27) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetRelation.scala:201) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.immutable.List.foreach(List.scala:318) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:105) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetRelation.scala:234) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetRelation.scala:267) [info] at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:143) [info] at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:122) [info] at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:139) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:268) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:268) [info] at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:66) [info] at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:98) ``` Author: Andrew Ash <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#690 from ash211/rdd-parquet-save and squashes the following commits: 747a0b9 [Andrew Ash] Merge pull request #1 from marmbrus/pr/690 54bd00e [Michael Armbrust] Need to put Option first since Option <: Seq. 8f3f281 [Andrew Ash] SPARK-1757 Add failing test for saving SparkSQL Schemas with Option[?] fields as parquet (cherry picked from commit 156df87) Signed-off-by: Reynold Xin <[email protected]>
1 parent 89b56d7 commit b52ac0e

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ object ScalaReflection {
4141

4242
/** Returns a catalyst DataType for the given Scala Type using reflection. */
4343
def schemaFor(tpe: `Type`): DataType = tpe match {
44+
case t if t <:< typeOf[Option[_]] =>
45+
val TypeRef(_, _, Seq(optType)) = t
46+
schemaFor(optType)
4447
case t if t <:< typeOf[Product] =>
4548
val params = t.member("<init>": TermName).asMethod.paramss
4649
StructType(
@@ -59,9 +62,6 @@ object ScalaReflection {
5962
case t if t <:< typeOf[String] => StringType
6063
case t if t <:< typeOf[Timestamp] => TimestampType
6164
case t if t <:< typeOf[BigDecimal] => DecimalType
62-
case t if t <:< typeOf[Option[_]] =>
63-
val TypeRef(_, _, Seq(optType)) = t
64-
schemaFor(optType)
6565
case t if t <:< typeOf[java.lang.Integer] => IntegerType
6666
case t if t <:< typeOf[java.lang.Long] => LongType
6767
case t if t <:< typeOf[java.lang.Double] => DoubleType

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,20 @@ import org.apache.spark.sql.test.TestSQLContext._
4242

4343
case class TestRDDEntry(key: Int, value: String)
4444

45+
case class NullReflectData(
46+
intField: java.lang.Integer,
47+
longField: java.lang.Long,
48+
floatField: java.lang.Float,
49+
doubleField: java.lang.Double,
50+
booleanField: java.lang.Boolean)
51+
52+
case class OptionalReflectData(
53+
intField: Option[Int],
54+
longField: Option[Long],
55+
floatField: Option[Float],
56+
doubleField: Option[Double],
57+
booleanField: Option[Boolean])
58+
4559
class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
4660
import TestData._
4761
TestData // Load test data tables.
@@ -195,5 +209,35 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
195209
Utils.deleteRecursively(ParquetTestData.testDir)
196210
ParquetTestData.writeFile()
197211
}
212+
213+
test("save and load case class RDD with nulls as parquet") {
214+
val data = NullReflectData(null, null, null, null, null)
215+
val rdd = sparkContext.parallelize(data :: Nil)
216+
217+
val file = getTempFilePath("parquet")
218+
val path = file.toString
219+
rdd.saveAsParquetFile(path)
220+
val readFile = parquetFile(path)
221+
222+
val rdd_saved = readFile.collect()
223+
assert(rdd_saved(0) === Seq.fill(5)(null))
224+
Utils.deleteRecursively(file)
225+
assert(true)
226+
}
227+
228+
test("save and load case class RDD with Nones as parquet") {
229+
val data = OptionalReflectData(null, null, null, null, null)
230+
val rdd = sparkContext.parallelize(data :: Nil)
231+
232+
val file = getTempFilePath("parquet")
233+
val path = file.toString
234+
rdd.saveAsParquetFile(path)
235+
val readFile = parquetFile(path)
236+
237+
val rdd_saved = readFile.collect()
238+
assert(rdd_saved(0) === Seq.fill(5)(null))
239+
Utils.deleteRecursively(file)
240+
assert(true)
241+
}
198242
}
199243

0 commit comments

Comments
 (0)