-
Couldn't load subscription status.
- Fork 28.9k
[SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC #49528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
Outdated
Show resolved
Hide resolved
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/UpsertTests.scala
Outdated
Show resolved
Hide resolved
| truncateTable(conn, options) | ||
| val tableSchema = JdbcUtils.getSchemaOption(conn, options) | ||
| saveTable(df, tableSchema, isCaseSensitive, options) | ||
| saveTable(df, tableSchema, isCaseSensitive, upsert, options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks strange that apply upsert if SaveMode is Overwrite here.
| dropTable(conn, options.table, options) | ||
| createTable(conn, options.table, df.schema, isCaseSensitive, options) | ||
| saveTable(df, Some(df.schema), isCaseSensitive, options) | ||
| saveTable(df, Some(df.schema), isCaseSensitive, upsert, options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea. It seems we should add a new SaveMode Upsert.
- Use SparkUnsupportedOperationException - Remove unused string interpolation - Fix indentation
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This is a follow-up on #16685 and #16692.
Implements upsert mode for
SaveMode.Appendof the MySql, MsSql, and Postgres JDBC source.See #41611 for an alternative using the
MERGE INTOcommand (not supported by MySql).Why are the changes needed?
The JDBC writer only supports either truncating the existing table or inserting. Duplicates, i.e. rows with identical values in the primary or unique index columns, cause an exception, permitting updating existing and inserting new rows.
Re-evaluating a partition due to executor loss will insert rows that have been inserted in an earlier attempt, which kills the entier Spark job.
Does this PR introduce any user-facing change?
This adds
upsertandupsertKeyColumnsoptions forSaveMode.Appendof the JDBC source.How was this patch tested?
Tests in
JdbcSuiteand integration suites.