Skip to content

Commit 2be39ef

Browse files
committed
store: Use an async connection pool
We use an async connnection pool, but only with a synchronous wrapper so that most uses of connections are not affected by the change yet. Over time, we will change code to use AsyncPgConn and make it truly async. Unfortunatley, this change also involves a lot of line noise because of the replacement of diesel's PgConnection with our own type definition that uses a AsyncConnectionWrapper
1 parent 63352af commit 2be39ef

30 files changed

+183
-189
lines changed

core/graphman/src/deployment.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
use anyhow::anyhow;
22
use diesel::dsl::sql;
3-
use diesel::prelude::*;
43
use diesel::sql_types::Text;
4+
use diesel::BoolExpressionMethods;
5+
use diesel::ExpressionMethods;
6+
use diesel::JoinOnDsl;
7+
use diesel::NullableExpressionMethods;
8+
use diesel::PgTextExpressionMethods;
9+
use diesel::QueryDsl;
10+
use diesel::Queryable;
11+
use diesel::RunQueryDsl;
512
use graph::components::store::DeploymentId;
613
use graph::components::store::DeploymentLocator;
714
use graph::data::subgraph::DeploymentHash;
815
use graph_store_postgres::command_support::catalog;
16+
use graph_store_postgres::PgConnection;
917
use itertools::Itertools;
1018

1119
use crate::GraphmanError;

node/src/manager/commands/stats.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,22 @@ use std::sync::Arc;
44

55
use crate::manager::deployment::DeploymentSearch;
66
use crate::manager::fmt;
7-
use diesel::r2d2::ConnectionManager;
8-
use diesel::r2d2::PooledConnection;
9-
use diesel::PgConnection;
107
use graph::components::store::DeploymentLocator;
118
use graph::components::store::VersionStats;
129
use graph::prelude::anyhow;
1310
use graph::prelude::CheapClone as _;
1411
use graph_store_postgres::command_support::catalog as store_catalog;
1512
use graph_store_postgres::command_support::catalog::Site;
1613
use graph_store_postgres::ConnectionPool;
14+
use graph_store_postgres::PgConnection;
1715
use graph_store_postgres::Shard;
1816
use graph_store_postgres::SubgraphStore;
1917
use graph_store_postgres::PRIMARY_SHARD;
2018

2119
async fn site_and_conn(
2220
pools: HashMap<Shard, ConnectionPool>,
2321
search: &DeploymentSearch,
24-
) -> Result<(Arc<Site>, PooledConnection<ConnectionManager<PgConnection>>), anyhow::Error> {
22+
) -> Result<(Arc<Site>, PgConnection), anyhow::Error> {
2523
let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap();
2624
let locator = search.locate_unique(primary_pool).await?;
2725

node/src/manager/commands/txn_speed.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use diesel::PgConnection;
21
use std::{collections::HashMap, thread::sleep, time::Duration};
32

43
use graph::prelude::anyhow;
5-
use graph_store_postgres::ConnectionPool;
4+
use graph_store_postgres::{ConnectionPool, PgConnection};
65

76
use crate::manager::catalog;
87

node/src/manager/deployment.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@ use std::collections::HashSet;
22
use std::fmt;
33
use std::str::FromStr;
44

5-
use diesel::{dsl::sql, prelude::*};
6-
use diesel::{sql_types::Text, PgConnection};
5+
use diesel::dsl::sql;
6+
use diesel::sql_types::Text;
7+
use diesel::{
8+
ExpressionMethods, JoinOnDsl, NullableExpressionMethods, PgTextExpressionMethods, QueryDsl,
9+
RunQueryDsl,
10+
};
711

812
use graph::components::store::DeploymentId;
913
use graph::{
1014
components::store::DeploymentLocator,
1115
prelude::{anyhow, lazy_static, regex::Regex, DeploymentHash},
1216
};
1317
use graph_store_postgres::command_support::catalog as store_catalog;
14-
use graph_store_postgres::unused;
1518
use graph_store_postgres::ConnectionPool;
19+
use graph_store_postgres::{unused, PgConnection};
1620

1721
lazy_static! {
1822
// `Qm...` optionally follow by `:$shard`

store/postgres/src/advisory_lock.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
//! happens to it
1616
1717
use diesel::sql_types::Bool;
18-
use diesel::{sql_query, PgConnection, RunQueryDsl};
18+
use diesel::{sql_query, RunQueryDsl};
1919
use graph::prelude::StoreError;
2020

2121
use crate::command_support::catalog::Site;
22+
use crate::pool::PgConnection;
2223
use crate::primary::DeploymentId;
2324

2425
/// A locking scope for a particular deployment. We use different scopes for

store/postgres/src/block_store.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use std::{
66

77
use anyhow::anyhow;
88
use async_trait::async_trait;
9-
use diesel::{
10-
query_dsl::methods::FilterDsl as _, sql_query, ExpressionMethods as _, PgConnection,
11-
RunQueryDsl,
12-
};
9+
use diesel::{query_dsl::methods::FilterDsl as _, sql_query, ExpressionMethods as _, RunQueryDsl};
1310
use diesel_async::scoped_futures::ScopedFutureExt;
1411
use graph::{
1512
blockchain::ChainIdentifier,
@@ -28,7 +25,7 @@ use graph::{prelude::StoreError, util::timed_cache::TimedCache};
2825
use crate::{
2926
chain_head_listener::ChainHeadUpdateSender,
3027
chain_store::{ChainStoreMetrics, Storage},
31-
pool::ConnectionPool,
28+
pool::{ConnectionPool, PgConnection},
3229
primary::Mirror as PrimaryMirror,
3330
ChainStore, NotificationSender, Shard, PRIMARY_SHARD,
3431
};
@@ -54,16 +51,15 @@ pub mod primary {
5451
use std::convert::TryFrom;
5552

5653
use diesel::{
57-
delete, insert_into, update, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
58-
RunQueryDsl,
54+
delete, insert_into, update, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
5955
};
6056
use graph::{
6157
blockchain::{BlockHash, ChainIdentifier},
6258
internal_error,
6359
prelude::StoreError,
6460
};
6561

66-
use crate::chain_store::Storage;
62+
use crate::{chain_store::Storage, pool::PgConnection};
6763
use crate::{ConnectionPool, Shard};
6864

6965
table! {

store/postgres/src/catalog.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use diesel::sql_query;
12
use diesel::sql_types::{Bool, Integer};
23
use diesel::{connection::SimpleConnection, prelude::RunQueryDsl, select};
34
use diesel::{insert_into, OptionalExtension};
4-
use diesel::{pg::PgConnection, sql_query};
55
use diesel::{
66
sql_types::{Array, BigInt, Double, Nullable, Text},
77
ExpressionMethods, QueryDsl,
@@ -22,6 +22,7 @@ use graph::{
2222
prelude::{lazy_static, StoreError, BLOCK_NUMBER_MAX},
2323
};
2424

25+
use crate::pool::PgConnection;
2526
use crate::{
2627
block_range::BLOCK_RANGE_COLUMN,
2728
pool::ForeignServer,

store/postgres/src/chain_store.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use anyhow::anyhow;
22
use async_trait::async_trait;
3-
use diesel::pg::PgConnection;
4-
use diesel::prelude::*;
5-
use diesel::r2d2::{ConnectionManager, PooledConnection};
63
use diesel::sql_types::Text;
7-
use diesel::{insert_into, update};
4+
use diesel::{insert_into, update, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
85
use diesel_async::scoped_futures::ScopedFutureExt;
96

107
use graph::components::store::ChainHeadStore;
@@ -37,7 +34,7 @@ use graph::prelude::{
3734
use graph::{ensure, internal_error};
3835

3936
use self::recent_blocks_cache::RecentBlocksCache;
40-
use crate::pool::AsyncConnection;
37+
use crate::pool::{AsyncConnection, PgConnection};
4138
use crate::{
4239
block_store::ChainStatus, chain_head_listener::ChainHeadUpdateSender, pool::ConnectionPool,
4340
};
@@ -88,16 +85,18 @@ pub use data::Storage;
8885
/// Encapuslate access to the blocks table for a chain.
8986
mod data {
9087
use crate::diesel::dsl::IntervalDsl;
88+
use diesel::dsl::sql;
9189
use diesel::sql_types::{Array, Binary, Bool, Nullable};
9290
use diesel::{connection::SimpleConnection, insert_into};
93-
use diesel::{delete, prelude::*, sql_query};
91+
use diesel::{
92+
delete, sql_query, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl,
93+
};
9494
use diesel::{
9595
deserialize::FromSql,
9696
pg::Pg,
9797
serialize::{Output, ToSql},
9898
sql_types::Text,
9999
};
100-
use diesel::{dsl::sql, pg::PgConnection};
101100
use diesel::{
102101
sql_types::{BigInt, Bytea, Integer, Jsonb},
103102
update,
@@ -119,6 +118,7 @@ mod data {
119118
use std::iter::FromIterator;
120119
use std::str::FromStr;
121120

121+
use crate::pool::PgConnection;
122122
use crate::transaction_receipt::RawTransactionReceipt;
123123

124124
use super::JsonBlock;
@@ -1948,7 +1948,7 @@ impl ChainStore {
19481948
matches!(self.status, ChainStatus::Ingestible)
19491949
}
19501950

1951-
async fn get_conn(&self) -> Result<PooledConnection<ConnectionManager<PgConnection>>, Error> {
1951+
async fn get_conn(&self) -> Result<PgConnection, Error> {
19521952
self.pool.get_sync().await.map_err(Error::from)
19531953
}
19541954

@@ -3019,7 +3019,7 @@ impl EthereumCallCache for ChainStore {
30193019
block: BlockPtr,
30203020
) -> Result<Option<call::Response>, Error> {
30213021
let id = contract_call_id(req, &block);
3022-
let conn = &mut *self.get_conn().await?;
3022+
let conn = &mut self.get_conn().await?;
30233023
let return_value = conn
30243024
.transaction_async::<_, Error, _>(|conn| {
30253025
async {
@@ -3060,7 +3060,7 @@ impl EthereumCallCache for ChainStore {
30603060
.collect();
30613061
let id_refs: Vec<_> = ids.iter().map(|id| id.as_slice()).collect();
30623062

3063-
let conn = &mut *self.get_conn().await?;
3063+
let conn = &mut self.get_conn().await?;
30643064
let rows = conn
30653065
.transaction_async::<_, Error, _>(|conn| {
30663066
self.storage
@@ -3094,7 +3094,7 @@ impl EthereumCallCache for ChainStore {
30943094
}
30953095

30963096
async fn get_calls_in_block(&self, block: BlockPtr) -> Result<Vec<CachedEthereumCall>, Error> {
3097-
let conn = &mut *self.get_conn().await?;
3097+
let conn = &mut self.get_conn().await?;
30983098
conn.transaction_async::<_, Error, _>(|conn| {
30993099
self.storage.get_calls_in_block(conn, block).scope_boxed()
31003100
})

store/postgres/src/copy.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@ use std::{
2323
};
2424

2525
use diesel::{
26-
connection::SimpleConnection as _,
27-
dsl::sql,
28-
insert_into,
29-
r2d2::{ConnectionManager, PooledConnection},
30-
select, sql_query, update, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl,
31-
RunQueryDsl,
26+
connection::SimpleConnection as _, dsl::sql, insert_into, select, sql_query, update,
27+
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
3228
};
3329
use diesel_async::scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
3430
use graph::{
@@ -49,6 +45,7 @@ use itertools::Itertools;
4945
use crate::{
5046
advisory_lock, catalog, deployment,
5147
dynds::DataSourcesTable,
48+
pool::PgConnection,
5249
primary::{DeploymentId, Primary, Site},
5350
relational::{index::IndexList, Layout, Table},
5451
relational_queries as rq,
@@ -679,12 +676,12 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
679676
/// This struct helps us with that. It wraps a connection and tracks whether
680677
/// the connection was used to acquire the copy lock
681678
struct LockTrackingConnection {
682-
inner: PooledConnection<ConnectionManager<PgConnection>>,
679+
inner: PgConnection,
683680
has_lock: bool,
684681
}
685682

686683
impl LockTrackingConnection {
687-
fn new(inner: PooledConnection<ConnectionManager<PgConnection>>) -> Self {
684+
fn new(inner: PgConnection) -> Self {
688685
Self {
689686
inner,
690687
has_lock: false,

store/postgres/src/deployment.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
//! Utilities for dealing with deployment metadata. Any connection passed
22
//! into these methods must be for the shard that holds the actual
33
//! deployment data and metadata
4-
use crate::{advisory_lock, detail::GraphNodeVersion, primary::DeploymentId};
5-
use diesel::pg::PgConnection;
4+
use crate::{advisory_lock, detail::GraphNodeVersion, pool::PgConnection, primary::DeploymentId};
65
use diesel::{
76
connection::SimpleConnection,
87
dsl::{count, delete, insert_into, now, select, sql, update},

0 commit comments

Comments
 (0)