7676use diesel:: backend:: Backend ;
7777use diesel:: connection:: { CacheSize , Instrumentation } ;
7878use diesel:: query_builder:: { AsQuery , QueryFragment , QueryId } ;
79- use diesel:: result:: Error ;
8079use diesel:: row:: Row ;
8180use diesel:: { ConnectionResult , QueryResult } ;
82- use futures_util:: { Future , Stream } ;
81+ use futures_util:: future:: BoxFuture ;
82+ use futures_util:: { Future , FutureExt , Stream } ;
8383use std:: fmt:: Debug ;
8484
8585pub use scoped_futures;
@@ -115,21 +115,19 @@ pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
115115/// Perform simple operations on a backend.
116116///
117117/// You should likely use [`AsyncConnection`] instead.
118- #[ async_trait:: async_trait]
119118pub trait SimpleAsyncConnection {
120119 /// Execute multiple SQL statements within the same string.
121120 ///
122121 /// This function is used to execute migrations,
123122 /// which may contain more than one SQL statement.
124- async fn batch_execute ( & mut self , query : & str ) -> QueryResult < ( ) > ;
123+ fn batch_execute ( & mut self , query : & str ) -> impl Future < Output = QueryResult < ( ) > > + Send ;
125124}
126125
127126/// An async connection to a database
128127///
129128/// This trait represents a n async database connection. It can be used to query the database through
130129/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
131130/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
132- #[ async_trait:: async_trait]
133131pub trait AsyncConnection : SimpleAsyncConnection + Sized + Send {
134132 /// The future returned by `AsyncConnection::execute`
135133 type ExecuteFuture < ' conn , ' query > : Future < Output = QueryResult < usize > > + Send ;
@@ -151,7 +149,7 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
151149 /// The argument to this method and the method's behavior varies by backend.
152150 /// See the documentation for that backend's connection class
153151 /// for details about what it accepts and how it behaves.
154- async fn establish ( database_url : & str ) -> ConnectionResult < Self > ;
152+ fn establish ( database_url : & str ) -> impl Future < Output = ConnectionResult < Self > > + Send ;
155153
156154 /// Executes the given function inside of a database transaction
157155 ///
@@ -230,34 +228,44 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
230228 /// # Ok(())
231229 /// # }
232230 /// ```
233- async fn transaction < ' a , R , E , F > ( & mut self , callback : F ) -> Result < R , E >
231+ fn transaction < ' a , ' conn , R , E , F > (
232+ & ' conn mut self ,
233+ callback : F ,
234+ ) -> BoxFuture < ' conn , Result < R , E > >
235+ // we cannot use `impl Trait` here due to bugs in rustc
236+ // https://github.com/rust-lang/rust/issues/100013
237+ //impl Future<Output = Result<R, E>> + Send + 'async_trait
234238 where
235239 F : for < ' r > FnOnce ( & ' r mut Self ) -> ScopedBoxFuture < ' a , ' r , Result < R , E > > + Send + ' a ,
236240 E : From < diesel:: result:: Error > + Send + ' a ,
237241 R : Send + ' a ,
242+ ' a : ' conn ,
238243 {
239- Self :: TransactionManager :: transaction ( self , callback) . await
244+ Self :: TransactionManager :: transaction ( self , callback) . boxed ( )
240245 }
241246
242247 /// Creates a transaction that will never be committed. This is useful for
243248 /// tests. Panics if called while inside of a transaction or
244249 /// if called with a connection containing a broken transaction
245- async fn begin_test_transaction ( & mut self ) -> QueryResult < ( ) > {
250+ fn begin_test_transaction ( & mut self ) -> impl Future < Output = QueryResult < ( ) > > + Send {
246251 use diesel:: connection:: TransactionManagerStatus ;
247252
248- match Self :: TransactionManager :: transaction_manager_status_mut ( self ) {
249- TransactionManagerStatus :: Valid ( valid_status) => {
250- assert_eq ! ( None , valid_status. transaction_depth( ) )
251- }
252- TransactionManagerStatus :: InError => panic ! ( "Transaction manager in error" ) ,
253- } ;
254- Self :: TransactionManager :: begin_transaction ( self ) . await ?;
255- // set the test transaction flag
256- // to prevent that this connection gets dropped in connection pools
257- // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
258- // to prevent modifications to the schema
259- Self :: TransactionManager :: transaction_manager_status_mut ( self ) . set_test_transaction_flag ( ) ;
260- Ok ( ( ) )
253+ async {
254+ match Self :: TransactionManager :: transaction_manager_status_mut ( self ) {
255+ TransactionManagerStatus :: Valid ( valid_status) => {
256+ assert_eq ! ( None , valid_status. transaction_depth( ) )
257+ }
258+ TransactionManagerStatus :: InError => panic ! ( "Transaction manager in error" ) ,
259+ } ;
260+ Self :: TransactionManager :: begin_transaction ( self ) . await ?;
261+ // set the test transaction flag
262+ // to prevent that this connection gets dropped in connection pools
263+ // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
264+ // to prevent modifications to the schema
265+ Self :: TransactionManager :: transaction_manager_status_mut ( self )
266+ . set_test_transaction_flag ( ) ;
267+ Ok ( ( ) )
268+ }
261269 }
262270
263271 /// Executes the given function inside a transaction, but does not commit
@@ -297,27 +305,33 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
297305 /// # Ok(())
298306 /// # }
299307 /// ```
300- async fn test_transaction < ' a , R , E , F > ( & ' a mut self , f : F ) -> R
308+ fn test_transaction < ' conn , ' a , R , E , F > (
309+ & ' conn mut self ,
310+ f : F ,
311+ ) -> impl Future < Output = R > + Send + ' conn
301312 where
302313 F : for < ' r > FnOnce ( & ' r mut Self ) -> ScopedBoxFuture < ' a , ' r , Result < R , E > > + Send + ' a ,
303314 E : Debug + Send + ' a ,
304315 R : Send + ' a ,
305- Self : ' a ,
316+ ' a : ' conn ,
306317 {
307318 use futures_util:: TryFutureExt ;
308-
309- let mut user_result = None ;
310- let _ = self
311- . transaction :: < R , _ , _ > ( |c| {
312- f ( c) . map_err ( |_| Error :: RollbackTransaction )
313- . and_then ( |r| {
314- user_result = Some ( r) ;
315- futures_util:: future:: ready ( Err ( Error :: RollbackTransaction ) )
316- } )
317- . scope_boxed ( )
318- } )
319- . await ;
320- user_result. expect ( "Transaction did not succeed" )
319+ let ( user_result_tx, user_result_rx) = std:: sync:: mpsc:: channel ( ) ;
320+ self . transaction :: < R , _ , _ > ( move |conn| {
321+ f ( conn)
322+ . map_err ( |_| diesel:: result:: Error :: RollbackTransaction )
323+ . and_then ( move |r| {
324+ let _ = user_result_tx. send ( r) ;
325+ futures_util:: future:: ready ( Err ( diesel:: result:: Error :: RollbackTransaction ) )
326+ } )
327+ . scope_boxed ( )
328+ } )
329+ . then ( move |_r| {
330+ let r = user_result_rx
331+ . try_recv ( )
332+ . expect ( "Transaction did not succeed" ) ;
333+ futures_util:: future:: ready ( r)
334+ } )
321335 }
322336
323337 #[ doc( hidden) ]
0 commit comments