• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
5 
6 use alloc::boxed::Box;
7 use core::cell::UnsafeCell;
8 use core::fmt;
9 use core::mem::MaybeUninit;
10 use core::sync::atomic::{self, AtomicUsize, Ordering};
11 
12 use crossbeam_utils::{Backoff, CachePadded};
13 
14 /// A slot in a queue.
15 struct Slot<T> {
16     /// The current stamp.
17     ///
18     /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
19     /// this node will be next read from.
20     stamp: AtomicUsize,
21 
22     /// The value in this slot.
23     value: UnsafeCell<MaybeUninit<T>>,
24 }
25 
26 /// A bounded multi-producer multi-consumer queue.
27 ///
28 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
29 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
30 /// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
31 /// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
32 /// a bit faster than [`SegQueue`].
33 ///
34 /// [`force_push`]: ArrayQueue::force_push
35 /// [`SegQueue`]: super::SegQueue
36 ///
37 /// # Examples
38 ///
39 /// ```
40 /// use crossbeam_queue::ArrayQueue;
41 ///
42 /// let q = ArrayQueue::new(2);
43 ///
44 /// assert_eq!(q.push('a'), Ok(()));
45 /// assert_eq!(q.push('b'), Ok(()));
46 /// assert_eq!(q.push('c'), Err('c'));
47 /// assert_eq!(q.pop(), Some('a'));
48 /// ```
49 pub struct ArrayQueue<T> {
50     /// The head of the queue.
51     ///
52     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
53     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
54     ///
55     /// Elements are popped from the head of the queue.
56     head: CachePadded<AtomicUsize>,
57 
58     /// The tail of the queue.
59     ///
60     /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
61     /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
62     ///
63     /// Elements are pushed into the tail of the queue.
64     tail: CachePadded<AtomicUsize>,
65 
66     /// The buffer holding slots.
67     buffer: Box<[Slot<T>]>,
68 
69     /// The queue capacity.
70     cap: usize,
71 
72     /// A stamp with the value of `{ lap: 1, index: 0 }`.
73     one_lap: usize,
74 }
75 
76 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
77 unsafe impl<T: Send> Send for ArrayQueue<T> {}
78 
79 impl<T> ArrayQueue<T> {
80     /// Creates a new bounded queue with the given capacity.
81     ///
82     /// # Panics
83     ///
84     /// Panics if the capacity is zero.
85     ///
86     /// # Examples
87     ///
88     /// ```
89     /// use crossbeam_queue::ArrayQueue;
90     ///
91     /// let q = ArrayQueue::<i32>::new(100);
92     /// ```
new(cap: usize) -> ArrayQueue<T>93     pub fn new(cap: usize) -> ArrayQueue<T> {
94         assert!(cap > 0, "capacity must be non-zero");
95 
96         // Head is initialized to `{ lap: 0, index: 0 }`.
97         // Tail is initialized to `{ lap: 0, index: 0 }`.
98         let head = 0;
99         let tail = 0;
100 
101         // Allocate a buffer of `cap` slots initialized
102         // with stamps.
103         let buffer: Box<[Slot<T>]> = (0..cap)
104             .map(|i| {
105                 // Set the stamp to `{ lap: 0, index: i }`.
106                 Slot {
107                     stamp: AtomicUsize::new(i),
108                     value: UnsafeCell::new(MaybeUninit::uninit()),
109                 }
110             })
111             .collect();
112 
113         // One lap is the smallest power of two greater than `cap`.
114         let one_lap = (cap + 1).next_power_of_two();
115 
116         ArrayQueue {
117             buffer,
118             cap,
119             one_lap,
120             head: CachePadded::new(AtomicUsize::new(head)),
121             tail: CachePadded::new(AtomicUsize::new(tail)),
122         }
123     }
124 
push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T> where F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,125     fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
126     where
127         F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
128     {
129         let backoff = Backoff::new();
130         let mut tail = self.tail.load(Ordering::Relaxed);
131 
132         loop {
133             // Deconstruct the tail.
134             let index = tail & (self.one_lap - 1);
135             let lap = tail & !(self.one_lap - 1);
136 
137             let new_tail = if index + 1 < self.cap {
138                 // Same lap, incremented index.
139                 // Set to `{ lap: lap, index: index + 1 }`.
140                 tail + 1
141             } else {
142                 // One lap forward, index wraps around to zero.
143                 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
144                 lap.wrapping_add(self.one_lap)
145             };
146 
147             // Inspect the corresponding slot.
148             debug_assert!(index < self.buffer.len());
149             let slot = unsafe { self.buffer.get_unchecked(index) };
150             let stamp = slot.stamp.load(Ordering::Acquire);
151 
152             // If the tail and the stamp match, we may attempt to push.
153             if tail == stamp {
154                 // Try moving the tail.
155                 match self.tail.compare_exchange_weak(
156                     tail,
157                     new_tail,
158                     Ordering::SeqCst,
159                     Ordering::Relaxed,
160                 ) {
161                     Ok(_) => {
162                         // Write the value into the slot and update the stamp.
163                         unsafe {
164                             slot.value.get().write(MaybeUninit::new(value));
165                         }
166                         slot.stamp.store(tail + 1, Ordering::Release);
167                         return Ok(());
168                     }
169                     Err(t) => {
170                         tail = t;
171                         backoff.spin();
172                     }
173                 }
174             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
175                 atomic::fence(Ordering::SeqCst);
176                 value = f(value, tail, new_tail, slot)?;
177                 backoff.spin();
178                 tail = self.tail.load(Ordering::Relaxed);
179             } else {
180                 // Snooze because we need to wait for the stamp to get updated.
181                 backoff.snooze();
182                 tail = self.tail.load(Ordering::Relaxed);
183             }
184         }
185     }
186 
187     /// Attempts to push an element into the queue.
188     ///
189     /// If the queue is full, the element is returned back as an error.
190     ///
191     /// # Examples
192     ///
193     /// ```
194     /// use crossbeam_queue::ArrayQueue;
195     ///
196     /// let q = ArrayQueue::new(1);
197     ///
198     /// assert_eq!(q.push(10), Ok(()));
199     /// assert_eq!(q.push(20), Err(20));
200     /// ```
push(&self, value: T) -> Result<(), T>201     pub fn push(&self, value: T) -> Result<(), T> {
202         self.push_or_else(value, |v, tail, _, _| {
203             let head = self.head.load(Ordering::Relaxed);
204 
205             // If the head lags one lap behind the tail as well...
206             if head.wrapping_add(self.one_lap) == tail {
207                 // ...then the queue is full.
208                 Err(v)
209             } else {
210                 Ok(v)
211             }
212         })
213     }
214 
215     /// Pushes an element into the queue, replacing the oldest element if necessary.
216     ///
217     /// If the queue is full, the oldest element is replaced and returned,
218     /// otherwise `None` is returned.
219     ///
220     /// # Examples
221     ///
222     /// ```
223     /// use crossbeam_queue::ArrayQueue;
224     ///
225     /// let q = ArrayQueue::new(2);
226     ///
227     /// assert_eq!(q.force_push(10), None);
228     /// assert_eq!(q.force_push(20), None);
229     /// assert_eq!(q.force_push(30), Some(10));
230     /// assert_eq!(q.pop(), Some(20));
231     /// ```
force_push(&self, value: T) -> Option<T>232     pub fn force_push(&self, value: T) -> Option<T> {
233         self.push_or_else(value, |v, tail, new_tail, slot| {
234             let head = tail.wrapping_sub(self.one_lap);
235             let new_head = new_tail.wrapping_sub(self.one_lap);
236 
237             // Try moving the head.
238             if self
239                 .head
240                 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
241                 .is_ok()
242             {
243                 // Move the tail.
244                 self.tail.store(new_tail, Ordering::SeqCst);
245 
246                 // Swap the previous value.
247                 let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
248 
249                 // Update the stamp.
250                 slot.stamp.store(tail + 1, Ordering::Release);
251 
252                 Err(old)
253             } else {
254                 Ok(v)
255             }
256         })
257         .err()
258     }
259 
260     /// Attempts to pop an element from the queue.
261     ///
262     /// If the queue is empty, `None` is returned.
263     ///
264     /// # Examples
265     ///
266     /// ```
267     /// use crossbeam_queue::ArrayQueue;
268     ///
269     /// let q = ArrayQueue::new(1);
270     /// assert_eq!(q.push(10), Ok(()));
271     ///
272     /// assert_eq!(q.pop(), Some(10));
273     /// assert!(q.pop().is_none());
274     /// ```
pop(&self) -> Option<T>275     pub fn pop(&self) -> Option<T> {
276         let backoff = Backoff::new();
277         let mut head = self.head.load(Ordering::Relaxed);
278 
279         loop {
280             // Deconstruct the head.
281             let index = head & (self.one_lap - 1);
282             let lap = head & !(self.one_lap - 1);
283 
284             // Inspect the corresponding slot.
285             debug_assert!(index < self.buffer.len());
286             let slot = unsafe { self.buffer.get_unchecked(index) };
287             let stamp = slot.stamp.load(Ordering::Acquire);
288 
289             // If the the stamp is ahead of the head by 1, we may attempt to pop.
290             if head + 1 == stamp {
291                 let new = if index + 1 < self.cap {
292                     // Same lap, incremented index.
293                     // Set to `{ lap: lap, index: index + 1 }`.
294                     head + 1
295                 } else {
296                     // One lap forward, index wraps around to zero.
297                     // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
298                     lap.wrapping_add(self.one_lap)
299                 };
300 
301                 // Try moving the head.
302                 match self.head.compare_exchange_weak(
303                     head,
304                     new,
305                     Ordering::SeqCst,
306                     Ordering::Relaxed,
307                 ) {
308                     Ok(_) => {
309                         // Read the value from the slot and update the stamp.
310                         let msg = unsafe { slot.value.get().read().assume_init() };
311                         slot.stamp
312                             .store(head.wrapping_add(self.one_lap), Ordering::Release);
313                         return Some(msg);
314                     }
315                     Err(h) => {
316                         head = h;
317                         backoff.spin();
318                     }
319                 }
320             } else if stamp == head {
321                 atomic::fence(Ordering::SeqCst);
322                 let tail = self.tail.load(Ordering::Relaxed);
323 
324                 // If the tail equals the head, that means the channel is empty.
325                 if tail == head {
326                     return None;
327                 }
328 
329                 backoff.spin();
330                 head = self.head.load(Ordering::Relaxed);
331             } else {
332                 // Snooze because we need to wait for the stamp to get updated.
333                 backoff.snooze();
334                 head = self.head.load(Ordering::Relaxed);
335             }
336         }
337     }
338 
339     /// Returns the capacity of the queue.
340     ///
341     /// # Examples
342     ///
343     /// ```
344     /// use crossbeam_queue::ArrayQueue;
345     ///
346     /// let q = ArrayQueue::<i32>::new(100);
347     ///
348     /// assert_eq!(q.capacity(), 100);
349     /// ```
capacity(&self) -> usize350     pub fn capacity(&self) -> usize {
351         self.cap
352     }
353 
354     /// Returns `true` if the queue is empty.
355     ///
356     /// # Examples
357     ///
358     /// ```
359     /// use crossbeam_queue::ArrayQueue;
360     ///
361     /// let q = ArrayQueue::new(100);
362     ///
363     /// assert!(q.is_empty());
364     /// q.push(1).unwrap();
365     /// assert!(!q.is_empty());
366     /// ```
is_empty(&self) -> bool367     pub fn is_empty(&self) -> bool {
368         let head = self.head.load(Ordering::SeqCst);
369         let tail = self.tail.load(Ordering::SeqCst);
370 
371         // Is the tail lagging one lap behind head?
372         // Is the tail equal to the head?
373         //
374         // Note: If the head changes just before we load the tail, that means there was a moment
375         // when the channel was not empty, so it is safe to just return `false`.
376         tail == head
377     }
378 
379     /// Returns `true` if the queue is full.
380     ///
381     /// # Examples
382     ///
383     /// ```
384     /// use crossbeam_queue::ArrayQueue;
385     ///
386     /// let q = ArrayQueue::new(1);
387     ///
388     /// assert!(!q.is_full());
389     /// q.push(1).unwrap();
390     /// assert!(q.is_full());
391     /// ```
is_full(&self) -> bool392     pub fn is_full(&self) -> bool {
393         let tail = self.tail.load(Ordering::SeqCst);
394         let head = self.head.load(Ordering::SeqCst);
395 
396         // Is the head lagging one lap behind tail?
397         //
398         // Note: If the tail changes just before we load the head, that means there was a moment
399         // when the queue was not full, so it is safe to just return `false`.
400         head.wrapping_add(self.one_lap) == tail
401     }
402 
403     /// Returns the number of elements in the queue.
404     ///
405     /// # Examples
406     ///
407     /// ```
408     /// use crossbeam_queue::ArrayQueue;
409     ///
410     /// let q = ArrayQueue::new(100);
411     /// assert_eq!(q.len(), 0);
412     ///
413     /// q.push(10).unwrap();
414     /// assert_eq!(q.len(), 1);
415     ///
416     /// q.push(20).unwrap();
417     /// assert_eq!(q.len(), 2);
418     /// ```
len(&self) -> usize419     pub fn len(&self) -> usize {
420         loop {
421             // Load the tail, then load the head.
422             let tail = self.tail.load(Ordering::SeqCst);
423             let head = self.head.load(Ordering::SeqCst);
424 
425             // If the tail didn't change, we've got consistent values to work with.
426             if self.tail.load(Ordering::SeqCst) == tail {
427                 let hix = head & (self.one_lap - 1);
428                 let tix = tail & (self.one_lap - 1);
429 
430                 return if hix < tix {
431                     tix - hix
432                 } else if hix > tix {
433                     self.cap - hix + tix
434                 } else if tail == head {
435                     0
436                 } else {
437                     self.cap
438                 };
439             }
440         }
441     }
442 }
443 
444 impl<T> Drop for ArrayQueue<T> {
drop(&mut self)445     fn drop(&mut self) {
446         // Get the index of the head.
447         let head = *self.head.get_mut();
448         let tail = *self.tail.get_mut();
449 
450         let hix = head & (self.one_lap - 1);
451         let tix = tail & (self.one_lap - 1);
452 
453         let len = if hix < tix {
454             tix - hix
455         } else if hix > tix {
456             self.cap - hix + tix
457         } else if tail == head {
458             0
459         } else {
460             self.cap
461         };
462 
463         // Loop over all slots that hold a message and drop them.
464         for i in 0..len {
465             // Compute the index of the next slot holding a message.
466             let index = if hix + i < self.cap {
467                 hix + i
468             } else {
469                 hix + i - self.cap
470             };
471 
472             unsafe {
473                 debug_assert!(index < self.buffer.len());
474                 let slot = self.buffer.get_unchecked_mut(index);
475                 let value = &mut *slot.value.get();
476                 value.as_mut_ptr().drop_in_place();
477             }
478         }
479     }
480 }
481 
482 impl<T> fmt::Debug for ArrayQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result483     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484         f.pad("ArrayQueue { .. }")
485     }
486 }
487 
488 impl<T> IntoIterator for ArrayQueue<T> {
489     type Item = T;
490 
491     type IntoIter = IntoIter<T>;
492 
into_iter(self) -> Self::IntoIter493     fn into_iter(self) -> Self::IntoIter {
494         IntoIter { value: self }
495     }
496 }
497 
498 #[derive(Debug)]
499 pub struct IntoIter<T> {
500     value: ArrayQueue<T>,
501 }
502 
503 impl<T> Iterator for IntoIter<T> {
504     type Item = T;
505 
next(&mut self) -> Option<Self::Item>506     fn next(&mut self) -> Option<Self::Item> {
507         let value = &mut self.value;
508         let head = *value.head.get_mut();
509         if value.head.get_mut() != value.tail.get_mut() {
510             let index = head & (value.one_lap - 1);
511             let lap = head & !(value.one_lap - 1);
512             // SAFETY: We have mutable access to this, so we can read without
513             // worrying about concurrency. Furthermore, we know this is
514             // initialized because it is the value pointed at by `value.head`
515             // and this is a non-empty queue.
516             let val = unsafe {
517                 debug_assert!(index < value.buffer.len());
518                 let slot = value.buffer.get_unchecked_mut(index);
519                 slot.value.get().read().assume_init()
520             };
521             let new = if index + 1 < value.cap {
522                 // Same lap, incremented index.
523                 // Set to `{ lap: lap, index: index + 1 }`.
524                 head + 1
525             } else {
526                 // One lap forward, index wraps around to zero.
527                 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
528                 lap.wrapping_add(value.one_lap)
529             };
530             *value.head.get_mut() = new;
531             Option::Some(val)
532         } else {
533             Option::None
534         }
535     }
536 }
537