@@ -24,7 +24,8 @@ use relative_path::RelativePathBuf;
2424use crate :: {
2525 catalog:: manifest:: Manifest ,
2626 query:: PartialTimeFilter ,
27- storage:: { ObjectStorage , ObjectStorageError } ,
27+ storage:: { ObjectStorage , ObjectStorageError , MANIFEST_FILE } ,
28+ utils:: get_address,
2829} ;
2930
3031use self :: { column:: Column , snapshot:: ManifestItem } ;
@@ -105,20 +106,67 @@ pub async fn update_snapshot(
105106 item. time_lower_bound <= lower_bound && lower_bound < item. time_upper_bound
106107 } ) ;
107108
109+ // if the mode in I.S. manifest needs to be created but it is not getting created because
110+ // there is already a pos, to index into stream.json
111+
108112 // We update the manifest referenced by this position
109113 // This updates an existing file so there is no need to create a snapshot entry.
110114 if let Some ( pos) = pos {
111115 let info = & mut manifests[ pos] ;
112116 let path = partition_path ( stream_name, info. time_lower_bound , info. time_upper_bound ) ;
113- let Some ( mut manifest) = storage. get_manifest ( & path) . await ? else {
114- return Err ( ObjectStorageError :: UnhandledError (
115- "Manifest found in snapshot but not in object-storage"
116- . to_string ( )
117- . into ( ) ,
118- ) ) ;
119- } ;
120- manifest. apply_change ( change) ;
121- storage. put_manifest ( & path, manifest) . await ?;
117+
118+ let mut ch = false ;
119+ for m in manifests. iter ( ) {
120+ let s = get_address ( ) ;
121+ let p = format ! ( "{}.{}.{}" , s. 0 , s. 1 , MANIFEST_FILE ) ;
122+ if m. manifest_path . contains ( & p) {
123+ ch = true ;
124+ }
125+ }
126+ if ch {
127+ let Some ( mut manifest) = storage. get_manifest ( & path) . await ? else {
128+ return Err ( ObjectStorageError :: UnhandledError (
129+ "Manifest found in snapshot but not in object-storage"
130+ . to_string ( )
131+ . into ( ) ,
132+ ) ) ;
133+ } ;
134+ manifest. apply_change ( change) ;
135+ storage. put_manifest ( & path, manifest) . await ?;
136+ } else {
137+ let lower_bound = lower_bound. date_naive ( ) . and_time ( NaiveTime :: MIN ) . and_utc ( ) ;
138+ let upper_bound = lower_bound
139+ . date_naive ( )
140+ . and_time (
141+ NaiveTime :: from_num_seconds_from_midnight_opt (
142+ 23 * 3600 + 59 * 60 + 59 ,
143+ 999_999_999 ,
144+ )
145+ . unwrap ( ) ,
146+ )
147+ . and_utc ( ) ;
148+
149+ let manifest = Manifest {
150+ files : vec ! [ change] ,
151+ ..Manifest :: default ( )
152+ } ;
153+
154+ let addr = get_address ( ) ;
155+ let mainfest_file_name = format ! ( "{}.{}.{}" , addr. 0 , addr. 1 , MANIFEST_FILE ) ;
156+ let path =
157+ partition_path ( stream_name, lower_bound, upper_bound) . join ( & mainfest_file_name) ;
158+ storage
159+ . put_object ( & path, serde_json:: to_vec ( & manifest) . unwrap ( ) . into ( ) )
160+ . await ?;
161+ let path = storage. absolute_url ( & path) ;
162+ let new_snapshot_entriy = snapshot:: ManifestItem {
163+ manifest_path : path. to_string ( ) ,
164+ time_lower_bound : lower_bound,
165+ time_upper_bound : upper_bound,
166+ } ;
167+ manifests. push ( new_snapshot_entriy) ;
168+ storage. put_snapshot ( stream_name, meta) . await ?;
169+ }
122170 } else {
123171 let lower_bound = lower_bound. date_naive ( ) . and_time ( NaiveTime :: MIN ) . and_utc ( ) ;
124172 let upper_bound = lower_bound
@@ -137,7 +185,9 @@ pub async fn update_snapshot(
137185 ..Manifest :: default ( )
138186 } ;
139187
140- let path = partition_path ( stream_name, lower_bound, upper_bound) . join ( "manifest.json" ) ;
188+ let addr = get_address ( ) ;
189+ let mainfest_file_name = format ! ( "{}.{}.{}" , addr. 0 , addr. 1 , MANIFEST_FILE ) ;
190+ let path = partition_path ( stream_name, lower_bound, upper_bound) . join ( & mainfest_file_name) ;
141191 storage
142192 . put_object ( & path, serde_json:: to_vec ( & manifest) . unwrap ( ) . into ( ) )
143193 . await ?;
0 commit comments