From edace1bd9be95187ed69e50d1729ec110c41fdc2 Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Wed, 17 Sep 2025 13:11:43 +0200 Subject: [PATCH] Add support for running migrations via `diesel_migrations` This commit provides an `AsyncMigrationHarness` which enables running migrations via `diesel_migrations` with any `AsyncConnection` This commit also provides some additional documenation and other minor fixes. --- .github/workflows/ci.yml | 36 ++-- CHANGELOG.md | 2 + Cargo.toml | 9 +- .../Cargo.toml | 2 +- .../src/main.rs | 15 +- src/async_connection_wrapper.rs | 2 +- src/deref_connection.rs | 150 +++++++++++++++++ src/lib.rs | 38 ++++- src/migrations.rs | 159 ++++++++++++++++++ src/pg/mod.rs | 14 +- src/pooled_connection/mod.rs | 147 +--------------- src/sync_connection_wrapper/mod.rs | 2 +- tests/instrumentation.rs | 24 +-- tests/lib.rs | 4 +- tests/migrations.rs | 31 ++++ tests/transactions.rs | 10 ++ 16 files changed, 441 insertions(+), 204 deletions(-) create mode 100644 src/deref_connection.rs create mode 100644 src/migrations.rs create mode 100644 tests/migrations.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3ce4467..3145cc4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,9 @@ on: push: branches: - main + - 0.7.x + - 0.6.x + - 0.5.x - 0.3.x - 0.4.x - 0.2.x @@ -25,7 +28,13 @@ jobs: rust: ["stable"] backend: ["postgres", "mysql", "sqlite"] os: - [ubuntu-latest, macos-13, macos-15, windows-latest, ubuntu-22.04-arm] + [ + ubuntu-latest, + macos-15-intel, + macos-15, + windows-latest, + ubuntu-22.04-arm, + ] include: - rust: "beta" backend: "postgres" @@ -100,22 +109,13 @@ jobs: run: | sudo apt-get update sudo apt-get install libsqlite3-dev - echo "DATABASE_URL=/tmp/test.db" >> $GITHUB_ENV + echo "DATABASE_URL=:memory:" >> $GITHUB_ENV - name: Install postgres (MacOS) - if: matrix.os == 'macos-13' && matrix.backend == 'postgres' + if: runner.os == 'macOS' && matrix.backend == 'postgres' run: | brew install postgresql@14 - brew services start postgresql@14 - sleep 3 - createuser -s postgres - echo "DATABASE_URL=postgres://postgres@localhost/" >> $GITHUB_ENV - - - name: Install postgres (MacOS M1) - if: matrix.os == 'macos-15' && matrix.backend == 'postgres' - run: | - brew install postgresql@14 - brew services start postgresql@14 + brew services restart postgresql@14 sleep 3 createuser -s postgres echo "DATABASE_URL=postgres://postgres@localhost/" >> $GITHUB_ENV @@ -124,10 +124,10 @@ jobs: if: runner.os == 'macOS' && matrix.backend == 'sqlite' run: | brew install sqlite - echo "DATABASE_URL=/tmp/test.db" >> $GITHUB_ENV + echo "DATABASE_URL=:memory:" >> $GITHUB_ENV - - name: Install mysql (MacOS) - if: matrix.os == 'macos-13' && matrix.backend == 'mysql' + - name: Install mysql (MacOS Intel) + if: matrix.os == 'macos-15-intel' && matrix.backend == 'mysql' run: | brew install mariadb@11.4 /usr/local/opt/mariadb@11.4/bin/mysql_install_db @@ -184,7 +184,7 @@ jobs: run: | echo "C:\ProgramData\chocolatey\lib\SQLite\tools" >> $GITHUB_PATH echo "SQLITE3_LIB_DIR=C:\ProgramData\chocolatey\lib\SQLite\tools" >> $GITHUB_ENV - echo "DATABASE_URL=C:\test.db" >> $GITHUB_ENV + echo "DATABASE_URL=:memory:" >> $GITHUB_ENV - name: Install rust toolchain uses: dtolnay/rust-toolchain@master @@ -194,7 +194,7 @@ jobs: run: cargo +${{ matrix.rust }} version - name: Test diesel_async - run: cargo +${{ matrix.rust }} test --manifest-path Cargo.toml --no-default-features --features "${{ matrix.backend }} deadpool bb8 mobc async-connection-wrapper" + run: cargo +${{ matrix.rust }} test --manifest-path Cargo.toml --no-default-features --features "${{ matrix.backend }} deadpool bb8 mobc async-connection-wrapper migrations" - name: Run examples (Postgres) if: matrix.backend == 'postgres' diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b08a08..ec3986e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/ ## [Unreleased] +* Added support for running migrations via `AsyncMigrationHarness` + ## [0.6.1] - 2025-07-03 * Fix features for some dependencies diff --git a/Cargo.toml b/Cargo.toml index ed2fccf..e79f77f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,10 @@ features = [ "i-implement-a-third-party-backend-and-opt-into-breaking-changes", ] +[dependencies.diesel_migrations] +version = "~2.3.0" +optional = true + [dev-dependencies] tokio = { version = "1.12.0", features = ["rt", "macros", "rt-multi-thread"] } cfg-if = "1" @@ -73,6 +77,7 @@ postgres = ["diesel/postgres_backend", "tokio-postgres", "tokio", "tokio/rt"] sqlite = ["diesel/sqlite", "sync-connection-wrapper"] sync-connection-wrapper = ["tokio/rt"] async-connection-wrapper = ["tokio/net", "tokio/rt"] +migrations = ["diesel_migrations", "async-connection-wrapper", "tokio/rt-multi-thread"] pool = [] r2d2 = ["pool", "diesel/r2d2"] bb8 = ["pool", "dep:bb8"] @@ -95,10 +100,12 @@ features = [ "async-connection-wrapper", "sync-connection-wrapper", "r2d2", + "migrations", + "tokio/macros", ] no-default-features = true rustc-args = ["--cfg", "docsrs"] -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = ["--cfg", "docsrs", "-Z", "unstable-options", "--generate-link-to-definition"] [workspace] members = [ diff --git a/examples/postgres/run-pending-migrations-with-rustls/Cargo.toml b/examples/postgres/run-pending-migrations-with-rustls/Cargo.toml index 93e0f2d..fde3837 100644 --- a/examples/postgres/run-pending-migrations-with-rustls/Cargo.toml +++ b/examples/postgres/run-pending-migrations-with-rustls/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -diesel-async = { version = "0.6.0", path = "../../../", features = ["bb8", "postgres", "async-connection-wrapper"] } +diesel-async = { version = "0.6.0", path = "../../../", features = ["bb8", "postgres", "migrations"] } futures-util = "0.3.21" rustls = "0.23.8" rustls-platform-verifier = "0.5.0" diff --git a/examples/postgres/run-pending-migrations-with-rustls/src/main.rs b/examples/postgres/run-pending-migrations-with-rustls/src/main.rs index 6c0781c..624fbd9 100644 --- a/examples/postgres/run-pending-migrations-with-rustls/src/main.rs +++ b/examples/postgres/run-pending-migrations-with-rustls/src/main.rs @@ -1,6 +1,5 @@ use diesel::{ConnectionError, ConnectionResult}; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::AsyncPgConnection; +use diesel_async::{AsyncMigrationHarness, AsyncPgConnection}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::future::BoxFuture; use futures_util::FutureExt; @@ -10,19 +9,15 @@ use rustls_platform_verifier::ConfigVerifierExt; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { // Should be in the form of postgres://user:password@localhost/database?sslmode=require let db_url = std::env::var("DATABASE_URL").expect("Env var `DATABASE_URL` not set"); let async_connection = establish_connection(db_url.as_str()).await?; - let mut async_wrapper: AsyncConnectionWrapper = - AsyncConnectionWrapper::from(async_connection); - - tokio::task::spawn_blocking(move || { - async_wrapper.run_pending_migrations(MIGRATIONS).unwrap(); - }) - .await?; + let mut harness = AsyncMigrationHarness::new(async_connection); + harness.run_pending_migrations(MIGRATIONS)?; + let _async_connection = harness.into_inner(); Ok(()) } diff --git a/src/async_connection_wrapper.rs b/src/async_connection_wrapper.rs index 4e11078..cec3828 100644 --- a/src/async_connection_wrapper.rs +++ b/src/async_connection_wrapper.rs @@ -99,7 +99,7 @@ pub type AsyncConnectionWrapper = #[cfg(not(feature = "tokio"))] pub use self::implementation::AsyncConnectionWrapper; -mod implementation { +pub(crate) mod implementation { use diesel::connection::{CacheSize, Instrumentation, SimpleConnection}; use std::ops::{Deref, DerefMut}; diff --git a/src/deref_connection.rs b/src/deref_connection.rs new file mode 100644 index 0000000..eaba348 --- /dev/null +++ b/src/deref_connection.rs @@ -0,0 +1,150 @@ +use crate::UpdateAndFetchResults; +use crate::{AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection, TransactionManager}; +use diesel::associations::HasTable; +use diesel::connection::CacheSize; +use diesel::connection::Instrumentation; +use diesel::QueryResult; +use futures_util::future::BoxFuture; +use std::ops::DerefMut; + +impl SimpleAsyncConnection for C +where + C: DerefMut + Send, + C::Target: SimpleAsyncConnection + Send, +{ + async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> { + let conn = self.deref_mut(); + conn.batch_execute(query).await + } +} + +impl AsyncConnectionCore for C +where + C: DerefMut + Send, + C::Target: AsyncConnectionCore, +{ + type ExecuteFuture<'conn, 'query> = + ::ExecuteFuture<'conn, 'query>; + type LoadFuture<'conn, 'query> = ::LoadFuture<'conn, 'query>; + type Stream<'conn, 'query> = ::Stream<'conn, 'query>; + type Row<'conn, 'query> = ::Row<'conn, 'query>; + + type Backend = ::Backend; + + fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> + where + T: diesel::query_builder::AsQuery + 'query, + T::Query: diesel::query_builder::QueryFragment + + diesel::query_builder::QueryId + + 'query, + { + let conn = self.deref_mut(); + conn.load(source) + } + + fn execute_returning_count<'conn, 'query, T>( + &'conn mut self, + source: T, + ) -> Self::ExecuteFuture<'conn, 'query> + where + T: diesel::query_builder::QueryFragment + + diesel::query_builder::QueryId + + 'query, + { + let conn = self.deref_mut(); + conn.execute_returning_count(source) + } +} + +#[diagnostic::do_not_recommend] +impl AsyncConnection for C +where + C: DerefMut + Send, + C::Target: AsyncConnection, +{ + type TransactionManager = + PoolTransactionManager<::TransactionManager>; + + async fn establish(_database_url: &str) -> diesel::ConnectionResult { + Err(diesel::result::ConnectionError::BadConnection( + String::from("Cannot directly establish a pooled connection"), + )) + } + + fn transaction_state( + &mut self, + ) -> &mut >::TransactionStateData{ + let conn = self.deref_mut(); + conn.transaction_state() + } + + async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> { + self.deref_mut().begin_test_transaction().await + } + + fn instrumentation(&mut self) -> &mut dyn Instrumentation { + self.deref_mut().instrumentation() + } + + fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { + self.deref_mut().set_instrumentation(instrumentation); + } + + fn set_prepared_statement_cache_size(&mut self, size: CacheSize) { + self.deref_mut().set_prepared_statement_cache_size(size); + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct PoolTransactionManager(std::marker::PhantomData); + +impl TransactionManager for PoolTransactionManager +where + C: DerefMut + Send, + C::Target: AsyncConnection, + TM: TransactionManager, +{ + type TransactionStateData = TM::TransactionStateData; + + async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> { + TM::begin_transaction(&mut **conn).await + } + + async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> { + TM::rollback_transaction(&mut **conn).await + } + + async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> { + TM::commit_transaction(&mut **conn).await + } + + fn transaction_manager_status_mut( + conn: &mut C, + ) -> &mut diesel::connection::TransactionManagerStatus { + TM::transaction_manager_status_mut(&mut **conn) + } + + fn is_broken_transaction_manager(conn: &mut C) -> bool { + TM::is_broken_transaction_manager(&mut **conn) + } +} + +impl UpdateAndFetchResults for Conn +where + Conn: DerefMut + Send, + Changes: diesel::prelude::Identifiable + HasTable + Send, + Conn::Target: UpdateAndFetchResults, +{ + fn update_and_fetch<'conn, 'changes>( + &'conn mut self, + changeset: Changes, + ) -> BoxFuture<'changes, QueryResult> + where + Changes: 'changes, + 'conn: 'changes, + Self: 'changes, + { + self.deref_mut().update_and_fetch(changeset) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8102312..943e19f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,12 +5,12 @@ //! with the main diesel crate. It only provides async variants of core diesel traits, //! that perform actual io-work. //! This includes async counterparts the following traits: -//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.0.x/diesel/prelude/trait.RunQueryDsl.html) -//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl) -//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html) -//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection) -//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.0.x/diesel/query_dsl/trait.UpdateAndFetchResults.html) -//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults) +//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.3.x/diesel/prelude/trait.RunQueryDsl.html) +//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl) +//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.3.x/diesel/connection/trait.Connection.html) +//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection) +//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.3.x/diesel/query_dsl/trait.UpdateAndFetchResults.html) +//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults) //! //! These traits closely mirror their diesel counter parts while providing async functionality. //! @@ -65,6 +65,26 @@ //! # Ok(()) //! # } //! ``` +//! +//! ## Crate features: +//! +//! * `postgres`: Enables the [`AsyncPgConnection`] implementation +//! * `mysql`: Enables the [`AsyncMysqlConnection`] implementation +//! * `sqlite`: Enables the [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper) +//! and everything required to work with SQLite +//! * `sync-connection-wrapper`: Enables the +//! [`SyncConnectionWrapper`](crate::sync_connection_wrapper::SyncConnectionWrapper) which allows to +//! wrap sync connections from [`diesel`] into async connection wrapper +//! * `async-connection-wrapper`: Enables the [`AsyncConnectionWrapper`](crate::async_connection_wrapper::AsyncConnectionWrapper) +//! which allows +//! to use connection implementations from this crate as sync [`diesel::Connection`] +//! * `migrations`: Enables the [`AsyncMigrationHarness`] to execute migrations via +//! [`diesel_migrations`] +//! * `pool`: Enables general support for connection pools +//! * `r2d2`: Enables support for pooling via the [`r2d2`] crate +//! * `bb8`: Enables support for pooling via the [`bb8`] crate +//! * `mobc`: Enables support for pooling via the [`mobc`] crate +//! * `deadpool`: Enables support for pooling via the [`deadpool`] crate #![warn( missing_docs, @@ -89,6 +109,9 @@ use scoped_futures::{ScopedBoxFuture, ScopedFutureExt}; #[cfg(feature = "async-connection-wrapper")] pub mod async_connection_wrapper; +mod deref_connection; +#[cfg(feature = "migrations")] +mod migrations; #[cfg(feature = "mysql")] mod mysql; #[cfg(feature = "postgres")] @@ -111,6 +134,9 @@ pub use self::pg::AsyncPgConnection; #[doc(inline)] pub use self::run_query_dsl::*; +#[doc(inline)] +#[cfg(feature = "migrations")] +pub use self::migrations::AsyncMigrationHarness; #[doc(inline)] pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager}; diff --git a/src/migrations.rs b/src/migrations.rs new file mode 100644 index 0000000..0afa7f9 --- /dev/null +++ b/src/migrations.rs @@ -0,0 +1,159 @@ +use diesel::migration::{Migration, MigrationVersion, Result}; + +use crate::async_connection_wrapper::AsyncConnectionWrapper; +use crate::AsyncConnection; + +/// A diesel-migration [`MigrationHarness`](diesel_migrations::MigrationHarness) to run migrations +/// via an [`AsyncConnection`](crate::AsyncConnection) +/// +/// Internally this harness is using [`tokio::task::block_in_place`] and [`AsyncConnectionWrapper`] +/// to utilize sync Diesel's migration infrastructure. For most applications this shouldn't +/// be problematic as migrations are usually run at application startup and most applications +/// default to use the multithreaded tokio runtime. In turn this also means that you cannot use +/// this migration harness if you use the current thread variant of the tokio runtime or if +/// you run migrations in a very special setup (e.g by using [`tokio::select!`] or [`tokio::join!`] +/// on a future produced by running the migrations). Consider manually construct a blocking task via +/// [`tokio::task::spawn_blocking`] instead. +/// +/// ## Example +/// +/// ```no_run +/// # include!("doctest_setup.rs"); +/// # async fn run_test() -> Result<(), Box>{ +/// use diesel_async::AsyncMigrationHarness; +/// use diesel_migrations::{FileBasedMigrations, MigrationHarness}; +/// +/// let mut connection = connection_no_data().await; +/// +/// // Alternativly use `diesel_migrations::embed_migrations!()` +/// // to get a list of migrations +/// let migrations = FileBasedMigrations::find_migrations_directory()?; +/// +/// let mut harness = AsyncMigrationHarness::new(connection); +/// harness.run_pending_migrations(migrations)?; +/// // get back the connection from the harness +/// let connection = harness.into_inner(); +/// # Ok(()) +/// # } +/// # #[tokio::main] +/// # async fn main() -> Result<(), Box> { +/// # run_test().await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// ## Example with pool +/// +/// ```no_run +/// # include!("doctest_setup.rs"); +/// # #[cfg(feature = "deadpool")] +/// # use diesel_async::pooled_connection::AsyncDieselConnectionManager; +/// # +/// # #[cfg(all(feature = "postgres", feature = "deadpool"))] +/// # fn get_config() -> AsyncDieselConnectionManager { +/// # let db_url = database_url_from_env("PG_DATABASE_URL"); +/// let config = AsyncDieselConnectionManager::::new(db_url); +/// # config +/// # } +/// # +/// # #[cfg(all(feature = "mysql", feature = "deadpool"))] +/// # fn get_config() -> AsyncDieselConnectionManager { +/// # let db_url = database_url_from_env("MYSQL_DATABASE_URL"); +/// # let config = AsyncDieselConnectionManager::::new(db_url); +/// # config +/// # } +/// # +/// # #[cfg(all(feature = "sqlite", feature = "deadpool"))] +/// # fn get_config() -> AsyncDieselConnectionManager> { +/// # let db_url = database_url_from_env("SQLITE_DATABASE_URL"); +/// # let config = AsyncDieselConnectionManager::>::new(db_url); +/// # config +/// # } +/// # #[cfg(feature = "deadpool")] +/// # async fn run_test() -> Result<(), Box>{ +/// use diesel_async::pooled_connection::deadpool::Pool; +/// use diesel_async::AsyncMigrationHarness; +/// use diesel_migrations::{FileBasedMigrations, MigrationHarness}; +/// +/// // Alternativly use `diesel_migrations::embed_migrations!()` +/// // to get a list of migrations +/// let migrations = FileBasedMigrations::find_migrations_directory()?; +/// +/// let pool = Pool::builder(get_config()).build()?; +/// let mut harness = AsyncMigrationHarness::new(pool.get().await?); +/// harness.run_pending_migrations(migrations)?; +/// # Ok(()) +/// # } +/// +/// # #[cfg(not(feature = "deadpool"))] +/// # async fn run_test() -> Result<(), Box> +/// # { +/// # Ok(()) +/// # } +/// # +/// # #[tokio::main] +/// # async fn main() -> Result<(), Box> { +/// # run_test().await?; +/// # Ok(()) +/// # } +/// ``` +pub struct AsyncMigrationHarness { + conn: AsyncConnectionWrapper, +} + +impl AsyncMigrationHarness +where + C: AsyncConnection, +{ + /// Construct a new `AsyncMigrationHarness` from a given connection + pub fn new(connection: C) -> Self { + Self { + conn: AsyncConnectionWrapper::from(connection), + } + } + + /// Return the connection stored inside this instance of `AsyncMigrationHarness` + pub fn into_inner(self) -> C { + self.conn.into_inner() + } +} + +impl From for AsyncMigrationHarness +where + C: AsyncConnection, +{ + fn from(value: C) -> Self { + AsyncMigrationHarness::new(value) + } +} + +impl diesel_migrations::MigrationHarness for AsyncMigrationHarness +where + C: AsyncConnection, + AsyncConnectionWrapper: + diesel::Connection + diesel_migrations::MigrationHarness, +{ + fn run_migration( + &mut self, + migration: &dyn Migration, + ) -> Result> { + tokio::task::block_in_place(|| { + diesel_migrations::MigrationHarness::run_migration(&mut self.conn, migration) + }) + } + + fn revert_migration( + &mut self, + migration: &dyn Migration, + ) -> Result> { + tokio::task::block_in_place(|| { + diesel_migrations::MigrationHarness::revert_migration(&mut self.conn, migration) + }) + } + + fn applied_migrations(&mut self) -> Result>> { + tokio::task::block_in_place(|| { + diesel_migrations::MigrationHarness::applied_migrations(&mut self.conn) + }) + } +} diff --git a/src/pg/mod.rs b/src/pg/mod.rs index 03e50ec..1c5bb46 100644 --- a/src/pg/mod.rs +++ b/src/pg/mod.rs @@ -1100,7 +1100,6 @@ mod tests { use diesel::sql_types::Integer; use diesel::IntoSql; use futures_util::future::try_join; - use futures_util::try_join; use scoped_futures::ScopedFutureExt; #[tokio::test] @@ -1118,7 +1117,7 @@ mod tests { let f1 = q1.get_result::(&mut conn); let f2 = q2.get_result::(&mut conn); - let (r1, r2) = try_join!(f1, f2).unwrap(); + let (r1, r2) = try_join(f1, f2).await.unwrap(); assert_eq!(r1, 1); assert_eq!(r2, 2); @@ -1137,20 +1136,20 @@ mod tests { let f1 = diesel::select(1_i32.into_sql::()).get_result::(&mut conn); let f2 = diesel::select(2_i32.into_sql::()).get_result::(&mut conn); - try_join!(f1, f2) + try_join(f1, f2).await } async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> { let f3 = diesel::select(3_i32.into_sql::()).get_result::(&mut conn); let f4 = diesel::select(4_i32.into_sql::()).get_result::(&mut conn); - try_join!(f3, f4) + try_join(f3, f4).await } let f12 = fn12(&conn); let f34 = fn34(&conn); - let ((r1, r2), (r3, r4)) = try_join!(f12, f34).unwrap(); + let ((r1, r2), (r3, r4)) = try_join(f12, f34).await.unwrap(); assert_eq!(r1, 1); assert_eq!(r2, 2); @@ -1189,7 +1188,7 @@ mod tests { let f5 = diesel::select(5_i32.into_sql::()).get_result::(&mut conn); let f6 = diesel::select(6_i32.into_sql::()).get_result::(&mut conn); - try_join!(f5.boxed(), f6.boxed()) + try_join(f5.boxed(), f6.boxed()).await } conn.transaction(|conn| { @@ -1198,7 +1197,8 @@ mod tests { let f34 = fn34(conn); let f56 = fn56(conn); - let ((r1, r2), (r3, r4), (r5, r6)) = try_join!(f12, f34, f56).unwrap(); + let ((r1, r2), ((r3, r4), (r5, r6))) = + try_join(f12, try_join(f34, f56)).await.unwrap(); assert_eq!(r1, 1); assert_eq!(r2, 2); diff --git a/src/pooled_connection/mod.rs b/src/pooled_connection/mod.rs index a3c30ab..cdc4b03 100644 --- a/src/pooled_connection/mod.rs +++ b/src/pooled_connection/mod.rs @@ -5,17 +5,13 @@ //! * [deadpool](self::deadpool) //! * [bb8](self::bb8) //! * [mobc](self::mobc) -use crate::{AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection}; -use crate::{TransactionManager, UpdateAndFetchResults}; -use diesel::associations::HasTable; -use diesel::connection::{CacheSize, Instrumentation}; +use crate::{AsyncConnection, TransactionManager}; use diesel::QueryResult; use futures_core::future::BoxFuture; use futures_util::FutureExt; use std::borrow::Cow; use std::fmt; use std::future::Future; -use std::ops::DerefMut; #[cfg(feature = "bb8")] pub mod bb8; @@ -165,147 +161,6 @@ where } } -impl SimpleAsyncConnection for C -where - C: DerefMut + Send, - C::Target: SimpleAsyncConnection + Send, -{ - async fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> { - let conn = self.deref_mut(); - conn.batch_execute(query).await - } -} - -impl AsyncConnectionCore for C -where - C: DerefMut + Send, - C::Target: AsyncConnectionCore, -{ - type ExecuteFuture<'conn, 'query> = - ::ExecuteFuture<'conn, 'query>; - type LoadFuture<'conn, 'query> = ::LoadFuture<'conn, 'query>; - type Stream<'conn, 'query> = ::Stream<'conn, 'query>; - type Row<'conn, 'query> = ::Row<'conn, 'query>; - - type Backend = ::Backend; - - fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query> - where - T: diesel::query_builder::AsQuery + 'query, - T::Query: diesel::query_builder::QueryFragment - + diesel::query_builder::QueryId - + 'query, - { - let conn = self.deref_mut(); - conn.load(source) - } - - fn execute_returning_count<'conn, 'query, T>( - &'conn mut self, - source: T, - ) -> Self::ExecuteFuture<'conn, 'query> - where - T: diesel::query_builder::QueryFragment - + diesel::query_builder::QueryId - + 'query, - { - let conn = self.deref_mut(); - conn.execute_returning_count(source) - } -} - -impl AsyncConnection for C -where - C: DerefMut + Send, - C::Target: AsyncConnection, -{ - type TransactionManager = - PoolTransactionManager<::TransactionManager>; - - async fn establish(_database_url: &str) -> diesel::ConnectionResult { - Err(diesel::result::ConnectionError::BadConnection( - String::from("Cannot directly establish a pooled connection"), - )) - } - - fn transaction_state( - &mut self, - ) -> &mut >::TransactionStateData{ - let conn = self.deref_mut(); - conn.transaction_state() - } - - async fn begin_test_transaction(&mut self) -> diesel::QueryResult<()> { - self.deref_mut().begin_test_transaction().await - } - - fn instrumentation(&mut self) -> &mut dyn Instrumentation { - self.deref_mut().instrumentation() - } - - fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) { - self.deref_mut().set_instrumentation(instrumentation); - } - - fn set_prepared_statement_cache_size(&mut self, size: CacheSize) { - self.deref_mut().set_prepared_statement_cache_size(size); - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct PoolTransactionManager(std::marker::PhantomData); - -impl TransactionManager for PoolTransactionManager -where - C: DerefMut + Send, - C::Target: AsyncConnection, - TM: TransactionManager, -{ - type TransactionStateData = TM::TransactionStateData; - - async fn begin_transaction(conn: &mut C) -> diesel::QueryResult<()> { - TM::begin_transaction(&mut **conn).await - } - - async fn rollback_transaction(conn: &mut C) -> diesel::QueryResult<()> { - TM::rollback_transaction(&mut **conn).await - } - - async fn commit_transaction(conn: &mut C) -> diesel::QueryResult<()> { - TM::commit_transaction(&mut **conn).await - } - - fn transaction_manager_status_mut( - conn: &mut C, - ) -> &mut diesel::connection::TransactionManagerStatus { - TM::transaction_manager_status_mut(&mut **conn) - } - - fn is_broken_transaction_manager(conn: &mut C) -> bool { - TM::is_broken_transaction_manager(&mut **conn) - } -} - -impl UpdateAndFetchResults for Conn -where - Conn: DerefMut + Send, - Changes: diesel::prelude::Identifiable + HasTable + Send, - Conn::Target: UpdateAndFetchResults, -{ - fn update_and_fetch<'conn, 'changes>( - &'conn mut self, - changeset: Changes, - ) -> BoxFuture<'changes, QueryResult> - where - Changes: 'changes, - 'conn: 'changes, - Self: 'changes, - { - self.deref_mut().update_and_fetch(changeset) - } -} - #[doc(hidden)] pub trait PoolableConnection: AsyncConnection { /// Check if a connection is still valid diff --git a/src/sync_connection_wrapper/mod.rs b/src/sync_connection_wrapper/mod.rs index cbb8436..3c7434f 100644 --- a/src/sync_connection_wrapper/mod.rs +++ b/src/sync_connection_wrapper/mod.rs @@ -499,7 +499,7 @@ mod implementation { Tokio::Runtime(runtime) => runtime.spawn_blocking(task), }; - fut.map_err(|err| Box::from(err)).boxed() + fut.map_err(Box::from).boxed() } fn get_runtime() -> Self { diff --git a/tests/instrumentation.rs b/tests/instrumentation.rs index 899189d..bad982c 100644 --- a/tests/instrumentation.rs +++ b/tests/instrumentation.rs @@ -99,7 +99,7 @@ async fn check_events_are_emitted_for_execute_returning_count() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 3, "{:?}", events); + assert_eq!(events.len(), 3, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::CacheQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -112,7 +112,7 @@ async fn check_events_are_emitted_for_load() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 3, "{:?}", events); + assert_eq!(events.len(), 3, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::CacheQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -126,7 +126,7 @@ async fn check_events_are_emitted_for_execute_returning_count_does_not_contain_c .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 2, "{:?}", events); + assert_eq!(events.len(), 2, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::FinishQuery { .. }); } @@ -138,7 +138,7 @@ async fn check_events_are_emitted_for_load_does_not_contain_cache_for_uncached_q .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 2, "{:?}", events); + assert_eq!(events.len(), 2, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::FinishQuery { .. }); } @@ -150,7 +150,7 @@ async fn check_events_are_emitted_for_execute_returning_count_does_contain_error .execute_returning_count(diesel::sql_query("invalid")) .await; let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 2, "{:?}", events); + assert_eq!(events.len(), 2, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); } @@ -160,7 +160,7 @@ async fn check_events_are_emitted_for_load_does_contain_error_for_failures() { let (events_to_check, mut conn) = setup_test_case().await; let _ = AsyncConnectionCore::load(&mut conn, diesel::sql_query("invalid")).await; let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 2, "{:?}", events); + assert_eq!(events.len(), 2, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::FinishQuery { error: Some(_), .. }); } @@ -175,7 +175,7 @@ async fn check_events_are_emitted_for_execute_returning_count_repeat_does_not_re .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 5, "{:?}", events); + assert_eq!(events.len(), 5, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::CacheQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -193,7 +193,7 @@ async fn check_events_are_emitted_for_load_repeat_does_not_repeat_cache() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 5, "{:?}", events); + assert_eq!(events.len(), 5, "{events:?}"); assert_matches!(events[0], Event::StartQuery { .. }); assert_matches!(events[1], Event::CacheQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -208,7 +208,7 @@ async fn check_events_transaction() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 6, "{:?}", events); + assert_eq!(events.len(), 6, "{events:?}"); assert_matches!(events[0], Event::BeginTransaction { .. }); assert_matches!(events[1], Event::StartQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -226,7 +226,7 @@ async fn check_events_transaction_error() { }) .await; let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 6, "{:?}", events); + assert_eq!(events.len(), 6, "{events:?}"); assert_matches!(events[0], Event::BeginTransaction { .. }); assert_matches!(events[1], Event::StartQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -247,7 +247,7 @@ async fn check_events_transaction_nested() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 12, "{:?}", events); + assert_eq!(events.len(), 12, "{events:?}"); assert_matches!(events[0], Event::BeginTransaction { .. }); assert_matches!(events[1], Event::StartQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); @@ -276,7 +276,7 @@ async fn check_events_transaction_builder() { .await .unwrap(); let events = events_to_check.lock().unwrap(); - assert_eq!(events.len(), 6, "{:?}", events); + assert_eq!(events.len(), 6, "{events:?}"); assert_matches!(events[0], Event::BeginTransaction { .. }); assert_matches!(events[1], Event::StartQuery { .. }); assert_matches!(events[2], Event::FinishQuery { .. }); diff --git a/tests/lib.rs b/tests/lib.rs index 5125e28..67b0c27 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -7,6 +7,8 @@ use std::fmt::Debug; #[cfg(feature = "postgres")] mod custom_types; mod instrumentation; +#[cfg(feature = "migrations")] +mod migrations; mod notifications; #[cfg(any(feature = "bb8", feature = "deadpool", feature = "mobc"))] mod pooling; @@ -161,7 +163,7 @@ async fn postgres_cancel_token() { match err { Error::DatabaseError(DatabaseErrorKind::Unknown, v) if v.message() == "canceling statement due to user request" => {} - _ => panic!("unexpected error: {:?}", err), + _ => panic!("unexpected error: {err:?}"), } } diff --git a/tests/migrations.rs b/tests/migrations.rs new file mode 100644 index 0000000..75b6750 --- /dev/null +++ b/tests/migrations.rs @@ -0,0 +1,31 @@ +use diesel_async::AsyncMigrationHarness; +use diesel_migrations::MigrationHarness; + +static SEQUENTIAL: std::sync::Mutex<()> = std::sync::Mutex::new(()); + +// These two tests are mostly smoke tests to verify +// that the `AsyncMigrationHarness` actually implements +// the necessary traits + +#[tokio::test(flavor = "multi_thread")] +async fn plain_connection() { + let _guard = SEQUENTIAL.lock().unwrap(); + let conn = super::connection().await; + let mut harness = AsyncMigrationHarness::from(conn); + harness.applied_migrations().unwrap(); +} + +#[cfg(feature = "deadpool")] +#[tokio::test(flavor = "multi_thread")] +async fn pool_connection() { + use diesel_async::pooled_connection::deadpool::Pool; + use diesel_async::pooled_connection::AsyncDieselConnectionManager; + let _guard = SEQUENTIAL.lock().unwrap(); + + let db_url = std::env::var("DATABASE_URL").unwrap(); + let config = AsyncDieselConnectionManager::::new(db_url); + let pool = Pool::builder(config).build().unwrap(); + let conn = pool.get().await.unwrap(); + let mut harness = AsyncMigrationHarness::from(conn); + harness.applied_migrations().unwrap(); +} diff --git a/tests/transactions.rs b/tests/transactions.rs index 6de782c..6ec46c9 100644 --- a/tests/transactions.rs +++ b/tests/transactions.rs @@ -24,10 +24,13 @@ async fn concurrent_serializable_transactions_behave_correctly() { let barrier_1 = Arc::new(Barrier::new(2)); let barrier_2 = Arc::new(Barrier::new(2)); + let barrier_3 = Arc::new(Barrier::new(2)); let barrier_1_for_tx1 = barrier_1.clone(); let barrier_1_for_tx2 = barrier_1.clone(); let barrier_2_for_tx1 = barrier_2.clone(); let barrier_2_for_tx2 = barrier_2.clone(); + let barrier_3_for_tx1 = barrier_3.clone(); + let barrier_3_for_tx2 = barrier_3.clone(); let mut tx = conn.build_transaction().serializable().read_write(); @@ -40,6 +43,7 @@ async fn concurrent_serializable_transactions_behave_correctly() { .values(users3::id.eq(1)) .execute(conn) .await?; + barrier_3_for_tx1.wait().await; barrier_2_for_tx1.wait().await; Ok::<_, diesel::result::Error>(()) @@ -59,6 +63,7 @@ async fn concurrent_serializable_transactions_behave_correctly() { .values(users3::id.eq(1)) .execute(conn) .await?; + barrier_3_for_tx2.wait().await; Ok::<_, diesel::result::Error>(()) }) @@ -146,10 +151,13 @@ async fn commit_with_serialization_failure_already_ends_transaction() { let barrier_1 = Arc::new(Barrier::new(2)); let barrier_2 = Arc::new(Barrier::new(2)); + let barrier_3 = Arc::new(Barrier::new(2)); let barrier_1_for_tx1 = barrier_1.clone(); let barrier_1_for_tx2 = barrier_1.clone(); let barrier_2_for_tx1 = barrier_2.clone(); let barrier_2_for_tx2 = barrier_2.clone(); + let barrier_3_for_tx1 = barrier_3.clone(); + let barrier_3_for_tx2 = barrier_3.clone(); let mut tx = conn.build_transaction().serializable().read_write(); @@ -162,6 +170,7 @@ async fn commit_with_serialization_failure_already_ends_transaction() { .values(users4::id.eq(1)) .execute(conn) .await?; + barrier_3_for_tx1.wait().await; barrier_2_for_tx1.wait().await; Ok::<_, diesel::result::Error>(()) @@ -181,6 +190,7 @@ async fn commit_with_serialization_failure_already_ends_transaction() { .values(users4::id.eq(1)) .execute(conn) .await?; + barrier_3_for_tx2.wait().await; Ok::<_, diesel::result::Error>(()) })