Skip to content

Commit fa9a339

Browse files
committed
node, store: Properly propagate schema changes in a shard
1 parent d5fad2e commit fa9a339

File tree

3 files changed

+127
-47
lines changed

3 files changed

+127
-47
lines changed

node/src/bin/manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use graph_node::{
2121
store_builder::StoreBuilder,
2222
MetricsContext,
2323
};
24+
use graph_store_postgres::connection_pool::PoolCoordinator;
2425
use graph_store_postgres::ChainStore;
2526
use graph_store_postgres::{
2627
connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore,
@@ -587,13 +588,14 @@ impl Context {
587588

588589
fn primary_pool(self) -> ConnectionPool {
589590
let primary = self.config.primary_store();
591+
let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![])));
590592
let pool = StoreBuilder::main_pool(
591593
&self.logger,
592594
&self.node_id,
593595
PRIMARY_SHARD.as_str(),
594596
primary,
595597
self.metrics_registry(),
596-
Arc::new(vec![]),
598+
coord,
597599
);
598600
pool.skip_setup();
599601
pool

node/src/store_builder.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use graph::{
99
prelude::{info, CheapClone, Logger},
1010
util::security::SafeDisplay,
1111
};
12-
use graph_store_postgres::connection_pool::{ConnectionPool, ForeignServer, PoolName};
12+
use graph_store_postgres::connection_pool::{
13+
ConnectionPool, ForeignServer, PoolCoordinator, PoolName,
14+
};
1315
use graph_store_postgres::{
1416
BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener,
1517
NotificationSender, Shard as ShardName, Store as DieselStore, SubgraphStore,
@@ -102,6 +104,7 @@ impl StoreBuilder {
102104
.collect::<Result<Vec<_>, _>>()
103105
.expect("connection url's contain enough detail");
104106
let servers = Arc::new(servers);
107+
let coord = Arc::new(PoolCoordinator::new(servers));
105108

106109
let shards: Vec<_> = config
107110
.stores
@@ -114,7 +117,7 @@ impl StoreBuilder {
114117
name,
115118
shard,
116119
registry.cheap_clone(),
117-
servers.clone(),
120+
coord.clone(),
118121
);
119122

120123
let (read_only_conn_pools, weights) = Self::replica_pools(
@@ -123,7 +126,7 @@ impl StoreBuilder {
123126
name,
124127
shard,
125128
registry.cheap_clone(),
126-
servers.clone(),
129+
coord.clone(),
127130
);
128131

129132
let name =
@@ -191,7 +194,7 @@ impl StoreBuilder {
191194
name: &str,
192195
shard: &Shard,
193196
registry: Arc<dyn MetricsRegistry>,
194-
servers: Arc<Vec<ForeignServer>>,
197+
coord: Arc<PoolCoordinator>,
195198
) -> ConnectionPool {
196199
let logger = logger.new(o!("pool" => "main"));
197200
let pool_size = shard.pool_size.size_for(node, name).expect(&format!(
@@ -209,15 +212,14 @@ impl StoreBuilder {
209212
"conn_pool_size" => pool_size,
210213
"weight" => shard.weight
211214
);
212-
ConnectionPool::create(
215+
coord.create_pool(
216+
&logger,
213217
name,
214218
PoolName::Main,
215219
shard.connection.to_owned(),
216220
pool_size,
217221
Some(fdw_pool_size),
218-
&logger,
219222
registry.cheap_clone(),
220-
servers,
221223
)
222224
}
223225

@@ -228,7 +230,7 @@ impl StoreBuilder {
228230
name: &str,
229231
shard: &Shard,
230232
registry: Arc<dyn MetricsRegistry>,
231-
servers: Arc<Vec<ForeignServer>>,
233+
coord: Arc<PoolCoordinator>,
232234
) -> (Vec<ConnectionPool>, Vec<usize>) {
233235
let mut weights: Vec<_> = vec![shard.weight];
234236
(
@@ -250,15 +252,15 @@ impl StoreBuilder {
250252
"we can determine the pool size for replica {}",
251253
name
252254
));
253-
ConnectionPool::create(
255+
256+
coord.clone().create_pool(
257+
&logger,
254258
name,
255259
PoolName::Replica(pool),
256260
replica.connection.clone(),
257261
pool_size,
258262
None,
259-
&logger,
260263
registry.cheap_clone(),
261-
servers.clone(),
262264
)
263265
})
264266
.collect(),

store/postgres/src/connection_pool.rs

Lines changed: 111 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use graph::{
2525

2626
use std::fmt::{self, Write};
2727
use std::sync::atomic::{AtomicBool, Ordering};
28-
use std::sync::Arc;
28+
use std::sync::{Arc, Mutex};
2929
use std::time::Duration;
3030
use std::{collections::HashMap, sync::RwLock};
3131

@@ -158,9 +158,6 @@ impl ForeignServer {
158158

159159
/// Map key tables from the primary into our local schema. If we are the
160160
/// primary, set them up as views.
161-
///
162-
/// We recreate this mapping on every server start so that migrations that
163-
/// change one of the mapped tables actually show up in the imported tables
164161
fn map_primary(conn: &PgConnection, shard: &Shard) -> Result<(), StoreError> {
165162
catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?;
166163

@@ -226,7 +223,7 @@ const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
226223
enum PoolState {
227224
/// A connection pool, and all the servers for which we need to
228225
/// establish fdw mappings when we call `setup` on the pool
229-
Created(Arc<PoolInner>, Arc<Vec<ForeignServer>>),
226+
Created(Arc<PoolInner>, Arc<PoolCoordinator>),
230227
/// The pool has been successfully set up
231228
Ready(Arc<PoolInner>),
232229
/// The pool has been disabled by setting its size to 0
@@ -300,15 +297,15 @@ impl PoolStateTracker {
300297
}
301298

302299
impl ConnectionPool {
303-
pub fn create(
300+
fn create(
304301
shard_name: &str,
305302
pool_name: PoolName,
306303
postgres_url: String,
307304
pool_size: u32,
308305
fdw_pool_size: Option<u32>,
309306
logger: &Logger,
310307
registry: Arc<dyn MetricsRegistry>,
311-
servers: Arc<Vec<ForeignServer>>,
308+
coord: Arc<PoolCoordinator>,
312309
) -> ConnectionPool {
313310
let state_tracker = PoolStateTracker::new();
314311
let shard =
@@ -330,7 +327,7 @@ impl ConnectionPool {
330327
if pool_name.is_replica() {
331328
PoolState::Ready(Arc::new(pool))
332329
} else {
333-
PoolState::Created(Arc::new(pool), servers)
330+
PoolState::Created(Arc::new(pool), coord)
334331
}
335332
}
336333
};
@@ -968,7 +965,7 @@ impl PoolInner {
968965
/// # Panics
969966
///
970967
/// If any errors happen during the migration, the process panics
971-
pub fn setup(&self, servers: Arc<Vec<ForeignServer>>) -> Result<(), StoreError> {
968+
fn setup(&self, coord: Arc<PoolCoordinator>) -> Result<(), StoreError> {
972969
fn die(logger: &Logger, msg: &'static str, err: &dyn std::fmt::Display) -> ! {
973970
crit!(logger, "{}", msg; "error" => format!("{:#}", err));
974971
panic!("{}: {}", msg, err);
@@ -980,11 +977,29 @@ impl PoolInner {
980977
let start = Instant::now();
981978
advisory_lock::lock_migration(&conn)
982979
.unwrap_or_else(|err| die(&pool.logger, "failed to get migration lock", &err));
980+
// This code can cause a race in database setup: if pool A has had
981+
// schema changes and pool B then tries to map tables from pool A,
982+
// but does so before the concurrent thread running this code for
983+
// pool B has at least finished `configure_fdw`, mapping tables will
984+
// fail. In that case, the node must be restarted. The restart is
985+
// guaranteed because this failure will lead to a panic in the setup
986+
// for pool A
987+
//
988+
// This code can also leave the table mappings in a state where they
989+
// have not been updated if the process is killed after migrating
990+
// the schema but before finishing remapping in all shards.
991+
// Addressing that would require keeping track of the need to remap
992+
// in the database instead of just in memory
983993
let result = pool
984-
.configure_fdw(servers.as_ref())
994+
.configure_fdw(coord.servers.as_ref())
985995
.and_then(|()| migrate_schema(&pool.logger, &conn))
986-
.and_then(|had_migrations| pool.map_primary())
987-
.and_then(|()| pool.map_metadata(servers.as_ref()));
996+
.and_then(|had_migrations| {
997+
if had_migrations {
998+
coord.propagate_schema_change(&self.shard)
999+
} else {
1000+
Ok(())
1001+
}
1002+
});
9881003
debug!(&pool.logger, "Release migration lock");
9891004
advisory_lock::unlock_migration(&conn).unwrap_or_else(|err| {
9901005
die(&pool.logger, "failed to release migration lock", &err);
@@ -1021,17 +1036,6 @@ impl PoolInner {
10211036
})
10221037
}
10231038

1024-
/// Map key tables from the primary into our local schema. If we are the
1025-
/// primary, set them up as views.
1026-
///
1027-
/// We recreate this mapping on every server start so that migrations that
1028-
/// change one of the mapped tables actually show up in the imported tables
1029-
fn map_primary(&self) -> Result<(), StoreError> {
1030-
info!(&self.logger, "Mapping primary");
1031-
let conn = self.get()?;
1032-
conn.transaction(|| ForeignServer::map_primary(&conn, &self.shard))
1033-
}
1034-
10351039
/// Copy the data from key tables in the primary into our local schema
10361040
/// so it can be used as a fallback when the primary goes down
10371041
pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> {
@@ -1046,18 +1050,21 @@ impl PoolInner {
10461050
.await
10471051
}
10481052

1049-
// Map some tables from the `subgraphs` metadata schema from foreign
1050-
// servers to ourselves. The mapping is recreated on every server start
1051-
// so that we pick up possible schema changes in the mappings
1052-
fn map_metadata(&self, servers: &[ForeignServer]) -> Result<(), StoreError> {
1053-
info!(&self.logger, "Mapping metadata");
1054-
let conn = self.get()?;
1055-
conn.transaction(|| {
1056-
for server in servers.iter().filter(|server| server.shard != self.shard) {
1057-
server.map_metadata(&conn)?;
1058-
}
1059-
Ok(())
1060-
})
1053+
// The foreign server `server` had schema changes, and we therefore need
1054+
// to remap anything that we are importing via fdw to make sure we are
1055+
// using this updated schema
1056+
fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
1057+
if &server.shard == &*PRIMARY_SHARD {
1058+
info!(&self.logger, "Mapping primary");
1059+
let conn = self.get()?;
1060+
conn.transaction(|| ForeignServer::map_primary(&conn, &self.shard))?;
1061+
}
1062+
if &server.shard != &self.shard {
1063+
info!(&self.logger, "Mapping metadata");
1064+
let conn = self.get()?;
1065+
conn.transaction(|| server.map_metadata(&conn))?;
1066+
}
1067+
Ok(())
10611068
}
10621069
}
10631070

@@ -1101,3 +1108,72 @@ fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<bool, StoreErr
11011108

11021109
Ok(had_migrations)
11031110
}
1111+
1112+
/// Helper to coordinate propagating schema changes from the database that
1113+
/// changes schema to all other shards so they can update their fdw mappings
1114+
/// of tables imported from that shard
1115+
pub struct PoolCoordinator {
1116+
pools: Mutex<HashMap<Shard, Arc<PoolInner>>>,
1117+
servers: Arc<Vec<ForeignServer>>,
1118+
}
1119+
1120+
impl PoolCoordinator {
1121+
pub fn new(servers: Arc<Vec<ForeignServer>>) -> Self {
1122+
Self {
1123+
pools: Mutex::new(HashMap::new()),
1124+
servers,
1125+
}
1126+
}
1127+
1128+
pub fn create_pool(
1129+
self: Arc<Self>,
1130+
logger: &Logger,
1131+
name: &str,
1132+
pool_name: PoolName,
1133+
postgres_url: String,
1134+
pool_size: u32,
1135+
fdw_pool_size: Option<u32>,
1136+
registry: Arc<dyn MetricsRegistry>,
1137+
) -> ConnectionPool {
1138+
let pool = ConnectionPool::create(
1139+
name,
1140+
pool_name,
1141+
postgres_url,
1142+
pool_size,
1143+
fdw_pool_size,
1144+
logger,
1145+
registry,
1146+
self.cheap_clone(),
1147+
);
1148+
// It is safe to take this lock here since nobody has seen the pool
1149+
// yet. We remember the `PoolInner` so that later, when we have to
1150+
// call `remap()`, we do not have to take this lock as that will be
1151+
// already held in `get_ready()`
1152+
match &*pool.inner.lock(logger) {
1153+
PoolState::Created(inner, _) | PoolState::Ready(inner) => {
1154+
self.pools
1155+
.lock()
1156+
.unwrap()
1157+
.insert(pool.shard.clone(), inner.clone());
1158+
}
1159+
PoolState::Disabled => { /* nothing to do */ }
1160+
}
1161+
pool
1162+
}
1163+
1164+
/// Propagate changes to the schema in `shard` to all other pools. Those
1165+
/// other pools will then recreate any tables that they imported from
1166+
/// `shard`
1167+
fn propagate_schema_change(&self, shard: &Shard) -> Result<(), StoreError> {
1168+
let server = self
1169+
.servers
1170+
.iter()
1171+
.find(|server| &server.shard == shard)
1172+
.ok_or_else(|| constraint_violation!("unknown shard {shard}"))?;
1173+
1174+
for pool in self.pools.lock().unwrap().values() {
1175+
pool.remap(server)?;
1176+
}
1177+
Ok(())
1178+
}
1179+
}

0 commit comments

Comments
 (0)