Skip to content

Commit 75dad84

Browse files
committed
node, store: Expose migrating and remapping schemas in graphman
1 parent fa9a339 commit 75dad84

File tree

5 files changed

+78
-6
lines changed

5 files changed

+78
-6
lines changed

node/src/bin/manager.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,10 @@ pub enum Command {
233233
#[clap(long, short, default_value = "10000")]
234234
history: usize,
235235
},
236+
237+
/// General database management
238+
#[clap(subcommand)]
239+
Database(DatabaseCommand),
236240
}
237241

238242
impl Command {
@@ -495,6 +499,18 @@ pub enum IndexCommand {
495499
},
496500
}
497501

502+
#[derive(Clone, Debug, Subcommand)]
503+
pub enum DatabaseCommand {
504+
/// Apply any pending migrations to the database schema in all shards
505+
Migrate,
506+
/// Refresh the mapping of tables into different shards
507+
///
508+
/// This command rebuilds the mappings of tables from one shard into all
509+
/// other shards. It makes it possible to fix these mappings when a
510+
/// database migration was interrupted before it could rebuild the
511+
/// mappings
512+
Remap,
513+
}
498514
#[derive(Clone, Debug, Subcommand)]
499515
pub enum CheckBlockMethod {
500516
/// The number of the target block
@@ -644,7 +660,7 @@ impl Context {
644660
}
645661

646662
fn store_and_pools(self) -> (Arc<Store>, HashMap<Shard, ConnectionPool>) {
647-
let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools(
663+
let (subgraph_store, pools, _) = StoreBuilder::make_subgraph_store_and_pools(
648664
&self.logger,
649665
&self.node_id,
650666
&self.config,
@@ -1059,6 +1075,20 @@ async fn main() -> anyhow::Result<()> {
10591075
}
10601076
}
10611077
}
1078+
Database(cmd) => {
1079+
match cmd {
1080+
DatabaseCommand::Migrate => {
1081+
/* creating the store builder runs migrations */
1082+
let _store_builder = ctx.store_builder().await;
1083+
println!("All database migrations have been applied");
1084+
Ok(())
1085+
}
1086+
DatabaseCommand::Remap => {
1087+
let store_builder = ctx.store_builder().await;
1088+
commands::database::remap(&store_builder.coord).await
1089+
}
1090+
}
1091+
}
10621092
Prune {
10631093
deployment,
10641094
history,
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::time::Instant;
2+
3+
use graph::prelude::anyhow;
4+
use graph_store_postgres::connection_pool::PoolCoordinator;
5+
6+
pub async fn remap(coord: &PoolCoordinator) -> Result<(), anyhow::Error> {
7+
let pools = coord.pools();
8+
let servers = coord.servers();
9+
10+
for server in servers.iter() {
11+
for pool in pools.iter() {
12+
let start = Instant::now();
13+
print!(
14+
"Remapping imports from {} in shard {}",
15+
server.shard, pool.shard
16+
);
17+
pool.remap(server)?;
18+
println!(" (done in {}s)", start.elapsed().as_secs());
19+
}
20+
}
21+
Ok(())
22+
}

node/src/manager/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod check_blocks;
44
pub mod config;
55
pub mod copy;
66
pub mod create;
7+
pub mod database;
78
pub mod index;
89
pub mod info;
910
pub mod listen;

node/src/store_builder.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub struct StoreBuilder {
2828
chain_head_update_listener: Arc<PostgresChainHeadUpdateListener>,
2929
/// Map network names to the shards where they are/should be stored
3030
chains: HashMap<String, ShardName>,
31+
pub coord: Arc<PoolCoordinator>,
3132
}
3233

3334
impl StoreBuilder {
@@ -49,7 +50,7 @@ impl StoreBuilder {
4950
registry.clone(),
5051
));
5152

52-
let (store, pools) = Self::make_subgraph_store_and_pools(
53+
let (store, pools, coord) = Self::make_subgraph_store_and_pools(
5354
logger,
5455
node,
5556
config,
@@ -82,6 +83,7 @@ impl StoreBuilder {
8283
subscription_manager,
8384
chain_head_update_listener,
8485
chains,
86+
coord,
8587
}
8688
}
8789

@@ -94,7 +96,11 @@ impl StoreBuilder {
9496
config: &Config,
9597
fork_base: Option<Url>,
9698
registry: Arc<dyn MetricsRegistry>,
97-
) -> (Arc<SubgraphStore>, HashMap<ShardName, ConnectionPool>) {
99+
) -> (
100+
Arc<SubgraphStore>,
101+
HashMap<ShardName, ConnectionPool>,
102+
Arc<PoolCoordinator>,
103+
) {
98104
let notification_sender = Arc::new(NotificationSender::new(registry.cheap_clone()));
99105

100106
let servers = config
@@ -150,7 +156,7 @@ impl StoreBuilder {
150156
registry,
151157
));
152158

153-
(store, pools)
159+
(store, pools, coord)
154160
}
155161

156162
pub fn make_store(

store/postgres/src/connection_pool.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ impl HandleEvent for EventHandler {
673673
#[derive(Clone)]
674674
pub struct PoolInner {
675675
logger: Logger,
676-
shard: Shard,
676+
pub shard: Shard,
677677
pool: Pool<ConnectionManager<PgConnection>>,
678678
// A separate pool for connections that will use foreign data wrappers.
679679
// Once such a connection accesses a foreign table, Postgres keeps a
@@ -1053,7 +1053,7 @@ impl PoolInner {
10531053
// The foreign server `server` had schema changes, and we therefore need
10541054
// to remap anything that we are importing via fdw to make sure we are
10551055
// using this updated schema
1056-
fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
1056+
pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
10571057
if &server.shard == &*PRIMARY_SHARD {
10581058
info!(&self.logger, "Mapping primary");
10591059
let conn = self.get()?;
@@ -1176,4 +1176,17 @@ impl PoolCoordinator {
11761176
}
11771177
Ok(())
11781178
}
1179+
1180+
pub fn pools(&self) -> Vec<Arc<PoolInner>> {
1181+
self.pools
1182+
.lock()
1183+
.unwrap()
1184+
.values()
1185+
.map(|pool| pool.clone())
1186+
.collect()
1187+
}
1188+
1189+
pub fn servers(&self) -> Arc<Vec<ForeignServer>> {
1190+
self.servers.clone()
1191+
}
11791192
}

0 commit comments

Comments
 (0)