Skip to content

Conversation

@ilganeli
Copy link

@ilganeli ilganeli commented Jan 24, 2017

What changes were proposed in this pull request?

Adds the ability to perform an insert, update, or update command to the JdbcUtils class which supports writing DataFrames to databases via JDBC

This functionality has not existed heretofore within Spark and doing an Upsert efficiently is generally difficult. The method presented here strikes a reasonable balance between simplicity and performance and has shown reasonably efficient scaling. The insert operation, while already existing, is implemented slightly differently in this approach to be consistent with how update is implemented.

This resolves https://issues.apache.org/jira/browse/SPARK-19335

How was this patch tested?

This functionality has been tested through extensive manual testing and tuning while developing this patch. If the committers believe that this is a valuable addition, I will be happy to add additional unit tests around this feature.

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71890 has finished for PR 16685 at commit ca494eb.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ilganeli
Copy link
Author

retest this please

@ilganeli ilganeli changed the title [SPARK-19935] Introduce insert, update, and upsert commands to the JdbcUtils class [SPARK-19335] Introduce insert, update, and upsert commands to the JdbcUtils class Jan 24, 2017
@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71891 has finished for PR 16685 at commit 6a2cb05.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.jdbc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my knowledge, this is not the public API (see execution/package.scala#L21-L23).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh heck - this did seem like the appropriate place to put this though. Any thoughts on where it could live instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is worth being added (i am not supposed to decide this), I think we should make this working together within current Spark APIs or new Spark APIs cc @gatorsmile who I guess knows this area better.

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71892 has finished for PR 16685 at commit 69f6939.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71895 has finished for PR 16685 at commit 7938277.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

The biggest challenge is how to resolve the constraints at the JDBC target (e.g., unique constraints). We make a new JDBC connection for each partition. The final results are non-deterministic.

@ilganeli
Copy link
Author

ilganeli commented Jan 24, 2017

@gatorsmile That's exactly right - in our testing, writing to databases with unique constraints proved to be extremely difficult to do efficiently. That's why this approach moves the maintenance of the unique constraint into the application (in this case the upsert function).
The uniqueness is enforced by the Spark function by:

  1. Defining the primary key set
  2. Decomposing the dataframe to write to the existing table as an update operation into two subsets: Ones with conflict, and ones that do not overlap
  3. Performing an insert + an update operation.

This is obviously suboptimal because it weakens the database to some degree but I think there's still enough utility in allowing an update operation. I know there's certainly been enough demand for it online and it's a common use case. I think with proper documentation of the challenges of doing an upsert and the necessary considerations for the table being updated, this can be a very welcome addition to Spark.

I would love to understand if there's a way to add this solution to what is a pretty difficult problem so that it's available to the community at large.

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71903 has finished for PR 16685 at commit 56545ed.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val DEFAULT_BATCH_SIZE: Int = 200

// Limit the number of database connections. Some DBs suffer when there are many open
// connections.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, since Spark 2.1, we already provide the parm for limiting the max num of concurrent JDBC connection when inserting data to JDBC tables. The parm is numPartitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A large DataFrame may have hundreds or thousands of partitions. Having a separate value not tied to partitions can facilitate working with databases with reduced allowable connections or ones where performance degrades if hundreds of thousands of connections are allowed, e.g. Postgres.

If you're suggesting purely changing the default to be numPartitions, I think that's wholly valid but I'm not aware of how to access that value in a static context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the logics here. We already can do it for Insert by using coalesce.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

@gatorsmile
Copy link
Member

I still have a concern for Update, even if we do an update based on key values. Using this way for update statements, we are still facing non-deterministic results. For example, you are unable to do a key update. For non-key updates, we also face another issue when the target table has the secondary unique index.

@ilganeli
Copy link
Author

ilganeli commented Jan 24, 2017

@gatorsmile What is a "key" update and in what context would that sort of operation be needed?

I don't think a secondary index on the table prevent this method from working, the primary issue is that makes it a more expensive operation. The database still enforces any existing constraints.

If the ask is to support a "uniqueness" constraint on multiple columns, that is already supported via primaryKeys passed to the upsert().

The update uses the "id" column not as a uniqueness constraint, but as a simple and efficient way to identify a given row to update. A future improvement would be to support using multiple columns to identify the row to update.

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71943 has finished for PR 16685 at commit c6af861.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

To support the UPSERT, this PR basically implements it by using SELECT, UPDATE and INSERT. It has to read the whole table from the JDBC-connected database, and process it in Spark. It does not perform well when the target table is huge. We are still facing the same issue caused by the UPDATE.

@ilganeli
Copy link
Author

ilganeli commented Jan 24, 2017

I recognize that this is not an optimal solution, but Spark has historically contained multiple sub-optimal operations that are nonetheless useful in certain contexts and it's left to the user to understand and use things correctly. A few examples off the top of my head include collectPartitions, zipWithIndex, and repartition - all of which may be expensive operations but are nonetheless useful when used appropriately.

I believe there's value in introducing this as a starting point which works in most scenarios and is more efficient than relying on the database to handle the uniqueness constraint and be responsible for a mass update, with the expectation of future improvement.

For extremely large tables we could provide an alternative implementation that instead relies on the database to do the update. I have existing code I can share that does this which may perform better when the number of updates is small and the table is large.

In our testing, however, we found that as the size of the table grows, even small operations may prove very slow when relying exclusively on the database to do the update.

@gatorsmile
Copy link
Member

gatorsmile commented Jan 24, 2017

Currently, I do not have a solution for supporting the parallel mass UPDATE, because the rows in the DataFrame might be out of order and a global transaction support is missing. The solution posted in this PR makes many assumptions. If users need to do it, the current suggested workaround is to let Spark SQL insert the results to a table and use a separate RDBMS application to do the update (outside Spark SQL).

I fully understand the challenges. I can post a solution which I did in the database replication area. https://www.google.com/patents/US20050193041 Although this patent still has a hole, it generally explains how to do it. In that use case, we can do the parallel update/insert/delete by using the maintained transaction dependencies and retries logics with spill queues. Unfortunately, it is not applicable to Spark SQL.

UPSERT is pretty useful to Spark SQL users. Since most RDBMS have a native support, I prefer to using the capability provided by RDBMS directly, instead of implementing it in Spark SQL. Then, we can avoid fetching/joining the data from the JDBC tables. More importantly, we can ensure each individual UPSERT works correctly even if the target tables are inserting/updating by the other applications at the same time.

@ilganeli
Copy link
Author

It sounds like you consider there to be too many errata and assumptions made in this patch for it to be a worthwhile code contribution. Given the numerous assumptions made in this PR, how would you instead feel about converting this as a documentation patch and somehow providing this as example code for users? I'm not sure if there is currently any official documentation around doing UPDATE in Spark so maybe this instead becomes a source of helpful information for others.

@xwu0226
Copy link
Contributor

xwu0226 commented Jan 24, 2017

A few comments:

  1. The mayor concern is that this solution need to pull in the whole target table data and do a join operation between the source dataframe and the target table to determine potential rows for update and inserts. I am worried that this join operation itself adds a lot of performance overhead for the upsert operation. And during this decision making process, the target table may have been advanced a lot, which makes the decision of inserts/updates worthless.

  2. The primary key set provided may not be the exact match of potential unique constraints on the target table, which will lead to failure of inserts or updates, because some columns that are part of unique constraints maybe outside of the provided primary key set.

  3. The insert is batch execution of the same # of statements as # of insert rows. Same for updates. We need to pass many statements via JDBC to target database. Will it perform better if column values are set to host variables in prepared statement for batch-size# of rows and executed once per batch?

  4. Most of database systems provide UPSERT capability, such asINSERT ON DUPLICATE KEY UPDATEfrom MySQL, INSERT ON CONFLICT ... DO UPDATE SET from PostgreSQL, MERGE statement for DB2, oracle, etc., where whether insert or update is decided by the database. Maybe we can take advantage of this by expanding different JDBCDialect?

PR #16692 actually minimize the issues above. Please take a look to compare.

@ilganeli
Copy link
Author

ilganeli commented Jan 25, 2017

@xwu0226 Thanks for the comments, I've reviewed your submission and commented here #16692.

Specifically in response to your comments:

  1. We did not find the join to be a limiting factor in our tests. Granted, this is very dataset specific but conceptually, Spark can do distributed joins very effectively and extracting the data from the database is an O(n) operation + network copy + output to HDFS after transformation. The main cost of this approach is the additional copy of data out of the database and then back in as an INSERT + UPDATE. However, an UPSERT operation is equivalent to a DELETE and INSERT operation. I think there may be a slight horse race between CopyOutOFDb/INSERT/UPDATE and UPSERT but I'm not convinced there's a dramatic performance cost in this step, particularly considering the dramatic cost of enforcing the uniqueness constraint for UPSERT.

  2. This is indeed a valid concern. This approach requires the Spark programmer to enforce and maintain the uniqueness constraints on the table, rather than the other way around. This is a conceptual shift from how things are usually implemented (where the DB Admin is king) but in our case this choice was justified by massive performance improvements.

  3. I agree using Prepared Statement would be better. I tried initially with Prepared Statement and ran into issues with certain datatypes (particularly timestamps). I haven't yet tried with the wildcards as it's currently implemented in JdbcUtils Insert statement, I think it's definitely doable that way. This might also help to boost performance.

  4. I like the approach that you guys took to expand JDBCDialect in [SPARK-19335] Introduce UPSERT feature to SPARK  #16692. It's a well modularized approach. Agree that something similar could be done here.

@xwu0226
Copy link
Contributor

xwu0226 commented Jan 25, 2017

@ilganeli Thanks for replying to my comments! Please correct me if I am wrong. My understanding of your assumption is that the target table does not have or maintain any unique constraints. Mostly the target table is created and maintained solely by the spark application, right?

If this is the assumption, I do believe that the simple INSERT and UPDATE may perform better than UPSERT. But if the target table has unique constraint to start with, INSERT/UPDATE and UPSERT/MERGE comparison may be like what you said as slight horse race, since in either case index lookup and validation is required, where UPSERT/MERGE may have a bit more if/else depending on the implementation in the database systems. Benchmark between 2 approaches can tell.

@gatorsmile
Copy link
Member

@ilganeli Regarding the support of UPDATE, your idea is pretty good. Please submit a separate PR and improve the document. I will review it.

Regarding the support of UPSERT, we need to measure the performance. We can try the both solutions and measure the performance difference.

I have a very basic question. If we are doing the UPSERT for a small data set (which is a common case), it sounds like fetching the whole table from the source is pretty expensive. Normally, the performance of JDBC is nortoriously bad when the table size is large. Thus, IMHO, we need to avoid fetching the source table for supporting UPSERT.

@ilganeli
Copy link
Author

@xwu0226 The The target table may be created and maintained outside of the Spark application. The only restriction is that in order to do efficient inserts, the table does not enforce a uniqueness constraint.

@ilganeli
Copy link
Author

@gatorsmile I'll submit a PR with just the UPDATE functionality, how do you suggest proceeding on the UPSERT front?

@gatorsmile
Copy link
Member

gatorsmile commented Jan 26, 2017

Thank you!

I prefer to pushing down the UPSERT workloads into the underlying DBMS, but not all the JDBC sources support it. Thus, maybe we can provide users two solutions at the same time. Let them choose based on their usage scenarios. Also cc @srowen @JoshRosen

BTW, I did not carefully review the solution. It might still have some holes. Doing it in JdbcUtils.scala is not consistent with our JDBC interface. We also need to think about the API design and make the implementation solution consistent with the other codes in Spark.

@gatorsmile
Copy link
Member

@ilganeli How about closing this PR at first and revisit it later?

@gatorsmile
Copy link
Member

We are closing it due to inactivity. please do reopen if you want to push it forward. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants