@@ -23,6 +23,7 @@ use itertools::Itertools;
23
23
use once_cell:: sync:: Lazy ;
24
24
use serde_json:: Value ;
25
25
use std:: collections:: HashMap ;
26
+ use std:: num:: NonZeroU32 ;
26
27
use std:: sync:: { Arc , RwLock } ;
27
28
28
29
use self :: error:: stream_info:: { CheckAlertError , LoadError , MetadataError } ;
@@ -53,7 +54,7 @@ pub struct LogStreamMetadata {
53
54
pub created_at : String ,
54
55
pub first_event_at : Option < String > ,
55
56
pub time_partition : Option < String > ,
56
- pub time_partition_limit : Option < String > ,
57
+ pub time_partition_limit : Option < NonZeroU32 > ,
57
58
pub custom_partition : Option < String > ,
58
59
pub static_schema_flag : Option < String > ,
59
60
pub hot_tier_enabled : Option < bool > ,
@@ -113,11 +114,11 @@ impl StreamInfo {
113
114
pub fn get_time_partition_limit (
114
115
& self ,
115
116
stream_name : & str ,
116
- ) -> Result < Option < String > , MetadataError > {
117
+ ) -> Result < Option < NonZeroU32 > , MetadataError > {
117
118
let map = self . read ( ) . expect ( LOCK_EXPECT ) ;
118
119
map. get ( stream_name)
119
120
. ok_or ( MetadataError :: StreamMetaNotFound ( stream_name. to_string ( ) ) )
120
- . map ( |metadata| metadata. time_partition_limit . clone ( ) )
121
+ . map ( |metadata| metadata. time_partition_limit )
121
122
}
122
123
123
124
pub fn get_custom_partition ( & self , stream_name : & str ) -> Result < Option < String > , MetadataError > {
@@ -202,7 +203,7 @@ impl StreamInfo {
202
203
pub fn update_time_partition_limit (
203
204
& self ,
204
205
stream_name : & str ,
205
- time_partition_limit : String ,
206
+ time_partition_limit : NonZeroU32 ,
206
207
) -> Result < ( ) , MetadataError > {
207
208
let mut map = self . write ( ) . expect ( LOCK_EXPECT ) ;
208
209
map. get_mut ( stream_name)
@@ -244,7 +245,7 @@ impl StreamInfo {
244
245
stream_name : String ,
245
246
created_at : String ,
246
247
time_partition : String ,
247
- time_partition_limit : String ,
248
+ time_partition_limit : Option < NonZeroU32 > ,
248
249
custom_partition : String ,
249
250
static_schema_flag : String ,
250
251
static_schema : HashMap < String , Arc < Field > > ,
@@ -262,11 +263,7 @@ impl StreamInfo {
262
263
} else {
263
264
Some ( time_partition)
264
265
} ,
265
- time_partition_limit : if time_partition_limit. is_empty ( ) {
266
- None
267
- } else {
268
- Some ( time_partition_limit)
269
- } ,
266
+ time_partition_limit,
270
267
custom_partition : if custom_partition. is_empty ( ) {
271
268
None
272
269
} else {
@@ -320,7 +317,9 @@ impl StreamInfo {
320
317
created_at : meta. created_at ,
321
318
first_event_at : meta. first_event_at ,
322
319
time_partition : meta. time_partition ,
323
- time_partition_limit : meta. time_partition_limit ,
320
+ time_partition_limit : meta
321
+ . time_partition_limit
322
+ . and_then ( |limit| limit. parse ( ) . ok ( ) ) ,
324
323
custom_partition : meta. custom_partition ,
325
324
static_schema_flag : meta. static_schema_flag ,
326
325
hot_tier_enabled : meta. hot_tier_enabled ,
@@ -473,7 +472,9 @@ pub async fn load_stream_metadata_on_server_start(
473
472
created_at : meta. created_at ,
474
473
first_event_at : meta. first_event_at ,
475
474
time_partition : meta. time_partition ,
476
- time_partition_limit : meta. time_partition_limit ,
475
+ time_partition_limit : meta
476
+ . time_partition_limit
477
+ . and_then ( |limit| limit. parse ( ) . ok ( ) ) ,
477
478
custom_partition : meta. custom_partition ,
478
479
static_schema_flag : meta. static_schema_flag ,
479
480
hot_tier_enabled : meta. hot_tier_enabled ,
0 commit comments