From 49606c8acf183ee5f657c09a76399546c92ab28f Mon Sep 17 00:00:00 2001 From: shivsood Date: Sat, 13 Jul 2019 12:18:01 -0700 Subject: [PATCH 1/2] Fix for SPARK-28152:[JDBC Connector] ShortType and FloatTypes are not mapped correctly for read/write of SQLServer Tables ShortType and FloatTypes are not correctly mapped to right JDBC types when using JDBC connector. This results in tables and spark data frame being created with unintended types. The issue was observed when validating against SQLServer. Some example issue - Write from df with column type results in a SQL table of with column type as INTEGER as opposed to SMALLINT. Thus a larger table that expected. - Read results in a dataframe with type INTEGER as opposed to ShortType FloatTypes have a issue with read path. In the write path Spark data type 'FloatType' is correctly mapped to JDBC equivalent data type 'Real'. But in the read path when JDBC data types need to be converted to Catalyst data types ( getCatalystType) 'Real' gets incorrectly gets mapped to 'DoubleType' rather than 'FloatType'. Post fix ShortType is correctly mapped to SMALLINT and FloatType is mapped to REAL --- .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala | 12 ++++++------ .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 7 ++++++- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 82ce16c2b7e5a..efd7ca74c796b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -120,24 +120,24 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types.length == 12) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Integer")) - assert(types(2).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Short")) assert(types(3).equals("class java.lang.Integer")) assert(types(4).equals("class java.lang.Long")) assert(types(5).equals("class java.lang.Double")) - assert(types(6).equals("class java.lang.Double")) - assert(types(7).equals("class java.lang.Double")) + assert(types(6).equals("class java.lang.Float")) + assert(types(7).equals("class java.lang.Float")) assert(types(8).equals("class java.math.BigDecimal")) assert(types(9).equals("class java.math.BigDecimal")) assert(types(10).equals("class java.math.BigDecimal")) assert(types(11).equals("class java.math.BigDecimal")) assert(row.getBoolean(0) == false) assert(row.getInt(1) == 255) - assert(row.getInt(2) == 32767) + assert(row.getShort(2) == 32767) assert(row.getInt(3) == 2147483647) assert(row.getLong(4) == 9223372036854775807L) assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision - assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision - assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24) + assert(row.getFloat(6) == 1.23456788103168E14) // float(24) has 7-digits precision + assert(row.getFloat(7) == 1.23456788103168E14) // real = float(24) assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00"))) assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000"))) assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 29500cf2afbc0..805f73dee141b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -30,7 +30,11 @@ private object MsSqlServerDialect extends JdbcDialect { // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients Option(StringType) } else { - None + sqlType match { + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None + } } } @@ -39,6 +43,7 @@ private object MsSqlServerDialect extends JdbcDialect { case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR)) case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT)) case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY)) + case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a28f4e49ffe1a..0d9684cae3341 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -895,8 +895,20 @@ class JDBCSuite extends QueryTest "BIT") assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "VARBINARY(MAX)") + assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == + "SMALLINT") } + test("SPARK-28152 MsSqlServerDialect catalyst type mapping") { + val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver") + val metadata = new MetadataBuilder().putLong("scale", 1) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1, + metadata).get == ShortType) + assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, + metadata).get == FloatType) + } + + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") From b25007ea329941801cba84f196b72fe34807947b Mon Sep 17 00:00:00 2001 From: shivsood Date: Mon, 15 Jul 2019 09:51:39 -0700 Subject: [PATCH 2/2] Hygiene fix : removing the extra line --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0d9684cae3341..a47fc18cce9cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -908,7 +908,6 @@ class JDBCSuite extends QueryTest metadata).get == FloatType) } - test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")