Skip to content

Commit aec2f55

Browse files
committed
node: Expose SubgraphStore.prune through graphman prune
Fixes #3665
1 parent 4a35be5 commit aec2f55

File tree

3 files changed

+181
-0
lines changed

3 files changed

+181
-0
lines changed

node/src/bin/manager.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,18 @@ pub enum Command {
209209

210210
/// Manage database indexes
211211
Index(IndexCommand),
212+
213+
/// Prune deployments
214+
Prune {
215+
/// The deployment to prune (see `help info`)
216+
deployment: DeploymentSearch,
217+
/// Prune tables with a ratio of entities to entity versions lower than this
218+
#[structopt(long, short, default_value = "0.20")]
219+
prune_ratio: f64,
220+
/// How much history to keep in blocks
221+
#[structopt(long, short, default_value = "10000")]
222+
history: usize,
223+
},
212224
}
213225

214226
impl Command {
@@ -1023,6 +1035,14 @@ async fn main() -> anyhow::Result<()> {
10231035
}
10241036
}
10251037
}
1038+
Prune {
1039+
deployment,
1040+
history,
1041+
prune_ratio,
1042+
} => {
1043+
let (store, primary_pool) = ctx.store_and_primary();
1044+
commands::prune::run(store, primary_pool, deployment, history, prune_ratio).await
1045+
}
10261046
}
10271047
}
10281048

node/src/manager/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod create;
77
pub mod index;
88
pub mod info;
99
pub mod listen;
10+
pub mod prune;
1011
pub mod query;
1112
pub mod remove;
1213
pub mod rewind;

node/src/manager/commands/prune.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use std::{io::Write, sync::Arc, time::Instant};
2+
3+
use graph::{
4+
components::store::{PruneReporter, StatusStore},
5+
data::subgraph::status,
6+
prelude::{anyhow, BlockNumber},
7+
};
8+
use graph_chain_ethereum::ENV_VARS as ETH_ENV;
9+
use graph_store_postgres::{connection_pool::ConnectionPool, Store};
10+
11+
use crate::manager::deployment::DeploymentSearch;
12+
13+
struct Progress {
14+
start: Instant,
15+
analyze_start: Instant,
16+
switch_start: Instant,
17+
final_start: Instant,
18+
final_table_start: Instant,
19+
nonfinal_start: Instant,
20+
}
21+
22+
impl Progress {
23+
fn new() -> Self {
24+
Self {
25+
start: Instant::now(),
26+
analyze_start: Instant::now(),
27+
switch_start: Instant::now(),
28+
final_start: Instant::now(),
29+
final_table_start: Instant::now(),
30+
nonfinal_start: Instant::now(),
31+
}
32+
}
33+
}
34+
35+
impl PruneReporter for Progress {
36+
fn start_analyze(&mut self, table: &str) {
37+
print!("analyze {table:48} ");
38+
std::io::stdout().flush().ok();
39+
self.analyze_start = Instant::now();
40+
}
41+
42+
fn finish_analyze(&mut self, table: &str) {
43+
println!(
44+
"\ranalyze {table:48} (done in {}s)",
45+
self.analyze_start.elapsed().as_secs()
46+
);
47+
std::io::stdout().flush().ok();
48+
}
49+
50+
fn copy_final_start(&mut self, earliest_block: BlockNumber, final_block: BlockNumber) {
51+
println!("copy final entities (versions live between {earliest_block} and {final_block})");
52+
self.final_start = Instant::now();
53+
self.final_table_start = self.final_start;
54+
}
55+
56+
fn copy_final_batch(&mut self, table: &str, _rows: usize, total_rows: usize, finished: bool) {
57+
if finished {
58+
println!(
59+
"\r copy final {table:43} ({total_rows} rows in {}s)",
60+
self.final_table_start.elapsed().as_secs()
61+
);
62+
self.final_table_start = Instant::now();
63+
} else {
64+
print!(
65+
"\r copy final {table:43} ({total_rows} rows in {}s)",
66+
self.final_table_start.elapsed().as_secs()
67+
);
68+
}
69+
std::io::stdout().flush().ok();
70+
}
71+
72+
fn copy_final_finish(&mut self) {
73+
println!(
74+
"finished copying final entity versions in {}s",
75+
self.final_start.elapsed().as_secs()
76+
);
77+
}
78+
79+
fn start_switch(&mut self) {
80+
println!("blocking writes and switching tables");
81+
self.switch_start = Instant::now();
82+
}
83+
84+
fn finish_switch(&mut self) {
85+
println!(
86+
"enabling writes. Switching took {}s",
87+
self.switch_start.elapsed().as_secs()
88+
);
89+
}
90+
91+
fn copy_nonfinal_start(&mut self, table: &str) {
92+
print!(" copy nonfinal {table:40}");
93+
std::io::stdout().flush().ok();
94+
self.nonfinal_start = Instant::now();
95+
}
96+
97+
fn copy_nonfinal_finish(&mut self, table: &str, rows: usize) {
98+
println!(
99+
"\r copy nonfinal {table:40} ({rows} rows in {}s)",
100+
self.nonfinal_start.elapsed().as_secs()
101+
);
102+
std::io::stdout().flush().ok();
103+
}
104+
105+
fn finish_prune(&mut self) {
106+
println!("finished pruning in {}s", self.start.elapsed().as_secs());
107+
}
108+
}
109+
110+
pub async fn run(
111+
store: Arc<Store>,
112+
primary_pool: ConnectionPool,
113+
search: DeploymentSearch,
114+
history: usize,
115+
prune_ratio: f64,
116+
) -> Result<(), anyhow::Error> {
117+
let history = history as BlockNumber;
118+
let deployment = search.locate_unique(&primary_pool)?;
119+
let mut info = store
120+
.status(status::Filter::DeploymentIds(vec![deployment.id]))?
121+
.pop()
122+
.ok_or_else(|| anyhow!("deployment {deployment} not found"))?;
123+
if info.chains.len() > 1 {
124+
return Err(anyhow!(
125+
"deployment {deployment} indexes {} chains, not sure how to deal with more than one chain",
126+
info.chains.len()
127+
));
128+
}
129+
let status = info
130+
.chains
131+
.pop()
132+
.ok_or_else(|| anyhow!("deployment {} does not index any chain", deployment))?;
133+
let latest = status.latest_block.map(|ptr| ptr.number()).unwrap_or(0);
134+
if latest <= history {
135+
return Err(anyhow!("deployment {deployment} has only indexed up to block {latest} and we can't preserve {history} blocks of history"));
136+
}
137+
138+
println!("prune {deployment}");
139+
println!(" latest: {latest}");
140+
println!(" final: {}", latest - ETH_ENV.reorg_threshold);
141+
println!(" earliest: {}", latest - history);
142+
143+
let reporter = Box::new(Progress::new());
144+
store
145+
.subgraph_store()
146+
.prune(
147+
reporter,
148+
&deployment,
149+
latest - history,
150+
// Using the setting for eth chains is a bit lazy; the value
151+
// should really depend on the chain, but we don't have a
152+
// convenient way to figure out how each chain deals with
153+
// finality
154+
ETH_ENV.reorg_threshold,
155+
prune_ratio,
156+
)
157+
.await?;
158+
159+
Ok(())
160+
}

0 commit comments

Comments
 (0)