• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use alloc::boxed::Box;
2 use core::cell::UnsafeCell;
3 use core::fmt;
4 use core::marker::PhantomData;
5 use core::mem::MaybeUninit;
6 use core::ptr;
7 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8 
9 use crossbeam_utils::{Backoff, CachePadded};
10 
11 // Bits indicating the state of a slot:
12 // * If a value has been written into the slot, `WRITE` is set.
13 // * If a value has been read from the slot, `READ` is set.
14 // * If the block is being destroyed, `DESTROY` is set.
15 const WRITE: usize = 1;
16 const READ: usize = 2;
17 const DESTROY: usize = 4;
18 
19 // Each block covers one "lap" of indices.
20 const LAP: usize = 32;
21 // The maximum number of values a block can hold.
22 const BLOCK_CAP: usize = LAP - 1;
23 // How many lower bits are reserved for metadata.
24 const SHIFT: usize = 1;
25 // Indicates that the block is not the last one.
26 const HAS_NEXT: usize = 1;
27 
28 /// A slot in a block.
29 struct Slot<T> {
30     /// The value.
31     value: UnsafeCell<MaybeUninit<T>>,
32 
33     /// The state of the slot.
34     state: AtomicUsize,
35 }
36 
37 impl<T> Slot<T> {
38     /// Waits until a value is written into the slot.
wait_write(&self)39     fn wait_write(&self) {
40         let backoff = Backoff::new();
41         while self.state.load(Ordering::Acquire) & WRITE == 0 {
42             backoff.snooze();
43         }
44     }
45 }
46 
47 /// A block in a linked list.
48 ///
49 /// Each block in the list can hold up to `BLOCK_CAP` values.
50 struct Block<T> {
51     /// The next block in the linked list.
52     next: AtomicPtr<Block<T>>,
53 
54     /// Slots for values.
55     slots: [Slot<T>; BLOCK_CAP],
56 }
57 
58 impl<T> Block<T> {
59     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>60     fn new() -> Block<T> {
61         // SAFETY: This is safe because:
62         //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63         //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64         //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65         //       holds a MaybeUninit.
66         //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67         unsafe { MaybeUninit::zeroed().assume_init() }
68     }
69 
70     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>71     fn wait_next(&self) -> *mut Block<T> {
72         let backoff = Backoff::new();
73         loop {
74             let next = self.next.load(Ordering::Acquire);
75             if !next.is_null() {
76                 return next;
77             }
78             backoff.snooze();
79         }
80     }
81 
82     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, start: usize)83     unsafe fn destroy(this: *mut Block<T>, start: usize) {
84         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85         // begun destruction of the block.
86         for i in start..BLOCK_CAP - 1 {
87             let slot = (*this).slots.get_unchecked(i);
88 
89             // Mark the `DESTROY` bit if a thread is still using the slot.
90             if slot.state.load(Ordering::Acquire) & READ == 0
91                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92             {
93                 // If a thread is still using the slot, it will continue destruction of the block.
94                 return;
95             }
96         }
97 
98         // No thread is using the block, now it is safe to destroy it.
99         drop(Box::from_raw(this));
100     }
101 }
102 
103 /// A position in a queue.
104 struct Position<T> {
105     /// The index in the queue.
106     index: AtomicUsize,
107 
108     /// The block in the linked list.
109     block: AtomicPtr<Block<T>>,
110 }
111 
112 /// An unbounded multi-producer multi-consumer queue.
113 ///
114 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
115 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
116 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
117 /// this queue is somewhat slower than [`ArrayQueue`].
118 ///
119 /// [`ArrayQueue`]: super::ArrayQueue
120 ///
121 /// # Examples
122 ///
123 /// ```
124 /// use crossbeam_queue::SegQueue;
125 ///
126 /// let q = SegQueue::new();
127 ///
128 /// q.push('a');
129 /// q.push('b');
130 ///
131 /// assert_eq!(q.pop(), Some('a'));
132 /// assert_eq!(q.pop(), Some('b'));
133 /// assert!(q.pop().is_none());
134 /// ```
135 pub struct SegQueue<T> {
136     /// The head of the queue.
137     head: CachePadded<Position<T>>,
138 
139     /// The tail of the queue.
140     tail: CachePadded<Position<T>>,
141 
142     /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
143     _marker: PhantomData<T>,
144 }
145 
146 unsafe impl<T: Send> Send for SegQueue<T> {}
147 unsafe impl<T: Send> Sync for SegQueue<T> {}
148 
149 impl<T> SegQueue<T> {
150     /// Creates a new unbounded queue.
151     ///
152     /// # Examples
153     ///
154     /// ```
155     /// use crossbeam_queue::SegQueue;
156     ///
157     /// let q = SegQueue::<i32>::new();
158     /// ```
new() -> SegQueue<T>159     pub const fn new() -> SegQueue<T> {
160         SegQueue {
161             head: CachePadded::new(Position {
162                 block: AtomicPtr::new(ptr::null_mut()),
163                 index: AtomicUsize::new(0),
164             }),
165             tail: CachePadded::new(Position {
166                 block: AtomicPtr::new(ptr::null_mut()),
167                 index: AtomicUsize::new(0),
168             }),
169             _marker: PhantomData,
170         }
171     }
172 
173     /// Pushes an element into the queue.
174     ///
175     /// # Examples
176     ///
177     /// ```
178     /// use crossbeam_queue::SegQueue;
179     ///
180     /// let q = SegQueue::new();
181     ///
182     /// q.push(10);
183     /// q.push(20);
184     /// ```
push(&self, value: T)185     pub fn push(&self, value: T) {
186         let backoff = Backoff::new();
187         let mut tail = self.tail.index.load(Ordering::Acquire);
188         let mut block = self.tail.block.load(Ordering::Acquire);
189         let mut next_block = None;
190 
191         loop {
192             // Calculate the offset of the index into the block.
193             let offset = (tail >> SHIFT) % LAP;
194 
195             // If we reached the end of the block, wait until the next one is installed.
196             if offset == BLOCK_CAP {
197                 backoff.snooze();
198                 tail = self.tail.index.load(Ordering::Acquire);
199                 block = self.tail.block.load(Ordering::Acquire);
200                 continue;
201             }
202 
203             // If we're going to have to install the next block, allocate it in advance in order to
204             // make the wait for other threads as short as possible.
205             if offset + 1 == BLOCK_CAP && next_block.is_none() {
206                 next_block = Some(Box::new(Block::<T>::new()));
207             }
208 
209             // If this is the first push operation, we need to allocate the first block.
210             if block.is_null() {
211                 let new = Box::into_raw(Box::new(Block::<T>::new()));
212 
213                 if self
214                     .tail
215                     .block
216                     .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217                     .is_ok()
218                 {
219                     self.head.block.store(new, Ordering::Release);
220                     block = new;
221                 } else {
222                     next_block = unsafe { Some(Box::from_raw(new)) };
223                     tail = self.tail.index.load(Ordering::Acquire);
224                     block = self.tail.block.load(Ordering::Acquire);
225                     continue;
226                 }
227             }
228 
229             let new_tail = tail + (1 << SHIFT);
230 
231             // Try advancing the tail forward.
232             match self.tail.index.compare_exchange_weak(
233                 tail,
234                 new_tail,
235                 Ordering::SeqCst,
236                 Ordering::Acquire,
237             ) {
238                 Ok(_) => unsafe {
239                     // If we've reached the end of the block, install the next one.
240                     if offset + 1 == BLOCK_CAP {
241                         let next_block = Box::into_raw(next_block.unwrap());
242                         let next_index = new_tail.wrapping_add(1 << SHIFT);
243 
244                         self.tail.block.store(next_block, Ordering::Release);
245                         self.tail.index.store(next_index, Ordering::Release);
246                         (*block).next.store(next_block, Ordering::Release);
247                     }
248 
249                     // Write the value into the slot.
250                     let slot = (*block).slots.get_unchecked(offset);
251                     slot.value.get().write(MaybeUninit::new(value));
252                     slot.state.fetch_or(WRITE, Ordering::Release);
253 
254                     return;
255                 },
256                 Err(t) => {
257                     tail = t;
258                     block = self.tail.block.load(Ordering::Acquire);
259                     backoff.spin();
260                 }
261             }
262         }
263     }
264 
265     /// Pops an element from the queue.
266     ///
267     /// If the queue is empty, `None` is returned.
268     ///
269     /// # Examples
270     ///
271     /// ```
272     /// use crossbeam_queue::SegQueue;
273     ///
274     /// let q = SegQueue::new();
275     ///
276     /// q.push(10);
277     /// assert_eq!(q.pop(), Some(10));
278     /// assert!(q.pop().is_none());
279     /// ```
pop(&self) -> Option<T>280     pub fn pop(&self) -> Option<T> {
281         let backoff = Backoff::new();
282         let mut head = self.head.index.load(Ordering::Acquire);
283         let mut block = self.head.block.load(Ordering::Acquire);
284 
285         loop {
286             // Calculate the offset of the index into the block.
287             let offset = (head >> SHIFT) % LAP;
288 
289             // If we reached the end of the block, wait until the next one is installed.
290             if offset == BLOCK_CAP {
291                 backoff.snooze();
292                 head = self.head.index.load(Ordering::Acquire);
293                 block = self.head.block.load(Ordering::Acquire);
294                 continue;
295             }
296 
297             let mut new_head = head + (1 << SHIFT);
298 
299             if new_head & HAS_NEXT == 0 {
300                 atomic::fence(Ordering::SeqCst);
301                 let tail = self.tail.index.load(Ordering::Relaxed);
302 
303                 // If the tail equals the head, that means the queue is empty.
304                 if head >> SHIFT == tail >> SHIFT {
305                     return None;
306                 }
307 
308                 // If head and tail are not in the same block, set `HAS_NEXT` in head.
309                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310                     new_head |= HAS_NEXT;
311                 }
312             }
313 
314             // The block can be null here only if the first push operation is in progress. In that
315             // case, just wait until it gets initialized.
316             if block.is_null() {
317                 backoff.snooze();
318                 head = self.head.index.load(Ordering::Acquire);
319                 block = self.head.block.load(Ordering::Acquire);
320                 continue;
321             }
322 
323             // Try moving the head index forward.
324             match self.head.index.compare_exchange_weak(
325                 head,
326                 new_head,
327                 Ordering::SeqCst,
328                 Ordering::Acquire,
329             ) {
330                 Ok(_) => unsafe {
331                     // If we've reached the end of the block, move to the next one.
332                     if offset + 1 == BLOCK_CAP {
333                         let next = (*block).wait_next();
334                         let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335                         if !(*next).next.load(Ordering::Relaxed).is_null() {
336                             next_index |= HAS_NEXT;
337                         }
338 
339                         self.head.block.store(next, Ordering::Release);
340                         self.head.index.store(next_index, Ordering::Release);
341                     }
342 
343                     // Read the value.
344                     let slot = (*block).slots.get_unchecked(offset);
345                     slot.wait_write();
346                     let value = slot.value.get().read().assume_init();
347 
348                     // Destroy the block if we've reached the end, or if another thread wanted to
349                     // destroy but couldn't because we were busy reading from the slot.
350                     if offset + 1 == BLOCK_CAP {
351                         Block::destroy(block, 0);
352                     } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353                         Block::destroy(block, offset + 1);
354                     }
355 
356                     return Some(value);
357                 },
358                 Err(h) => {
359                     head = h;
360                     block = self.head.block.load(Ordering::Acquire);
361                     backoff.spin();
362                 }
363             }
364         }
365     }
366 
367     /// Returns `true` if the queue is empty.
368     ///
369     /// # Examples
370     ///
371     /// ```
372     /// use crossbeam_queue::SegQueue;
373     ///
374     /// let q = SegQueue::new();
375     ///
376     /// assert!(q.is_empty());
377     /// q.push(1);
378     /// assert!(!q.is_empty());
379     /// ```
is_empty(&self) -> bool380     pub fn is_empty(&self) -> bool {
381         let head = self.head.index.load(Ordering::SeqCst);
382         let tail = self.tail.index.load(Ordering::SeqCst);
383         head >> SHIFT == tail >> SHIFT
384     }
385 
386     /// Returns the number of elements in the queue.
387     ///
388     /// # Examples
389     ///
390     /// ```
391     /// use crossbeam_queue::SegQueue;
392     ///
393     /// let q = SegQueue::new();
394     /// assert_eq!(q.len(), 0);
395     ///
396     /// q.push(10);
397     /// assert_eq!(q.len(), 1);
398     ///
399     /// q.push(20);
400     /// assert_eq!(q.len(), 2);
401     /// ```
len(&self) -> usize402     pub fn len(&self) -> usize {
403         loop {
404             // Load the tail index, then load the head index.
405             let mut tail = self.tail.index.load(Ordering::SeqCst);
406             let mut head = self.head.index.load(Ordering::SeqCst);
407 
408             // If the tail index didn't change, we've got consistent indices to work with.
409             if self.tail.index.load(Ordering::SeqCst) == tail {
410                 // Erase the lower bits.
411                 tail &= !((1 << SHIFT) - 1);
412                 head &= !((1 << SHIFT) - 1);
413 
414                 // Fix up indices if they fall onto block ends.
415                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416                     tail = tail.wrapping_add(1 << SHIFT);
417                 }
418                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419                     head = head.wrapping_add(1 << SHIFT);
420                 }
421 
422                 // Rotate indices so that head falls into the first block.
423                 let lap = (head >> SHIFT) / LAP;
424                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425                 head = head.wrapping_sub((lap * LAP) << SHIFT);
426 
427                 // Remove the lower bits.
428                 tail >>= SHIFT;
429                 head >>= SHIFT;
430 
431                 // Return the difference minus the number of blocks between tail and head.
432                 return tail - head - tail / LAP;
433             }
434         }
435     }
436 }
437 
438 impl<T> Drop for SegQueue<T> {
drop(&mut self)439     fn drop(&mut self) {
440         let mut head = self.head.index.load(Ordering::Relaxed);
441         let mut tail = self.tail.index.load(Ordering::Relaxed);
442         let mut block = self.head.block.load(Ordering::Relaxed);
443 
444         // Erase the lower bits.
445         head &= !((1 << SHIFT) - 1);
446         tail &= !((1 << SHIFT) - 1);
447 
448         unsafe {
449             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450             while head != tail {
451                 let offset = (head >> SHIFT) % LAP;
452 
453                 if offset < BLOCK_CAP {
454                     // Drop the value in the slot.
455                     let slot = (*block).slots.get_unchecked(offset);
456                     let p = &mut *slot.value.get();
457                     p.as_mut_ptr().drop_in_place();
458                 } else {
459                     // Deallocate the block and move to the next one.
460                     let next = (*block).next.load(Ordering::Relaxed);
461                     drop(Box::from_raw(block));
462                     block = next;
463                 }
464 
465                 head = head.wrapping_add(1 << SHIFT);
466             }
467 
468             // Deallocate the last remaining block.
469             if !block.is_null() {
470                 drop(Box::from_raw(block));
471             }
472         }
473     }
474 }
475 
476 impl<T> fmt::Debug for SegQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result477     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478         f.pad("SegQueue { .. }")
479     }
480 }
481 
482 impl<T> Default for SegQueue<T> {
default() -> SegQueue<T>483     fn default() -> SegQueue<T> {
484         SegQueue::new()
485     }
486 }
487 
488 impl<T> IntoIterator for SegQueue<T> {
489     type Item = T;
490 
491     type IntoIter = IntoIter<T>;
492 
into_iter(self) -> Self::IntoIter493     fn into_iter(self) -> Self::IntoIter {
494         IntoIter { value: self }
495     }
496 }
497 
498 #[derive(Debug)]
499 pub struct IntoIter<T> {
500     value: SegQueue<T>,
501 }
502 
503 impl<T> Iterator for IntoIter<T> {
504     type Item = T;
505 
next(&mut self) -> Option<Self::Item>506     fn next(&mut self) -> Option<Self::Item> {
507         let value = &mut self.value;
508         let head = *value.head.index.get_mut();
509         let tail = *value.tail.index.get_mut();
510         if head >> SHIFT == tail >> SHIFT {
511             None
512         } else {
513             let block = *value.head.block.get_mut();
514             let offset = (head >> SHIFT) % LAP;
515 
516             // SAFETY: We have mutable access to this, so we can read without
517             // worrying about concurrency. Furthermore, we know this is
518             // initialized because it is the value pointed at by `value.head`
519             // and this is a non-empty queue.
520             let item = unsafe {
521                 let slot = (*block).slots.get_unchecked(offset);
522                 let p = &mut *slot.value.get();
523                 p.as_mut_ptr().read()
524             };
525             if offset + 1 == BLOCK_CAP {
526                 // Deallocate the block and move to the next one.
527                 // SAFETY: The block is initialized because we've been reading
528                 // from it this entire time. We can drop it b/c everything has
529                 // been read out of it, so nothing is pointing to it anymore.
530                 unsafe {
531                     let next = *(*block).next.get_mut();
532                     drop(Box::from_raw(block));
533                     *value.head.block.get_mut() = next;
534                 }
535                 // The last value in a block is empty, so skip it
536                 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
537                 // Double-check that we're pointing to the first item in a block.
538                 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
539             } else {
540                 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
541             }
542             Some(item)
543         }
544     }
545 }
546