@@ -22,51 +22,71 @@ use core::cell::UnsafeCell;
2222
2323use  sync:: atomic:: { AtomicPtr ,  AtomicUsize ,  Ordering } ; 
2424
25+ use  super :: cache_aligned:: CacheAligned ; 
26+ 
2527// Node within the linked list queue of messages to send 
2628struct  Node < T >  { 
2729    // FIXME: this could be an uninitialized T if we're careful enough, and 
2830    //      that would reduce memory usage (and be a bit faster). 
2931    //      is it worth it? 
3032    value :  Option < T > ,            // nullable for re-use of nodes 
33+     cached :  bool ,                // This node goes into the node cache 
3134    next :  AtomicPtr < Node < T > > ,    // next node in the queue 
3235} 
3336
3437/// The single-producer single-consumer queue. This structure is not cloneable, 
3538/// but it can be safely shared in an Arc if it is guaranteed that there 
3639/// is only one popper and one pusher touching the queue at any one point in 
3740/// time. 
38- pub  struct  Queue < T >  { 
41+ pub  struct  Queue < T ,   ProducerAddition = ( ) ,   ConsumerAddition = ( ) >  { 
3942    // consumer fields 
43+     consumer :  CacheAligned < Consumer < T ,  ConsumerAddition > > , 
44+ 
45+     // producer fields 
46+     producer :  CacheAligned < Producer < T ,  ProducerAddition > > , 
47+ } 
48+ 
49+ struct  Consumer < T ,  Addition >  { 
4050    tail :  UnsafeCell < * mut  Node < T > > ,  // where to pop from 
4151    tail_prev :  AtomicPtr < Node < T > > ,  // where to pop from 
52+     cache_bound :  usize ,  // maximum cache size 
53+     cached_nodes :  AtomicUsize ,  // number of nodes marked as cachable 
54+     addition :  Addition , 
55+ } 
4256
43-      // producer fields 
57+ struct   Producer < T ,   Addition >   { 
4458    head :  UnsafeCell < * mut  Node < T > > ,       // where to push to 
4559    first :  UnsafeCell < * mut  Node < T > > ,      // where to get new nodes from 
4660    tail_copy :  UnsafeCell < * mut  Node < T > > ,  // between first/tail 
47- 
48-     // Cache maintenance fields. Additions and subtractions are stored 
49-     // separately in order to allow them to use nonatomic addition/subtraction. 
50-     cache_bound :  usize , 
51-     cache_additions :  AtomicUsize , 
52-     cache_subtractions :  AtomicUsize , 
61+     addition :  Addition , 
5362} 
5463
55- unsafe  impl < T :  Send >  Send  for  Queue < T >  {  } 
64+ unsafe  impl < T :  Send ,   P :   Send  +  Sync ,   C :   Send  +  Sync >  Send  for  Queue < T ,   P ,   C >  {  } 
5665
57- unsafe  impl < T :  Send >  Sync  for  Queue < T >  {  } 
66+ unsafe  impl < T :  Send ,   P :   Send  +  Sync ,   C :   Send  +  Sync >  Sync  for  Queue < T ,   P ,   C >  {  } 
5867
5968impl < T >  Node < T >  { 
6069    fn  new ( )  -> * mut  Node < T >  { 
6170        Box :: into_raw ( box Node  { 
6271            value :  None , 
72+             cached :  false , 
6373            next :  AtomicPtr :: new ( ptr:: null_mut :: < Node < T > > ( ) ) , 
6474        } ) 
6575    } 
6676} 
6777
68- impl < T >  Queue < T >  { 
69-     /// Creates a new queue. 
78+ impl < T ,  ProducerAddition ,  ConsumerAddition >  Queue < T ,  ProducerAddition ,  ConsumerAddition >  { 
79+ 
80+     /// Creates a new queue. With given additional elements in the producer and 
81+ /// consumer portions of the queue. 
82+ /// 
83+ /// Due to the performance implications of cache-contention, 
84+ /// we wish to keep fields used mainly by the producer on a separate cache 
85+ /// line than those used by the consumer. 
86+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to 
87+ /// allocate one for small fields, so we allow users to insert additional 
88+ /// fields into the cache lines already allocated by this for the producer 
89+ /// and consumer. 
7090/// 
7191/// This is unsafe as the type system doesn't enforce a single 
7292/// consumer-producer relationship. It also allows the consumer to `pop` 
@@ -83,19 +103,28 @@ impl<T> Queue<T> {
83103///               cache (if desired). If the value is 0, then the cache has 
84104///               no bound. Otherwise, the cache will never grow larger than 
85105///               `bound` (although the queue itself could be much larger. 
86- pub  unsafe  fn  new ( bound :  usize )  -> Queue < T >  { 
106+ pub  unsafe  fn  with_additions ( 
107+         bound :  usize , 
108+         producer_addition :  ProducerAddition , 
109+         consumer_addition :  ConsumerAddition , 
110+     )  -> Self  { 
87111        let  n1 = Node :: new ( ) ; 
88112        let  n2 = Node :: new ( ) ; 
89113        ( * n1) . next . store ( n2,  Ordering :: Relaxed ) ; 
90114        Queue  { 
91-             tail :  UnsafeCell :: new ( n2) , 
92-             tail_prev :  AtomicPtr :: new ( n1) , 
93-             head :  UnsafeCell :: new ( n2) , 
94-             first :  UnsafeCell :: new ( n1) , 
95-             tail_copy :  UnsafeCell :: new ( n1) , 
96-             cache_bound :  bound, 
97-             cache_additions :  AtomicUsize :: new ( 0 ) , 
98-             cache_subtractions :  AtomicUsize :: new ( 0 ) , 
115+             consumer :  CacheAligned :: new ( Consumer  { 
116+                 tail :  UnsafeCell :: new ( n2) , 
117+                 tail_prev :  AtomicPtr :: new ( n1) , 
118+                 cache_bound :  bound, 
119+                 cached_nodes :  AtomicUsize :: new ( 0 ) , 
120+                 addition :  consumer_addition
121+             } ) , 
122+             producer :  CacheAligned :: new ( Producer  { 
123+                 head :  UnsafeCell :: new ( n2) , 
124+                 first :  UnsafeCell :: new ( n1) , 
125+                 tail_copy :  UnsafeCell :: new ( n1) , 
126+                 addition :  producer_addition
127+             } ) , 
99128        } 
100129    } 
101130
@@ -109,35 +138,25 @@ impl<T> Queue<T> {
109138            assert ! ( ( * n) . value. is_none( ) ) ; 
110139            ( * n) . value  = Some ( t) ; 
111140            ( * n) . next . store ( ptr:: null_mut ( ) ,  Ordering :: Relaxed ) ; 
112-             ( * * self . head . get ( ) ) . next . store ( n,  Ordering :: Release ) ; 
113-             * self . head . get ( )  = n; 
141+             ( * * self . producer . head . get ( ) ) . next . store ( n,  Ordering :: Release ) ; 
142+             * ( & self . producer . head ) . get ( )  = n; 
114143        } 
115144    } 
116145
117146    unsafe  fn  alloc ( & self )  -> * mut  Node < T >  { 
118147        // First try to see if we can consume the 'first' node for our uses. 
119-         // We try to avoid as many atomic instructions as possible here, so 
120-         // the addition to cache_subtractions is not atomic (plus we're the 
121-         // only one subtracting from the cache). 
122-         if  * self . first . get ( )  != * self . tail_copy . get ( )  { 
123-             if  self . cache_bound  > 0  { 
124-                 let  b = self . cache_subtractions . load ( Ordering :: Relaxed ) ; 
125-                 self . cache_subtractions . store ( b + 1 ,  Ordering :: Relaxed ) ; 
126-             } 
127-             let  ret = * self . first . get ( ) ; 
128-             * self . first . get ( )  = ( * ret) . next . load ( Ordering :: Relaxed ) ; 
148+         if  * self . producer . first . get ( )  != * self . producer . tail_copy . get ( )  { 
149+             let  ret = * self . producer . first . get ( ) ; 
150+             * self . producer . 0 . first . get ( )  = ( * ret) . next . load ( Ordering :: Relaxed ) ; 
129151            return  ret; 
130152        } 
131153        // If the above fails, then update our copy of the tail and try 
132154        // again. 
133-         * self . tail_copy . get ( )  = self . tail_prev . load ( Ordering :: Acquire ) ; 
134-         if  * self . first . get ( )  != * self . tail_copy . get ( )  { 
135-             if  self . cache_bound  > 0  { 
136-                 let  b = self . cache_subtractions . load ( Ordering :: Relaxed ) ; 
137-                 self . cache_subtractions . store ( b + 1 ,  Ordering :: Relaxed ) ; 
138-             } 
139-             let  ret = * self . first . get ( ) ; 
140-             * self . first . get ( )  = ( * ret) . next . load ( Ordering :: Relaxed ) ; 
155+         * self . producer . 0 . tail_copy . get ( )  =
156+             self . consumer . tail_prev . load ( Ordering :: Acquire ) ; 
157+         if  * self . producer . first . get ( )  != * self . producer . tail_copy . get ( )  { 
158+             let  ret = * self . producer . first . get ( ) ; 
159+             * self . producer . 0 . first . get ( )  = ( * ret) . next . load ( Ordering :: Relaxed ) ; 
141160            return  ret; 
142161        } 
143162        // If all of that fails, then we have to allocate a new node 
@@ -153,27 +172,27 @@ impl<T> Queue<T> {
153172            // sentinel from where we should start popping from. Hence, look at 
154173            // tail's next field and see if we can use it. If we do a pop, then 
155174            // the current tail node is a candidate for going into the cache. 
156-             let  tail = * self . tail . get ( ) ; 
175+             let  tail = * self . consumer . tail . get ( ) ; 
157176            let  next = ( * tail) . next . load ( Ordering :: Acquire ) ; 
158177            if  next. is_null ( )  {  return  None  } 
159178            assert ! ( ( * next) . value. is_some( ) ) ; 
160179            let  ret = ( * next) . value . take ( ) ; 
161180
162-             * self . tail . get ( )  = next; 
163-             if  self . cache_bound  == 0  { 
164-                 self . tail_prev . store ( tail,  Ordering :: Release ) ; 
181+             * self . consumer . 0 . tail . get ( )  = next; 
182+             if  self . consumer . cache_bound  == 0  { 
183+                 self . consumer . tail_prev . store ( tail,  Ordering :: Release ) ; 
165184            }  else  { 
166-                 // FIXME: this is dubious with overflow. 
167-                 let  additions =  self . cache_additions . load ( Ordering :: Relaxed ) ; 
168-                 let  subtractions =  self . cache_subtractions . load ( Ordering :: Relaxed ) ; 
169-                 let  size = additions - subtractions ; 
170- 
171-                  if  size <  self . cache_bound   { 
172-                      self . tail_prev . store ( tail,   Ordering :: Release ) ; 
173-                     self . cache_additions . store ( additions +  1 ,  Ordering :: Relaxed ) ; 
185+                 let  cached_nodes =  self . consumer . cached_nodes . load ( Ordering :: Relaxed ) ; 
186+                 if  cached_nodes <  self . consumer . cache_bound  && ! ( * tail ) . cached   { 
187+                      self . consumer . cached_nodes . store ( cached_nodes ,   Ordering :: Relaxed ) ; 
188+                      ( * tail ) . cached  =  true ; 
189+                  } 
190+ 
191+                 if   ( * tail) . cached   { 
192+                     self . consumer . tail_prev . store ( tail ,  Ordering :: Release ) ; 
174193                }  else  { 
175-                     ( * self . tail_prev . load ( Ordering :: Relaxed ) ) 
176-                            . next . store ( next,  Ordering :: Relaxed ) ; 
194+                     ( * self . consumer . tail_prev . load ( Ordering :: Relaxed ) ) 
195+                         . next . store ( next,  Ordering :: Relaxed ) ; 
177196                    // We have successfully erased all references to 'tail', so 
178197                    // now we can safely drop it. 
179198                    let  _:  Box < Node < T > >  = Box :: from_raw ( tail) ; 
@@ -194,17 +213,25 @@ impl<T> Queue<T> {
194213        // This is essentially the same as above with all the popping bits 
195214        // stripped out. 
196215        unsafe  { 
197-             let  tail = * self . tail . get ( ) ; 
216+             let  tail = * self . consumer . tail . get ( ) ; 
198217            let  next = ( * tail) . next . load ( Ordering :: Acquire ) ; 
199218            if  next. is_null ( )  {  None  }  else  {  ( * next) . value . as_mut ( )  } 
200219        } 
201220    } 
221+ 
222+     pub  fn  producer_addition ( & self )  -> & ProducerAddition  { 
223+         & self . producer . addition 
224+     } 
225+ 
226+     pub  fn  consumer_addition ( & self )  -> & ConsumerAddition  { 
227+         & self . consumer . addition 
228+     } 
202229} 
203230
204- impl < T >  Drop  for  Queue < T >  { 
231+ impl < T ,   ProducerAddition ,   ConsumerAddition >  Drop  for  Queue < T ,   ProducerAddition ,   ConsumerAddition >  { 
205232    fn  drop ( & mut  self )  { 
206233        unsafe  { 
207-             let  mut  cur = * self . first . get ( ) ; 
234+             let  mut  cur = * self . producer . first . get ( ) ; 
208235            while  !cur. is_null ( )  { 
209236                let  next = ( * cur) . next . load ( Ordering :: Relaxed ) ; 
210237                let  _n:  Box < Node < T > >  = Box :: from_raw ( cur) ; 
@@ -224,7 +251,7 @@ mod tests {
224251    #[ test]  
225252    fn  smoke ( )  { 
226253        unsafe  { 
227-             let  queue = Queue :: new ( 0 ) ; 
254+             let  queue = Queue :: with_additions ( 0 ,   ( ) ,   ( ) ) ; 
228255            queue. push ( 1 ) ; 
229256            queue. push ( 2 ) ; 
230257            assert_eq ! ( queue. pop( ) ,  Some ( 1 ) ) ; 
@@ -241,7 +268,7 @@ mod tests {
241268    #[ test]  
242269    fn  peek ( )  { 
243270        unsafe  { 
244-             let  queue = Queue :: new ( 0 ) ; 
271+             let  queue = Queue :: with_additions ( 0 ,   ( ) ,   ( ) ) ; 
245272            queue. push ( vec ! [ 1 ] ) ; 
246273
247274            // Ensure the borrowchecker works 
@@ -264,7 +291,7 @@ mod tests {
264291    #[ test]  
265292    fn  drop_full ( )  { 
266293        unsafe  { 
267-             let  q:  Queue < Box < _ > >  = Queue :: new ( 0 ) ; 
294+             let  q:  Queue < Box < _ > >  = Queue :: with_additions ( 0 ,   ( ) ,   ( ) ) ; 
268295            q. push ( box 1 ) ; 
269296            q. push ( box 2 ) ; 
270297        } 
@@ -273,7 +300,7 @@ mod tests {
273300    #[ test]  
274301    fn  smoke_bound ( )  { 
275302        unsafe  { 
276-             let  q = Queue :: new ( 0 ) ; 
303+             let  q = Queue :: with_additions ( 0 ,   ( ) ,   ( ) ) ; 
277304            q. push ( 1 ) ; 
278305            q. push ( 2 ) ; 
279306            assert_eq ! ( q. pop( ) ,  Some ( 1 ) ) ; 
@@ -295,7 +322,7 @@ mod tests {
295322        } 
296323
297324        unsafe  fn  stress_bound ( bound :  usize )  { 
298-             let  q = Arc :: new ( Queue :: new ( bound) ) ; 
325+             let  q = Arc :: new ( Queue :: with_additions ( bound,   ( ) ,   ( ) ) ) ; 
299326
300327            let  ( tx,  rx)  = channel ( ) ; 
301328            let  q2 = q. clone ( ) ; 
0 commit comments