Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1061,17 +1061,28 @@ the following case-sensitive options:
</tr>

<tr>
<td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td>
<td><code>partitionColumn, lowerBound, upperBound</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should revert it back. These four parameters are related.

These options must all be specified if any of them is specified

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is incorrect now. numPartitions can be used alone. Those three are read-only optional parameters and numPartitions are general optional parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the read path, they are still related.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that, I'll add some other description mentioning the relations.

<td>
These options must all be specified if any of them is specified. They describe how to
partition the table when reading in parallel from multiple workers.
These options must all be specified if any of them is specified. In addition,
<code>numPartitions</code> must be specified. They describe how to partition the table when
reading in parallel from multiple workers.
<code>partitionColumn</code> must be a numeric column from the table in question. Notice
that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
partition stride, not for filtering the rows in table. So all rows in the table will be
partitioned and returned. This option applies only to reading.
</td>
</tr>

<tr>
<td><code>numPartitions</code></td>
<td>
The maximum number of partitions that can be used for parallelism in table reading and
writing. This also determines the maximum number of concurrent JDBC connections.
If the number of partitions to write exceeds this limit, we decrease it to this limit by
calling <code>coalesce(numPartitions)</code> before writing.
</td>
</tr>

<tr>
<td><code>fetchsize</code></td>
<td>
Expand All @@ -1086,13 +1097,6 @@ the following case-sensitive options:
</td>
</tr>

<tr>
<td><code>maxConnections</code></td>
<td>
The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
</td>
</tr>

<tr>
<td><code>isolationLevel</code></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,23 @@ class JDBCOptions(
}
}

// the number of partitions
val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
require(numPartitions.isEmpty || numPartitions.get > 0,
s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " +
"The minimum value is 1.")

// ------------------------------------------------------------
// Optional parameters only for reading
// ------------------------------------------------------------
// the column used to partition
val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null)
val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN)
// the lower bound of partition column
val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null)
val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
// the upper bound of the partition column
val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null)
// the number of partitions
val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)
require(partitionColumn == null ||
(lowerBound != null && upperBound != null && numPartitions != null),
val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
require(partitionColumn.isEmpty ||
(lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
s" and '$JDBC_NUM_PARTITIONS' are required.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this error message too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, for me, this error message looks correct. JDBC_NUM_PARTITION is indenpendent of the others, but JDBC_PARTITION_COLUMN requires the others (including JDBC_NUM_PARTITION), isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not try it. What happened if we input JDBC_PARTITION_COLUMN when writing the JDBC tables?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will raise exception due to the require. I think it's the correct previous behavior. Ah, let me check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's used in declaration of view, users cannot go to writing path.

CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2', partitionColumn 'value')
java.lang.IllegalArgumentException: requirement failed: If 'partitionColumn' is specified then 'lowerBound', 'upperBound', and 'numPartitions' are required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried the DataFrameWriter's JDBC/write() APIs?

val fetchSize = {
Expand Down Expand Up @@ -122,11 +126,6 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
// the maximum number of connections
val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
require(maxConnections.isEmpty || maxConnections.get > 0,
s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
"The minimum value is 1.")
}

object JDBCOptions {
Expand All @@ -149,5 +148,4 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
val upperBound = jdbcOptions.upperBound
val numPartitions = jdbcOptions.numPartitions

val partitionInfo = if (partitionColumn == null) {
val partitionInfo = if (partitionColumn.isEmpty) {
assert(lowerBound.isEmpty && upperBound.isEmpty)
null
} else {
assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
JDBCPartitioningInfo(
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,10 @@ object JdbcUtils extends Logging {
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
val maxConnections = options.maxConnections
val numPartitions = options.numPartitions
val repartitionedDF =
if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
df.coalesce(maxConnections.get)
if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) {
df.coalesce(numPartitions.get)
} else {
df
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
.save()
}

test("SPARK-18413: Add `maxConnections` JDBCOption") {
test("SPARK-18413: Use `numPartitions` JDBCOption") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val e = intercept[IllegalArgumentException] {
df.write.format("jdbc")
.option("dbtable", "TEST.SAVETEST")
.option("url", url1)
.option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
.option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0")
.save()
}.getMessage
assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1"))
}
}