Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,3 +1076,35 @@ impl ReadStore for EmptyStore {
self.schema.cheap_clone()
}
}

/// An estimate of the number of entities and the number of entity versions
/// in a database table
#[derive(Clone, Debug)]
pub struct VersionStats {
pub entities: i32,
pub versions: i32,
pub tablename: String,
/// The ratio `entities / versions`
pub ratio: f64,
}

/// Callbacks for `SubgraphStore.prune` so that callers can report progress
/// of the pruning procedure to users
#[allow(unused_variables)]
pub trait PruneReporter: Send + 'static {
fn start_analyze(&mut self) {}
fn start_analyze_table(&mut self, table: &str) {}
fn finish_analyze_table(&mut self, table: &str) {}
fn finish_analyze(&mut self, stats: &[VersionStats]) {}

fn copy_final_start(&mut self, earliest_block: BlockNumber, final_block: BlockNumber) {}
fn copy_final_batch(&mut self, table: &str, rows: usize, total_rows: usize, finished: bool) {}
fn copy_final_finish(&mut self) {}

fn start_switch(&mut self) {}
fn copy_nonfinal_start(&mut self, table: &str) {}
fn copy_nonfinal_finish(&mut self, table: &str, rows: usize) {}
fn finish_switch(&mut self) {}

fn finish_prune(&mut self) {}
}
35 changes: 24 additions & 11 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ pub enum Command {

/// Manage database indexes
Index(IndexCommand),

/// Prune deployments
Prune {
/// The deployment to prune (see `help info`)
deployment: DeploymentSearch,
/// Prune tables with a ratio of entities to entity versions lower than this
#[structopt(long, short, default_value = "0.20")]
prune_ratio: f64,
/// How much history to keep in blocks
#[structopt(long, short, default_value = "10000")]
history: usize,
},
}

impl Command {
Expand Down Expand Up @@ -399,16 +411,10 @@ pub enum StatsCommand {
///
/// Show how many distinct entities and how many versions the tables of
/// each subgraph have. The data is based on the statistics that
/// Postgres keeps, and only refreshed when a table is analyzed. If a
/// table name is passed, perform a full count of entities and versions
/// in that table, which can be very slow, but is needed since the
/// statistics based data can be off by an order of magnitude.
/// Postgres keeps, and only refreshed when a table is analyzed.
Show {
/// The deployment (see `help info`).
deployment: DeploymentSearch,
/// The name of a table to fully count which can be very slow
#[structopt(long, short)]
table: Option<String>,
},
/// Perform a SQL ANALYZE in a Entity table
Analyze {
Expand Down Expand Up @@ -989,18 +995,17 @@ async fn main() -> anyhow::Result<()> {
table,
} => {
let (store, primary_pool) = ctx.store_and_primary();
let subgraph_store = store.subgraph_store();
commands::stats::account_like(
store.subgraph_store(),
subgraph_store,
primary_pool,
clear,
&deployment,
table,
)
.await
}
Show { deployment, table } => {
commands::stats::show(ctx.pools(), &deployment, table)
}
Show { deployment } => commands::stats::show(ctx.pools(), &deployment),
Analyze { deployment, entity } => {
let (store, primary_pool) = ctx.store_and_primary();
let subgraph_store = store.subgraph_store();
Expand Down Expand Up @@ -1041,6 +1046,14 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Prune {
deployment,
history,
prune_ratio,
} => {
let (store, primary_pool) = ctx.store_and_primary();
commands::prune::run(store, primary_pool, deployment, history, prune_ratio).await
}
}
}

Expand Down
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod create;
pub mod index;
pub mod info;
pub mod listen;
pub mod prune;
pub mod query;
pub mod remove;
pub mod rewind;
Expand Down
183 changes: 183 additions & 0 deletions node/src/manager/commands/prune.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
collections::HashSet,
io::Write,
sync::Arc,
time::{Duration, Instant},
};

use graph::{
components::store::{PruneReporter, StatusStore},
data::subgraph::status,
prelude::{anyhow, BlockNumber},
};
use graph_chain_ethereum::ENV_VARS as ETH_ENV;
use graph_store_postgres::{connection_pool::ConnectionPool, Store};

use crate::manager::{
commands::stats::{abbreviate_table_name, show_stats},
deployment::DeploymentSearch,
};

struct Progress {
start: Instant,
analyze_start: Instant,
switch_start: Instant,
final_start: Instant,
final_table_start: Instant,
nonfinal_start: Instant,
}

impl Progress {
fn new() -> Self {
Self {
start: Instant::now(),
analyze_start: Instant::now(),
switch_start: Instant::now(),
final_start: Instant::now(),
final_table_start: Instant::now(),
nonfinal_start: Instant::now(),
}
}
}

fn print_copy_header() {
println!("{:^30} | {:^10} | {:^11}", "table", "versions", "time");
println!("{:-^30}-+-{:-^10}-+-{:-^11}", "", "", "");
std::io::stdout().flush().ok();
}

fn print_copy_row(table: &str, total_rows: usize, elapsed: Duration) {
print!(
"\r{:<30} | {:>10} | {:>9}s",
abbreviate_table_name(table, 30),
total_rows,
elapsed.as_secs()
);
std::io::stdout().flush().ok();
}

impl PruneReporter for Progress {
fn start_analyze(&mut self) {
print!("Analyze tables");
self.analyze_start = Instant::now();
}

fn start_analyze_table(&mut self, table: &str) {
print!("\rAnalyze {table:48} ");
std::io::stdout().flush().ok();
}

fn finish_analyze(&mut self, stats: &[graph::components::store::VersionStats]) {
println!(
"\rAnalyzed {} tables in {}s",
stats.len(),
self.analyze_start.elapsed().as_secs()
);
show_stats(stats, HashSet::new()).ok();
println!("");
}

fn copy_final_start(&mut self, earliest_block: BlockNumber, final_block: BlockNumber) {
println!("Copy final entities (versions live between {earliest_block} and {final_block})");
print_copy_header();

self.final_start = Instant::now();
self.final_table_start = self.final_start;
}

fn copy_final_batch(&mut self, table: &str, _rows: usize, total_rows: usize, finished: bool) {
print_copy_row(table, total_rows, self.final_table_start.elapsed());
if finished {
println!("");
self.final_table_start = Instant::now();
}
std::io::stdout().flush().ok();
}

fn copy_final_finish(&mut self) {
println!(
"Finished copying final entity versions in {}s\n",
self.final_start.elapsed().as_secs()
);
}

fn start_switch(&mut self) {
println!("Blocking writes and switching tables");
print_copy_header();
self.switch_start = Instant::now();
}

fn finish_switch(&mut self) {
println!(
"Enabling writes. Switching took {}s\n",
self.switch_start.elapsed().as_secs()
);
}

fn copy_nonfinal_start(&mut self, table: &str) {
print_copy_row(table, 0, Duration::from_secs(0));
self.nonfinal_start = Instant::now();
}

fn copy_nonfinal_finish(&mut self, table: &str, rows: usize) {
print_copy_row(table, rows, self.nonfinal_start.elapsed());
println!("");
std::io::stdout().flush().ok();
}

fn finish_prune(&mut self) {
println!("Finished pruning in {}s", self.start.elapsed().as_secs());
}
}

pub async fn run(
store: Arc<Store>,
primary_pool: ConnectionPool,
search: DeploymentSearch,
history: usize,
prune_ratio: f64,
) -> Result<(), anyhow::Error> {
let history = history as BlockNumber;
let deployment = search.locate_unique(&primary_pool)?;
let mut info = store
.status(status::Filter::DeploymentIds(vec![deployment.id]))?
.pop()
.ok_or_else(|| anyhow!("deployment {deployment} not found"))?;
if info.chains.len() > 1 {
return Err(anyhow!(
"deployment {deployment} indexes {} chains, not sure how to deal with more than one chain",
info.chains.len()
));
}
let status = info
.chains
.pop()
.ok_or_else(|| anyhow!("deployment {} does not index any chain", deployment))?;
let latest = status.latest_block.map(|ptr| ptr.number()).unwrap_or(0);
if latest <= history {
return Err(anyhow!("deployment {deployment} has only indexed up to block {latest} and we can't preserve {history} blocks of history"));
}

println!("prune {deployment}");
println!(" latest: {latest}");
println!(" final: {}", latest - ETH_ENV.reorg_threshold);
println!(" earliest: {}\n", latest - history);

let reporter = Box::new(Progress::new());
store
.subgraph_store()
.prune(
reporter,
&deployment,
latest - history,
// Using the setting for eth chains is a bit lazy; the value
// should really depend on the chain, but we don't have a
// convenient way to figure out how each chain deals with
// finality
ETH_ENV.reorg_threshold,
prune_ratio,
)
.await?;

Ok(())
}
Loading