Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl CopyState {
/// batch, but don't step up the size by more than 2x at once
#[derive(Debug, Queryable)]
pub(crate) struct AdaptiveBatchSize {
size: i64,
pub size: i64,
}

impl AdaptiveBatchSize {
Expand Down
155 changes: 91 additions & 64 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ impl TablePair {
}

/// Copy all entity versions visible between `earliest_block` and
/// `final_block` in batches, where each batch is a separate transaction
/// `final_block` in batches, where each batch is a separate
/// transaction. Write activity for nonfinal blocks can happen
/// concurrently to this copy
fn copy_final_entities(
&self,
conn: &PgConnection,
Expand All @@ -73,62 +75,74 @@ impl TablePair {
final_block: BlockNumber,
cancel: &CancelHandle,
) -> Result<usize, CancelableError<StoreError>> {
#[derive(QueryableByName)]
struct LastVid {
#[sql_type = "BigInt"]
rows: i64,
#[sql_type = "BigInt"]
last_vid: i64,
}

let column_list = self.column_list();

// Determine the last vid that we need to copy
let VidRange { min_vid, max_vid } = sql_query(&format!(
"select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]')",
src = self.src.qualified_name,
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.get_result::<VidRange>(conn)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy. When we start, we start with
// 0 so that we don't constrain the set of rows based on their vid
let mut next_vid = 0;
// The first vid we still need to copy
let mut next_vid = min_vid;
let mut total_rows: usize = 0;
loop {
while next_vid <= max_vid {
let start = Instant::now();
let LastVid { last_vid, rows } = conn.transaction(|| {
let rows = conn.transaction(|| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(&format!(
"with cp as (insert into {dst}({column_list}) \
select {column_list} from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]') \
and vid >= $3 \
order by vid \
limit $4 \
returning vid) \
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
"insert into {dst}({column_list}) \
select {column_list} from {src} \
where lower(block_range) <= $2 \
and coalesce(upper(block_range), 2147483647) > $1 \
and coalesce(upper(block_range), 2147483647) <= $2 \
and block_range && int4range($1, $2, '[]') \
and vid >= $3 and vid < $3 + $4 \
order by vid",
src = self.src.qualified_name,
dst = self.dst.qualified_name
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.get_result::<LastVid>(conn)
.execute(conn)
})?;
cancel.check_cancel()?;

total_rows += rows as usize;
let done = rows == 0;
reporter.copy_final_batch(self.src.name.as_str(), rows as usize, total_rows, done);

if done {
return Ok(total_rows);
}
total_rows += rows;
next_vid += batch_size.size;

batch_size.adapt(start.elapsed());
next_vid = last_vid + 1;

reporter.copy_final_batch(
self.src.name.as_str(),
rows as usize,
total_rows,
next_vid > max_vid,
);
}
Ok(total_rows)
}

/// Copy all entity versions visible after `final_block` in batches,
/// where each batch is a separate transaction
/// where each batch is a separate transaction. This assumes that all
/// other write activity to the source table is blocked while we copy
fn copy_nonfinal_entities(
&self,
conn: &PgConnection,
Expand All @@ -137,54 +151,59 @@ impl TablePair {
) -> Result<usize, StoreError> {
let column_list = self.column_list();

#[derive(QueryableByName)]
struct LastVid {
#[sql_type = "BigInt"]
rows: i64,
#[sql_type = "BigInt"]
last_vid: i64,
}
// Determine the last vid that we need to copy
let VidRange { min_vid, max_vid } = sql_query(&format!(
"select coalesce(min(vid), 0) as min_vid, \
coalesce(max(vid), -1) as max_vid from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null)",
src = self.src.qualified_name,
))
.bind::<Integer, _>(final_block)
.get_result::<VidRange>(conn)?;

let mut batch_size = AdaptiveBatchSize::new(&self.src);
// The first vid we still need to copy. When we start, we start with
// 0 so that we don't constrain the set of rows based on their vid
let mut next_vid = 0;
// The first vid we still need to copy
let mut next_vid = min_vid;
let mut total_rows = 0;
loop {
while next_vid <= max_vid {
let start = Instant::now();
let LastVid { rows, last_vid } = conn.transaction(|| {
let rows = conn.transaction(|| {
// Page through all the rows in `src` in batches of
// `batch_size` that are visible to queries at block heights
// starting right after `final_block`.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(&format!(
"with cp as (insert into {dst}({column_list}) \
select {column_list} from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null) \
and vid >= $2 \
order by vid \
limit $3
returning vid) \
select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
"insert into {dst}({column_list}) \
select {column_list} from {src} \
where coalesce(upper(block_range), 2147483647) > $1 \
and block_range && int4range($1, null) \
and vid >= $2 and vid < $2 + $3 \
order by vid",
dst = self.dst.qualified_name,
src = self.src.qualified_name,
))
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(next_vid)
.bind::<BigInt, _>(&batch_size)
.get_result::<LastVid>(conn)
.execute(conn)
.map_err(StoreError::from)
})?;
total_rows += rows as usize;

total_rows += rows;
next_vid += batch_size.size;

batch_size.adapt(start.elapsed());

reporter.copy_nonfinal_batch(
self.src.name.as_str(),
rows as usize,
total_rows,
rows == 0,
next_vid > max_vid,
);
if rows == 0 {
return Ok(total_rows);
}
batch_size.adapt(start.elapsed());
next_vid = last_vid + 1;
}
Ok(total_rows)
}

/// Replace the `src` table with the `dst` table
Expand Down Expand Up @@ -368,3 +387,11 @@ impl Layout {
Ok(())
}
}

#[derive(QueryableByName)]
struct VidRange {
#[sql_type = "BigInt"]
min_vid: i64,
#[sql_type = "BigInt"]
max_vid: i64,
}