@@ -22,51 +22,71 @@ use core::cell::UnsafeCell;
2222
2323use sync:: atomic:: { AtomicPtr , AtomicUsize , Ordering } ;
2424
25+ use super :: cache_aligned:: CacheAligned ;
26+
2527// Node within the linked list queue of messages to send
2628struct Node < T > {
2729 // FIXME: this could be an uninitialized T if we're careful enough, and
2830 // that would reduce memory usage (and be a bit faster).
2931 // is it worth it?
3032 value : Option < T > , // nullable for re-use of nodes
33+ cached : bool , // This node goes into the node cache
3134 next : AtomicPtr < Node < T > > , // next node in the queue
3235}
3336
3437/// The single-producer single-consumer queue. This structure is not cloneable,
3538/// but it can be safely shared in an Arc if it is guaranteed that there
3639/// is only one popper and one pusher touching the queue at any one point in
3740/// time.
38- pub struct Queue < T > {
41+ pub struct Queue < T , ProducerAddition = ( ) , ConsumerAddition = ( ) > {
3942 // consumer fields
43+ consumer : CacheAligned < Consumer < T , ConsumerAddition > > ,
44+
45+ // producer fields
46+ producer : CacheAligned < Producer < T , ProducerAddition > > ,
47+ }
48+
49+ struct Consumer < T , Addition > {
4050 tail : UnsafeCell < * mut Node < T > > , // where to pop from
4151 tail_prev : AtomicPtr < Node < T > > , // where to pop from
52+ cache_bound : usize , // maximum cache size
53+ cached_nodes : AtomicUsize , // number of nodes marked as cachable
54+ addition : Addition ,
55+ }
4256
43- // producer fields
57+ struct Producer < T , Addition > {
4458 head : UnsafeCell < * mut Node < T > > , // where to push to
4559 first : UnsafeCell < * mut Node < T > > , // where to get new nodes from
4660 tail_copy : UnsafeCell < * mut Node < T > > , // between first/tail
47-
48- // Cache maintenance fields. Additions and subtractions are stored
49- // separately in order to allow them to use nonatomic addition/subtraction.
50- cache_bound : usize ,
51- cache_additions : AtomicUsize ,
52- cache_subtractions : AtomicUsize ,
61+ addition : Addition ,
5362}
5463
55- unsafe impl < T : Send > Send for Queue < T > { }
64+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Send for Queue < T , P , C > { }
5665
57- unsafe impl < T : Send > Sync for Queue < T > { }
66+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Sync for Queue < T , P , C > { }
5867
5968impl < T > Node < T > {
6069 fn new ( ) -> * mut Node < T > {
6170 Box :: into_raw ( box Node {
6271 value : None ,
72+ cached : false ,
6373 next : AtomicPtr :: new ( ptr:: null_mut :: < Node < T > > ( ) ) ,
6474 } )
6575 }
6676}
6777
68- impl < T > Queue < T > {
69- /// Creates a new queue.
78+ impl < T , ProducerAddition , ConsumerAddition > Queue < T , ProducerAddition , ConsumerAddition > {
79+
80+ /// Creates a new queue. With given additional elements in the producer and
81+ /// consumer portions of the queue.
82+ ///
83+ /// Due to the performance implications of cache-contention,
84+ /// we wish to keep fields used mainly by the producer on a separate cache
85+ /// line than those used by the consumer.
86+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87+ /// allocate one for small fields, so we allow users to insert additional
88+ /// fields into the cache lines already allocated by this for the producer
89+ /// and consumer.
7090 ///
7191 /// This is unsafe as the type system doesn't enforce a single
7292 /// consumer-producer relationship. It also allows the consumer to `pop`
@@ -83,19 +103,28 @@ impl<T> Queue<T> {
83103 /// cache (if desired). If the value is 0, then the cache has
84104 /// no bound. Otherwise, the cache will never grow larger than
85105 /// `bound` (although the queue itself could be much larger.
86- pub unsafe fn new ( bound : usize ) -> Queue < T > {
106+ pub unsafe fn with_additions (
107+ bound : usize ,
108+ producer_addition : ProducerAddition ,
109+ consumer_addition : ConsumerAddition ,
110+ ) -> Self {
87111 let n1 = Node :: new ( ) ;
88112 let n2 = Node :: new ( ) ;
89113 ( * n1) . next . store ( n2, Ordering :: Relaxed ) ;
90114 Queue {
91- tail : UnsafeCell :: new ( n2) ,
92- tail_prev : AtomicPtr :: new ( n1) ,
93- head : UnsafeCell :: new ( n2) ,
94- first : UnsafeCell :: new ( n1) ,
95- tail_copy : UnsafeCell :: new ( n1) ,
96- cache_bound : bound,
97- cache_additions : AtomicUsize :: new ( 0 ) ,
98- cache_subtractions : AtomicUsize :: new ( 0 ) ,
115+ consumer : CacheAligned :: new ( Consumer {
116+ tail : UnsafeCell :: new ( n2) ,
117+ tail_prev : AtomicPtr :: new ( n1) ,
118+ cache_bound : bound,
119+ cached_nodes : AtomicUsize :: new ( 0 ) ,
120+ addition : consumer_addition
121+ } ) ,
122+ producer : CacheAligned :: new ( Producer {
123+ head : UnsafeCell :: new ( n2) ,
124+ first : UnsafeCell :: new ( n1) ,
125+ tail_copy : UnsafeCell :: new ( n1) ,
126+ addition : producer_addition
127+ } ) ,
99128 }
100129 }
101130
@@ -109,35 +138,25 @@ impl<T> Queue<T> {
109138 assert ! ( ( * n) . value. is_none( ) ) ;
110139 ( * n) . value = Some ( t) ;
111140 ( * n) . next . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
112- ( * * self . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
113- * self . head . get ( ) = n;
141+ ( * * self . producer . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
142+ * ( & self . producer . head ) . get ( ) = n;
114143 }
115144 }
116145
117146 unsafe fn alloc ( & self ) -> * mut Node < T > {
118147 // First try to see if we can consume the 'first' node for our uses.
119- // We try to avoid as many atomic instructions as possible here, so
120- // the addition to cache_subtractions is not atomic (plus we're the
121- // only one subtracting from the cache).
122- if * self . first . get ( ) != * self . tail_copy . get ( ) {
123- if self . cache_bound > 0 {
124- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
125- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
126- }
127- let ret = * self . first . get ( ) ;
128- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
148+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
149+ let ret = * self . producer . first . get ( ) ;
150+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
129151 return ret;
130152 }
131153 // If the above fails, then update our copy of the tail and try
132154 // again.
133- * self . tail_copy . get ( ) = self . tail_prev . load ( Ordering :: Acquire ) ;
134- if * self . first . get ( ) != * self . tail_copy . get ( ) {
135- if self . cache_bound > 0 {
136- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
137- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
138- }
139- let ret = * self . first . get ( ) ;
140- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
155+ * self . producer . 0 . tail_copy . get ( ) =
156+ self . consumer . tail_prev . load ( Ordering :: Acquire ) ;
157+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
158+ let ret = * self . producer . first . get ( ) ;
159+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
141160 return ret;
142161 }
143162 // If all of that fails, then we have to allocate a new node
@@ -153,27 +172,27 @@ impl<T> Queue<T> {
153172 // sentinel from where we should start popping from. Hence, look at
154173 // tail's next field and see if we can use it. If we do a pop, then
155174 // the current tail node is a candidate for going into the cache.
156- let tail = * self . tail . get ( ) ;
175+ let tail = * self . consumer . tail . get ( ) ;
157176 let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
158177 if next. is_null ( ) { return None }
159178 assert ! ( ( * next) . value. is_some( ) ) ;
160179 let ret = ( * next) . value . take ( ) ;
161180
162- * self . tail . get ( ) = next;
163- if self . cache_bound == 0 {
164- self . tail_prev . store ( tail, Ordering :: Release ) ;
181+ * self . consumer . 0 . tail . get ( ) = next;
182+ if self . consumer . cache_bound == 0 {
183+ self . consumer . tail_prev . store ( tail, Ordering :: Release ) ;
165184 } else {
166- // FIXME: this is dubious with overflow.
167- let additions = self . cache_additions . load ( Ordering :: Relaxed ) ;
168- let subtractions = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
169- let size = additions - subtractions ;
170-
171- if size < self . cache_bound {
172- self . tail_prev . store ( tail, Ordering :: Release ) ;
173- self . cache_additions . store ( additions + 1 , Ordering :: Relaxed ) ;
185+ let cached_nodes = self . consumer . cached_nodes . load ( Ordering :: Relaxed ) ;
186+ if cached_nodes < self . consumer . cache_bound && ! ( * tail ) . cached {
187+ self . consumer . cached_nodes . store ( cached_nodes , Ordering :: Relaxed ) ;
188+ ( * tail ) . cached = true ;
189+ }
190+
191+ if ( * tail) . cached {
192+ self . consumer . tail_prev . store ( tail , Ordering :: Release ) ;
174193 } else {
175- ( * self . tail_prev . load ( Ordering :: Relaxed ) )
176- . next . store ( next, Ordering :: Relaxed ) ;
194+ ( * self . consumer . tail_prev . load ( Ordering :: Relaxed ) )
195+ . next . store ( next, Ordering :: Relaxed ) ;
177196 // We have successfully erased all references to 'tail', so
178197 // now we can safely drop it.
179198 let _: Box < Node < T > > = Box :: from_raw ( tail) ;
@@ -194,17 +213,25 @@ impl<T> Queue<T> {
194213 // This is essentially the same as above with all the popping bits
195214 // stripped out.
196215 unsafe {
197- let tail = * self . tail . get ( ) ;
216+ let tail = * self . consumer . tail . get ( ) ;
198217 let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
199218 if next. is_null ( ) { None } else { ( * next) . value . as_mut ( ) }
200219 }
201220 }
221+
222+ pub fn producer_addition ( & self ) -> & ProducerAddition {
223+ & self . producer . addition
224+ }
225+
226+ pub fn consumer_addition ( & self ) -> & ConsumerAddition {
227+ & self . consumer . addition
228+ }
202229}
203230
204- impl < T > Drop for Queue < T > {
231+ impl < T , ProducerAddition , ConsumerAddition > Drop for Queue < T , ProducerAddition , ConsumerAddition > {
205232 fn drop ( & mut self ) {
206233 unsafe {
207- let mut cur = * self . first . get ( ) ;
234+ let mut cur = * self . producer . first . get ( ) ;
208235 while !cur. is_null ( ) {
209236 let next = ( * cur) . next . load ( Ordering :: Relaxed ) ;
210237 let _n: Box < Node < T > > = Box :: from_raw ( cur) ;
@@ -224,7 +251,7 @@ mod tests {
224251 #[ test]
225252 fn smoke ( ) {
226253 unsafe {
227- let queue = Queue :: new ( 0 ) ;
254+ let queue = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
228255 queue. push ( 1 ) ;
229256 queue. push ( 2 ) ;
230257 assert_eq ! ( queue. pop( ) , Some ( 1 ) ) ;
@@ -241,7 +268,7 @@ mod tests {
241268 #[ test]
242269 fn peek ( ) {
243270 unsafe {
244- let queue = Queue :: new ( 0 ) ;
271+ let queue = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
245272 queue. push ( vec ! [ 1 ] ) ;
246273
247274 // Ensure the borrowchecker works
@@ -264,7 +291,7 @@ mod tests {
264291 #[ test]
265292 fn drop_full ( ) {
266293 unsafe {
267- let q: Queue < Box < _ > > = Queue :: new ( 0 ) ;
294+ let q: Queue < Box < _ > > = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
268295 q. push ( box 1 ) ;
269296 q. push ( box 2 ) ;
270297 }
@@ -273,7 +300,7 @@ mod tests {
273300 #[ test]
274301 fn smoke_bound ( ) {
275302 unsafe {
276- let q = Queue :: new ( 0 ) ;
303+ let q = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
277304 q. push ( 1 ) ;
278305 q. push ( 2 ) ;
279306 assert_eq ! ( q. pop( ) , Some ( 1 ) ) ;
@@ -295,7 +322,7 @@ mod tests {
295322 }
296323
297324 unsafe fn stress_bound ( bound : usize ) {
298- let q = Arc :: new ( Queue :: new ( bound) ) ;
325+ let q = Arc :: new ( Queue :: with_additions ( bound, ( ) , ( ) ) ) ;
299326
300327 let ( tx, rx) = channel ( ) ;
301328 let q2 = q. clone ( ) ;
0 commit comments