• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2 use alloc::boxed::Box;
3 use core::cell::UnsafeCell;
4 use core::fmt;
5 use core::marker::PhantomData;
6 use core::mem::MaybeUninit;
7 use core::panic::{RefUnwindSafe, UnwindSafe};
8 use core::ptr;
9 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10 
11 use crossbeam_utils::{Backoff, CachePadded};
12 
13 // Bits indicating the state of a slot:
14 // * If a value has been written into the slot, `WRITE` is set.
15 // * If a value has been read from the slot, `READ` is set.
16 // * If the block is being destroyed, `DESTROY` is set.
17 const WRITE: usize = 1;
18 const READ: usize = 2;
19 const DESTROY: usize = 4;
20 
21 // Each block covers one "lap" of indices.
22 const LAP: usize = 32;
23 // The maximum number of values a block can hold.
24 const BLOCK_CAP: usize = LAP - 1;
25 // How many lower bits are reserved for metadata.
26 const SHIFT: usize = 1;
27 // Indicates that the block is not the last one.
28 const HAS_NEXT: usize = 1;
29 
30 /// A slot in a block.
31 struct Slot<T> {
32     /// The value.
33     value: UnsafeCell<MaybeUninit<T>>,
34 
35     /// The state of the slot.
36     state: AtomicUsize,
37 }
38 
39 impl<T> Slot<T> {
40     /// Waits until a value is written into the slot.
wait_write(&self)41     fn wait_write(&self) {
42         let backoff = Backoff::new();
43         while self.state.load(Ordering::Acquire) & WRITE == 0 {
44             backoff.snooze();
45         }
46     }
47 }
48 
49 /// A block in a linked list.
50 ///
51 /// Each block in the list can hold up to `BLOCK_CAP` values.
52 struct Block<T> {
53     /// The next block in the linked list.
54     next: AtomicPtr<Block<T>>,
55 
56     /// Slots for values.
57     slots: [Slot<T>; BLOCK_CAP],
58 }
59 
60 impl<T> Block<T> {
61     const LAYOUT: Layout = {
62         let layout = Layout::new::<Self>();
63         assert!(
64             layout.size() != 0,
65             "Block should never be zero-sized, as it has an AtomicPtr field"
66         );
67         layout
68     };
69 
70     /// Creates an empty block.
new() -> Box<Self>71     fn new() -> Box<Self> {
72         // SAFETY: layout is not zero-sized
73         let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
74         // Handle allocation failure
75         if ptr.is_null() {
76             handle_alloc_error(Self::LAYOUT)
77         }
78         // SAFETY: This is safe because:
79         //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
80         //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
81         //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
82         //       holds a MaybeUninit.
83         //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
84         // TODO: unsafe { Box::new_zeroed().assume_init() }
85         unsafe { Box::from_raw(ptr.cast()) }
86     }
87 
88     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>89     fn wait_next(&self) -> *mut Block<T> {
90         let backoff = Backoff::new();
91         loop {
92             let next = self.next.load(Ordering::Acquire);
93             if !next.is_null() {
94                 return next;
95             }
96             backoff.snooze();
97         }
98     }
99 
100     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, start: usize)101     unsafe fn destroy(this: *mut Block<T>, start: usize) {
102         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
103         // begun destruction of the block.
104         for i in start..BLOCK_CAP - 1 {
105             let slot = (*this).slots.get_unchecked(i);
106 
107             // Mark the `DESTROY` bit if a thread is still using the slot.
108             if slot.state.load(Ordering::Acquire) & READ == 0
109                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
110             {
111                 // If a thread is still using the slot, it will continue destruction of the block.
112                 return;
113             }
114         }
115 
116         // No thread is using the block, now it is safe to destroy it.
117         drop(Box::from_raw(this));
118     }
119 }
120 
121 /// A position in a queue.
122 struct Position<T> {
123     /// The index in the queue.
124     index: AtomicUsize,
125 
126     /// The block in the linked list.
127     block: AtomicPtr<Block<T>>,
128 }
129 
130 /// An unbounded multi-producer multi-consumer queue.
131 ///
132 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
133 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
134 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
135 /// this queue is somewhat slower than [`ArrayQueue`].
136 ///
137 /// [`ArrayQueue`]: super::ArrayQueue
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use crossbeam_queue::SegQueue;
143 ///
144 /// let q = SegQueue::new();
145 ///
146 /// q.push('a');
147 /// q.push('b');
148 ///
149 /// assert_eq!(q.pop(), Some('a'));
150 /// assert_eq!(q.pop(), Some('b'));
151 /// assert!(q.pop().is_none());
152 /// ```
153 pub struct SegQueue<T> {
154     /// The head of the queue.
155     head: CachePadded<Position<T>>,
156 
157     /// The tail of the queue.
158     tail: CachePadded<Position<T>>,
159 
160     /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
161     _marker: PhantomData<T>,
162 }
163 
164 unsafe impl<T: Send> Send for SegQueue<T> {}
165 unsafe impl<T: Send> Sync for SegQueue<T> {}
166 
167 impl<T> UnwindSafe for SegQueue<T> {}
168 impl<T> RefUnwindSafe for SegQueue<T> {}
169 
170 impl<T> SegQueue<T> {
171     /// Creates a new unbounded queue.
172     ///
173     /// # Examples
174     ///
175     /// ```
176     /// use crossbeam_queue::SegQueue;
177     ///
178     /// let q = SegQueue::<i32>::new();
179     /// ```
new() -> SegQueue<T>180     pub const fn new() -> SegQueue<T> {
181         SegQueue {
182             head: CachePadded::new(Position {
183                 block: AtomicPtr::new(ptr::null_mut()),
184                 index: AtomicUsize::new(0),
185             }),
186             tail: CachePadded::new(Position {
187                 block: AtomicPtr::new(ptr::null_mut()),
188                 index: AtomicUsize::new(0),
189             }),
190             _marker: PhantomData,
191         }
192     }
193 
194     /// Pushes back an element to the tail.
195     ///
196     /// # Examples
197     ///
198     /// ```
199     /// use crossbeam_queue::SegQueue;
200     ///
201     /// let q = SegQueue::new();
202     ///
203     /// q.push(10);
204     /// q.push(20);
205     /// ```
push(&self, value: T)206     pub fn push(&self, value: T) {
207         let backoff = Backoff::new();
208         let mut tail = self.tail.index.load(Ordering::Acquire);
209         let mut block = self.tail.block.load(Ordering::Acquire);
210         let mut next_block = None;
211 
212         loop {
213             // Calculate the offset of the index into the block.
214             let offset = (tail >> SHIFT) % LAP;
215 
216             // If we reached the end of the block, wait until the next one is installed.
217             if offset == BLOCK_CAP {
218                 backoff.snooze();
219                 tail = self.tail.index.load(Ordering::Acquire);
220                 block = self.tail.block.load(Ordering::Acquire);
221                 continue;
222             }
223 
224             // If we're going to have to install the next block, allocate it in advance in order to
225             // make the wait for other threads as short as possible.
226             if offset + 1 == BLOCK_CAP && next_block.is_none() {
227                 next_block = Some(Block::<T>::new());
228             }
229 
230             // If this is the first push operation, we need to allocate the first block.
231             if block.is_null() {
232                 let new = Box::into_raw(Block::<T>::new());
233 
234                 if self
235                     .tail
236                     .block
237                     .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238                     .is_ok()
239                 {
240                     self.head.block.store(new, Ordering::Release);
241                     block = new;
242                 } else {
243                     next_block = unsafe { Some(Box::from_raw(new)) };
244                     tail = self.tail.index.load(Ordering::Acquire);
245                     block = self.tail.block.load(Ordering::Acquire);
246                     continue;
247                 }
248             }
249 
250             let new_tail = tail + (1 << SHIFT);
251 
252             // Try advancing the tail forward.
253             match self.tail.index.compare_exchange_weak(
254                 tail,
255                 new_tail,
256                 Ordering::SeqCst,
257                 Ordering::Acquire,
258             ) {
259                 Ok(_) => unsafe {
260                     // If we've reached the end of the block, install the next one.
261                     if offset + 1 == BLOCK_CAP {
262                         let next_block = Box::into_raw(next_block.unwrap());
263                         let next_index = new_tail.wrapping_add(1 << SHIFT);
264 
265                         self.tail.block.store(next_block, Ordering::Release);
266                         self.tail.index.store(next_index, Ordering::Release);
267                         (*block).next.store(next_block, Ordering::Release);
268                     }
269 
270                     // Write the value into the slot.
271                     let slot = (*block).slots.get_unchecked(offset);
272                     slot.value.get().write(MaybeUninit::new(value));
273                     slot.state.fetch_or(WRITE, Ordering::Release);
274 
275                     return;
276                 },
277                 Err(t) => {
278                     tail = t;
279                     block = self.tail.block.load(Ordering::Acquire);
280                     backoff.spin();
281                 }
282             }
283         }
284     }
285 
286     /// Pops the head element from the queue.
287     ///
288     /// If the queue is empty, `None` is returned.
289     ///
290     /// # Examples
291     ///
292     /// ```
293     /// use crossbeam_queue::SegQueue;
294     ///
295     /// let q = SegQueue::new();
296     ///
297     /// q.push(10);
298     /// q.push(20);
299     /// assert_eq!(q.pop(), Some(10));
300     /// assert_eq!(q.pop(), Some(20));
301     /// assert!(q.pop().is_none());
302     /// ```
pop(&self) -> Option<T>303     pub fn pop(&self) -> Option<T> {
304         let backoff = Backoff::new();
305         let mut head = self.head.index.load(Ordering::Acquire);
306         let mut block = self.head.block.load(Ordering::Acquire);
307 
308         loop {
309             // Calculate the offset of the index into the block.
310             let offset = (head >> SHIFT) % LAP;
311 
312             // If we reached the end of the block, wait until the next one is installed.
313             if offset == BLOCK_CAP {
314                 backoff.snooze();
315                 head = self.head.index.load(Ordering::Acquire);
316                 block = self.head.block.load(Ordering::Acquire);
317                 continue;
318             }
319 
320             let mut new_head = head + (1 << SHIFT);
321 
322             if new_head & HAS_NEXT == 0 {
323                 atomic::fence(Ordering::SeqCst);
324                 let tail = self.tail.index.load(Ordering::Relaxed);
325 
326                 // If the tail equals the head, that means the queue is empty.
327                 if head >> SHIFT == tail >> SHIFT {
328                     return None;
329                 }
330 
331                 // If head and tail are not in the same block, set `HAS_NEXT` in head.
332                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
333                     new_head |= HAS_NEXT;
334                 }
335             }
336 
337             // The block can be null here only if the first push operation is in progress. In that
338             // case, just wait until it gets initialized.
339             if block.is_null() {
340                 backoff.snooze();
341                 head = self.head.index.load(Ordering::Acquire);
342                 block = self.head.block.load(Ordering::Acquire);
343                 continue;
344             }
345 
346             // Try moving the head index forward.
347             match self.head.index.compare_exchange_weak(
348                 head,
349                 new_head,
350                 Ordering::SeqCst,
351                 Ordering::Acquire,
352             ) {
353                 Ok(_) => unsafe {
354                     // If we've reached the end of the block, move to the next one.
355                     if offset + 1 == BLOCK_CAP {
356                         let next = (*block).wait_next();
357                         let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
358                         if !(*next).next.load(Ordering::Relaxed).is_null() {
359                             next_index |= HAS_NEXT;
360                         }
361 
362                         self.head.block.store(next, Ordering::Release);
363                         self.head.index.store(next_index, Ordering::Release);
364                     }
365 
366                     // Read the value.
367                     let slot = (*block).slots.get_unchecked(offset);
368                     slot.wait_write();
369                     let value = slot.value.get().read().assume_init();
370 
371                     // Destroy the block if we've reached the end, or if another thread wanted to
372                     // destroy but couldn't because we were busy reading from the slot.
373                     if offset + 1 == BLOCK_CAP {
374                         Block::destroy(block, 0);
375                     } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
376                         Block::destroy(block, offset + 1);
377                     }
378 
379                     return Some(value);
380                 },
381                 Err(h) => {
382                     head = h;
383                     block = self.head.block.load(Ordering::Acquire);
384                     backoff.spin();
385                 }
386             }
387         }
388     }
389 
390     /// Returns `true` if the queue is empty.
391     ///
392     /// # Examples
393     ///
394     /// ```
395     /// use crossbeam_queue::SegQueue;
396     ///
397     /// let q = SegQueue::new();
398     ///
399     /// assert!(q.is_empty());
400     /// q.push(1);
401     /// assert!(!q.is_empty());
402     /// ```
is_empty(&self) -> bool403     pub fn is_empty(&self) -> bool {
404         let head = self.head.index.load(Ordering::SeqCst);
405         let tail = self.tail.index.load(Ordering::SeqCst);
406         head >> SHIFT == tail >> SHIFT
407     }
408 
409     /// Returns the number of elements in the queue.
410     ///
411     /// # Examples
412     ///
413     /// ```
414     /// use crossbeam_queue::SegQueue;
415     ///
416     /// let q = SegQueue::new();
417     /// assert_eq!(q.len(), 0);
418     ///
419     /// q.push(10);
420     /// assert_eq!(q.len(), 1);
421     ///
422     /// q.push(20);
423     /// assert_eq!(q.len(), 2);
424     /// ```
len(&self) -> usize425     pub fn len(&self) -> usize {
426         loop {
427             // Load the tail index, then load the head index.
428             let mut tail = self.tail.index.load(Ordering::SeqCst);
429             let mut head = self.head.index.load(Ordering::SeqCst);
430 
431             // If the tail index didn't change, we've got consistent indices to work with.
432             if self.tail.index.load(Ordering::SeqCst) == tail {
433                 // Erase the lower bits.
434                 tail &= !((1 << SHIFT) - 1);
435                 head &= !((1 << SHIFT) - 1);
436 
437                 // Fix up indices if they fall onto block ends.
438                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
439                     tail = tail.wrapping_add(1 << SHIFT);
440                 }
441                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
442                     head = head.wrapping_add(1 << SHIFT);
443                 }
444 
445                 // Rotate indices so that head falls into the first block.
446                 let lap = (head >> SHIFT) / LAP;
447                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
448                 head = head.wrapping_sub((lap * LAP) << SHIFT);
449 
450                 // Remove the lower bits.
451                 tail >>= SHIFT;
452                 head >>= SHIFT;
453 
454                 // Return the difference minus the number of blocks between tail and head.
455                 return tail - head - tail / LAP;
456             }
457         }
458     }
459 }
460 
461 impl<T> Drop for SegQueue<T> {
drop(&mut self)462     fn drop(&mut self) {
463         let mut head = *self.head.index.get_mut();
464         let mut tail = *self.tail.index.get_mut();
465         let mut block = *self.head.block.get_mut();
466 
467         // Erase the lower bits.
468         head &= !((1 << SHIFT) - 1);
469         tail &= !((1 << SHIFT) - 1);
470 
471         unsafe {
472             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
473             while head != tail {
474                 let offset = (head >> SHIFT) % LAP;
475 
476                 if offset < BLOCK_CAP {
477                     // Drop the value in the slot.
478                     let slot = (*block).slots.get_unchecked(offset);
479                     (*slot.value.get()).assume_init_drop();
480                 } else {
481                     // Deallocate the block and move to the next one.
482                     let next = *(*block).next.get_mut();
483                     drop(Box::from_raw(block));
484                     block = next;
485                 }
486 
487                 head = head.wrapping_add(1 << SHIFT);
488             }
489 
490             // Deallocate the last remaining block.
491             if !block.is_null() {
492                 drop(Box::from_raw(block));
493             }
494         }
495     }
496 }
497 
498 impl<T> fmt::Debug for SegQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result499     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500         f.pad("SegQueue { .. }")
501     }
502 }
503 
504 impl<T> Default for SegQueue<T> {
default() -> SegQueue<T>505     fn default() -> SegQueue<T> {
506         SegQueue::new()
507     }
508 }
509 
510 impl<T> IntoIterator for SegQueue<T> {
511     type Item = T;
512 
513     type IntoIter = IntoIter<T>;
514 
into_iter(self) -> Self::IntoIter515     fn into_iter(self) -> Self::IntoIter {
516         IntoIter { value: self }
517     }
518 }
519 
520 #[derive(Debug)]
521 pub struct IntoIter<T> {
522     value: SegQueue<T>,
523 }
524 
525 impl<T> Iterator for IntoIter<T> {
526     type Item = T;
527 
next(&mut self) -> Option<Self::Item>528     fn next(&mut self) -> Option<Self::Item> {
529         let value = &mut self.value;
530         let head = *value.head.index.get_mut();
531         let tail = *value.tail.index.get_mut();
532         if head >> SHIFT == tail >> SHIFT {
533             None
534         } else {
535             let block = *value.head.block.get_mut();
536             let offset = (head >> SHIFT) % LAP;
537 
538             // SAFETY: We have mutable access to this, so we can read without
539             // worrying about concurrency. Furthermore, we know this is
540             // initialized because it is the value pointed at by `value.head`
541             // and this is a non-empty queue.
542             let item = unsafe {
543                 let slot = (*block).slots.get_unchecked(offset);
544                 slot.value.get().read().assume_init()
545             };
546             if offset + 1 == BLOCK_CAP {
547                 // Deallocate the block and move to the next one.
548                 // SAFETY: The block is initialized because we've been reading
549                 // from it this entire time. We can drop it b/c everything has
550                 // been read out of it, so nothing is pointing to it anymore.
551                 unsafe {
552                     let next = *(*block).next.get_mut();
553                     drop(Box::from_raw(block));
554                     *value.head.block.get_mut() = next;
555                 }
556                 // The last value in a block is empty, so skip it
557                 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
558                 // Double-check that we're pointing to the first item in a block.
559                 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
560             } else {
561                 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
562             }
563             Some(item)
564         }
565     }
566 }
567