Skip to content

Commit a7a9f3a

Browse files
committed
store: Move ErrorHandler functionality to ConnectionManager
1 parent d195ee3 commit a7a9f3a

File tree

2 files changed

+72
-85
lines changed

2 files changed

+72
-85
lines changed

store/postgres/src/pool/manager.rs

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! tracking availability of the underlying database
55
66
use deadpool::managed::{Hook, RecycleError, RecycleResult};
7-
use diesel::{r2d2, IntoSql};
7+
use diesel::IntoSql;
88

99
use diesel_async::pooled_connection::{PoolError as DieselPoolError, PoolableConnection};
1010
use diesel_async::{AsyncConnection, RunQueryDsl};
@@ -18,7 +18,6 @@ use graph::slog::info;
1818
use graph::slog::Logger;
1919

2020
use std::collections::HashMap;
21-
use std::fmt;
2221
use std::sync::atomic::AtomicBool;
2322
use std::sync::atomic::Ordering;
2423
use std::sync::Arc;
@@ -30,13 +29,63 @@ use crate::pool::AsyncPool;
3029
/// Our own connection manager. It is pretty much the same as
3130
/// `AsyncDieselConnectionManager` but makes it easier to instrument and
3231
/// track connection errors
32+
#[derive(Clone)]
3333
pub struct ConnectionManager {
34+
logger: Logger,
3435
connection_url: String,
36+
state_tracker: StateTracker,
37+
error_counter: Counter,
3538
}
3639

3740
impl ConnectionManager {
38-
pub(super) fn new(connection_url: String) -> Self {
39-
Self { connection_url }
41+
pub(super) fn new(
42+
logger: Logger,
43+
connection_url: String,
44+
state_tracker: StateTracker,
45+
registry: &MetricsRegistry,
46+
const_labels: HashMap<String, String>,
47+
) -> Self {
48+
let error_counter = registry
49+
.global_counter(
50+
"store_connection_error_count",
51+
"The number of Postgres connections errors",
52+
const_labels,
53+
)
54+
.expect("failed to create `store_connection_error_count` counter");
55+
56+
Self {
57+
logger,
58+
connection_url,
59+
state_tracker,
60+
error_counter,
61+
}
62+
}
63+
64+
fn handle_error(&self, error: &dyn std::error::Error) {
65+
let msg = brief_error_msg(&error);
66+
67+
// Don't count canceling statements for timeouts etc. as a
68+
// connection error. Unfortunately, we only have the textual error
69+
// and need to infer whether the error indicates that the database
70+
// is down or if something else happened. When querying a replica,
71+
// these messages indicate that a query was canceled because it
72+
// conflicted with replication, but does not indicate that there is
73+
// a problem with the database itself.
74+
//
75+
// This check will break if users run Postgres (or even graph-node)
76+
// in a locale other than English. In that case, their database will
77+
// be marked as unavailable even though it is perfectly fine.
78+
if msg.contains("canceling statement")
79+
|| msg.contains("terminating connection due to conflict with recovery")
80+
{
81+
return;
82+
}
83+
84+
self.error_counter.inc();
85+
if self.state_tracker.is_available() {
86+
error!(self.logger, "Connection checkout"; "error" => msg);
87+
}
88+
self.state_tracker.mark_unavailable(Duration::from_secs(0));
4089
}
4190
}
4291

@@ -46,9 +95,11 @@ impl deadpool::managed::Manager for ConnectionManager {
4695
type Error = DieselPoolError;
4796

4897
async fn create(&self) -> Result<Self::Type, Self::Error> {
49-
diesel_async::AsyncPgConnection::establish(&self.connection_url)
50-
.await
51-
.map_err(DieselPoolError::ConnectionError)
98+
let res = diesel_async::AsyncPgConnection::establish(&self.connection_url).await;
99+
if let Err(ref e) = res {
100+
self.handle_error(e);
101+
}
102+
res.map_err(DieselPoolError::ConnectionError)
52103
}
53104

54105
async fn recycle(
@@ -59,11 +110,14 @@ impl deadpool::managed::Manager for ConnectionManager {
59110
if std::thread::panicking() || obj.is_broken() {
60111
return Err(RecycleError::Message("Broken connection".into()));
61112
}
62-
diesel::select(67_i32.into_sql::<diesel::sql_types::Integer>())
113+
let res = diesel::select(67_i32.into_sql::<diesel::sql_types::Integer>())
63114
.execute(obj)
64115
.await
65-
.map(|_| ())
66-
.map_err(DieselPoolError::QueryError)?;
116+
.map(|_| ());
117+
if let Err(ref e) = res {
118+
self.handle_error(e);
119+
}
120+
res.map_err(DieselPoolError::QueryError)?;
67121
Ok(())
68122
}
69123
}
@@ -139,57 +193,6 @@ impl StateTracker {
139193
}
140194
}
141195

142-
#[derive(Clone)]
143-
pub(super) struct ErrorHandler {
144-
logger: Logger,
145-
counter: Counter,
146-
state_tracker: StateTracker,
147-
}
148-
149-
impl ErrorHandler {
150-
pub(super) fn new(logger: Logger, counter: Counter, state_tracker: StateTracker) -> Self {
151-
Self {
152-
logger,
153-
counter,
154-
state_tracker,
155-
}
156-
}
157-
}
158-
impl std::fmt::Debug for ErrorHandler {
159-
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
160-
fmt::Result::Ok(())
161-
}
162-
}
163-
164-
impl r2d2::HandleError<r2d2::Error> for ErrorHandler {
165-
fn handle_error(&self, error: r2d2::Error) {
166-
let msg = brief_error_msg(&error);
167-
168-
// Don't count canceling statements for timeouts etc. as a
169-
// connection error. Unfortunately, we only have the textual error
170-
// and need to infer whether the error indicates that the database
171-
// is down or if something else happened. When querying a replica,
172-
// these messages indicate that a query was canceled because it
173-
// conflicted with replication, but does not indicate that there is
174-
// a problem with the database itself.
175-
//
176-
// This check will break if users run Postgres (or even graph-node)
177-
// in a locale other than English. In that case, their database will
178-
// be marked as unavailable even though it is perfectly fine.
179-
if msg.contains("canceling statement")
180-
|| msg.contains("terminating connection due to conflict with recovery")
181-
{
182-
return;
183-
}
184-
185-
self.counter.inc();
186-
if self.state_tracker.is_available() {
187-
error!(self.logger, "Postgres connection error"; "error" => msg);
188-
}
189-
self.state_tracker.mark_unavailable(Duration::from_secs(0));
190-
}
191-
}
192-
193196
fn brief_error_msg(error: &dyn std::error::Error) -> String {
194197
// For 'Connection refused' errors, Postgres includes the IP and
195198
// port number in the error message. We want to suppress that and

store/postgres/src/pool/mod.rs

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub use diesel_async::scoped_futures::ScopedFutureExt;
3636

3737
pub use coordinator::PoolCoordinator;
3838
pub use foreign_server::ForeignServer;
39-
use manager::{ErrorHandler, StateTracker};
39+
use manager::StateTracker;
4040

4141
type AsyncPool = deadpool::managed::Pool<ConnectionManager>;
4242
/// A database connection for asynchronous diesel operations
@@ -429,29 +429,14 @@ impl PoolInner {
429429

430430
let state_tracker = StateTracker::new(logger_pool.cheap_clone());
431431

432-
// Note: deadpool provides built-in metrics via pool.status()
433-
// The r2d2-style ErrorHandler and EventHandler are not needed with deadpool.
434-
// Metrics can be obtained from pool.status() and custom hooks can be added
435-
// to the pool builder if needed.
436-
let error_counter = registry
437-
.global_counter(
438-
"store_connection_error_count",
439-
"The number of Postgres connections errors",
440-
const_labels.clone(),
441-
)
442-
.expect("failed to create `store_connection_error_count` counter");
443-
crit!(
444-
logger_pool,
445-
"Unfinished: not to replicate ErrorHandler with mobc"
446-
);
447-
let _error_handler = Box::new(ErrorHandler::new(
432+
// Connect to Postgres
433+
let conn_manager = ConnectionManager::new(
448434
logger_pool.clone(),
449-
error_counter,
435+
postgres_url.clone(),
450436
state_tracker.clone(),
451-
));
452-
453-
// Connect to Postgres
454-
let conn_manager = ConnectionManager::new(postgres_url.clone());
437+
&registry,
438+
const_labels.clone(),
439+
);
455440

456441
// Note: deadpool does not support min_idle configuration
457442
if let Some(min_idle) = ENV_VARS.store.connection_min_idle {
@@ -471,7 +456,7 @@ impl PoolInner {
471456
// The post_create and post_recycle hooks are only called when
472457
// create and recycle succeed; we can therefore mark the pool
473458
// available
474-
let pool = AsyncPool::builder(conn_manager)
459+
let pool = AsyncPool::builder(conn_manager.clone())
475460
.max_size(pool_size as usize)
476461
.timeouts(timeouts)
477462
.runtime(Runtime::Tokio1)
@@ -485,7 +470,6 @@ impl PoolInner {
485470
let wait_meter = WaitMeter::new(&registry, const_labels.clone());
486471

487472
let fdw_pool = fdw_pool_size.map(|pool_size| {
488-
let conn_manager = ConnectionManager::new(postgres_url.clone());
489473
let fdw_timeouts = Timeouts {
490474
wait: Some(ENV_VARS.store.connection_timeout),
491475
create: None,

0 commit comments

Comments
 (0)