diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 7fea7be9860..f56b5b7710a 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -288,7 +288,7 @@ impl CopyState { /// batch, but don't step up the size by more than 2x at once #[derive(Debug, Queryable)] pub(crate) struct AdaptiveBatchSize { - size: i64, + pub size: i64, } impl AdaptiveBatchSize { diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 27f0444d188..6a9ccdc8dd5 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -64,7 +64,9 @@ impl TablePair { } /// Copy all entity versions visible between `earliest_block` and - /// `final_block` in batches, where each batch is a separate transaction + /// `final_block` in batches, where each batch is a separate + /// transaction. Write activity for nonfinal blocks can happen + /// concurrently to this copy fn copy_final_entities( &self, conn: &PgConnection, @@ -73,36 +75,45 @@ impl TablePair { final_block: BlockNumber, cancel: &CancelHandle, ) -> Result> { - #[derive(QueryableByName)] - struct LastVid { - #[sql_type = "BigInt"] - rows: i64, - #[sql_type = "BigInt"] - last_vid: i64, - } - let column_list = self.column_list(); + // Determine the last vid that we need to copy + let VidRange { min_vid, max_vid } = sql_query(&format!( + "select coalesce(min(vid), 0) as min_vid, \ + coalesce(max(vid), -1) as max_vid from {src} \ + where lower(block_range) <= $2 \ + and coalesce(upper(block_range), 2147483647) > $1 \ + and coalesce(upper(block_range), 2147483647) <= $2 \ + and block_range && int4range($1, $2, '[]')", + src = self.src.qualified_name, + )) + .bind::(earliest_block) + .bind::(final_block) + .get_result::(conn)?; + let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy. When we start, we start with - // 0 so that we don't constrain the set of rows based on their vid - let mut next_vid = 0; + // The first vid we still need to copy + let mut next_vid = min_vid; let mut total_rows: usize = 0; - loop { + while next_vid <= max_vid { let start = Instant::now(); - let LastVid { last_vid, rows } = conn.transaction(|| { + let rows = conn.transaction(|| { + // Page through all rows in `src` in batches of `batch_size` + // and copy the ones that are visible to queries at block + // heights between `earliest_block` and `final_block`, but + // whose block_range does not extend past `final_block` + // since they could still be reverted while we copy. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable sql_query(&format!( - "with cp as (insert into {dst}({column_list}) \ - select {column_list} from {src} \ - where lower(block_range) <= $2 \ - and coalesce(upper(block_range), 2147483647) > $1 \ - and coalesce(upper(block_range), 2147483647) <= $2 \ - and block_range && int4range($1, $2, '[]') \ - and vid >= $3 \ - order by vid \ - limit $4 \ - returning vid) \ - select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp", + "insert into {dst}({column_list}) \ + select {column_list} from {src} \ + where lower(block_range) <= $2 \ + and coalesce(upper(block_range), 2147483647) > $1 \ + and coalesce(upper(block_range), 2147483647) <= $2 \ + and block_range && int4range($1, $2, '[]') \ + and vid >= $3 and vid < $3 + $4 \ + order by vid", src = self.src.qualified_name, dst = self.dst.qualified_name )) @@ -110,25 +121,28 @@ impl TablePair { .bind::(final_block) .bind::(next_vid) .bind::(&batch_size) - .get_result::(conn) + .execute(conn) })?; cancel.check_cancel()?; - total_rows += rows as usize; - let done = rows == 0; - reporter.copy_final_batch(self.src.name.as_str(), rows as usize, total_rows, done); - - if done { - return Ok(total_rows); - } + total_rows += rows; + next_vid += batch_size.size; batch_size.adapt(start.elapsed()); - next_vid = last_vid + 1; + + reporter.copy_final_batch( + self.src.name.as_str(), + rows as usize, + total_rows, + next_vid > max_vid, + ); } + Ok(total_rows) } /// Copy all entity versions visible after `final_block` in batches, - /// where each batch is a separate transaction + /// where each batch is a separate transaction. This assumes that all + /// other write activity to the source table is blocked while we copy fn copy_nonfinal_entities( &self, conn: &PgConnection, @@ -137,54 +151,59 @@ impl TablePair { ) -> Result { let column_list = self.column_list(); - #[derive(QueryableByName)] - struct LastVid { - #[sql_type = "BigInt"] - rows: i64, - #[sql_type = "BigInt"] - last_vid: i64, - } + // Determine the last vid that we need to copy + let VidRange { min_vid, max_vid } = sql_query(&format!( + "select coalesce(min(vid), 0) as min_vid, \ + coalesce(max(vid), -1) as max_vid from {src} \ + where coalesce(upper(block_range), 2147483647) > $1 \ + and block_range && int4range($1, null)", + src = self.src.qualified_name, + )) + .bind::(final_block) + .get_result::(conn)?; let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy. When we start, we start with - // 0 so that we don't constrain the set of rows based on their vid - let mut next_vid = 0; + // The first vid we still need to copy + let mut next_vid = min_vid; let mut total_rows = 0; - loop { + while next_vid <= max_vid { let start = Instant::now(); - let LastVid { rows, last_vid } = conn.transaction(|| { + let rows = conn.transaction(|| { + // Page through all the rows in `src` in batches of + // `batch_size` that are visible to queries at block heights + // starting right after `final_block`. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable sql_query(&format!( - "with cp as (insert into {dst}({column_list}) \ - select {column_list} from {src} \ - where coalesce(upper(block_range), 2147483647) > $1 \ - and block_range && int4range($1, null) \ - and vid >= $2 \ - order by vid \ - limit $3 - returning vid) \ - select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp", + "insert into {dst}({column_list}) \ + select {column_list} from {src} \ + where coalesce(upper(block_range), 2147483647) > $1 \ + and block_range && int4range($1, null) \ + and vid >= $2 and vid < $2 + $3 \ + order by vid", dst = self.dst.qualified_name, src = self.src.qualified_name, )) .bind::(final_block) .bind::(next_vid) .bind::(&batch_size) - .get_result::(conn) + .execute(conn) .map_err(StoreError::from) })?; - total_rows += rows as usize; + + total_rows += rows; + next_vid += batch_size.size; + + batch_size.adapt(start.elapsed()); + reporter.copy_nonfinal_batch( self.src.name.as_str(), rows as usize, total_rows, - rows == 0, + next_vid > max_vid, ); - if rows == 0 { - return Ok(total_rows); - } - batch_size.adapt(start.elapsed()); - next_vid = last_vid + 1; } + Ok(total_rows) } /// Replace the `src` table with the `dst` table @@ -368,3 +387,11 @@ impl Layout { Ok(()) } } + +#[derive(QueryableByName)] +struct VidRange { + #[sql_type = "BigInt"] + min_vid: i64, + #[sql_type = "BigInt"] + max_vid: i64, +}