@@ -70,9 +70,10 @@ use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWi
7070
7171use crate :: error:: Result ;
7272use crate :: spec:: {
73- COMMIT_MAX_RETRY_WAIT_MS , COMMIT_MAX_RETRY_WAIT_MS_DEFAULT , COMMIT_MIN_RETRY_WAIT_MS ,
74- COMMIT_MIN_RETRY_WAIT_MS_DEFAULT , COMMIT_NUM_RETRIES , COMMIT_NUM_RETRIES_DEFAULT ,
75- COMMIT_TOTAL_RETRY_TIME_MS , COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
73+ PROPERTY_COMMIT_MAX_RETRY_WAIT_MS , PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ,
74+ PROPERTY_COMMIT_MIN_RETRY_WAIT_MS , PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ,
75+ PROPERTY_COMMIT_NUM_RETRIES , PROPERTY_COMMIT_NUM_RETRIES_DEFAULT ,
76+ PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS , PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
7677} ;
7778use crate :: table:: Table ;
7879use crate :: transaction:: action:: BoxedTransactionAction ;
@@ -82,7 +83,7 @@ use crate::transaction::update_location::UpdateLocationAction;
8283use crate :: transaction:: update_properties:: UpdatePropertiesAction ;
8384use crate :: transaction:: update_statistics:: UpdateStatisticsAction ;
8485use crate :: transaction:: upgrade_format_version:: UpgradeFormatVersionAction ;
85- use crate :: { Catalog , TableCommit , TableRequirement , TableUpdate } ;
86+ use crate :: { Catalog , Error , ErrorKind , TableCommit , TableRequirement , TableUpdate } ;
8687
8788/// Table transaction.
8889#[ derive( Clone ) ]
@@ -169,7 +170,7 @@ impl Transaction {
169170 return Ok ( self . table ) ;
170171 }
171172
172- let backoff = Self :: build_backoff ( self . table . metadata ( ) . properties ( ) ) ;
173+ let backoff = Self :: build_backoff ( self . table . metadata ( ) . properties ( ) ) ? ;
173174 let tx = self ;
174175
175176 ( |mut tx : Transaction | async {
@@ -184,38 +185,55 @@ impl Transaction {
184185 . 1
185186 }
186187
187- fn build_backoff ( props : & HashMap < String , String > ) -> ExponentialBackoff {
188- ExponentialBuilder :: new ( )
189- . with_min_delay ( Duration :: from_millis (
190- props
191- . get ( COMMIT_MIN_RETRY_WAIT_MS )
192- . map ( |s| s. parse ( ) )
193- . unwrap_or_else ( || Ok ( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ) )
194- . expect ( "Invalid value for commit.retry.min-wait-ms" ) ,
195- ) )
196- . with_max_delay ( Duration :: from_millis (
197- props
198- . get ( COMMIT_MAX_RETRY_WAIT_MS )
199- . map ( |s| s. parse ( ) )
200- . unwrap_or_else ( || Ok ( COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ) )
201- . expect ( "Invalid value for commit.retry.max-wait-ms" ) ,
202- ) )
203- . with_total_delay ( Some ( Duration :: from_millis (
204- props
205- . get ( COMMIT_TOTAL_RETRY_TIME_MS )
206- . map ( |s| s. parse ( ) )
207- . unwrap_or_else ( || Ok ( COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ) )
208- . expect ( "Invalid value for commit.retry.total-timeout-ms" ) ,
209- ) ) )
210- . with_max_times (
211- props
212- . get ( COMMIT_NUM_RETRIES )
213- . map ( |s| s. parse ( ) )
214- . unwrap_or_else ( || Ok ( COMMIT_NUM_RETRIES_DEFAULT ) )
215- . expect ( "Invalid value for commit.retry.num-retries" ) ,
216- )
188+ fn build_backoff ( props : & HashMap < String , String > ) -> Result < ExponentialBackoff > {
189+ let min_delay = match props. get ( PROPERTY_COMMIT_MIN_RETRY_WAIT_MS ) {
190+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
191+ Error :: new (
192+ ErrorKind :: DataInvalid ,
193+ "Invalid value for commit.retry.min-wait-ms" ,
194+ )
195+ . with_source ( e)
196+ } ) ?,
197+ None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ,
198+ } ;
199+ let max_delay = match props. get ( PROPERTY_COMMIT_MAX_RETRY_WAIT_MS ) {
200+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
201+ Error :: new (
202+ ErrorKind :: DataInvalid ,
203+ "Invalid value for commit.retry.max-wait-ms" ,
204+ )
205+ . with_source ( e)
206+ } ) ?,
207+ None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ,
208+ } ;
209+ let total_delay = match props. get ( PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS ) {
210+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
211+ Error :: new (
212+ ErrorKind :: DataInvalid ,
213+ "Invalid value for commit.retry.total-timeout-ms" ,
214+ )
215+ . with_source ( e)
216+ } ) ?,
217+ None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
218+ } ;
219+ let max_times = match props. get ( PROPERTY_COMMIT_NUM_RETRIES ) {
220+ Some ( value_str) => value_str. parse :: < usize > ( ) . map_err ( |e| {
221+ Error :: new (
222+ ErrorKind :: DataInvalid ,
223+ "Invalid value for commit.retry.num-retries" ,
224+ )
225+ . with_source ( e)
226+ } ) ?,
227+ None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT ,
228+ } ;
229+
230+ Ok ( ExponentialBuilder :: new ( )
231+ . with_min_delay ( Duration :: from_millis ( min_delay) )
232+ . with_max_delay ( Duration :: from_millis ( max_delay) )
233+ . with_total_delay ( Some ( Duration :: from_millis ( total_delay) ) )
234+ . with_max_times ( max_times)
217235 . with_factor ( 2.0 )
218- . build ( )
236+ . build ( ) )
219237 }
220238
221239 async fn do_commit ( & mut self , catalog : & dyn Catalog ) -> Result < Table > {
@@ -259,6 +277,7 @@ mod tests {
259277 use std:: fs:: File ;
260278 use std:: io:: BufReader ;
261279 use std:: sync:: Arc ;
280+ use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
262281
263282 use crate :: catalog:: MockCatalog ;
264283 use crate :: io:: FileIOBuilder ;
@@ -375,25 +394,22 @@ mod tests {
375394 . expect_load_table ( )
376395 . returning_st ( |_| Box :: pin ( async move { Ok ( make_v2_table ( ) ) } ) ) ;
377396
397+ let attempts = AtomicU32 :: new ( 0 ) ;
378398 mock_catalog
379399 . expect_update_table ( )
380400 . times ( expected_calls)
381401 . returning_st ( move |_| {
382- if let Some ( attempts) = success_after_attempts {
383- static mut ATTEMPTS : u32 = 0 ;
384- unsafe {
385- ATTEMPTS += 1 ;
386- if ATTEMPTS <= attempts {
387- Box :: pin ( async move {
388- Err ( Error :: new (
389- ErrorKind :: CatalogCommitConflicts ,
390- "Commit conflict" ,
391- )
392- . with_retryable ( true ) )
393- } )
394- } else {
395- Box :: pin ( async move { Ok ( make_v2_table ( ) ) } )
396- }
402+ if let Some ( success_after_attempts) = success_after_attempts {
403+ attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
404+ if attempts. load ( Ordering :: SeqCst ) <= success_after_attempts {
405+ Box :: pin ( async move {
406+ Err (
407+ Error :: new ( ErrorKind :: CatalogCommitConflicts , "Commit conflict" )
408+ . with_retryable ( true ) ,
409+ )
410+ } )
411+ } else {
412+ Box :: pin ( async move { Ok ( make_v2_table ( ) ) } )
397413 }
398414 } else {
399415 // Always fail with retryable error
0 commit comments