@@ -64,7 +64,9 @@ impl TablePair {
6464 }
6565
6666 /// Copy all entity versions visible between `earliest_block` and
67- /// `final_block` in batches, where each batch is a separate transaction
67+ /// `final_block` in batches, where each batch is a separate
68+ /// transaction. Write activity for nonfinal blocks can happen
69+ /// concurrently to this copy
6870 fn copy_final_entities (
6971 & self ,
7072 conn : & PgConnection ,
@@ -73,62 +75,74 @@ impl TablePair {
7375 final_block : BlockNumber ,
7476 cancel : & CancelHandle ,
7577 ) -> Result < usize , CancelableError < StoreError > > {
76- #[ derive( QueryableByName ) ]
77- struct LastVid {
78- #[ sql_type = "BigInt" ]
79- rows : i64 ,
80- #[ sql_type = "BigInt" ]
81- last_vid : i64 ,
82- }
83-
8478 let column_list = self . column_list ( ) ;
8579
80+ // Determine the last vid that we need to copy
81+ let VidRange { min_vid, max_vid } = sql_query ( & format ! (
82+ "select coalesce(min(vid), 0) as min_vid, \
83+ coalesce(max(vid), -1) as max_vid from {src} \
84+ where lower(block_range) <= $2 \
85+ and coalesce(upper(block_range), 2147483647) > $1 \
86+ and coalesce(upper(block_range), 2147483647) <= $2 \
87+ and block_range && int4range($1, $2, '[]')",
88+ src = self . src. qualified_name,
89+ ) )
90+ . bind :: < Integer , _ > ( earliest_block)
91+ . bind :: < Integer , _ > ( final_block)
92+ . get_result :: < VidRange > ( conn) ?;
93+
8694 let mut batch_size = AdaptiveBatchSize :: new ( & self . src ) ;
87- // The first vid we still need to copy. When we start, we start with
88- // 0 so that we don't constrain the set of rows based on their vid
89- let mut next_vid = 0 ;
95+ // The first vid we still need to copy
96+ let mut next_vid = min_vid;
9097 let mut total_rows: usize = 0 ;
91- loop {
98+ while next_vid <= max_vid {
9299 let start = Instant :: now ( ) ;
93- let LastVid { last_vid, rows } = conn. transaction ( || {
100+ let rows = conn. transaction ( || {
101+ // Page through all rows in `src` in batches of `batch_size`
102+ // and copy the ones that are visible to queries at block
103+ // heights between `earliest_block` and `final_block`, but
104+ // whose block_range does not extend past `final_block`
105+ // since they could still be reverted while we copy.
106+ // The conditions on `block_range` are expressed redundantly
107+ // to make more indexes useable
94108 sql_query ( & format ! (
95- "with cp as (insert into {dst}({column_list}) \
96- select {column_list} from {src} \
97- where lower(block_range) <= $2 \
98- and coalesce(upper(block_range), 2147483647) > $1 \
99- and coalesce(upper(block_range), 2147483647) <= $2 \
100- and block_range && int4range($1, $2, '[]') \
101- and vid >= $3 \
102- order by vid \
103- limit $4 \
104- returning vid) \
105- select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
109+ "insert into {dst}({column_list}) \
110+ select {column_list} from {src} \
111+ where lower(block_range) <= $2 \
112+ and coalesce(upper(block_range), 2147483647) > $1 \
113+ and coalesce(upper(block_range), 2147483647) <= $2 \
114+ and block_range && int4range($1, $2, '[]') \
115+ and vid >= $3 and vid < $3 + $4 \
116+ order by vid",
106117 src = self . src. qualified_name,
107118 dst = self . dst. qualified_name
108119 ) )
109120 . bind :: < Integer , _ > ( earliest_block)
110121 . bind :: < Integer , _ > ( final_block)
111122 . bind :: < BigInt , _ > ( next_vid)
112123 . bind :: < BigInt , _ > ( & batch_size)
113- . get_result :: < LastVid > ( conn)
124+ . execute ( conn)
114125 } ) ?;
115126 cancel. check_cancel ( ) ?;
116127
117- total_rows += rows as usize ;
118- let done = rows == 0 ;
119- reporter. copy_final_batch ( self . src . name . as_str ( ) , rows as usize , total_rows, done) ;
120-
121- if done {
122- return Ok ( total_rows) ;
123- }
128+ total_rows += rows;
129+ next_vid += batch_size. size ;
124130
125131 batch_size. adapt ( start. elapsed ( ) ) ;
126- next_vid = last_vid + 1 ;
132+
133+ reporter. copy_final_batch (
134+ self . src . name . as_str ( ) ,
135+ rows as usize ,
136+ total_rows,
137+ next_vid > max_vid,
138+ ) ;
127139 }
140+ Ok ( total_rows)
128141 }
129142
130143 /// Copy all entity versions visible after `final_block` in batches,
131- /// where each batch is a separate transaction
144+ /// where each batch is a separate transaction. This assumes that all
145+ /// other write activity to the source table is blocked while we copy
132146 fn copy_nonfinal_entities (
133147 & self ,
134148 conn : & PgConnection ,
@@ -137,54 +151,59 @@ impl TablePair {
137151 ) -> Result < usize , StoreError > {
138152 let column_list = self . column_list ( ) ;
139153
140- #[ derive( QueryableByName ) ]
141- struct LastVid {
142- #[ sql_type = "BigInt" ]
143- rows : i64 ,
144- #[ sql_type = "BigInt" ]
145- last_vid : i64 ,
146- }
154+ // Determine the last vid that we need to copy
155+ let VidRange { min_vid, max_vid } = sql_query ( & format ! (
156+ "select coalesce(min(vid), 0) as min_vid, \
157+ coalesce(max(vid), -1) as max_vid from {src} \
158+ where coalesce(upper(block_range), 2147483647) > $1 \
159+ and block_range && int4range($1, null)",
160+ src = self . src. qualified_name,
161+ ) )
162+ . bind :: < Integer , _ > ( final_block)
163+ . get_result :: < VidRange > ( conn) ?;
147164
148165 let mut batch_size = AdaptiveBatchSize :: new ( & self . src ) ;
149- // The first vid we still need to copy. When we start, we start with
150- // 0 so that we don't constrain the set of rows based on their vid
151- let mut next_vid = 0 ;
166+ // The first vid we still need to copy
167+ let mut next_vid = min_vid;
152168 let mut total_rows = 0 ;
153- loop {
169+ while next_vid <= max_vid {
154170 let start = Instant :: now ( ) ;
155- let LastVid { rows, last_vid } = conn. transaction ( || {
171+ let rows = conn. transaction ( || {
172+ // Page through all the rows in `src` in batches of
173+ // `batch_size` that are visible to queries at block heights
174+ // starting right after `final_block`.
175+ // The conditions on `block_range` are expressed redundantly
176+ // to make more indexes useable
156177 sql_query ( & format ! (
157- "with cp as (insert into {dst}({column_list}) \
158- select {column_list} from {src} \
159- where coalesce(upper(block_range), 2147483647) > $1 \
160- and block_range && int4range($1, null) \
161- and vid >= $2 \
162- order by vid \
163- limit $3
164- returning vid) \
165- select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp",
178+ "insert into {dst}({column_list}) \
179+ select {column_list} from {src} \
180+ where coalesce(upper(block_range), 2147483647) > $1 \
181+ and block_range && int4range($1, null) \
182+ and vid >= $2 and vid < $2 + $3 \
183+ order by vid",
166184 dst = self . dst. qualified_name,
167185 src = self . src. qualified_name,
168186 ) )
169187 . bind :: < Integer , _ > ( final_block)
170188 . bind :: < BigInt , _ > ( next_vid)
171189 . bind :: < BigInt , _ > ( & batch_size)
172- . get_result :: < LastVid > ( conn)
190+ . execute ( conn)
173191 . map_err ( StoreError :: from)
174192 } ) ?;
175- total_rows += rows as usize ;
193+
194+ total_rows += rows;
195+ next_vid += batch_size. size ;
196+
197+ batch_size. adapt ( start. elapsed ( ) ) ;
198+
176199 reporter. copy_nonfinal_batch (
177200 self . src . name . as_str ( ) ,
178201 rows as usize ,
179202 total_rows,
180- rows == 0 ,
203+ next_vid > max_vid ,
181204 ) ;
182- if rows == 0 {
183- return Ok ( total_rows) ;
184- }
185- batch_size. adapt ( start. elapsed ( ) ) ;
186- next_vid = last_vid + 1 ;
187205 }
206+ Ok ( total_rows)
188207 }
189208
190209 /// Replace the `src` table with the `dst` table
@@ -368,3 +387,11 @@ impl Layout {
368387 Ok ( ( ) )
369388 }
370389}
390+
391+ #[ derive( QueryableByName ) ]
392+ struct VidRange {
393+ #[ sql_type = "BigInt" ]
394+ min_vid : i64 ,
395+ #[ sql_type = "BigInt" ]
396+ max_vid : i64 ,
397+ }
0 commit comments