@@ -48,7 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
48
48
use itertools:: Itertools ;
49
49
use relative_path:: RelativePath ;
50
50
use relative_path:: RelativePathBuf ;
51
- use tracing:: error;
51
+ use tracing:: { debug , error, instrument , trace } ;
52
52
53
53
use std:: collections:: BTreeMap ;
54
54
use std:: {
@@ -67,7 +67,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync
67
67
}
68
68
69
69
#[ async_trait]
70
- pub trait ObjectStorage : Send + Sync + ' static {
70
+ pub trait ObjectStorage : std :: fmt :: Debug + Send + Sync + ' static {
71
71
async fn get_object ( & self , path : & RelativePath ) -> Result < Bytes , ObjectStorageError > ;
72
72
// TODO: make the filter function optional as we may want to get all objects
73
73
async fn get_objects (
@@ -538,8 +538,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
538
538
Ok ( Bytes :: new ( ) )
539
539
}
540
540
541
+ #[ instrument( level = "debug" ) ]
541
542
async fn sync ( & self , shutdown_signal : bool ) -> Result < ( ) , ObjectStorageError > {
542
543
if !Path :: new ( & CONFIG . staging_dir ( ) ) . exists ( ) {
544
+ trace ! ( "Nothing to sync" ) ;
543
545
return Ok ( ( ) ) ;
544
546
}
545
547
@@ -552,6 +554,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
552
554
let cache_enabled = STREAM_INFO
553
555
. get_cache_enabled ( stream)
554
556
. map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
557
+
555
558
let time_partition = STREAM_INFO
556
559
. get_time_partition ( stream)
557
560
. map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
@@ -568,6 +571,8 @@ pub trait ObjectStorage: Send + Sync + 'static {
568
571
)
569
572
. map_err ( |err| ObjectStorageError :: UnhandledError ( Box :: new ( err) ) ) ?;
570
573
574
+ debug ! ( "Arrow files compressed into parquet for stream: {stream}" ) ;
575
+
571
576
if let Some ( schema) = schema {
572
577
let static_schema_flag = STREAM_INFO
573
578
. get_static_schema_flag ( stream)
@@ -614,6 +619,8 @@ pub trait ObjectStorage: Send + Sync + 'static {
614
619
if let Err ( e) = self . upload_file ( & stream_relative_path, & file) . await {
615
620
error ! ( "Failed to upload file {}: {:?}" , filename, e) ;
616
621
continue ; // Skip to the next file
622
+ } else {
623
+ debug ! ( "Parquet file uploaded to s3 for stream: {stream}" ) ;
617
624
}
618
625
619
626
let absolute_path = self
@@ -622,6 +629,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
622
629
let store = CONFIG . storage ( ) . get_object_store ( ) ;
623
630
let manifest =
624
631
catalog:: create_from_parquet_file ( absolute_path. clone ( ) , & file) . unwrap ( ) ;
632
+
625
633
catalog:: update_snapshot ( store, stream, manifest) . await ?;
626
634
if cache_enabled && cache_manager. is_some ( ) {
627
635
cache_updates
0 commit comments