Skip to content

Commit 6e768dc

Browse files
committed
store: Reap idle connections according to configuration
1 parent a7a9f3a commit 6e768dc

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

store/postgres/src/pool/manager.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use diesel::IntoSql;
88

99
use diesel_async::pooled_connection::{PoolError as DieselPoolError, PoolableConnection};
1010
use diesel_async::{AsyncConnection, RunQueryDsl};
11+
use graph::env::ENV_VARS;
1112
use graph::prelude::error;
1213
use graph::prelude::Counter;
1314
use graph::prelude::Gauge;
@@ -237,6 +238,37 @@ pub(crate) fn spawn_size_stat_collector(
237238
});
238239
}
239240

241+
/// Reap connections that are too old (older than 30 minutes) or if there
242+
/// are more than `connection_min_idle` connections in the pool that have
243+
/// been idle for longer than `idle_timeout`
244+
pub(crate) fn spawn_connection_reaper(pool: AsyncPool, idle_timeout: Duration) {
245+
const MAX_LIFETIME: Duration = Duration::from_secs(30 * 60);
246+
let Some(min_idle) = ENV_VARS.store.connection_min_idle else {
247+
// If this is None, we will never reap anything
248+
return;
249+
};
250+
// What happens here isn't exactly what we would like to have: we would
251+
// like to have at any point `min_idle` unused connections in the pool,
252+
// but there is no way to achieve that with deadpool. Instead, we try to
253+
// keep `min_idle` connections around if they exist
254+
tokio::task::spawn(async move {
255+
loop {
256+
let mut idle_count = 0;
257+
pool.retain(|_, metrics| {
258+
if metrics.age() > MAX_LIFETIME {
259+
return false;
260+
}
261+
if metrics.last_used() > idle_timeout {
262+
idle_count += 1;
263+
return idle_count <= min_idle;
264+
}
265+
true
266+
});
267+
tokio::time::sleep(Duration::from_secs(30)).await;
268+
}
269+
});
270+
}
271+
240272
pub(crate) struct WaitMeter {
241273
wait_gauge: Gauge,
242274
pub(crate) wait_stats: PoolWaitStats,

store/postgres/src/pool/mod.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,6 @@ impl PoolInner {
438438
const_labels.clone(),
439439
);
440440

441-
// Note: deadpool does not support min_idle configuration
442-
if let Some(min_idle) = ENV_VARS.store.connection_min_idle {
443-
warn!(
444-
logger_pool,
445-
"min_idle configuration ({}) is not supported by deadpool and will be ignored",
446-
min_idle
447-
);
448-
}
449-
450441
let timeouts = Timeouts {
451442
wait: Some(ENV_VARS.store.connection_timeout),
452443
create: Some(ENV_VARS.store.connection_timeout),
@@ -467,6 +458,8 @@ impl PoolInner {
467458

468459
manager::spawn_size_stat_collector(pool.clone(), &registry, const_labels.clone());
469460

461+
manager::spawn_connection_reaper(pool.clone(), ENV_VARS.store.connection_idle_timeout);
462+
470463
let wait_meter = WaitMeter::new(&registry, const_labels.clone());
471464

472465
let fdw_pool = fdw_pool_size.map(|pool_size| {
@@ -476,14 +469,17 @@ impl PoolInner {
476469
recycle: Some(FDW_IDLE_TIMEOUT),
477470
};
478471

479-
AsyncPool::builder(conn_manager)
472+
let fdw_pool = AsyncPool::builder(conn_manager)
480473
.max_size(pool_size as usize)
481474
.timeouts(fdw_timeouts)
482475
.runtime(Runtime::Tokio1)
483476
.post_create(state_tracker.mark_available_hook())
484477
.post_recycle(state_tracker.mark_available_hook())
485478
.build()
486-
.expect("failed to create fdw connection pool")
479+
.expect("failed to create fdw connection pool");
480+
481+
manager::spawn_connection_reaper(fdw_pool.clone(), FDW_IDLE_TIMEOUT);
482+
fdw_pool
487483
});
488484

489485
info!(logger_store, "Pool successfully connected to Postgres");

0 commit comments

Comments
 (0)