Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

/**
Expand All @@ -41,10 +39,23 @@ class JDBCOptions(
JDBCOptions.JDBC_TABLE_NAME -> table)))
}

/**
* Returns a property with all options.
*/
val asProperties: Properties = {
val properties = new Properties()
parameters.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}

/**
* Returns a property with all options except Spark internal data source options like `url`,
* `dbtable`, and `numPartition`. This should be used when invoking JDBC API like `Driver.connect`
* because each DBMS vendor has its own property list for JDBC driver. See SPARK-17776.
*/
val asConnectionProperties: Properties = {
val properties = new Properties()
// We should avoid to pass the options into properties. See SPARK-17776.
parameters.filterKeys(!jdbcOptionNames.contains(_))
parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase))
.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}
Expand Down Expand Up @@ -126,10 +137,10 @@ class JDBCOptions(
}

object JDBCOptions {
private val jdbcOptionNames = ArrayBuffer.empty[String]
private val jdbcOptionNames = collection.mutable.Set[String]()

private def newOption(name: String): String = {
jdbcOptionNames += name
jdbcOptionNames += name.toLowerCase
name
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ object JDBCRDD extends Logging {
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
val properties = options.asConnectionProperties
val dialect = JdbcDialects.get(url)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused.

val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val url = jdbcOptions.url
val table = jdbcOptions.table
val properties = jdbcOptions.asConnectionProperties
val properties = jdbcOptions.asProperties
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this should not use asConnectionProperties. writer needs options like numPartitions.

data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2)
assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1)
}

test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") {
val parameters = Map(
"url" -> "jdbc:mysql://localhost:3306/temp",
"dbtable" -> "t1",
"numPartitions" -> "10")
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
}
}