@@ -36,6 +36,20 @@ case class ParquetData(intField: Int, stringField: String)
3636// The data that also includes the partitioning key
3737case class ParquetDataWithKey (p : Int , intField : Int , stringField : String )
3838
39+ case class StructContainer (intStructField : Int , stringStructField : String )
40+
41+ case class ParquetDataWithComplexTypes (
42+ intField : Int ,
43+ stringField : String ,
44+ structField : StructContainer ,
45+ arrayField : Seq [Int ])
46+
47+ case class ParquetDataWithKeyAndComplexTypes (
48+ p : Int ,
49+ intField : Int ,
50+ stringField : String ,
51+ structField : StructContainer ,
52+ arrayField : Seq [Int ])
3953
4054/**
4155 * A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -86,6 +100,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
86100 location ' ${new File (normalTableDir, " normal" ).getCanonicalPath}'
87101 """ )
88102
103+ sql(s """
104+ CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
105+ (
106+ intField INT,
107+ stringField STRING,
108+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
109+ arrayField ARRAY<INT>
110+ )
111+ PARTITIONED BY (p int)
112+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
113+ STORED AS
114+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
115+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
116+ LOCATION ' ${partitionedTableDirWithComplexTypes.getCanonicalPath}'
117+ """ )
118+
119+ sql(s """
120+ CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
121+ (
122+ intField INT,
123+ stringField STRING,
124+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
125+ arrayField ARRAY<INT>
126+ )
127+ PARTITIONED BY (p int)
128+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
129+ STORED AS
130+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
131+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
132+ LOCATION ' ${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
133+ """ )
134+
89135 (1 to 10 ).foreach { p =>
90136 sql(s " ALTER TABLE partitioned_parquet ADD PARTITION (p= $p) " )
91137 }
@@ -94,7 +140,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
94140 sql(s " ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p= $p) " )
95141 }
96142
97- val rdd1 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str ${i}"} """ ))
143+ (1 to 10 ).foreach { p =>
144+ sql(s " ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p= $p) " )
145+ }
146+
147+ (1 to 10 ).foreach { p =>
148+ sql(s " ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p= $p) " )
149+ }
150+
151+ val rdd1 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str $i"} """ ))
98152 jsonRDD(rdd1).registerTempTable(" jt" )
99153 val rdd2 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a":[ $i, null]} """ ))
100154 jsonRDD(rdd2).registerTempTable(" jt_array" )
@@ -105,6 +159,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
105159 override def afterAll (): Unit = {
106160 sql(" DROP TABLE partitioned_parquet" )
107161 sql(" DROP TABLE partitioned_parquet_with_key" )
162+ sql(" DROP TABLE partitioned_parquet_with_complextypes" )
163+ sql(" DROP TABLE partitioned_parquet_with_key_and_complextypes" )
108164 sql(" DROP TABLE normal_parquet" )
109165 sql(" DROP TABLE IF EXISTS jt" )
110166 sql(" DROP TABLE IF EXISTS jt_array" )
@@ -409,6 +465,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
409465 path ' ${new File (partitionedTableDir, " p=1" ).getCanonicalPath}'
410466 )
411467 """ )
468+
469+ sql( s """
470+ CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
471+ USING org.apache.spark.sql.parquet
472+ OPTIONS (
473+ path ' ${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
474+ )
475+ """ )
476+
477+ sql( s """
478+ CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
479+ USING org.apache.spark.sql.parquet
480+ OPTIONS (
481+ path ' ${partitionedTableDirWithComplexTypes.getCanonicalPath}'
482+ )
483+ """ )
412484 }
413485
414486 test(" SPARK-6016 make sure to use the latest footers" ) {
@@ -473,7 +545,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
473545 var partitionedTableDir : File = null
474546 var normalTableDir : File = null
475547 var partitionedTableDirWithKey : File = null
476-
548+ var partitionedTableDirWithComplexTypes : File = null
549+ var partitionedTableDirWithKeyAndComplexTypes : File = null
477550
478551 override def beforeAll (): Unit = {
479552 partitionedTableDir = File .createTempFile(" parquettests" , " sparksql" )
@@ -509,9 +582,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
509582 .toDF()
510583 .saveAsParquetFile(partDir.getCanonicalPath)
511584 }
585+
586+ partitionedTableDirWithKeyAndComplexTypes = File .createTempFile(" parquettests" , " sparksql" )
587+ partitionedTableDirWithKeyAndComplexTypes.delete()
588+ partitionedTableDirWithKeyAndComplexTypes.mkdir()
589+
590+ (1 to 10 ).foreach { p =>
591+ val partDir = new File (partitionedTableDirWithKeyAndComplexTypes, s " p= $p" )
592+ sparkContext.makeRDD(1 to 10 ).map { i =>
593+ ParquetDataWithKeyAndComplexTypes (
594+ p, i, s " part- $p" , StructContainer (i, f " ${i}_string " ), 1 to i)
595+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
596+ }
597+
598+ partitionedTableDirWithComplexTypes = File .createTempFile(" parquettests" , " sparksql" )
599+ partitionedTableDirWithComplexTypes.delete()
600+ partitionedTableDirWithComplexTypes.mkdir()
601+
602+ (1 to 10 ).foreach { p =>
603+ val partDir = new File (partitionedTableDirWithComplexTypes, s " p= $p" )
604+ sparkContext.makeRDD(1 to 10 ).map { i =>
605+ ParquetDataWithComplexTypes (i, s " part- $p" , StructContainer (i, f " ${i}_string " ), 1 to i)
606+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
607+ }
608+ }
609+
610+ override protected def afterAll (): Unit = {
611+ partitionedTableDir.delete()
612+ normalTableDir.delete()
613+ partitionedTableDirWithKey.delete()
614+ partitionedTableDirWithComplexTypes.delete()
615+ partitionedTableDirWithKeyAndComplexTypes.delete()
512616 }
513617
514- Seq (" partitioned_parquet" , " partitioned_parquet_with_key" ).foreach { table =>
618+ Seq (
619+ " partitioned_parquet" ,
620+ " partitioned_parquet_with_key" ,
621+ " partitioned_parquet_with_complextypes" ,
622+ " partitioned_parquet_with_key_and_complextypes" ).foreach { table =>
623+
515624 test(s " ordering of the partitioning columns $table" ) {
516625 checkAnswer(
517626 sql(s " SELECT p, stringField FROM $table WHERE p = 1 " ),
@@ -601,6 +710,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
601710 }
602711 }
603712
713+ Seq (
714+ " partitioned_parquet_with_key_and_complextypes" ,
715+ " partitioned_parquet_with_complextypes" ).foreach { table =>
716+
717+ test(s " SPARK-5775 read struct from $table" ) {
718+ checkAnswer(
719+ sql(s " SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1 " ),
720+ (1 to 10 ).map(i => Row (1 , i, f " ${i}_string " )))
721+ }
722+
723+ // Re-enable this after SPARK-5508 is fixed
724+ ignore(s " SPARK-5775 read array from $table" ) {
725+ checkAnswer(
726+ sql(s " SELECT arrayField, p FROM $table WHERE p = 1 " ),
727+ (1 to 10 ).map(i => Row (1 to i, 1 )))
728+ }
729+ }
730+
731+
604732 test(" non-part select(*)" ) {
605733 checkAnswer(
606734 sql(" SELECT COUNT(*) FROM normal_parquet" ),
0 commit comments