1- use std:: sync:: { Arc , Mutex } ;
1+ use std:: {
2+ sync:: { Arc , Mutex } ,
3+ time:: Duration ,
4+ } ;
25
36use anyhow:: { Context , Result } ;
47use deadpool_postgres:: { Config , ManagerConfig , Pool , PoolConfig , RecyclingMethod , Runtime } ;
8+ use tokio:: task:: JoinHandle ;
59use tokio_postgres:: NoTls ;
610
711use crate :: {
@@ -14,9 +18,13 @@ use crate::{
1418
1519use super :: transaction:: PostgresTransactionDriver ;
1620
21+ const TXN_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
22+ const GC_INTERVAL : Duration = Duration :: from_secs ( 5 ) ;
23+
1724pub struct PostgresDatabaseDriver {
1825 pool : Arc < Pool > ,
1926 max_retries : Arc < Mutex < i32 > > ,
27+ gc_handle : JoinHandle < ( ) > ,
2028}
2129
2230impl PostgresDatabaseDriver {
@@ -53,7 +61,7 @@ impl PostgresDatabaseDriver {
5361 . context ( "failed to create btree_gist extension" ) ?;
5462
5563 conn. execute (
56- "CREATE SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1" ,
64+ "CREATE UNLOGGED SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1" ,
5765 & [ ] ,
5866 )
5967 . await
@@ -123,12 +131,39 @@ impl PostgresDatabaseDriver {
123131 . await
124132 . context ( "failed to create conflict_ranges table" ) ?;
125133
126- // Connection is automatically returned to the pool when dropped
127- drop ( conn) ;
134+ // Create index on ts column for efficient garbage collection
135+ conn. execute (
136+ "CREATE INDEX IF NOT EXISTS idx_conflict_ranges_ts ON conflict_ranges (ts)" ,
137+ & [ ] ,
138+ )
139+ . await
140+ . context ( "failed to create index on conflict_ranges ts column" ) ?;
141+
142+ let gc_handle = tokio:: spawn ( async move {
143+ let mut interval = tokio:: time:: interval ( GC_INTERVAL ) ;
144+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
145+
146+ loop {
147+ interval. tick ( ) . await ;
148+
149+ // NOTE: Transactions have a max limit of 5 seconds, we delete after 10 seconds for extra padding
150+ // Delete old conflict ranges
151+ if let Err ( err) = conn
152+ . execute (
153+ "DELETE FROM conflict_ranges where ts < now() - interval '10 seconds'" ,
154+ & [ ] ,
155+ )
156+ . await
157+ {
158+ tracing:: error!( ?err, "failed postgres gc task" ) ;
159+ }
160+ }
161+ } ) ;
128162
129163 Ok ( PostgresDatabaseDriver {
130164 pool : Arc :: new ( pool) ,
131165 max_retries : Arc :: new ( Mutex :: new ( 100 ) ) ,
166+ gc_handle,
132167 } )
133168 }
134169}
@@ -155,13 +190,15 @@ impl DatabaseDriver for PostgresDatabaseDriver {
155190 retryable. maybe_committed = maybe_committed;
156191
157192 // Execute transaction
158- let error = match closure ( retryable. clone ( ) ) . await {
159- Ok ( res) => match retryable. inner . driver . commit_ref ( ) . await {
160- Ok ( _) => return Ok ( res) ,
161- Err ( e) => e,
162- } ,
163- Err ( e) => e,
164- } ;
193+ let error =
194+ match tokio:: time:: timeout ( TXN_TIMEOUT , closure ( retryable. clone ( ) ) ) . await {
195+ Ok ( Ok ( res) ) => match retryable. inner . driver . commit_ref ( ) . await {
196+ Ok ( _) => return Ok ( res) ,
197+ Err ( e) => e,
198+ } ,
199+ Ok ( Err ( e) ) => e,
200+ Err ( e) => anyhow:: Error :: from ( DatabaseError :: TransactionTooOld ) ,
201+ } ;
165202
166203 let chain = error
167204 . chain ( )
@@ -196,3 +233,9 @@ impl DatabaseDriver for PostgresDatabaseDriver {
196233 }
197234 }
198235}
236+
237+ impl Drop for PostgresDatabaseDriver {
238+ fn drop ( & mut self ) {
239+ self . gc_handle . abort ( ) ;
240+ }
241+ }
0 commit comments