Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

What changes were proposed in this pull request?

This is a follow-up PR of #15868 to merge maxConnections option into numPartitions options.

How was this patch tested?

Pass the existing tests.

<td><code>numPartitions</code></td>
<td>
The number of partitions that can be used, if set. It works by limiting both read and write
operations' parallelism. If the number of partitions to write exceeds this limit, the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also document the impact on the number of JDBC connections here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll add that, too.

<td><code>numPartitions</code></td>
<td>
The number of partitions that can be used, if set. It works by limiting both read and write
operations' parallelism. If the number of partitions to write exceeds this limit, the
Copy link
Member

@gatorsmile gatorsmile Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fewer partitions is obscure. : )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review again. Yep. I'll revise it. :)

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

The number of partitions that can be used, if set. It works by limiting both read and write operations' parallelism. If the number of partitions to write exceeds this limit, the operation will coalesce the data set with this value before writing. In other words, this determines the maximum number of concurrent JDBC connections.

Copy link
Member

@gatorsmile gatorsmile Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the read path, we might not generate the exact number of partitions.

Found a bug in the read path in #15499.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh really? The number of JDBCPartition is different?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, if the column values are exactly identical, only one partition will be generated.

The bug in #15499 is not related the above fact.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. Actually, the above statement is describing coalesce and its parameter. So, shall we keep this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to rephrase it and document both behaviors (read and write paths). Also emphasize this is the max number.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this?

The number of partitions that can be used, if set. It works by limiting both read and write operations' parallelism. In other words, this determines the maximum number of concurrent JDBC connections. For reading, it will make partitions less than or equal to this maximum. For writing, if the number of partitions to write exceeds this limit, the operation will coalesce the data set with this maximum before writing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I am sorry I goofed in the PR.)

</tr>

<tr>
<td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should revert it back. These four parameters are related.

These options must all be specified if any of them is specified

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is incorrect now. numPartitions can be used alone. Those three are read-only optional parameters and numPartitions are general optional parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the read path, they are still related.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that, I'll add some other description mentioning the relations.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68951 has finished for PR 15966 at commit 7df41a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

(lowerBound != null && upperBound != null && numPartitions != null),
(lowerBound != null && upperBound != null && numPartitions.isDefined),
s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
s" and '$JDBC_NUM_PARTITIONS' are required.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this error message too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, for me, this error message looks correct. JDBC_NUM_PARTITION is indenpendent of the others, but JDBC_PARTITION_COLUMN requires the others (including JDBC_NUM_PARTITION), isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not try it. What happened if we input JDBC_PARTITION_COLUMN when writing the JDBC tables?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will raise exception due to the require. I think it's the correct previous behavior. Ah, let me check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's used in declaration of view, users cannot go to writing path.

CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2', partitionColumn 'value')
java.lang.IllegalArgumentException: requirement failed: If 'partitionColumn' is specified then 'lowerBound', 'upperBound', and 'numPartitions' are required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried the DataFrameWriter's JDBC/write() APIs?

@dongjoon-hyun
Copy link
Member Author

To further review, I updated the doc. We can proceed on the updated content.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68955 has finished for PR 15966 at commit 48b6d25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Is it possible that users read a table through JDBC and then write data to a table through JDBC and want the read and write have different parallelism?

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68963 has finished for PR 15966 at commit f8c67ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Nov 22, 2016

Thank you for review, @cloud-fan .

With the same parameter name numPartitions for read/write, we will use the parallelism with the same maximum limit by default. It's easy to use.

To use different parallelisms for read/write, we are able to do that with different view names. In the following example, t1 is numPartitions=1 and t2 is numPartitions=2. It's an example for writing, but I think the situation is the same with read operation. (For the read operation, partitionColumn, lowerBound, upperBound are required.)

sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '1')")
sql("CREATE OR REPLACE TEMPORARY VIEW t2 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '2')")
sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
sql("INSERT OVERWRITE TABLE t2 SELECT a FROM data GROUP BY a")

@gatorsmile
Copy link
Member

I found a way to verify the coalesce logics of JDBC writing. See my PR: #15975 It added numPartition into JDBCRelation.

With minor code changes, you can see the adjusted numPartition in the output of EXPLAIN.

sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE").explain(true)

The number of partitions that can be used, if set. It works by limiting both read and write
operations' parallelism. In other words, this determines the maximum number of concurrent
JDBC connections. For reading, it will make partitions less than or equal to this maximum.
For writing, if the number of partitions to write exceeds this limit, the operation will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I am not good at writing tech document. Below is my revision.

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

@dongjoon-hyun
Copy link
Member Author

Great! Thank you for #15975.

@gatorsmile
Copy link
Member

LGTM pending test.

Also cc @rxin

} else {
JDBCPartitioningInfo(
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.get)
Copy link
Contributor

@rxin rxin Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this safe to call get on? calling get on an option is always dangerous and not future proof, especially when there is no if (x.isDefined) check surrounding this.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @rxin . Yes. It implicitly depends on that partitionColumn requires all others columns before.
I'll change here like this. Is it better?

-    val partitionInfo = if (partitionColumn == null) {
+    val partitionInfo = if (partitionColumn == null || lowerBound == null || upperBound == null ||
+        numPartitions.isEmpty) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are doing this, how about changing partitionColumn, lowerBound, upperBound to Option types as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! No problem. I'll update them tonight.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69019 has finished for PR 15966 at commit ba8be46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The only one failure seems to be irrelevant to this.

- SPARK-8020: set sql conf in spark conf *** FAILED *** (17 seconds, 602 milliseconds)

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69024 has finished for PR 15966 at commit d45467e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin .
Could you review this again?

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin , @cloud-fan , @gatorsmile .
Please let me know if there is something to do more.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69104 has finished for PR 15966 at commit f9db374.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The only one failure is irrelevant to this PR.

[info] KafkaSourceStressForDontFailOnDataLossSuite:
[info] - stress test for failOnDataLoss=false *** FAILED *** (1 minute, 58 seconds)

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69112 has started for PR 15966 at commit f9db374.

@dongjoon-hyun
Copy link
Member Author

Maybe something internal errors.

Traceback (most recent call last):
  File "./dev/run-tests-jenkins.py", line 232, in <module>
    main()
  File "./dev/run-tests-jenkins.py", line 219, in main
    test_result_code, test_result_note = run_tests(tests_timeout)
  File "./dev/run-tests-jenkins.py", line 140, in run_tests
    test_result_note = ' * This patch **fails %s**.' % failure_note_by_errcode[test_result_code]
KeyError: -9

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69125 has finished for PR 15966 at commit f9db374.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69126 has finished for PR 15966 at commit f9db374.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan !

val numPartitions = jdbcOptions.numPartitions

val partitionInfo = if (partitionColumn == null) {
val partitionInfo = if (partitionColumn.isEmpty || lowerBound.isEmpty || upperBound.isEmpty ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change this to

if (partitionColumn == null) {
  assert(lowerBound.isEmpty && upperBound.isEmpty && numPartitions.isEmpty)
  null
} else {
  ...
}

to be future proof.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll update soon.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed the following.

if (partitionColumn.isEmpty) {
  assert(lowerBound.isEmpty && upperBound.isEmpty && numPartitions.isEmpty)
  null
} else {
  ...
}

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, @rxin .
We are using numPartitions for both writing and reading, and numPartitions can be used alone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use the following.

    val partitionInfo = if (partitionColumn.isEmpty) {
      assert(lowerBound.isEmpty && upperBound.isEmpty)
      null
    } else {
      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
      JDBCPartitioningInfo(
        partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
    }

@rxin
Copy link
Contributor

rxin commented Nov 25, 2016

LGTM other than that one change.

@SparkQA
Copy link

SparkQA commented Nov 25, 2016

Test build #69143 has finished for PR 15966 at commit 4a957e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 25, 2016

Thanks - merging in master.

@asfgit asfgit closed this in fb07bbe Nov 25, 2016
@dongjoon-hyun
Copy link
Member Author

Thank you for review and merging, @rxin , @gatorsmile , @cloud-fan !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-18413-2 branch November 27, 2016 07:20
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…ections`

## What changes were proposed in this pull request?

This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <[email protected]>

Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ections`

## What changes were proposed in this pull request?

This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <[email protected]>

Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants