From cd06c0f661e4beb6691ffa54e3c92104695bb65c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 12 Nov 2016 05:44:47 -0800 Subject: [PATCH 1/3] [SPARK-18419][SQL] Fix JDBCOptions.asConnectionProperties to be case-insensitive --- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 4 ++-- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 1 - .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 ++++++++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 56cd17816f7bd..6d4d0a80fedaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -44,7 +44,7 @@ class JDBCOptions( val asConnectionProperties: Properties = { val properties = new Properties() // We should avoid to pass the options into properties. See SPARK-17776. - parameters.filterKeys(!jdbcOptionNames.contains(_)) + parameters.filterKeys(key => !jdbcOptionNames.contains(key.toLowerCase)) .foreach { case (k, v) => properties.setProperty(k, v) } properties } @@ -129,7 +129,7 @@ object JDBCOptions { private val jdbcOptionNames = ArrayBuffer.empty[String] private def newOption(name: String): String = { - jdbcOptionNames += name + jdbcOptionNames += name.toLowerCase name } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 37df283a9e5b2..d5b11e7bec0bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -54,7 +54,6 @@ object JDBCRDD extends Logging { def resolveTable(options: JDBCOptions): StructType = { val url = options.url val table = options.table - val properties = options.asConnectionProperties val dialect = JdbcDialects.get(url) val conn: Connection = JdbcUtils.createConnectionFactory(options)() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index aa1ab141a4ec8..4c964bf1b3ac4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2) assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1) } + + test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") { + val parameters = Map( + "url" -> "jdbc:mysql://localhost:3306/temp", + "dbtable" -> "t1", + "numPartitions" -> "10") + assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty) + assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) + } } From 950cab0e11fdcd5c354430a83c158211fb25ca42 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 1 Dec 2016 12:07:31 -0800 Subject: [PATCH 2/3] JDBCRelation.insert should not use `asConnectionProperties`. --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 6 ++++++ .../spark/sql/execution/datasources/jdbc/JDBCRelation.scala | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 6d4d0a80fedaa..519a656e03f2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -41,6 +41,12 @@ class JDBCOptions( JDBCOptions.JDBC_TABLE_NAME -> table))) } + val asProperties: Properties = { + val properties = new Properties() + parameters.foreach { case (k, v) => properties.setProperty(k, v) } + properties + } + val asConnectionProperties: Properties = { val properties = new Properties() // We should avoid to pass the options into properties. See SPARK-17776. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 5ca1c7543cfa7..8b45dba04d29e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -131,7 +131,7 @@ private[sql] case class JDBCRelation( override def insert(data: DataFrame, overwrite: Boolean): Unit = { val url = jdbcOptions.url val table = jdbcOptions.table - val properties = jdbcOptions.asConnectionProperties + val properties = jdbcOptions.asProperties data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .jdbc(url, table, properties) From 2375f7fc8d6e3ad879c129dc007c37eeeca3990e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 1 Dec 2016 21:03:16 -0800 Subject: [PATCH 3/3] Use `Set` and add docs. --- .../execution/datasources/jdbc/JDBCOptions.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 519a656e03f2d..6fd2e0d24112b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, DriverManager} import java.util.Properties -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** @@ -41,16 +39,23 @@ class JDBCOptions( JDBCOptions.JDBC_TABLE_NAME -> table))) } + /** + * Returns a property with all options. + */ val asProperties: Properties = { val properties = new Properties() parameters.foreach { case (k, v) => properties.setProperty(k, v) } properties } + /** + * Returns a property with all options except Spark internal data source options like `url`, + * `dbtable`, and `numPartition`. This should be used when invoking JDBC API like `Driver.connect` + * because each DBMS vendor has its own property list for JDBC driver. See SPARK-17776. + */ val asConnectionProperties: Properties = { val properties = new Properties() - // We should avoid to pass the options into properties. See SPARK-17776. - parameters.filterKeys(key => !jdbcOptionNames.contains(key.toLowerCase)) + parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase)) .foreach { case (k, v) => properties.setProperty(k, v) } properties } @@ -132,7 +137,7 @@ class JDBCOptions( } object JDBCOptions { - private val jdbcOptionNames = ArrayBuffer.empty[String] + private val jdbcOptionNames = collection.mutable.Set[String]() private def newOption(name: String): String = { jdbcOptionNames += name.toLowerCase