@@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE};
1212use  lzzzz:: lz4:: { self ,  ACC_LEVEL_DEFAULT } ; 
1313use  parking_lot:: Mutex ; 
1414use  rayon:: { 
15-     iter:: { IndexedParallelIterator ,  IntoParallelIterator ,  ParallelIterator } , 
15+     iter:: { Either ,   IndexedParallelIterator ,  IntoParallelIterator ,  ParallelIterator } , 
1616    scope, 
1717} ; 
1818use  smallvec:: SmallVec ; 
@@ -39,13 +39,25 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
3939new_blob_files :  Vec < ( u32 ,  File ) > , 
4040} 
4141
42+ const  COLLECTOR_SHARDS :  usize  = 4 ; 
43+ const  COLLECTOR_SHARD_SHIFT :  usize  =
44+     u64:: BITS  as  usize  - COLLECTOR_SHARDS . trailing_zeros ( )  as  usize ; 
45+ 
4246/// The result of a `WriteBatch::finish` operation. 
4347pub ( crate )  struct  FinishResult  { 
4448    pub ( crate )  sequence_number :  u32 , 
4549    pub ( crate )  new_sst_files :  Vec < ( u32 ,  File ) > , 
4650    pub ( crate )  new_blob_files :  Vec < ( u32 ,  File ) > , 
4751} 
4852
53+ enum  GlobalCollectorState < K :  StoreKey  + Send >  { 
54+     /// Initial state. Single collector. Once the collector is full, we switch to sharded mode. 
55+ Unsharded ( Collector < K > ) , 
56+     /// Sharded mode. 
57+ /// We use multiple collectors, and select one based on the first bits of the key hash. 
58+ Sharded ( [ Collector < K > ;  COLLECTOR_SHARDS ] ) , 
59+ } 
60+ 
4961/// A write batch. 
5062pub  struct  WriteBatch < K :  StoreKey  + Send ,  const  FAMILIES :  usize >  { 
5163    /// The database path 
@@ -55,7 +67,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
5567    /// The thread local state. 
5668thread_locals :  ThreadLocal < UnsafeCell < ThreadLocalState < K ,  FAMILIES > > > , 
5769    /// Collectors in use. The thread local collectors flush into these when they are full. 
58- collectors :  [ Mutex < Collector < K > > ;  FAMILIES ] , 
70+ collectors :  [ Mutex < GlobalCollectorState < K > > ;  FAMILIES ] , 
5971    /// The list of new SST files that have been created. 
6072new_sst_files :  Mutex < Vec < ( u32 ,  File ) > > , 
6173    /// Collectors are are current unused, but have memory preallocated. 
@@ -74,7 +86,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
7486            path, 
7587            current_sequence_number :  AtomicU32 :: new ( current) , 
7688            thread_locals :  ThreadLocal :: new ( ) , 
77-             collectors :  [ ( ) ;  FAMILIES ] . map ( |_| Mutex :: new ( Collector :: new ( ) ) ) , 
89+             collectors :  [ ( ) ;  FAMILIES ] 
90+                 . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) , 
7891            new_sst_files :  Mutex :: new ( Vec :: new ( ) ) , 
7992            idle_collectors :  Mutex :: new ( Vec :: new ( ) ) , 
8093            idle_thread_local_collectors :  Mutex :: new ( Vec :: new ( ) ) , 
@@ -127,17 +140,39 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
127140    )  -> Result < ( ) >  { 
128141        let  mut  full_collectors = SmallVec :: < [ _ ;  2 ] > :: new ( ) ; 
129142        { 
130-             let  mut  global_collector  = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ; 
143+             let  mut  global_collector_state  = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ; 
131144            for  entry in  collector. drain ( )  { 
132-                 global_collector. add_entry ( entry) ; 
133-                 if  global_collector. is_full ( )  { 
134-                     full_collectors. push ( replace ( 
135-                         & mut  * global_collector, 
136-                         self . idle_collectors 
137-                             . lock ( ) 
138-                             . pop ( ) 
139-                             . unwrap_or_else ( || Collector :: new ( ) ) , 
140-                     ) ) ; 
145+                 match  & mut  * global_collector_state { 
146+                     GlobalCollectorState :: Unsharded ( collector)  => { 
147+                         collector. add_entry ( entry) ; 
148+                         if  collector. is_full ( )  { 
149+                             // When full, split the entries into shards. 
150+                             let  mut  shards:  [ Collector < K > ;  4 ]  =
151+                                 [ ( ) ;  COLLECTOR_SHARDS ] . map ( |_| Collector :: new ( ) ) ; 
152+                             for  entry in  collector. drain ( )  { 
153+                                 let  shard = ( entry. key . hash  >> COLLECTOR_SHARD_SHIFT )  as  usize ; 
154+                                 shards[ shard] . add_entry ( entry) ; 
155+                             } 
156+                             // There is a rare edge case where all entries are in the same shard, 
157+                             // and the collector is full after the split. 
158+                             for  collector in  shards. iter_mut ( )  { 
159+                                 if  collector. is_full ( )  { 
160+                                     full_collectors
161+                                         . push ( replace ( & mut  * collector,  self . get_new_collector ( ) ) ) ; 
162+                                 } 
163+                             } 
164+                             * global_collector_state = GlobalCollectorState :: Sharded ( shards) ; 
165+                         } 
166+                     } 
167+                     GlobalCollectorState :: Sharded ( shards)  => { 
168+                         let  shard = ( entry. key . hash  >> COLLECTOR_SHARD_SHIFT )  as  usize ; 
169+                         let  collector = & mut  shards[ shard] ; 
170+                         collector. add_entry ( entry) ; 
171+                         if  collector. is_full ( )  { 
172+                             full_collectors
173+                                 . push ( replace ( & mut  * collector,  self . get_new_collector ( ) ) ) ; 
174+                         } 
175+                     } 
141176                } 
142177            } 
143178        } 
@@ -151,6 +186,13 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
151186        Ok ( ( ) ) 
152187    } 
153188
189+     fn  get_new_collector ( & self )  -> Collector < K >  { 
190+         self . idle_collectors 
191+             . lock ( ) 
192+             . pop ( ) 
193+             . unwrap_or_else ( || Collector :: new ( ) ) 
194+     } 
195+ 
154196    /// Puts a key-value pair into the write batch. 
155197pub  fn  put ( & self ,  family :  u32 ,  key :  K ,  value :  ValueBuffer < ' _ > )  -> Result < ( ) >  { 
156198        let  state = self . thread_local_state ( ) ; 
@@ -213,23 +255,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
213255        // Now we reduce the global collectors in parallel 
214256        let  shared_new_sst_files = Mutex :: new ( & mut  new_sst_files) ; 
215257
216-         let  collectors = replace ( 
217-             & mut  self . collectors , 
218-             [ ( ) ;  FAMILIES ] . map ( |_| { 
219-                 Mutex :: new ( 
220-                     self . idle_collectors 
221-                         . lock ( ) 
222-                         . pop ( ) 
223-                         . unwrap_or_else ( || Collector :: new ( ) ) , 
224-                 ) 
225-             } ) , 
226-         ) ; 
258+         let  new_collectors = [ ( ) ;  FAMILIES ] 
259+             . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ) ) ; 
260+         let  collectors = replace ( & mut  self . collectors ,  new_collectors) ; 
227261        collectors
228262            . into_par_iter ( ) 
229263            . enumerate ( ) 
230-             . try_for_each ( |( family,  collector) | { 
264+             . flat_map ( |( family,  state) | { 
265+                 let  collector = state. into_inner ( ) ; 
266+                 match  collector { 
267+                     GlobalCollectorState :: Unsharded ( collector)  => { 
268+                         Either :: Left ( [ ( family,  collector) ] . into_par_iter ( ) ) 
269+                     } 
270+                     GlobalCollectorState :: Sharded ( shards)  => Either :: Right ( 
271+                         shards
272+                             . into_par_iter ( ) 
273+                             . map ( move  |collector| ( family,  collector) ) , 
274+                     ) , 
275+                 } 
276+             } ) 
277+             . try_for_each ( |( family,  mut  collector) | { 
231278                let  family = family as  u32 ; 
232-                 let  mut  collector = collector. into_inner ( ) ; 
233279                if  !collector. is_empty ( )  { 
234280                    let  sst = self . create_sst_file ( family,  collector. sorted ( ) ) ?; 
235281                    collector. clear ( ) ; 
0 commit comments