-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-18413][SQL][FOLLOW-UP] Use numPartitions instead of maxConnections
#15966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ections` JDBCOption
docs/sql-programming-guide.md
Outdated
| <td><code>numPartitions</code></td> | ||
| <td> | ||
| The number of partitions that can be used, if set. It works by limiting both read and write | ||
| operations' parallelism. If the number of partitions to write exceeds this limit, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also document the impact on the number of JDBC connections here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I'll add that, too.
docs/sql-programming-guide.md
Outdated
| <td><code>numPartitions</code></td> | ||
| <td> | ||
| The number of partitions that can be used, if set. It works by limiting both read and write | ||
| operations' parallelism. If the number of partitions to write exceeds this limit, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fewer partitions is obscure. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review again. Yep. I'll revise it. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
The number of partitions that can be used, if set. It works by limiting both read and write operations' parallelism. If the number of partitions to write exceeds this limit, the operation will coalesce the data set with this value before writing. In other words, this determines the maximum number of concurrent JDBC connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the read path, we might not generate the exact number of partitions.
Found a bug in the read path in #15499.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh really? The number of JDBCPartition is different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually, if the column values are exactly identical, only one partition will be generated.
The bug in #15499 is not related the above fact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya. Actually, the above statement is describing coalesce and its parameter. So, shall we keep this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to rephrase it and document both behaviors (read and write paths). Also emphasize this is the max number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this?
The number of partitions that can be used, if set. It works by limiting both read and write operations' parallelism. In other words, this determines the maximum number of concurrent JDBC connections. For reading, it will make partitions less than or equal to this maximum. For writing, if the number of partitions to write exceeds this limit, the operation will coalesce the data set with this maximum before writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I am sorry I goofed in the PR.)
| </tr> | ||
|
|
||
| <tr> | ||
| <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should revert it back. These four parameters are related.
These options must all be specified if any of them is specified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is incorrect now. numPartitions can be used alone. Those three are read-only optional parameters and numPartitions are general optional parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the read path, they are still related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that, I'll add some other description mentioning the relations.
|
Test build #68951 has finished for PR 15966 at commit
|
| (lowerBound != null && upperBound != null && numPartitions != null), | ||
| (lowerBound != null && upperBound != null && numPartitions.isDefined), | ||
| s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + | ||
| s" and '$JDBC_NUM_PARTITIONS' are required.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update this error message too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, for me, this error message looks correct. JDBC_NUM_PARTITION is indenpendent of the others, but JDBC_PARTITION_COLUMN requires the others (including JDBC_NUM_PARTITION), isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not try it. What happened if we input JDBC_PARTITION_COLUMN when writing the JDBC tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will raise exception due to the Ah, let me check.require. I think it's the correct previous behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's used in declaration of view, users cannot go to writing path.
CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2', partitionColumn 'value')
java.lang.IllegalArgumentException: requirement failed: If 'partitionColumn' is specified then 'lowerBound', 'upperBound', and 'numPartitions' are required.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried the DataFrameWriter's JDBC/write() APIs?
|
To further review, I updated the doc. We can proceed on the updated content. |
|
Test build #68955 has finished for PR 15966 at commit
|
|
Is it possible that users read a table through JDBC and then write data to a table through JDBC and want the read and write have different parallelism? |
|
Test build #68963 has finished for PR 15966 at commit
|
|
Thank you for review, @cloud-fan . With the same parameter name To use different parallelisms for read/write, we are able to do that with different view names. In the following example, t1 is sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '1')")
sql("CREATE OR REPLACE TEMPORARY VIEW t2 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '2')")
sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
sql("INSERT OVERWRITE TABLE t2 SELECT a FROM data GROUP BY a") |
|
I found a way to verify the coalesce logics of JDBC writing. See my PR: #15975 It added With minor code changes, you can see the adjusted sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE").explain(true) |
docs/sql-programming-guide.md
Outdated
| The number of partitions that can be used, if set. It works by limiting both read and write | ||
| operations' parallelism. In other words, this determines the maximum number of concurrent | ||
| JDBC connections. For reading, it will make partitions less than or equal to this maximum. | ||
| For writing, if the number of partitions to write exceeds this limit, the operation will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, I am not good at writing tech document. Below is my revision.
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling
coalesce(numPartitions)before writing.
|
Great! Thank you for #15975. |
|
LGTM pending test. Also cc @rxin |
| } else { | ||
| JDBCPartitioningInfo( | ||
| partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) | ||
| partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this safe to call get on? calling get on an option is always dangerous and not future proof, especially when there is no if (x.isDefined) check surrounding this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @rxin . Yes. It implicitly depends on that partitionColumn requires all others columns before.
I'll change here like this. Is it better?
- val partitionInfo = if (partitionColumn == null) {
+ val partitionInfo = if (partitionColumn == null || lowerBound == null || upperBound == null ||
+ numPartitions.isEmpty) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are doing this, how about changing partitionColumn, lowerBound, upperBound to Option types as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! No problem. I'll update them tonight.
|
Test build #69019 has finished for PR 15966 at commit
|
|
The only one failure seems to be irrelevant to this. |
|
Test build #69024 has finished for PR 15966 at commit
|
|
Hi, @rxin . |
|
Hi, @rxin , @cloud-fan , @gatorsmile . |
|
Test build #69104 has finished for PR 15966 at commit
|
|
The only one failure is irrelevant to this PR. |
|
Retest this please. |
|
Test build #69112 has started for PR 15966 at commit |
|
Maybe something internal errors. |
|
Retest this please. |
|
Test build #69125 has finished for PR 15966 at commit
|
|
Test build #69126 has finished for PR 15966 at commit
|
|
LGTM |
|
Thank you, @cloud-fan ! |
| val numPartitions = jdbcOptions.numPartitions | ||
|
|
||
| val partitionInfo = if (partitionColumn == null) { | ||
| val partitionInfo = if (partitionColumn.isEmpty || lowerBound.isEmpty || upperBound.isEmpty || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd change this to
if (partitionColumn == null) {
assert(lowerBound.isEmpty && upperBound.isEmpty && numPartitions.isEmpty)
null
} else {
...
}
to be future proof.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'll update soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed the following.
if (partitionColumn.isEmpty) {
assert(lowerBound.isEmpty && upperBound.isEmpty && numPartitions.isEmpty)
null
} else {
...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, @rxin .
We are using numPartitions for both writing and reading, and numPartitions can be used alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll use the following.
val partitionInfo = if (partitionColumn.isEmpty) {
assert(lowerBound.isEmpty && upperBound.isEmpty)
null
} else {
assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
JDBCPartitioningInfo(
partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
}|
LGTM other than that one change. |
|
Test build #69143 has finished for PR 15966 at commit
|
|
Thanks - merging in master. |
|
Thank you for review and merging, @rxin , @gatorsmile , @cloud-fan ! |
…ections` ## What changes were proposed in this pull request? This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
…ections` ## What changes were proposed in this pull request? This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
What changes were proposed in this pull request?
This is a follow-up PR of #15868 to merge
maxConnectionsoption intonumPartitionsoptions.How was this patch tested?
Pass the existing tests.