From 88f7635fd783629ea9e3b712d52460aa5323b6e8 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 20 Nov 2020 22:06:06 +0300 Subject: [PATCH 1/5] Add a test --- .../sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ddafa1bb5070a..b13039a3876fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils class DataSourceV2SQLSuite @@ -2513,6 +2514,21 @@ class DataSourceV2SQLSuite } } + test("SPARK-33505: insert into partitioned table") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + sql(s""" + |CREATE TABLE $t (id bigint, name string, data string) + |USING foo + |PARTITIONED BY (id, name)""".stripMargin) + sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'abc'") + + val partTable = catalog("testpart").asTableCatalog + .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] + assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1, UTF8String.fromString("NY"))))) + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From ec15f2aba12104832c7feb85cf9f4d54d2fbb30c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 20 Nov 2020 22:10:36 +0300 Subject: [PATCH 2/5] Fix the test --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index b13039a3876fd..1dbfc80d0fed6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2518,9 +2518,9 @@ class DataSourceV2SQLSuite val t = "testpart.ns1.ns2.tbl" withTable(t) { sql(s""" - |CREATE TABLE $t (id bigint, name string, data string) + |CREATE TABLE $t (id bigint, city string, data string) |USING foo - |PARTITIONED BY (id, name)""".stripMargin) + |PARTITIONED BY (id, city)""".stripMargin) sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'abc'") val partTable = catalog("testpart").asTableCatalog From dc0c6d8f2a00ca094cb192cd3b0b22436edbf149 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 20 Nov 2020 22:11:04 +0300 Subject: [PATCH 3/5] Fix InMemoryPartitionTable --- .../apache/spark/sql/connector/InMemoryPartitionTable.scala | 4 ++++ .../scala/org/apache/spark/sql/connector/InMemoryTable.scala | 3 +++ 2 files changed, 7 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index 1c96bdf3afa20..23987e909aa70 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -92,4 +92,8 @@ class InMemoryPartitionTable( override def partitionExists(ident: InternalRow): Boolean = memoryTablePartitions.containsKey(ident) + + override protected def addPartitionKey(key: Seq[Any]): Unit = { + memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index 3b47271a114e2..c93053abc550a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -160,12 +160,15 @@ class InMemoryTable( } } + protected def addPartitionKey(key: Seq[Any]): Unit = {} + def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => val key = getKey(row) dataMap += dataMap.get(key) .map(key -> _.withRow(row)) .getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row)) + addPartitionKey(key) }) this } From ff0f1f8496cf00243ff90a8132a62467aa80f217 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 20 Nov 2020 23:34:45 +0300 Subject: [PATCH 4/5] import InternalRow --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index dd127e15bf5b1..80e4785bd639d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog._ From 362e8b86ae18d5833c281717924a778e1108085a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 00:00:43 +0300 Subject: [PATCH 5/5] Check that insert into the existing partition does not fail --- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 80e4785bd639d..89f97fe5be6a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2469,11 +2469,15 @@ class DataSourceV2SQLSuite |CREATE TABLE $t (id bigint, city string, data string) |USING foo |PARTITIONED BY (id, city)""".stripMargin) - sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'abc'") - val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1, UTF8String.fromString("NY"))))) + val expectedPartitionIdent = InternalRow.fromSeq(Seq(1, UTF8String.fromString("NY"))) + assert(!partTable.partitionExists(expectedPartitionIdent)) + sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'abc'") + assert(partTable.partitionExists(expectedPartitionIdent)) + // Insert into the existing partition must not fail + sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'def'") + assert(partTable.partitionExists(expectedPartitionIdent)) } }