Skip to content

Commit d9b33ea

Browse files
committed
Apply code review comments
- Use SparkUnsupportedOperationException - Remove unused string interpolation - Fix indentation
1 parent a6f8ed8 commit d9b33ea

File tree

4 files changed

+18
-10
lines changed

4 files changed

+18
-10
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6046,6 +6046,11 @@
60466046
"Update column nullability for MySQL and MS SQL Server."
60476047
]
60486048
},
6049+
"UPSERT" : {
6050+
"message" : [
6051+
"Upsert not supported by JDBC dialect <class>."
6052+
]
6053+
},
60496054
"WRITE_FOR_BINARY_SOURCE" : {
60506055
"message" : [
60516056
"Write for the binary file data source."

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/UpsertTests.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ trait UpsertTests {
3131
def createTableOption: String
3232
def upsertTestOptions: Map[String, String] = Map("createTableOptions" -> createTableOption)
3333

34-
test(s"Upsert existing table") { doTestUpsert(tableExists = true) }
35-
test(s"Upsert non-existing table") { doTestUpsert(tableExists = false) }
34+
test("Upsert existing table") { doTestUpsert(tableExists = true) }
35+
test("Upsert non-existing table") { doTestUpsert(tableExists = false) }
3636

3737
Seq(
3838
Seq("ts", "id", "v1", "v2"),
@@ -80,7 +80,7 @@ trait UpsertTests {
8080
assert(actual === expected)
8181
}
8282

83-
test(s"Upsert concurrency") {
83+
test("Upsert concurrency") {
8484
// create a table with 100k rows
8585
val init =
8686
spark.range(100000)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ object JdbcUtils extends Logging with SQLConfHelper {
145145
}
146146

147147
def getUpsertStatement(
148-
table: String,
149-
rddSchema: StructType,
150-
tableSchema: Option[StructType],
151-
isCaseSensitive: Boolean,
152-
dialect: JdbcDialect,
153-
options: JDBCOptions): String = {
148+
table: String,
149+
rddSchema: StructType,
150+
tableSchema: Option[StructType],
151+
isCaseSensitive: Boolean,
152+
dialect: JdbcDialect,
153+
options: JDBCOptions): String = {
154154
val columns = getInsertColumns(table, rddSchema, tableSchema, isCaseSensitive, dialect)
155155
dialect.getUpsertStatement(table, columns, isCaseSensitive, options)
156156
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,10 @@ abstract class JdbcDialect extends Serializable with Logging {
345345
columns: Array[StructField],
346346
isCaseSensitive: Boolean,
347347
options: JDBCOptions): String =
348-
throw new UnsupportedOperationException("upserts are not supported")
348+
throw new SparkUnsupportedOperationException(
349+
errorClass = "UNSUPPORTED_FEATURE.UPSERT",
350+
messageParameters = Map(
351+
"class" -> this.getClass.getSimpleName))
349352

350353
/**
351354
* Override connection specific properties to run before a select is made. This is in place to

0 commit comments

Comments
 (0)