@@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
4444 ) -> impl Future < Output = Result < Vec < ManifestEntry > > > + Send ;
4545 fn existing_manifest (
4646 & self ,
47- table : & Table ,
47+ snapshot_produce : & SnapshotProducer < ' _ > ,
4848 ) -> impl Future < Output = Result < Vec < ManifestFile > > > + Send ;
4949}
5050
5151pub ( crate ) struct DefaultManifestProcess ;
5252
5353impl ManifestProcess for DefaultManifestProcess {
54- fn process_manifests ( & self , manifests : Vec < ManifestFile > ) -> Vec < ManifestFile > {
54+ fn process_manifests (
55+ & self ,
56+ _snapshot_produce : & SnapshotProducer < ' _ > ,
57+ manifests : Vec < ManifestFile > ,
58+ ) -> Vec < ManifestFile > {
5559 manifests
5660 }
5761}
5862
5963pub ( crate ) trait ManifestProcess : Send + Sync {
60- fn process_manifests ( & self , manifests : Vec < ManifestFile > ) -> Vec < ManifestFile > ;
64+ fn process_manifests (
65+ & self ,
66+ snapshot_produce : & SnapshotProducer < ' _ > ,
67+ manifests : Vec < ManifestFile > ,
68+ ) -> Vec < ManifestFile > ;
6169}
6270
63- pub ( crate ) struct SnapshotProducer {
71+ pub ( crate ) struct SnapshotProducer < ' a > {
72+ pub ( crate ) table : & ' a Table ,
6473 snapshot_id : i64 ,
6574 commit_uuid : Uuid ,
6675 key_metadata : Option < Vec < u8 > > ,
@@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer {
7281 manifest_counter : RangeFrom < u64 > ,
7382}
7483
75- impl SnapshotProducer {
84+ impl < ' a > SnapshotProducer < ' a > {
7685 pub ( crate ) fn new (
77- table : & Table ,
86+ table : & ' a Table ,
7887 commit_uuid : Uuid ,
7988 key_metadata : Option < Vec < u8 > > ,
8089 snapshot_properties : HashMap < String , String > ,
8190 added_data_files : Vec < DataFile > ,
8291 ) -> Self {
8392 Self {
93+ table,
8494 snapshot_id : Self :: generate_unique_snapshot_id ( table) ,
8595 commit_uuid,
8696 key_metadata,
@@ -90,10 +100,7 @@ impl SnapshotProducer {
90100 }
91101 }
92102
93- pub ( crate ) fn validate_added_data_files (
94- table : & Table ,
95- added_data_files : & [ DataFile ] ,
96- ) -> Result < ( ) > {
103+ pub ( crate ) fn validate_added_data_files ( & self , added_data_files : & [ DataFile ] ) -> Result < ( ) > {
97104 for data_file in added_data_files {
98105 if data_file. content_type ( ) != crate :: spec:: DataContentType :: Data {
99106 return Err ( Error :: new (
@@ -102,23 +109,23 @@ impl SnapshotProducer {
102109 ) ) ;
103110 }
104111 // Check if the data file partition spec id matches the table default partition spec id.
105- if table. metadata ( ) . default_partition_spec_id ( ) != data_file. partition_spec_id {
112+ if self . table . metadata ( ) . default_partition_spec_id ( ) != data_file. partition_spec_id {
106113 return Err ( Error :: new (
107114 ErrorKind :: DataInvalid ,
108115 "Data file partition spec id does not match table default partition spec id" ,
109116 ) ) ;
110117 }
111118 Self :: validate_partition_value (
112119 data_file. partition ( ) ,
113- table. metadata ( ) . default_partition_type ( ) ,
120+ self . table . metadata ( ) . default_partition_type ( ) ,
114121 ) ?;
115122 }
116123
117124 Ok ( ( ) )
118125 }
119126
120127 pub ( crate ) async fn validate_duplicate_files (
121- table : & Table ,
128+ & self ,
122129 added_data_files : & [ DataFile ] ,
123130 ) -> Result < ( ) > {
124131 let new_files: HashSet < & str > = added_data_files
@@ -127,12 +134,14 @@ impl SnapshotProducer {
127134 . collect ( ) ;
128135
129136 let mut referenced_files = Vec :: new ( ) ;
130- if let Some ( current_snapshot) = table. metadata ( ) . current_snapshot ( ) {
137+ if let Some ( current_snapshot) = self . table . metadata ( ) . current_snapshot ( ) {
131138 let manifest_list = current_snapshot
132- . load_manifest_list ( table. file_io ( ) , & table. metadata_ref ( ) )
139+ . load_manifest_list ( self . table . file_io ( ) , & self . table . metadata_ref ( ) )
133140 . await ?;
134141 for manifest_list_entry in manifest_list. entries ( ) {
135- let manifest = manifest_list_entry. load_manifest ( table. file_io ( ) ) . await ?;
142+ let manifest = manifest_list_entry
143+ . load_manifest ( self . table . file_io ( ) )
144+ . await ?;
136145 for entry in manifest. entries ( ) {
137146 let file_path = entry. file_path ( ) ;
138147 if new_files. contains ( file_path) && entry. is_alive ( ) {
@@ -177,28 +186,28 @@ impl SnapshotProducer {
177186 snapshot_id
178187 }
179188
180- fn new_manifest_writer (
181- & mut self ,
182- content : ManifestContentType ,
183- table : & Table ,
184- ) -> Result < ManifestWriter > {
189+ fn new_manifest_writer ( & mut self , content : ManifestContentType ) -> Result < ManifestWriter > {
185190 let new_manifest_path = format ! (
186191 "{}/{}/{}-m{}.{}" ,
187- table. metadata( ) . location( ) ,
192+ self . table. metadata( ) . location( ) ,
188193 META_ROOT_PATH ,
189194 self . commit_uuid,
190195 self . manifest_counter. next( ) . unwrap( ) ,
191196 DataFileFormat :: Avro
192197 ) ;
193- let output_file = table. file_io ( ) . new_output ( new_manifest_path) ?;
198+ let output_file = self . table . file_io ( ) . new_output ( new_manifest_path) ?;
194199 let builder = ManifestWriterBuilder :: new (
195200 output_file,
196201 Some ( self . snapshot_id ) ,
197202 self . key_metadata . clone ( ) ,
198- table. metadata ( ) . current_schema ( ) . clone ( ) ,
199- table. metadata ( ) . default_partition_spec ( ) . as_ref ( ) . clone ( ) ,
203+ self . table . metadata ( ) . current_schema ( ) . clone ( ) ,
204+ self . table
205+ . metadata ( )
206+ . default_partition_spec ( )
207+ . as_ref ( )
208+ . clone ( ) ,
200209 ) ;
201- if table. metadata ( ) . format_version ( ) == FormatVersion :: V1 {
210+ if self . table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
202211 Ok ( builder. build_v1 ( ) )
203212 } else {
204213 match content {
@@ -240,7 +249,7 @@ impl SnapshotProducer {
240249 }
241250
242251 // Write manifest file for added data files and return the ManifestFile for ManifestList.
243- async fn write_added_manifest ( & mut self , table : & Table ) -> Result < ManifestFile > {
252+ async fn write_added_manifest ( & mut self ) -> Result < ManifestFile > {
244253 let added_data_files = std:: mem:: take ( & mut self . added_data_files ) ;
245254 if added_data_files. is_empty ( ) {
246255 return Err ( Error :: new (
@@ -250,7 +259,7 @@ impl SnapshotProducer {
250259 }
251260
252261 let snapshot_id = self . snapshot_id ;
253- let format_version = table. metadata ( ) . format_version ( ) ;
262+ let format_version = self . table . metadata ( ) . format_version ( ) ;
254263 let manifest_entries = added_data_files. into_iter ( ) . map ( |data_file| {
255264 let builder = ManifestEntry :: builder ( )
256265 . status ( crate :: spec:: ManifestStatus :: Added )
@@ -263,7 +272,7 @@ impl SnapshotProducer {
263272 builder. build ( )
264273 }
265274 } ) ;
266- let mut writer = self . new_manifest_writer ( ManifestContentType :: Data , table ) ?;
275+ let mut writer = self . new_manifest_writer ( ManifestContentType :: Data ) ?;
267276 for entry in manifest_entries {
268277 writer. add_entry ( entry) ?;
269278 }
@@ -272,29 +281,27 @@ impl SnapshotProducer {
272281
273282 async fn manifest_file < OP : SnapshotProduceOperation , MP : ManifestProcess > (
274283 & mut self ,
275- table : & Table ,
276284 snapshot_produce_operation : & OP ,
277285 manifest_process : & MP ,
278286 ) -> Result < Vec < ManifestFile > > {
279- let added_manifest = self . write_added_manifest ( table ) . await ?;
280- let existing_manifests = snapshot_produce_operation. existing_manifest ( table ) . await ?;
287+ let added_manifest = self . write_added_manifest ( ) . await ?;
288+ let existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
281289 // # TODO
282290 // Support process delete entries.
283291
284292 let mut manifest_files = vec ! [ added_manifest] ;
285293 manifest_files. extend ( existing_manifests) ;
286- let manifest_files = manifest_process. process_manifests ( manifest_files) ;
294+ let manifest_files = manifest_process. process_manifests ( self , manifest_files) ;
287295 Ok ( manifest_files)
288296 }
289297
290298 // Returns a `Summary` of the current snapshot
291299 fn summary < OP : SnapshotProduceOperation > (
292300 & self ,
293- table : & Table ,
294301 snapshot_produce_operation : & OP ,
295302 ) -> Result < Summary > {
296303 let mut summary_collector = SnapshotSummaryCollector :: default ( ) ;
297- let table_metadata = table. metadata_ref ( ) ;
304+ let table_metadata = self . table . metadata_ref ( ) ;
298305
299306 let partition_summary_limit = if let Some ( limit) = table_metadata
300307 . properties ( )
@@ -339,10 +346,10 @@ impl SnapshotProducer {
339346 )
340347 }
341348
342- fn generate_manifest_list_file_path ( & self , table : & Table , attempt : i64 ) -> String {
349+ fn generate_manifest_list_file_path ( & self , attempt : i64 ) -> String {
343350 format ! (
344351 "{}/{}/snap-{}-{}-{}.{}" ,
345- table. metadata( ) . location( ) ,
352+ self . table. metadata( ) . location( ) ,
346353 META_ROOT_PATH ,
347354 self . snapshot_id,
348355 attempt,
@@ -354,34 +361,34 @@ impl SnapshotProducer {
354361 /// Finished building the action and return the [`ActionCommit`] to the transaction.
355362 pub ( crate ) async fn commit < OP : SnapshotProduceOperation , MP : ManifestProcess > (
356363 mut self ,
357- table : & Table ,
358364 snapshot_produce_operation : OP ,
359365 process : MP ,
360366 ) -> Result < ActionCommit > {
361367 let new_manifests = self
362- . manifest_file ( table , & snapshot_produce_operation, & process)
368+ . manifest_file ( & snapshot_produce_operation, & process)
363369 . await ?;
364- let next_seq_num = table. metadata ( ) . next_sequence_number ( ) ;
370+ let next_seq_num = self . table . metadata ( ) . next_sequence_number ( ) ;
365371
366- let summary = self
367- . summary ( table, & snapshot_produce_operation)
368- . map_err ( |err| {
369- Error :: new ( ErrorKind :: Unexpected , "Failed to create snapshot summary." )
370- . with_source ( err)
371- } ) ?;
372+ let summary = self . summary ( & snapshot_produce_operation) . map_err ( |err| {
373+ Error :: new ( ErrorKind :: Unexpected , "Failed to create snapshot summary." ) . with_source ( err)
374+ } ) ?;
372375
373- let manifest_list_path = self . generate_manifest_list_file_path ( table , 0 ) ;
376+ let manifest_list_path = self . generate_manifest_list_file_path ( 0 ) ;
374377
375- let mut manifest_list_writer = match table. metadata ( ) . format_version ( ) {
378+ let mut manifest_list_writer = match self . table . metadata ( ) . format_version ( ) {
376379 FormatVersion :: V1 => ManifestListWriter :: v1 (
377- table. file_io ( ) . new_output ( manifest_list_path. clone ( ) ) ?,
380+ self . table
381+ . file_io ( )
382+ . new_output ( manifest_list_path. clone ( ) ) ?,
378383 self . snapshot_id ,
379- table. metadata ( ) . current_snapshot_id ( ) ,
384+ self . table . metadata ( ) . current_snapshot_id ( ) ,
380385 ) ,
381386 FormatVersion :: V2 => ManifestListWriter :: v2 (
382- table. file_io ( ) . new_output ( manifest_list_path. clone ( ) ) ?,
387+ self . table
388+ . file_io ( )
389+ . new_output ( manifest_list_path. clone ( ) ) ?,
383390 self . snapshot_id ,
384- table. metadata ( ) . current_snapshot_id ( ) ,
391+ self . table . metadata ( ) . current_snapshot_id ( ) ,
385392 next_seq_num,
386393 ) ,
387394 } ;
@@ -392,10 +399,10 @@ impl SnapshotProducer {
392399 let new_snapshot = Snapshot :: builder ( )
393400 . with_manifest_list ( manifest_list_path)
394401 . with_snapshot_id ( self . snapshot_id )
395- . with_parent_snapshot_id ( table. metadata ( ) . current_snapshot_id ( ) )
402+ . with_parent_snapshot_id ( self . table . metadata ( ) . current_snapshot_id ( ) )
396403 . with_sequence_number ( next_seq_num)
397404 . with_summary ( summary)
398- . with_schema_id ( table. metadata ( ) . current_schema_id ( ) )
405+ . with_schema_id ( self . table . metadata ( ) . current_schema_id ( ) )
399406 . with_timestamp_ms ( commit_ts)
400407 . build ( ) ;
401408
@@ -414,11 +421,11 @@ impl SnapshotProducer {
414421
415422 let requirements = vec ! [
416423 TableRequirement :: UuidMatch {
417- uuid: table. metadata( ) . uuid( ) ,
424+ uuid: self . table. metadata( ) . uuid( ) ,
418425 } ,
419426 TableRequirement :: RefSnapshotIdMatch {
420427 r#ref: MAIN_BRANCH . to_string( ) ,
421- snapshot_id: table. metadata( ) . current_snapshot_id( ) ,
428+ snapshot_id: self . table. metadata( ) . current_snapshot_id( ) ,
422429 } ,
423430 ] ;
424431
0 commit comments