-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-19335] Introduce insert, update, and upsert commands to the JdbcUtils class #16685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #71890 has finished for PR 16685 at commit
|
|
retest this please |
|
Test build #71891 has finished for PR 16685 at commit
|
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.jdbc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to my knowledge, this is not the public API (see execution/package.scala#L21-L23).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh heck - this did seem like the appropriate place to put this though. Any thoughts on where it could live instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is worth being added (i am not supposed to decide this), I think we should make this working together within current Spark APIs or new Spark APIs cc @gatorsmile who I guess knows this area better.
|
Test build #71892 has finished for PR 16685 at commit
|
|
Test build #71895 has finished for PR 16685 at commit
|
|
The biggest challenge is how to resolve the constraints at the JDBC target (e.g., unique constraints). We make a new JDBC connection for each partition. The final results are non-deterministic. |
|
@gatorsmile That's exactly right - in our testing, writing to databases with unique constraints proved to be extremely difficult to do efficiently. That's why this approach moves the maintenance of the unique constraint into the application (in this case the upsert function).
This is obviously suboptimal because it weakens the database to some degree but I think there's still enough utility in allowing an update operation. I know there's certainly been enough demand for it online and it's a common use case. I think with proper documentation of the challenges of doing an upsert and the necessary considerations for the table being updated, this can be a very welcome addition to Spark. I would love to understand if there's a way to add this solution to what is a pretty difficult problem so that it's available to the community at large. |
|
Test build #71903 has finished for PR 16685 at commit
|
| private val DEFAULT_BATCH_SIZE: Int = 200 | ||
|
|
||
| // Limit the number of database connections. Some DBs suffer when there are many open | ||
| // connections. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, since Spark 2.1, we already provide the parm for limiting the max num of concurrent JDBC connection when inserting data to JDBC tables. The parm is numPartitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A large DataFrame may have hundreds or thousands of partitions. Having a separate value not tied to partitions can facilitate working with databases with reduced allowable connections or ones where performance degrades if hundreds of thousands of connections are allowed, e.g. Postgres.
If you're suggesting purely changing the default to be numPartitions, I think that's wholly valid but I'm not aware of how to access that value in a static context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the logics here. We already can do it for Insert by using coalesce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks!
|
I still have a concern for Update, even if we do an update based on key values. Using this way for update statements, we are still facing non-deterministic results. For example, you are unable to do a key update. For non-key updates, we also face another issue when the target table has the secondary unique index. |
|
@gatorsmile What is a "key" update and in what context would that sort of operation be needed? I don't think a secondary index on the table prevent this method from working, the primary issue is that makes it a more expensive operation. The database still enforces any existing constraints. If the ask is to support a "uniqueness" constraint on multiple columns, that is already supported via The update uses the "id" column not as a uniqueness constraint, but as a simple and efficient way to identify a given row to update. A future improvement would be to support using multiple columns to identify the row to update. |
|
Test build #71943 has finished for PR 16685 at commit
|
|
To support the UPSERT, this PR basically implements it by using SELECT, UPDATE and INSERT. It has to read the whole table from the JDBC-connected database, and process it in Spark. It does not perform well when the target table is huge. We are still facing the same issue caused by the UPDATE. |
|
I recognize that this is not an optimal solution, but Spark has historically contained multiple sub-optimal operations that are nonetheless useful in certain contexts and it's left to the user to understand and use things correctly. A few examples off the top of my head include collectPartitions, zipWithIndex, and repartition - all of which may be expensive operations but are nonetheless useful when used appropriately. I believe there's value in introducing this as a starting point which works in most scenarios and is more efficient than relying on the database to handle the uniqueness constraint and be responsible for a mass update, with the expectation of future improvement. For extremely large tables we could provide an alternative implementation that instead relies on the database to do the update. I have existing code I can share that does this which may perform better when the number of updates is small and the table is large. In our testing, however, we found that as the size of the table grows, even small operations may prove very slow when relying exclusively on the database to do the update. |
|
Currently, I do not have a solution for supporting the parallel mass UPDATE, because the rows in the DataFrame might be out of order and a global transaction support is missing. The solution posted in this PR makes many assumptions. If users need to do it, the current suggested workaround is to let Spark SQL insert the results to a table and use a separate RDBMS application to do the update (outside Spark SQL). I fully understand the challenges. I can post a solution which I did in the database replication area. https://www.google.com/patents/US20050193041 Although this patent still has a hole, it generally explains how to do it. In that use case, we can do the parallel update/insert/delete by using the maintained transaction dependencies and retries logics with spill queues. Unfortunately, it is not applicable to Spark SQL.
|
|
It sounds like you consider there to be too many errata and assumptions made in this patch for it to be a worthwhile code contribution. Given the numerous assumptions made in this PR, how would you instead feel about converting this as a documentation patch and somehow providing this as example code for users? I'm not sure if there is currently any official documentation around doing UPDATE in Spark so maybe this instead becomes a source of helpful information for others. |
|
A few comments:
PR #16692 actually minimize the issues above. Please take a look to compare. |
|
@xwu0226 Thanks for the comments, I've reviewed your submission and commented here #16692. Specifically in response to your comments:
|
|
@ilganeli Thanks for replying to my comments! Please correct me if I am wrong. My understanding of your assumption is that the target table does not have or maintain any unique constraints. Mostly the target table is created and maintained solely by the spark application, right? If this is the assumption, I do believe that the simple INSERT and UPDATE may perform better than UPSERT. But if the target table has unique constraint to start with, INSERT/UPDATE and UPSERT/MERGE comparison may be like what you said as slight horse race, since in either case index lookup and validation is required, where UPSERT/MERGE may have a bit more |
|
@ilganeli Regarding the support of UPDATE, your idea is pretty good. Please submit a separate PR and improve the document. I will review it. Regarding the support of UPSERT, we need to measure the performance. We can try the both solutions and measure the performance difference. I have a very basic question. If we are doing the UPSERT for a small data set (which is a common case), it sounds like fetching the whole table from the source is pretty expensive. Normally, the performance of JDBC is nortoriously bad when the table size is large. Thus, IMHO, we need to avoid fetching the source table for supporting UPSERT. |
|
@xwu0226 The The target table may be created and maintained outside of the Spark application. The only restriction is that in order to do efficient inserts, the table does not enforce a uniqueness constraint. |
|
@gatorsmile I'll submit a PR with just the UPDATE functionality, how do you suggest proceeding on the UPSERT front? |
|
Thank you! I prefer to pushing down the UPSERT workloads into the underlying DBMS, but not all the JDBC sources support it. Thus, maybe we can provide users two solutions at the same time. Let them choose based on their usage scenarios. Also cc @srowen @JoshRosen BTW, I did not carefully review the solution. It might still have some holes. Doing it in |
|
@ilganeli How about closing this PR at first and revisit it later? |
|
We are closing it due to inactivity. please do reopen if you want to push it forward. Thanks! |
What changes were proposed in this pull request?
Adds the ability to perform an insert, update, or update command to the JdbcUtils class which supports writing DataFrames to databases via JDBC
This functionality has not existed heretofore within Spark and doing an Upsert efficiently is generally difficult. The method presented here strikes a reasonable balance between simplicity and performance and has shown reasonably efficient scaling. The insert operation, while already existing, is implemented slightly differently in this approach to be consistent with how update is implemented.
This resolves https://issues.apache.org/jira/browse/SPARK-19335
How was this patch tested?
This functionality has been tested through extensive manual testing and tuning while developing this patch. If the committers believe that this is a valuable addition, I will be happy to add additional unit tests around this feature.