-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19335] Introduce UPSERT feature to SPARK #16692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Hi, all - thanks for this submission. Overall it's a very clean implementation and I like it a lot. There's obviously a large amount of effort that went into developing this. The main issue with this approach however is that the Upsert statement itself is an extremely expensive operation. Depending on how your uniqueness condition is defined, validating against the uniqueness constraint proves to be the most expensive part of this whole sequence. In #16685 I chose to implement this by reading in the existing table and doing a join operation to identify conflicts. The reason for this is that operation is easily distributed across the entire dataset. In contrast, the implementation as it stands in this PR ultimately depends entirely on the database to enforce the uniqueness constraint, something that in general does not parallelize very well and requires a full traversal of the index created on the uniqueness constraint. Furthermore, this index, in both MySQL and Postgres (the examples you've provided) cannot be implemented as a Hash index. Unless the owner of the database manually computes and enforces hashes on individual rows, this approach instead relies on btree indices to do this lookup. This is a marginal cost when the btree is on a single field but if the uniqueness constraint spans multiple columns, this index is implemented as nested btrees. This, in turn, proves to be an extraordinarily costly update with non-linear performance degradation as both the size of the database and the size of the table being upserted increase. This approach mirrors our initial approach to the problem but we ultimately moved away from this approach in favor of the one in #16685 for performance reasons. We were able to achieve a more than 10x performance increase, even taking into account the cost of the additional joins. Our tests were not massive - we tested against a roughly 10gb database in Postgres with approximately 10 million rows - on a relatively middle-line machine. I would love to know if you guys have done any performance benchmarks with this approach and if you could try out the approach in #16685 and let me know how that performs. Thanks! |
|
@kevinyu98 How about closing this PR at first and revisit it later? |
|
We are closing it due to inactivity. please do reopen if you want to push it forward. Thanks! |
|
Such feature is quite needed - please proceed The biggest pain point is that one duplicate entry leads to a cancellation of the whole batch insert in case of having unique constraints on the database table. Performance (as the cost) should not be the blocker here as often results are written to traditional RDBMs and they're usually quite small (compared to the typical spark scenario having lots of "big data" for analysis jobs). The uniqueness constraint check should be done entirely by the RDBMs, the solution should be very lightweight from spark perspective - an extra option for write.jdbc() should be enough. Therefore I like this PR/approach a lot. Maybe Oracle can be added having its "MERGE INTO" syntax. An even more lightweight alternative: Following should even be enough at least for mariadb/mysql databases: |
What changes were proposed in this pull request?
This PR proposes to add the UPSERT feature support into SPARK through DataFrameWriter's JDBC data source options.
For example:
If the mytable2 in mysql database have unique constraints on column c1, and the user wants to save the dataframe into the mysql database, it will fail with violation of unique constraints.
val df = Seq((1,4)).toDF("c1","c2")val url = "jdbc:mysql://9.30.167.220:3306/mydb"df.write.mode(org.apache.spark.sql.SaveMode.Append) .option("user","kevin").option("password","kevin").jdbc(url,"mytable2",new java.util.Properties())With this feature, the user can use this UPSERT options to write the dataframe into the mysql database table.
df.write.mode(org.apache.spark.sql.SaveMode.Append) .option(“upsert”,true).option(“upsertUpdateColumn”,”c1”).option("user","kevin").option("password","kevin").jdbc(url,"mytable2",new java.util.Properties())Here is the design doc.
UPSERT DESIGN DOC
How was this patch tested?
Local test: run the test case from spark-shell and connect to MySQL and Postgresql database
Test case: add test cases in the existing test cases including docker integration suite
Please review http://spark.apache.org/contributing.html before opening a pull request.