@@ -2,7 +2,7 @@ use std::{
22 any:: { Any , TypeId } ,
33 collections:: HashSet ,
44 fs:: { self , File , OpenOptions , ReadDir } ,
5- io:: Write ,
5+ io:: { BufWriter , Write } ,
66 mem:: { swap, transmute, MaybeUninit } ,
77 path:: { Path , PathBuf } ,
88 sync:: {
@@ -13,6 +13,7 @@ use std::{
1313
1414use anyhow:: { bail, Context , Result } ;
1515use byteorder:: { ReadBytesExt , WriteBytesExt , BE } ;
16+ use jiff:: Timestamp ;
1617use lzzzz:: lz4:: decompress;
1718use memmap2:: Mmap ;
1819use parking_lot:: { Mutex , RwLock } ;
@@ -287,6 +288,9 @@ impl TurboPersistence {
287288 Some ( "CURRENT" ) => {
288289 // Already read
289290 }
291+ Some ( "LOG" ) => {
292+ // Ignored, write-only
293+ }
290294 _ => {
291295 if !path
292296 . file_name ( )
@@ -393,6 +397,15 @@ impl TurboPersistence {
393397 Ok ( WriteBatch :: new ( self . path . clone ( ) , current) )
394398 }
395399
400+ fn open_log ( & self ) -> Result < BufWriter < File > > {
401+ let log_path = self . path . join ( "LOG" ) ;
402+ let log_file = OpenOptions :: new ( )
403+ . create ( true )
404+ . append ( true )
405+ . open ( log_path) ?;
406+ Ok ( BufWriter :: new ( log_file) )
407+ }
408+
396409 /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
397410 /// visible to readers.
398411 pub fn commit_write_batch < K : StoreKey + Send + Sync + ' static , const FAMILIES : usize > (
@@ -418,10 +431,12 @@ impl TurboPersistence {
418431 fn commit (
419432 & self ,
420433 mut new_sst_files : Vec < ( u32 , File ) > ,
421- new_blob_files : Vec < File > ,
434+ new_blob_files : Vec < ( u32 , File ) > ,
422435 mut indicies_to_delete : Vec < usize > ,
423436 mut seq : u32 ,
424437 ) -> Result < ( ) , anyhow:: Error > {
438+ let time = Timestamp :: now ( ) ;
439+
425440 new_sst_files. sort_unstable_by_key ( |( seq, _) | * seq) ;
426441
427442 let mut new_sst_files = new_sst_files
@@ -432,10 +447,20 @@ impl TurboPersistence {
432447 } )
433448 . collect :: < Result < Vec < _ > > > ( ) ?;
434449
435- for file in new_blob_files {
450+ for ( _ , file) in new_blob_files. iter ( ) {
436451 file. sync_all ( ) ?;
437452 }
438453
454+ let new_sst_info = new_sst_files
455+ . iter ( )
456+ . map ( |sst| {
457+ let seq = sst. sequence_number ( ) ;
458+ let range = sst. range ( ) ?;
459+ let size = sst. size ( ) ;
460+ Ok ( ( seq, range. family , range. min_hash , range. max_hash , size) )
461+ } )
462+ . collect :: < Result < Vec < _ > > > ( ) ?;
463+
439464 if !indicies_to_delete. is_empty ( ) {
440465 seq += 1 ;
441466 }
@@ -479,6 +504,30 @@ impl TurboPersistence {
479504 fs:: remove_file ( self . path . join ( format ! ( "{seq:08}.sst" ) ) ) ?;
480505 }
481506
507+ {
508+ let mut log = self . open_log ( ) ?;
509+ writeln ! ( log, "Time {}" , time) ?;
510+ let span = time. until ( Timestamp :: now ( ) ) ?;
511+ writeln ! ( log, "Commit {seq:08} {:#}" , span) ?;
512+ for ( index, family, min, max, size) in new_sst_info. iter ( ) {
513+ writeln ! (
514+ log,
515+ "{:08} SST family:{} {:016x}-{:016x} {} MiB" ,
516+ index,
517+ family,
518+ min,
519+ max,
520+ size / 1024 / 1024
521+ ) ?;
522+ }
523+ for ( seq, _) in new_blob_files. iter ( ) {
524+ writeln ! ( log, "{:08} BLOB" , seq) ?;
525+ }
526+ for index in indicies_to_delete. iter ( ) {
527+ writeln ! ( log, "{:08} DELETED" , index) ?;
528+ }
529+ }
530+
482531 Ok ( ( ) )
483532 }
484533
@@ -583,6 +632,7 @@ impl TurboPersistence {
583632 let value_block_cache = & self . value_block_cache ;
584633 let path = & self . path ;
585634
635+ let log_mutex = Mutex :: new ( ( ) ) ;
586636 let result = sst_by_family
587637 . into_par_iter ( )
588638 . with_min_len ( 1 )
@@ -604,6 +654,32 @@ impl TurboPersistence {
604654 } ,
605655 ) ;
606656
657+ if !merge_jobs. is_empty ( ) {
658+ let guard = log_mutex. lock ( ) ;
659+ let mut log = self . open_log ( ) ?;
660+ writeln ! (
661+ log,
662+ "Compaction for family {family} (coverage: {coverage}):"
663+ ) ?;
664+ for job in merge_jobs. iter ( ) {
665+ writeln ! ( log, " merge" ) ?;
666+ for i in job. iter ( ) {
667+ let index = ssts_with_ranges[ * i] . index ;
668+ let ( min, max) = ssts_with_ranges[ * i] . range ( ) ;
669+ writeln ! ( log, " {index:08} {min:016x}-{max:016x}" ) ?;
670+ }
671+ }
672+ if !move_jobs. is_empty ( ) {
673+ writeln ! ( log, " move" ) ?;
674+ for i in move_jobs. iter ( ) {
675+ let index = ssts_with_ranges[ * i] . index ;
676+ let ( min, max) = ssts_with_ranges[ * i] . range ( ) ;
677+ writeln ! ( log, " {index:08} {min:016x}-{max:016x}" ) ?;
678+ }
679+ }
680+ drop ( guard) ;
681+ }
682+
607683 // Later we will remove the merged and moved files
608684 let indicies_to_delete = merge_jobs
609685 . iter ( )
0 commit comments