Skip to content

Commit 2349175

Browse files
sadikovigengliangwang
authored andcommitted
[SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source
### What changes were proposed in this pull request? This PR adds support for TimestampNTZ (TIMESTAMP WITHOUT TIME ZONE) in JDBC data source. It also introduces a new configuration option `inferTimestampNTZType` which allows to read written timestamps as timestamp without time zone. By default this is set to `false`, i.e. all timestamps are read as legacy timestamp type. Here is the state of timestamp without time zone support in the built-in dialects: - H2: timestamp without time zone, seems to map to timestamp type - Derby: only has timestamp type - MySQL: only has timestamp type - Postgres: has timestamp without time zone, which maps to timestamp - SQL Server: only datetime/datetime2, neither are time zone aware - Oracle: seems to only have timestamp and timestamp with time zone - Teradata: similar to Oracle but I could not verify - DB2: has TIMESTAMP WITHOUT TIME ZONE but I could not make this type work in my test, only TIMESTAMP seems to work ### Why are the changes needed? Adds support for the new TimestampNTZ type, see https://issues.apache.org/jira/browse/SPARK-35662. ### Does this PR introduce _any_ user-facing change? JDBC data source is now capable of writing and reading TimestampNTZ types. When reading timestamp values, configuration option `inferTimestampNTZType` allows to infer those values as TIMESTAMP WITHOUT TIME ZONE. By default the option is set to `false` so the behaviour is unchanged and all timestamps are read TIMESTAMP WITH LOCAL TIME ZONE. ### How was this patch tested? I added a unit test to ensure the general functionality works. I also manually verified the write/read test for TimestampNTZ in the following databases (all I could get access to): - H2, `jdbc:h2:mem:testdb0` - Derby, `jdbc:derby:<filepath>` - MySQL, `docker run --name mysql -e MYSQL_ROOT_PASSWORD=secret -e MYSQL_DATABASE=db -e MYSQL_USER=user -e MYSQL_PASSWORD=secret -p 3306:3306 -d mysql:5.7`, `jdbc:mysql://127.0.0.1:3306/db?user=user&password=secret` - PostgreSQL, `docker run -d --name postgres -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=user -e POSTGRES_DB=db -p 5432:5432 postgres:12.11`, `jdbc:postgresql://127.0.0.1:5432/db?user=user&password=secret` - SQL Server, `docker run -e "ACCEPT_EULA=Y" -e SA_PASSWORD='yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-CU15-ubuntu-20.04`, `jdbc:sqlserver://127.0.0.1:1433;user=sa;password=yourStrong(!)Password` - DB2, ` docker run -itd --name mydb2 --privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=secret -e DBNAME=db ibmcom/db2`, `jdbc:db2://127.0.0.1:50000/db:user=db2inst1;password=secret;`. Closes #36726 from sadikovi/timestamp_ntz_jdbc. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 0594cad commit 2349175

File tree

6 files changed

+98
-12
lines changed

6 files changed

+98
-12
lines changed

docs/sql-data-sources-jdbc.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ logging into the data sources.
103103
<td>(none)</td>
104104
<td>
105105
A prefix that will form the final query together with <code>query</code>.
106-
As the specified <code>query</code> will be parenthesized as a subquery in the <code>FROM</code> clause and some databases do not
106+
As the specified <code>query</code> will be parenthesized as a subquery in the <code>FROM</code> clause and some databases do not
107107
support all clauses in subqueries, the <code>prepareQuery</code> property offers a way to run such complex queries.
108108
As an example, spark will issue a query of the following form to the JDBC Source.<br><br>
109109
<code>&lt;prepareQuery&gt; SELECT &lt;columns&gt; FROM (&lt;user_specified_query&gt;) spark_gen_alias</code><br><br>
@@ -340,10 +340,19 @@ logging into the data sources.
340340
<td>
341341
The name of the JDBC connection provider to use to connect to this URL, e.g. <code>db2</code>, <code>mssql</code>.
342342
Must be one of the providers loaded with the JDBC data source. Used to disambiguate when more than one provider can handle
343-
the specified driver and options. The selected provider must not be disabled by <code>spark.sql.sources.disabledJdbcConnProviderList</code>.
343+
the specified driver and options. The selected provider must not be disabled by <code>spark.sql.sources.disabledJdbcConnProviderList</code>.
344344
</td>
345345
<td>read/write</td>
346-
</tr>
346+
</tr>
347+
<tr>
348+
<td><code>inferTimestampNTZType</code></td>
349+
<td>false</td>
350+
<td>
351+
When the option is set to <code>true</code>, all timestamps are inferred as TIMESTAMP WITHOUT TIME ZONE.
352+
Otherwise, timestamps are read as TIMESTAMP with local time zone.
353+
</td>
354+
<td>read</td>
355+
</tr>
347356
</table>
348357

349358
Note that kerberos authentication with keytab is not always supported by the JDBC driver.<br>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ class JDBCOptions(
226226
// The prefix that is added to the query sent to the JDBC database.
227227
// This is required to support some complex queries with some JDBC databases.
228228
val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("")
229+
230+
// Infers timestamp values as TimestampNTZ type when reading data.
231+
val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean
229232
}
230233

231234
class JdbcOptionsInWrite(
@@ -287,4 +290,5 @@ object JDBCOptions {
287290
val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
288291
val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
289292
val JDBC_PREPARE_QUERY = newOption("prepareQuery")
293+
val JDBC_INFER_TIMESTAMP_NTZ = newOption("inferTimestampNTZType")
290294
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ object JDBCRDD extends Logging {
6767
statement.setQueryTimeout(options.queryTimeout)
6868
val rs = statement.executeQuery()
6969
try {
70-
JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
70+
JdbcUtils.getSchema(rs, dialect, alwaysNullable = true,
71+
isTimestampNTZ = options.inferTimestampNTZType)
7172
} finally {
7273
rs.close()
7374
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
3838
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
3939
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
4040
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
41-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
41+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
4242
import org.apache.spark.sql.connector.catalog.TableChange
4343
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
4444
import org.apache.spark.sql.connector.expressions.NamedReference
@@ -150,6 +150,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
150150
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
151151
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
152152
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
153+
// This is a common case of timestamp without time zone. Most of the databases either only
154+
// support TIMESTAMP type or use TIMESTAMP as an alias for TIMESTAMP WITHOUT TIME ZONE.
155+
// Note that some dialects override this setting, e.g. as SQL Server.
156+
case TimestampNTZType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
153157
case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
154158
case t: DecimalType => Option(
155159
JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
@@ -173,7 +177,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
173177
sqlType: Int,
174178
precision: Int,
175179
scale: Int,
176-
signed: Boolean): DataType = {
180+
signed: Boolean,
181+
isTimestampNTZ: Boolean): DataType = {
177182
val answer = sqlType match {
178183
// scalastyle:off
179184
case java.sql.Types.ARRAY => null
@@ -215,6 +220,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
215220
case java.sql.Types.TIME => TimestampType
216221
case java.sql.Types.TIME_WITH_TIMEZONE
217222
=> null
223+
case java.sql.Types.TIMESTAMP
224+
if isTimestampNTZ => TimestampNTZType
218225
case java.sql.Types.TIMESTAMP => TimestampType
219226
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
220227
=> null
@@ -243,7 +250,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
243250
conn.prepareStatement(options.prepareQuery + dialect.getSchemaQuery(options.tableOrQuery))
244251
try {
245252
statement.setQueryTimeout(options.queryTimeout)
246-
Some(getSchema(statement.executeQuery(), dialect))
253+
Some(getSchema(statement.executeQuery(), dialect,
254+
isTimestampNTZ = options.inferTimestampNTZType))
247255
} catch {
248256
case _: SQLException => None
249257
} finally {
@@ -258,13 +266,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
258266
* Takes a [[ResultSet]] and returns its Catalyst schema.
259267
*
260268
* @param alwaysNullable If true, all the columns are nullable.
269+
* @param isTimestampNTZ If true, all timestamp columns are interpreted as TIMESTAMP_NTZ.
261270
* @return A [[StructType]] giving the Catalyst schema.
262271
* @throws SQLException if the schema contains an unsupported type.
263272
*/
264273
def getSchema(
265274
resultSet: ResultSet,
266275
dialect: JdbcDialect,
267-
alwaysNullable: Boolean = false): StructType = {
276+
alwaysNullable: Boolean = false,
277+
isTimestampNTZ: Boolean = false): StructType = {
268278
val rsmd = resultSet.getMetaData
269279
val ncols = rsmd.getColumnCount
270280
val fields = new Array[StructField](ncols)
@@ -306,7 +316,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
306316

307317
val columnType =
308318
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
309-
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
319+
getCatalystType(dataType, fieldSize, fieldScale, isSigned, isTimestampNTZ))
310320
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
311321
i = i + 1
312322
}
@@ -463,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
463473
}
464474
}
465475

466-
case TimestampType =>
476+
case TimestampType | TimestampNTZType =>
467477
(rs: ResultSet, row: InternalRow, pos: Int) =>
468478
val t = rs.getTimestamp(pos + 1)
469479
if (t != null) {
@@ -583,6 +593,18 @@ object JdbcUtils extends Logging with SQLConfHelper {
583593
stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
584594
}
585595

596+
case TimestampNTZType =>
597+
if (conf.datetimeJava8ApiEnabled) {
598+
(stmt: PreparedStatement, row: Row, pos: Int) =>
599+
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
600+
} else {
601+
(stmt: PreparedStatement, row: Row, pos: Int) =>
602+
stmt.setTimestamp(
603+
pos + 1,
604+
toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
605+
)
606+
}
607+
586608
case DateType =>
587609
if (conf.datetimeJava8ApiEnabled) {
588610
(stmt: PreparedStatement, row: Row, pos: Int) =>

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ private object MsSqlServerDialect extends JdbcDialect {
9898

9999
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
100100
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
101+
case TimestampNTZType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
101102
case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
102103
case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
103104
case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY))

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc
1919

2020
import java.math.BigDecimal
2121
import java.sql.{Date, DriverManager, SQLException, Timestamp}
22-
import java.time.{Instant, LocalDate}
22+
import java.time.{Instant, LocalDate, LocalDateTime}
2323
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}
2424

2525
import scala.collection.JavaConverters._
@@ -1230,6 +1230,7 @@ class JDBCSuite extends QueryTest
12301230
assert(getJdbcType(oracleDialect, BinaryType) == "BLOB")
12311231
assert(getJdbcType(oracleDialect, DateType) == "DATE")
12321232
assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
1233+
assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP")
12331234
}
12341235

12351236
private def assertEmptyQuery(sqlString: String): Unit = {
@@ -1879,5 +1880,53 @@ class JDBCSuite extends QueryTest
18791880
val fields = schema.fields
18801881
assert(fields.length === 1)
18811882
assert(fields(0).dataType === StringType)
1882-
}
1883+
}
1884+
1885+
test("SPARK-39339: Handle TimestampNTZType null values") {
1886+
val tableName = "timestamp_ntz_null_table"
1887+
1888+
val df = Seq(null.asInstanceOf[LocalDateTime]).toDF("col1")
1889+
1890+
df.write.format("jdbc")
1891+
.option("url", urlWithUserAndPass)
1892+
.option("dbtable", tableName).save()
1893+
1894+
val res = spark.read.format("jdbc")
1895+
.option("inferTimestampNTZType", "true")
1896+
.option("url", urlWithUserAndPass)
1897+
.option("dbtable", tableName)
1898+
.load()
1899+
1900+
checkAnswer(res, Seq(Row(null)))
1901+
}
1902+
1903+
test("SPARK-39339: TimestampNTZType with different local time zones") {
1904+
val tableName = "timestamp_ntz_diff_tz_support_table"
1905+
1906+
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
1907+
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
1908+
Seq(
1909+
"1972-07-04 03:30:00",
1910+
"2019-01-20 12:00:00.502",
1911+
"2019-01-20T00:00:00.123456",
1912+
"1500-01-20T00:00:00.123456"
1913+
).foreach { case datetime =>
1914+
val df = spark.sql(s"select timestamp_ntz '$datetime'")
1915+
df.write.format("jdbc")
1916+
.mode("overwrite")
1917+
.option("url", urlWithUserAndPass)
1918+
.option("dbtable", tableName)
1919+
.save()
1920+
1921+
val res = spark.read.format("jdbc")
1922+
.option("inferTimestampNTZType", "true")
1923+
.option("url", urlWithUserAndPass)
1924+
.option("dbtable", tableName)
1925+
.load()
1926+
1927+
checkAnswer(res, df)
1928+
}
1929+
}
1930+
}
1931+
}
18831932
}

0 commit comments

Comments
 (0)