• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2 use std::boxed::Box;
3 use std::cell::{Cell, UnsafeCell};
4 use std::cmp;
5 use std::fmt;
6 use std::marker::PhantomData;
7 use std::mem::{self, MaybeUninit};
8 use std::ptr;
9 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
10 use std::sync::Arc;
11 
12 use crossbeam_epoch::{self as epoch, Atomic, Owned};
13 use crossbeam_utils::{Backoff, CachePadded};
14 
15 // Minimum buffer capacity.
16 const MIN_CAP: usize = 64;
17 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
18 const MAX_BATCH: usize = 32;
19 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
20 // deallocated as soon as possible.
21 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
22 
23 /// A buffer that holds tasks in a worker queue.
24 ///
25 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will
26 /// *not* deallocate the buffer.
27 struct Buffer<T> {
28     /// Pointer to the allocated memory.
29     ptr: *mut T,
30 
31     /// Capacity of the buffer. Always a power of two.
32     cap: usize,
33 }
34 
35 unsafe impl<T> Send for Buffer<T> {}
36 
37 impl<T> Buffer<T> {
38     /// Allocates a new buffer with the specified capacity.
alloc(cap: usize) -> Buffer<T>39     fn alloc(cap: usize) -> Buffer<T> {
40         debug_assert_eq!(cap, cap.next_power_of_two());
41 
42         let ptr = Box::into_raw(
43             (0..cap)
44                 .map(|_| MaybeUninit::<T>::uninit())
45                 .collect::<Box<[_]>>(),
46         )
47         .cast::<T>();
48 
49         Buffer { ptr, cap }
50     }
51 
52     /// Deallocates the buffer.
dealloc(self)53     unsafe fn dealloc(self) {
54         drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
55             self.ptr.cast::<MaybeUninit<T>>(),
56             self.cap,
57         )));
58     }
59 
60     /// Returns a pointer to the task at the specified `index`.
at(&self, index: isize) -> *mut T61     unsafe fn at(&self, index: isize) -> *mut T {
62         // `self.cap` is always a power of two.
63         // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
64         // don't actually have the right to access this memory.
65         self.ptr.offset(index & (self.cap - 1) as isize)
66     }
67 
68     /// Writes `task` into the specified `index`.
69     ///
70     /// This method might be concurrently called with another `read` at the same index, which is
71     /// technically speaking a data race and therefore UB. We should use an atomic store here, but
72     /// that would be more expensive and difficult to implement generically for all types `T`.
73     /// Hence, as a hack, we use a volatile write instead.
write(&self, index: isize, task: MaybeUninit<T>)74     unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
75         ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
76     }
77 
78     /// Reads a task from the specified `index`.
79     ///
80     /// This method might be concurrently called with another `write` at the same index, which is
81     /// technically speaking a data race and therefore UB. We should use an atomic load here, but
82     /// that would be more expensive and difficult to implement generically for all types `T`.
83     /// Hence, as a hack, we use a volatile load instead.
read(&self, index: isize) -> MaybeUninit<T>84     unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
85         ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
86     }
87 }
88 
89 impl<T> Clone for Buffer<T> {
clone(&self) -> Buffer<T>90     fn clone(&self) -> Buffer<T> {
91         *self
92     }
93 }
94 
95 impl<T> Copy for Buffer<T> {}
96 
97 /// Internal queue data shared between the worker and stealers.
98 ///
99 /// The implementation is based on the following work:
100 ///
101 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
102 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
103 ///    PPoPP 2013.][weak-mem]
104 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
105 ///    atomics. OOPSLA 2013.][checker]
106 ///
107 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
108 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
109 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514
110 struct Inner<T> {
111     /// The front index.
112     front: AtomicIsize,
113 
114     /// The back index.
115     back: AtomicIsize,
116 
117     /// The underlying buffer.
118     buffer: CachePadded<Atomic<Buffer<T>>>,
119 }
120 
121 impl<T> Drop for Inner<T> {
drop(&mut self)122     fn drop(&mut self) {
123         // Load the back index, front index, and buffer.
124         let b = *self.back.get_mut();
125         let f = *self.front.get_mut();
126 
127         unsafe {
128             let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
129 
130             // Go through the buffer from front to back and drop all tasks in the queue.
131             let mut i = f;
132             while i != b {
133                 buffer.deref().at(i).drop_in_place();
134                 i = i.wrapping_add(1);
135             }
136 
137             // Free the memory allocated by the buffer.
138             buffer.into_owned().into_box().dealloc();
139         }
140     }
141 }
142 
143 /// Worker queue flavor: FIFO or LIFO.
144 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
145 enum Flavor {
146     /// The first-in first-out flavor.
147     Fifo,
148 
149     /// The last-in first-out flavor.
150     Lifo,
151 }
152 
153 /// A worker queue.
154 ///
155 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
156 /// tasks from it. Task schedulers typically create a single worker queue per thread.
157 ///
158 /// # Examples
159 ///
160 /// A FIFO worker:
161 ///
162 /// ```
163 /// use crossbeam_deque::{Steal, Worker};
164 ///
165 /// let w = Worker::new_fifo();
166 /// let s = w.stealer();
167 ///
168 /// w.push(1);
169 /// w.push(2);
170 /// w.push(3);
171 ///
172 /// assert_eq!(s.steal(), Steal::Success(1));
173 /// assert_eq!(w.pop(), Some(2));
174 /// assert_eq!(w.pop(), Some(3));
175 /// ```
176 ///
177 /// A LIFO worker:
178 ///
179 /// ```
180 /// use crossbeam_deque::{Steal, Worker};
181 ///
182 /// let w = Worker::new_lifo();
183 /// let s = w.stealer();
184 ///
185 /// w.push(1);
186 /// w.push(2);
187 /// w.push(3);
188 ///
189 /// assert_eq!(s.steal(), Steal::Success(1));
190 /// assert_eq!(w.pop(), Some(3));
191 /// assert_eq!(w.pop(), Some(2));
192 /// ```
193 pub struct Worker<T> {
194     /// A reference to the inner representation of the queue.
195     inner: Arc<CachePadded<Inner<T>>>,
196 
197     /// A copy of `inner.buffer` for quick access.
198     buffer: Cell<Buffer<T>>,
199 
200     /// The flavor of the queue.
201     flavor: Flavor,
202 
203     /// Indicates that the worker cannot be shared among threads.
204     _marker: PhantomData<*mut ()>, // !Send + !Sync
205 }
206 
207 unsafe impl<T: Send> Send for Worker<T> {}
208 
209 impl<T> Worker<T> {
210     /// Creates a FIFO worker queue.
211     ///
212     /// Tasks are pushed and popped from opposite ends.
213     ///
214     /// # Examples
215     ///
216     /// ```
217     /// use crossbeam_deque::Worker;
218     ///
219     /// let w = Worker::<i32>::new_fifo();
220     /// ```
new_fifo() -> Worker<T>221     pub fn new_fifo() -> Worker<T> {
222         let buffer = Buffer::alloc(MIN_CAP);
223 
224         let inner = Arc::new(CachePadded::new(Inner {
225             front: AtomicIsize::new(0),
226             back: AtomicIsize::new(0),
227             buffer: CachePadded::new(Atomic::new(buffer)),
228         }));
229 
230         Worker {
231             inner,
232             buffer: Cell::new(buffer),
233             flavor: Flavor::Fifo,
234             _marker: PhantomData,
235         }
236     }
237 
238     /// Creates a LIFO worker queue.
239     ///
240     /// Tasks are pushed and popped from the same end.
241     ///
242     /// # Examples
243     ///
244     /// ```
245     /// use crossbeam_deque::Worker;
246     ///
247     /// let w = Worker::<i32>::new_lifo();
248     /// ```
new_lifo() -> Worker<T>249     pub fn new_lifo() -> Worker<T> {
250         let buffer = Buffer::alloc(MIN_CAP);
251 
252         let inner = Arc::new(CachePadded::new(Inner {
253             front: AtomicIsize::new(0),
254             back: AtomicIsize::new(0),
255             buffer: CachePadded::new(Atomic::new(buffer)),
256         }));
257 
258         Worker {
259             inner,
260             buffer: Cell::new(buffer),
261             flavor: Flavor::Lifo,
262             _marker: PhantomData,
263         }
264     }
265 
266     /// Creates a stealer for this queue.
267     ///
268     /// The returned stealer can be shared among threads and cloned.
269     ///
270     /// # Examples
271     ///
272     /// ```
273     /// use crossbeam_deque::Worker;
274     ///
275     /// let w = Worker::<i32>::new_lifo();
276     /// let s = w.stealer();
277     /// ```
stealer(&self) -> Stealer<T>278     pub fn stealer(&self) -> Stealer<T> {
279         Stealer {
280             inner: self.inner.clone(),
281             flavor: self.flavor,
282         }
283     }
284 
285     /// Resizes the internal buffer to the new capacity of `new_cap`.
286     #[cold]
resize(&self, new_cap: usize)287     unsafe fn resize(&self, new_cap: usize) {
288         // Load the back index, front index, and buffer.
289         let b = self.inner.back.load(Ordering::Relaxed);
290         let f = self.inner.front.load(Ordering::Relaxed);
291         let buffer = self.buffer.get();
292 
293         // Allocate a new buffer and copy data from the old buffer to the new one.
294         let new = Buffer::alloc(new_cap);
295         let mut i = f;
296         while i != b {
297             ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
298             i = i.wrapping_add(1);
299         }
300 
301         let guard = &epoch::pin();
302 
303         // Replace the old buffer with the new one.
304         self.buffer.replace(new);
305         let old =
306             self.inner
307                 .buffer
308                 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
309 
310         // Destroy the old buffer later.
311         guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
312 
313         // If the buffer is very large, then flush the thread-local garbage in order to deallocate
314         // it as soon as possible.
315         if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
316             guard.flush();
317         }
318     }
319 
320     /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
321     /// buffer.
reserve(&self, reserve_cap: usize)322     fn reserve(&self, reserve_cap: usize) {
323         if reserve_cap > 0 {
324             // Compute the current length.
325             let b = self.inner.back.load(Ordering::Relaxed);
326             let f = self.inner.front.load(Ordering::SeqCst);
327             let len = b.wrapping_sub(f) as usize;
328 
329             // The current capacity.
330             let cap = self.buffer.get().cap;
331 
332             // Is there enough capacity to push `reserve_cap` tasks?
333             if cap - len < reserve_cap {
334                 // Keep doubling the capacity as much as is needed.
335                 let mut new_cap = cap * 2;
336                 while new_cap - len < reserve_cap {
337                     new_cap *= 2;
338                 }
339 
340                 // Resize the buffer.
341                 unsafe {
342                     self.resize(new_cap);
343                 }
344             }
345         }
346     }
347 
348     /// Returns `true` if the queue is empty.
349     ///
350     /// ```
351     /// use crossbeam_deque::Worker;
352     ///
353     /// let w = Worker::new_lifo();
354     ///
355     /// assert!(w.is_empty());
356     /// w.push(1);
357     /// assert!(!w.is_empty());
358     /// ```
is_empty(&self) -> bool359     pub fn is_empty(&self) -> bool {
360         let b = self.inner.back.load(Ordering::Relaxed);
361         let f = self.inner.front.load(Ordering::SeqCst);
362         b.wrapping_sub(f) <= 0
363     }
364 
365     /// Returns the number of tasks in the deque.
366     ///
367     /// ```
368     /// use crossbeam_deque::Worker;
369     ///
370     /// let w = Worker::new_lifo();
371     ///
372     /// assert_eq!(w.len(), 0);
373     /// w.push(1);
374     /// assert_eq!(w.len(), 1);
375     /// w.push(1);
376     /// assert_eq!(w.len(), 2);
377     /// ```
len(&self) -> usize378     pub fn len(&self) -> usize {
379         let b = self.inner.back.load(Ordering::Relaxed);
380         let f = self.inner.front.load(Ordering::SeqCst);
381         b.wrapping_sub(f).max(0) as usize
382     }
383 
384     /// Pushes a task into the queue.
385     ///
386     /// # Examples
387     ///
388     /// ```
389     /// use crossbeam_deque::Worker;
390     ///
391     /// let w = Worker::new_lifo();
392     /// w.push(1);
393     /// w.push(2);
394     /// ```
push(&self, task: T)395     pub fn push(&self, task: T) {
396         // Load the back index, front index, and buffer.
397         let b = self.inner.back.load(Ordering::Relaxed);
398         let f = self.inner.front.load(Ordering::Acquire);
399         let mut buffer = self.buffer.get();
400 
401         // Calculate the length of the queue.
402         let len = b.wrapping_sub(f);
403 
404         // Is the queue full?
405         if len >= buffer.cap as isize {
406             // Yes. Grow the underlying buffer.
407             unsafe {
408                 self.resize(2 * buffer.cap);
409             }
410             buffer = self.buffer.get();
411         }
412 
413         // Write `task` into the slot.
414         unsafe {
415             buffer.write(b, MaybeUninit::new(task));
416         }
417 
418         atomic::fence(Ordering::Release);
419 
420         // Increment the back index.
421         //
422         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
423         // races because it doesn't understand fences.
424         self.inner.back.store(b.wrapping_add(1), Ordering::Release);
425     }
426 
427     /// Pops a task from the queue.
428     ///
429     /// # Examples
430     ///
431     /// ```
432     /// use crossbeam_deque::Worker;
433     ///
434     /// let w = Worker::new_fifo();
435     /// w.push(1);
436     /// w.push(2);
437     ///
438     /// assert_eq!(w.pop(), Some(1));
439     /// assert_eq!(w.pop(), Some(2));
440     /// assert_eq!(w.pop(), None);
441     /// ```
pop(&self) -> Option<T>442     pub fn pop(&self) -> Option<T> {
443         // Load the back and front index.
444         let b = self.inner.back.load(Ordering::Relaxed);
445         let f = self.inner.front.load(Ordering::Relaxed);
446 
447         // Calculate the length of the queue.
448         let len = b.wrapping_sub(f);
449 
450         // Is the queue empty?
451         if len <= 0 {
452             return None;
453         }
454 
455         match self.flavor {
456             // Pop from the front of the queue.
457             Flavor::Fifo => {
458                 // Try incrementing the front index to pop the task.
459                 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
460                 let new_f = f.wrapping_add(1);
461 
462                 if b.wrapping_sub(new_f) < 0 {
463                     self.inner.front.store(f, Ordering::Relaxed);
464                     return None;
465                 }
466 
467                 unsafe {
468                     // Read the popped task.
469                     let buffer = self.buffer.get();
470                     let task = buffer.read(f).assume_init();
471 
472                     // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
473                     if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
474                         self.resize(buffer.cap / 2);
475                     }
476 
477                     Some(task)
478                 }
479             }
480 
481             // Pop from the back of the queue.
482             Flavor::Lifo => {
483                 // Decrement the back index.
484                 let b = b.wrapping_sub(1);
485                 self.inner.back.store(b, Ordering::Relaxed);
486 
487                 atomic::fence(Ordering::SeqCst);
488 
489                 // Load the front index.
490                 let f = self.inner.front.load(Ordering::Relaxed);
491 
492                 // Compute the length after the back index was decremented.
493                 let len = b.wrapping_sub(f);
494 
495                 if len < 0 {
496                     // The queue is empty. Restore the back index to the original task.
497                     self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
498                     None
499                 } else {
500                     // Read the task to be popped.
501                     let buffer = self.buffer.get();
502                     let mut task = unsafe { Some(buffer.read(b)) };
503 
504                     // Are we popping the last task from the queue?
505                     if len == 0 {
506                         // Try incrementing the front index.
507                         if self
508                             .inner
509                             .front
510                             .compare_exchange(
511                                 f,
512                                 f.wrapping_add(1),
513                                 Ordering::SeqCst,
514                                 Ordering::Relaxed,
515                             )
516                             .is_err()
517                         {
518                             // Failed. We didn't pop anything. Reset to `None`.
519                             task.take();
520                         }
521 
522                         // Restore the back index to the original task.
523                         self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
524                     } else {
525                         // Shrink the buffer if `len` is less than one fourth of the capacity.
526                         if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
527                             unsafe {
528                                 self.resize(buffer.cap / 2);
529                             }
530                         }
531                     }
532 
533                     task.map(|t| unsafe { t.assume_init() })
534                 }
535             }
536         }
537     }
538 }
539 
540 impl<T> fmt::Debug for Worker<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result541     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542         f.pad("Worker { .. }")
543     }
544 }
545 
546 /// A stealer handle of a worker queue.
547 ///
548 /// Stealers can be shared among threads.
549 ///
550 /// Task schedulers typically have a single worker queue per worker thread.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// use crossbeam_deque::{Steal, Worker};
556 ///
557 /// let w = Worker::new_lifo();
558 /// w.push(1);
559 /// w.push(2);
560 ///
561 /// let s = w.stealer();
562 /// assert_eq!(s.steal(), Steal::Success(1));
563 /// assert_eq!(s.steal(), Steal::Success(2));
564 /// assert_eq!(s.steal(), Steal::Empty);
565 /// ```
566 pub struct Stealer<T> {
567     /// A reference to the inner representation of the queue.
568     inner: Arc<CachePadded<Inner<T>>>,
569 
570     /// The flavor of the queue.
571     flavor: Flavor,
572 }
573 
574 unsafe impl<T: Send> Send for Stealer<T> {}
575 unsafe impl<T: Send> Sync for Stealer<T> {}
576 
577 impl<T> Stealer<T> {
578     /// Returns `true` if the queue is empty.
579     ///
580     /// ```
581     /// use crossbeam_deque::Worker;
582     ///
583     /// let w = Worker::new_lifo();
584     /// let s = w.stealer();
585     ///
586     /// assert!(s.is_empty());
587     /// w.push(1);
588     /// assert!(!s.is_empty());
589     /// ```
is_empty(&self) -> bool590     pub fn is_empty(&self) -> bool {
591         let f = self.inner.front.load(Ordering::Acquire);
592         atomic::fence(Ordering::SeqCst);
593         let b = self.inner.back.load(Ordering::Acquire);
594         b.wrapping_sub(f) <= 0
595     }
596 
597     /// Returns the number of tasks in the deque.
598     ///
599     /// ```
600     /// use crossbeam_deque::Worker;
601     ///
602     /// let w = Worker::new_lifo();
603     /// let s = w.stealer();
604     ///
605     /// assert_eq!(s.len(), 0);
606     /// w.push(1);
607     /// assert_eq!(s.len(), 1);
608     /// w.push(2);
609     /// assert_eq!(s.len(), 2);
610     /// ```
len(&self) -> usize611     pub fn len(&self) -> usize {
612         let f = self.inner.front.load(Ordering::Acquire);
613         atomic::fence(Ordering::SeqCst);
614         let b = self.inner.back.load(Ordering::Acquire);
615         b.wrapping_sub(f).max(0) as usize
616     }
617 
618     /// Steals a task from the queue.
619     ///
620     /// # Examples
621     ///
622     /// ```
623     /// use crossbeam_deque::{Steal, Worker};
624     ///
625     /// let w = Worker::new_lifo();
626     /// w.push(1);
627     /// w.push(2);
628     ///
629     /// let s = w.stealer();
630     /// assert_eq!(s.steal(), Steal::Success(1));
631     /// assert_eq!(s.steal(), Steal::Success(2));
632     /// ```
steal(&self) -> Steal<T>633     pub fn steal(&self) -> Steal<T> {
634         // Load the front index.
635         let f = self.inner.front.load(Ordering::Acquire);
636 
637         // A SeqCst fence is needed here.
638         //
639         // If the current thread is already pinned (reentrantly), we must manually issue the
640         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
641         // have to.
642         if epoch::is_pinned() {
643             atomic::fence(Ordering::SeqCst);
644         }
645 
646         let guard = &epoch::pin();
647 
648         // Load the back index.
649         let b = self.inner.back.load(Ordering::Acquire);
650 
651         // Is the queue empty?
652         if b.wrapping_sub(f) <= 0 {
653             return Steal::Empty;
654         }
655 
656         // Load the buffer and read the task at the front.
657         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
658         let task = unsafe { buffer.deref().read(f) };
659 
660         // Try incrementing the front index to steal the task.
661         // If the buffer has been swapped or the increment fails, we retry.
662         if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
663             || self
664                 .inner
665                 .front
666                 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
667                 .is_err()
668         {
669             // We didn't steal this task, forget it.
670             return Steal::Retry;
671         }
672 
673         // Return the stolen task.
674         Steal::Success(unsafe { task.assume_init() })
675     }
676 
677     /// Steals a batch of tasks and pushes them into another worker.
678     ///
679     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
680     /// steal around half of the tasks in the queue, but also not more than some constant limit.
681     ///
682     /// # Examples
683     ///
684     /// ```
685     /// use crossbeam_deque::Worker;
686     ///
687     /// let w1 = Worker::new_fifo();
688     /// w1.push(1);
689     /// w1.push(2);
690     /// w1.push(3);
691     /// w1.push(4);
692     ///
693     /// let s = w1.stealer();
694     /// let w2 = Worker::new_fifo();
695     ///
696     /// let _ = s.steal_batch(&w2);
697     /// assert_eq!(w2.pop(), Some(1));
698     /// assert_eq!(w2.pop(), Some(2));
699     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>700     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
701         self.steal_batch_with_limit(dest, MAX_BATCH)
702     }
703 
704     /// Steals no more than `limit` of tasks and pushes them into another worker.
705     ///
706     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
707     /// steal around half of the tasks in the queue, but also not more than the given limit.
708     ///
709     /// # Examples
710     ///
711     /// ```
712     /// use crossbeam_deque::Worker;
713     ///
714     /// let w1 = Worker::new_fifo();
715     /// w1.push(1);
716     /// w1.push(2);
717     /// w1.push(3);
718     /// w1.push(4);
719     /// w1.push(5);
720     /// w1.push(6);
721     ///
722     /// let s = w1.stealer();
723     /// let w2 = Worker::new_fifo();
724     ///
725     /// let _ = s.steal_batch_with_limit(&w2, 2);
726     /// assert_eq!(w2.pop(), Some(1));
727     /// assert_eq!(w2.pop(), Some(2));
728     /// assert_eq!(w2.pop(), None);
729     ///
730     /// w1.push(7);
731     /// w1.push(8);
732     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
733     /// // half of the elements are currently popped, but the number of popped elements is considered
734     /// // an implementation detail that may be changed in the future.
735     /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
736     /// assert_eq!(w2.len(), 3);
737     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>738     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
739         assert!(limit > 0);
740         if Arc::ptr_eq(&self.inner, &dest.inner) {
741             if dest.is_empty() {
742                 return Steal::Empty;
743             } else {
744                 return Steal::Success(());
745             }
746         }
747 
748         // Load the front index.
749         let mut f = self.inner.front.load(Ordering::Acquire);
750 
751         // A SeqCst fence is needed here.
752         //
753         // If the current thread is already pinned (reentrantly), we must manually issue the
754         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
755         // have to.
756         if epoch::is_pinned() {
757             atomic::fence(Ordering::SeqCst);
758         }
759 
760         let guard = &epoch::pin();
761 
762         // Load the back index.
763         let b = self.inner.back.load(Ordering::Acquire);
764 
765         // Is the queue empty?
766         let len = b.wrapping_sub(f);
767         if len <= 0 {
768             return Steal::Empty;
769         }
770 
771         // Reserve capacity for the stolen batch.
772         let batch_size = cmp::min((len as usize + 1) / 2, limit);
773         dest.reserve(batch_size);
774         let mut batch_size = batch_size as isize;
775 
776         // Get the destination buffer and back index.
777         let dest_buffer = dest.buffer.get();
778         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
779 
780         // Load the buffer.
781         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
782 
783         match self.flavor {
784             // Steal a batch of tasks from the front at once.
785             Flavor::Fifo => {
786                 // Copy the batch from the source to the destination buffer.
787                 match dest.flavor {
788                     Flavor::Fifo => {
789                         for i in 0..batch_size {
790                             unsafe {
791                                 let task = buffer.deref().read(f.wrapping_add(i));
792                                 dest_buffer.write(dest_b.wrapping_add(i), task);
793                             }
794                         }
795                     }
796                     Flavor::Lifo => {
797                         for i in 0..batch_size {
798                             unsafe {
799                                 let task = buffer.deref().read(f.wrapping_add(i));
800                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
801                             }
802                         }
803                     }
804                 }
805 
806                 // Try incrementing the front index to steal the batch.
807                 // If the buffer has been swapped or the increment fails, we retry.
808                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
809                     || self
810                         .inner
811                         .front
812                         .compare_exchange(
813                             f,
814                             f.wrapping_add(batch_size),
815                             Ordering::SeqCst,
816                             Ordering::Relaxed,
817                         )
818                         .is_err()
819                 {
820                     return Steal::Retry;
821                 }
822 
823                 dest_b = dest_b.wrapping_add(batch_size);
824             }
825 
826             // Steal a batch of tasks from the front one by one.
827             Flavor::Lifo => {
828                 // This loop may modify the batch_size, which triggers a clippy lint warning.
829                 // Use a new variable to avoid the warning, and to make it clear we aren't
830                 // modifying the loop exit condition during iteration.
831                 let original_batch_size = batch_size;
832 
833                 for i in 0..original_batch_size {
834                     // If this is not the first steal, check whether the queue is empty.
835                     if i > 0 {
836                         // We've already got the current front index. Now execute the fence to
837                         // synchronize with other threads.
838                         atomic::fence(Ordering::SeqCst);
839 
840                         // Load the back index.
841                         let b = self.inner.back.load(Ordering::Acquire);
842 
843                         // Is the queue empty?
844                         if b.wrapping_sub(f) <= 0 {
845                             batch_size = i;
846                             break;
847                         }
848                     }
849 
850                     // Read the task at the front.
851                     let task = unsafe { buffer.deref().read(f) };
852 
853                     // Try incrementing the front index to steal the task.
854                     // If the buffer has been swapped or the increment fails, we retry.
855                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
856                         || self
857                             .inner
858                             .front
859                             .compare_exchange(
860                                 f,
861                                 f.wrapping_add(1),
862                                 Ordering::SeqCst,
863                                 Ordering::Relaxed,
864                             )
865                             .is_err()
866                     {
867                         // We didn't steal this task, forget it and break from the loop.
868                         batch_size = i;
869                         break;
870                     }
871 
872                     // Write the stolen task into the destination buffer.
873                     unsafe {
874                         dest_buffer.write(dest_b, task);
875                     }
876 
877                     // Move the source front index and the destination back index one step forward.
878                     f = f.wrapping_add(1);
879                     dest_b = dest_b.wrapping_add(1);
880                 }
881 
882                 // If we didn't steal anything, the operation needs to be retried.
883                 if batch_size == 0 {
884                     return Steal::Retry;
885                 }
886 
887                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
888                 if dest.flavor == Flavor::Fifo {
889                     for i in 0..batch_size / 2 {
890                         unsafe {
891                             let i1 = dest_b.wrapping_sub(batch_size - i);
892                             let i2 = dest_b.wrapping_sub(i + 1);
893                             let t1 = dest_buffer.read(i1);
894                             let t2 = dest_buffer.read(i2);
895                             dest_buffer.write(i1, t2);
896                             dest_buffer.write(i2, t1);
897                         }
898                     }
899                 }
900             }
901         }
902 
903         atomic::fence(Ordering::Release);
904 
905         // Update the back index in the destination queue.
906         //
907         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
908         // races because it doesn't understand fences.
909         dest.inner.back.store(dest_b, Ordering::Release);
910 
911         // Return with success.
912         Steal::Success(())
913     }
914 
915     /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
916     ///
917     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
918     /// steal around half of the tasks in the queue, but also not more than some constant limit.
919     ///
920     /// # Examples
921     ///
922     /// ```
923     /// use crossbeam_deque::{Steal, Worker};
924     ///
925     /// let w1 = Worker::new_fifo();
926     /// w1.push(1);
927     /// w1.push(2);
928     /// w1.push(3);
929     /// w1.push(4);
930     ///
931     /// let s = w1.stealer();
932     /// let w2 = Worker::new_fifo();
933     ///
934     /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
935     /// assert_eq!(w2.pop(), Some(2));
936     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>937     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
938         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
939     }
940 
941     /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
942     /// that worker.
943     ///
944     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
945     /// steal around half of the tasks in the queue, but also not more than the given limit.
946     ///
947     /// # Examples
948     ///
949     /// ```
950     /// use crossbeam_deque::{Steal, Worker};
951     ///
952     /// let w1 = Worker::new_fifo();
953     /// w1.push(1);
954     /// w1.push(2);
955     /// w1.push(3);
956     /// w1.push(4);
957     /// w1.push(5);
958     /// w1.push(6);
959     ///
960     /// let s = w1.stealer();
961     /// let w2 = Worker::new_fifo();
962     ///
963     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
964     /// assert_eq!(w2.pop(), Some(2));
965     /// assert_eq!(w2.pop(), None);
966     ///
967     /// w1.push(7);
968     /// w1.push(8);
969     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
970     /// // half of the elements are currently popped, but the number of popped elements is considered
971     /// // an implementation detail that may be changed in the future.
972     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
973     /// assert_eq!(w2.pop(), Some(4));
974     /// assert_eq!(w2.pop(), Some(5));
975     /// assert_eq!(w2.pop(), None);
976     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>977     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
978         assert!(limit > 0);
979         if Arc::ptr_eq(&self.inner, &dest.inner) {
980             match dest.pop() {
981                 None => return Steal::Empty,
982                 Some(task) => return Steal::Success(task),
983             }
984         }
985 
986         // Load the front index.
987         let mut f = self.inner.front.load(Ordering::Acquire);
988 
989         // A SeqCst fence is needed here.
990         //
991         // If the current thread is already pinned (reentrantly), we must manually issue the
992         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
993         // have to.
994         if epoch::is_pinned() {
995             atomic::fence(Ordering::SeqCst);
996         }
997 
998         let guard = &epoch::pin();
999 
1000         // Load the back index.
1001         let b = self.inner.back.load(Ordering::Acquire);
1002 
1003         // Is the queue empty?
1004         let len = b.wrapping_sub(f);
1005         if len <= 0 {
1006             return Steal::Empty;
1007         }
1008 
1009         // Reserve capacity for the stolen batch.
1010         let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1011         dest.reserve(batch_size);
1012         let mut batch_size = batch_size as isize;
1013 
1014         // Get the destination buffer and back index.
1015         let dest_buffer = dest.buffer.get();
1016         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1017 
1018         // Load the buffer
1019         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1020 
1021         // Read the task at the front.
1022         let mut task = unsafe { buffer.deref().read(f) };
1023 
1024         match self.flavor {
1025             // Steal a batch of tasks from the front at once.
1026             Flavor::Fifo => {
1027                 // Copy the batch from the source to the destination buffer.
1028                 match dest.flavor {
1029                     Flavor::Fifo => {
1030                         for i in 0..batch_size {
1031                             unsafe {
1032                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1033                                 dest_buffer.write(dest_b.wrapping_add(i), task);
1034                             }
1035                         }
1036                     }
1037                     Flavor::Lifo => {
1038                         for i in 0..batch_size {
1039                             unsafe {
1040                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1041                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1042                             }
1043                         }
1044                     }
1045                 }
1046 
1047                 // Try incrementing the front index to steal the task.
1048                 // If the buffer has been swapped or the increment fails, we retry.
1049                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1050                     || self
1051                         .inner
1052                         .front
1053                         .compare_exchange(
1054                             f,
1055                             f.wrapping_add(batch_size + 1),
1056                             Ordering::SeqCst,
1057                             Ordering::Relaxed,
1058                         )
1059                         .is_err()
1060                 {
1061                     // We didn't steal this task, forget it.
1062                     return Steal::Retry;
1063                 }
1064 
1065                 dest_b = dest_b.wrapping_add(batch_size);
1066             }
1067 
1068             // Steal a batch of tasks from the front one by one.
1069             Flavor::Lifo => {
1070                 // Try incrementing the front index to steal the task.
1071                 if self
1072                     .inner
1073                     .front
1074                     .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1075                     .is_err()
1076                 {
1077                     // We didn't steal this task, forget it.
1078                     return Steal::Retry;
1079                 }
1080 
1081                 // Move the front index one step forward.
1082                 f = f.wrapping_add(1);
1083 
1084                 // Repeat the same procedure for the batch steals.
1085                 //
1086                 // This loop may modify the batch_size, which triggers a clippy lint warning.
1087                 // Use a new variable to avoid the warning, and to make it clear we aren't
1088                 // modifying the loop exit condition during iteration.
1089                 let original_batch_size = batch_size;
1090                 for i in 0..original_batch_size {
1091                     // We've already got the current front index. Now execute the fence to
1092                     // synchronize with other threads.
1093                     atomic::fence(Ordering::SeqCst);
1094 
1095                     // Load the back index.
1096                     let b = self.inner.back.load(Ordering::Acquire);
1097 
1098                     // Is the queue empty?
1099                     if b.wrapping_sub(f) <= 0 {
1100                         batch_size = i;
1101                         break;
1102                     }
1103 
1104                     // Read the task at the front.
1105                     let tmp = unsafe { buffer.deref().read(f) };
1106 
1107                     // Try incrementing the front index to steal the task.
1108                     // If the buffer has been swapped or the increment fails, we retry.
1109                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1110                         || self
1111                             .inner
1112                             .front
1113                             .compare_exchange(
1114                                 f,
1115                                 f.wrapping_add(1),
1116                                 Ordering::SeqCst,
1117                                 Ordering::Relaxed,
1118                             )
1119                             .is_err()
1120                     {
1121                         // We didn't steal this task, forget it and break from the loop.
1122                         batch_size = i;
1123                         break;
1124                     }
1125 
1126                     // Write the previously stolen task into the destination buffer.
1127                     unsafe {
1128                         dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1129                     }
1130 
1131                     // Move the source front index and the destination back index one step forward.
1132                     f = f.wrapping_add(1);
1133                     dest_b = dest_b.wrapping_add(1);
1134                 }
1135 
1136                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1137                 if dest.flavor == Flavor::Fifo {
1138                     for i in 0..batch_size / 2 {
1139                         unsafe {
1140                             let i1 = dest_b.wrapping_sub(batch_size - i);
1141                             let i2 = dest_b.wrapping_sub(i + 1);
1142                             let t1 = dest_buffer.read(i1);
1143                             let t2 = dest_buffer.read(i2);
1144                             dest_buffer.write(i1, t2);
1145                             dest_buffer.write(i2, t1);
1146                         }
1147                     }
1148                 }
1149             }
1150         }
1151 
1152         atomic::fence(Ordering::Release);
1153 
1154         // Update the back index in the destination queue.
1155         //
1156         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1157         // races because it doesn't understand fences.
1158         dest.inner.back.store(dest_b, Ordering::Release);
1159 
1160         // Return with success.
1161         Steal::Success(unsafe { task.assume_init() })
1162     }
1163 }
1164 
1165 impl<T> Clone for Stealer<T> {
clone(&self) -> Stealer<T>1166     fn clone(&self) -> Stealer<T> {
1167         Stealer {
1168             inner: self.inner.clone(),
1169             flavor: self.flavor,
1170         }
1171     }
1172 }
1173 
1174 impl<T> fmt::Debug for Stealer<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1175     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176         f.pad("Stealer { .. }")
1177     }
1178 }
1179 
1180 // Bits indicating the state of a slot:
1181 // * If a task has been written into the slot, `WRITE` is set.
1182 // * If a task has been read from the slot, `READ` is set.
1183 // * If the block is being destroyed, `DESTROY` is set.
1184 const WRITE: usize = 1;
1185 const READ: usize = 2;
1186 const DESTROY: usize = 4;
1187 
1188 // Each block covers one "lap" of indices.
1189 const LAP: usize = 64;
1190 // The maximum number of values a block can hold.
1191 const BLOCK_CAP: usize = LAP - 1;
1192 // How many lower bits are reserved for metadata.
1193 const SHIFT: usize = 1;
1194 // Indicates that the block is not the last one.
1195 const HAS_NEXT: usize = 1;
1196 
1197 /// A slot in a block.
1198 struct Slot<T> {
1199     /// The task.
1200     task: UnsafeCell<MaybeUninit<T>>,
1201 
1202     /// The state of the slot.
1203     state: AtomicUsize,
1204 }
1205 
1206 impl<T> Slot<T> {
1207     /// Waits until a task is written into the slot.
wait_write(&self)1208     fn wait_write(&self) {
1209         let backoff = Backoff::new();
1210         while self.state.load(Ordering::Acquire) & WRITE == 0 {
1211             backoff.snooze();
1212         }
1213     }
1214 }
1215 
1216 /// A block in a linked list.
1217 ///
1218 /// Each block in the list can hold up to `BLOCK_CAP` values.
1219 struct Block<T> {
1220     /// The next block in the linked list.
1221     next: AtomicPtr<Block<T>>,
1222 
1223     /// Slots for values.
1224     slots: [Slot<T>; BLOCK_CAP],
1225 }
1226 
1227 impl<T> Block<T> {
1228     const LAYOUT: Layout = {
1229         let layout = Layout::new::<Self>();
1230         assert!(
1231             layout.size() != 0,
1232             "Block should never be zero-sized, as it has an AtomicPtr field"
1233         );
1234         layout
1235     };
1236 
1237     /// Creates an empty block.
new() -> Box<Self>1238     fn new() -> Box<Self> {
1239         // SAFETY: layout is not zero-sized
1240         let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1241         // Handle allocation failure
1242         if ptr.is_null() {
1243             handle_alloc_error(Self::LAYOUT)
1244         }
1245         // SAFETY: This is safe because:
1246         //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1247         //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1248         //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1249         //       holds a MaybeUninit.
1250         //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1251         // TODO: unsafe { Box::new_zeroed().assume_init() }
1252         unsafe { Box::from_raw(ptr.cast()) }
1253     }
1254 
1255     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>1256     fn wait_next(&self) -> *mut Block<T> {
1257         let backoff = Backoff::new();
1258         loop {
1259             let next = self.next.load(Ordering::Acquire);
1260             if !next.is_null() {
1261                 return next;
1262             }
1263             backoff.snooze();
1264         }
1265     }
1266 
1267     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, count: usize)1268     unsafe fn destroy(this: *mut Block<T>, count: usize) {
1269         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1270         // begun destruction of the block.
1271         for i in (0..count).rev() {
1272             let slot = (*this).slots.get_unchecked(i);
1273 
1274             // Mark the `DESTROY` bit if a thread is still using the slot.
1275             if slot.state.load(Ordering::Acquire) & READ == 0
1276                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1277             {
1278                 // If a thread is still using the slot, it will continue destruction of the block.
1279                 return;
1280             }
1281         }
1282 
1283         // No thread is using the block, now it is safe to destroy it.
1284         drop(Box::from_raw(this));
1285     }
1286 }
1287 
1288 /// A position in a queue.
1289 struct Position<T> {
1290     /// The index in the queue.
1291     index: AtomicUsize,
1292 
1293     /// The block in the linked list.
1294     block: AtomicPtr<Block<T>>,
1295 }
1296 
1297 /// An injector queue.
1298 ///
1299 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1300 /// a single injector queue, which is the entry point for new tasks.
1301 ///
1302 /// # Examples
1303 ///
1304 /// ```
1305 /// use crossbeam_deque::{Injector, Steal};
1306 ///
1307 /// let q = Injector::new();
1308 /// q.push(1);
1309 /// q.push(2);
1310 ///
1311 /// assert_eq!(q.steal(), Steal::Success(1));
1312 /// assert_eq!(q.steal(), Steal::Success(2));
1313 /// assert_eq!(q.steal(), Steal::Empty);
1314 /// ```
1315 pub struct Injector<T> {
1316     /// The head of the queue.
1317     head: CachePadded<Position<T>>,
1318 
1319     /// The tail of the queue.
1320     tail: CachePadded<Position<T>>,
1321 
1322     /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1323     _marker: PhantomData<T>,
1324 }
1325 
1326 unsafe impl<T: Send> Send for Injector<T> {}
1327 unsafe impl<T: Send> Sync for Injector<T> {}
1328 
1329 impl<T> Default for Injector<T> {
default() -> Self1330     fn default() -> Self {
1331         let block = Box::into_raw(Block::<T>::new());
1332         Self {
1333             head: CachePadded::new(Position {
1334                 block: AtomicPtr::new(block),
1335                 index: AtomicUsize::new(0),
1336             }),
1337             tail: CachePadded::new(Position {
1338                 block: AtomicPtr::new(block),
1339                 index: AtomicUsize::new(0),
1340             }),
1341             _marker: PhantomData,
1342         }
1343     }
1344 }
1345 
1346 impl<T> Injector<T> {
1347     /// Creates a new injector queue.
1348     ///
1349     /// # Examples
1350     ///
1351     /// ```
1352     /// use crossbeam_deque::Injector;
1353     ///
1354     /// let q = Injector::<i32>::new();
1355     /// ```
new() -> Injector<T>1356     pub fn new() -> Injector<T> {
1357         Self::default()
1358     }
1359 
1360     /// Pushes a task into the queue.
1361     ///
1362     /// # Examples
1363     ///
1364     /// ```
1365     /// use crossbeam_deque::Injector;
1366     ///
1367     /// let w = Injector::new();
1368     /// w.push(1);
1369     /// w.push(2);
1370     /// ```
push(&self, task: T)1371     pub fn push(&self, task: T) {
1372         let backoff = Backoff::new();
1373         let mut tail = self.tail.index.load(Ordering::Acquire);
1374         let mut block = self.tail.block.load(Ordering::Acquire);
1375         let mut next_block = None;
1376 
1377         loop {
1378             // Calculate the offset of the index into the block.
1379             let offset = (tail >> SHIFT) % LAP;
1380 
1381             // If we reached the end of the block, wait until the next one is installed.
1382             if offset == BLOCK_CAP {
1383                 backoff.snooze();
1384                 tail = self.tail.index.load(Ordering::Acquire);
1385                 block = self.tail.block.load(Ordering::Acquire);
1386                 continue;
1387             }
1388 
1389             // If we're going to have to install the next block, allocate it in advance in order to
1390             // make the wait for other threads as short as possible.
1391             if offset + 1 == BLOCK_CAP && next_block.is_none() {
1392                 next_block = Some(Block::<T>::new());
1393             }
1394 
1395             let new_tail = tail + (1 << SHIFT);
1396 
1397             // Try advancing the tail forward.
1398             match self.tail.index.compare_exchange_weak(
1399                 tail,
1400                 new_tail,
1401                 Ordering::SeqCst,
1402                 Ordering::Acquire,
1403             ) {
1404                 Ok(_) => unsafe {
1405                     // If we've reached the end of the block, install the next one.
1406                     if offset + 1 == BLOCK_CAP {
1407                         let next_block = Box::into_raw(next_block.unwrap());
1408                         let next_index = new_tail.wrapping_add(1 << SHIFT);
1409 
1410                         self.tail.block.store(next_block, Ordering::Release);
1411                         self.tail.index.store(next_index, Ordering::Release);
1412                         (*block).next.store(next_block, Ordering::Release);
1413                     }
1414 
1415                     // Write the task into the slot.
1416                     let slot = (*block).slots.get_unchecked(offset);
1417                     slot.task.get().write(MaybeUninit::new(task));
1418                     slot.state.fetch_or(WRITE, Ordering::Release);
1419 
1420                     return;
1421                 },
1422                 Err(t) => {
1423                     tail = t;
1424                     block = self.tail.block.load(Ordering::Acquire);
1425                     backoff.spin();
1426                 }
1427             }
1428         }
1429     }
1430 
1431     /// Steals a task from the queue.
1432     ///
1433     /// # Examples
1434     ///
1435     /// ```
1436     /// use crossbeam_deque::{Injector, Steal};
1437     ///
1438     /// let q = Injector::new();
1439     /// q.push(1);
1440     /// q.push(2);
1441     ///
1442     /// assert_eq!(q.steal(), Steal::Success(1));
1443     /// assert_eq!(q.steal(), Steal::Success(2));
1444     /// assert_eq!(q.steal(), Steal::Empty);
1445     /// ```
steal(&self) -> Steal<T>1446     pub fn steal(&self) -> Steal<T> {
1447         let mut head;
1448         let mut block;
1449         let mut offset;
1450 
1451         let backoff = Backoff::new();
1452         loop {
1453             head = self.head.index.load(Ordering::Acquire);
1454             block = self.head.block.load(Ordering::Acquire);
1455 
1456             // Calculate the offset of the index into the block.
1457             offset = (head >> SHIFT) % LAP;
1458 
1459             // If we reached the end of the block, wait until the next one is installed.
1460             if offset == BLOCK_CAP {
1461                 backoff.snooze();
1462             } else {
1463                 break;
1464             }
1465         }
1466 
1467         let mut new_head = head + (1 << SHIFT);
1468 
1469         if new_head & HAS_NEXT == 0 {
1470             atomic::fence(Ordering::SeqCst);
1471             let tail = self.tail.index.load(Ordering::Relaxed);
1472 
1473             // If the tail equals the head, that means the queue is empty.
1474             if head >> SHIFT == tail >> SHIFT {
1475                 return Steal::Empty;
1476             }
1477 
1478             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1479             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1480                 new_head |= HAS_NEXT;
1481             }
1482         }
1483 
1484         // Try moving the head index forward.
1485         if self
1486             .head
1487             .index
1488             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1489             .is_err()
1490         {
1491             return Steal::Retry;
1492         }
1493 
1494         unsafe {
1495             // If we've reached the end of the block, move to the next one.
1496             if offset + 1 == BLOCK_CAP {
1497                 let next = (*block).wait_next();
1498                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1499                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1500                     next_index |= HAS_NEXT;
1501                 }
1502 
1503                 self.head.block.store(next, Ordering::Release);
1504                 self.head.index.store(next_index, Ordering::Release);
1505             }
1506 
1507             // Read the task.
1508             let slot = (*block).slots.get_unchecked(offset);
1509             slot.wait_write();
1510             let task = slot.task.get().read().assume_init();
1511 
1512             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1513             // but couldn't because we were busy reading from the slot.
1514             if (offset + 1 == BLOCK_CAP)
1515                 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1516             {
1517                 Block::destroy(block, offset);
1518             }
1519 
1520             Steal::Success(task)
1521         }
1522     }
1523 
1524     /// Steals a batch of tasks and pushes them into a worker.
1525     ///
1526     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1527     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1528     ///
1529     /// # Examples
1530     ///
1531     /// ```
1532     /// use crossbeam_deque::{Injector, Worker};
1533     ///
1534     /// let q = Injector::new();
1535     /// q.push(1);
1536     /// q.push(2);
1537     /// q.push(3);
1538     /// q.push(4);
1539     ///
1540     /// let w = Worker::new_fifo();
1541     /// let _ = q.steal_batch(&w);
1542     /// assert_eq!(w.pop(), Some(1));
1543     /// assert_eq!(w.pop(), Some(2));
1544     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>1545     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1546         self.steal_batch_with_limit(dest, MAX_BATCH)
1547     }
1548 
1549     /// Steals no more than of tasks and pushes them into a worker.
1550     ///
1551     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1552     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1553     ///
1554     /// # Examples
1555     ///
1556     /// ```
1557     /// use crossbeam_deque::{Injector, Worker};
1558     ///
1559     /// let q = Injector::new();
1560     /// q.push(1);
1561     /// q.push(2);
1562     /// q.push(3);
1563     /// q.push(4);
1564     /// q.push(5);
1565     /// q.push(6);
1566     ///
1567     /// let w = Worker::new_fifo();
1568     /// let _ = q.steal_batch_with_limit(&w, 2);
1569     /// assert_eq!(w.pop(), Some(1));
1570     /// assert_eq!(w.pop(), Some(2));
1571     /// assert_eq!(w.pop(), None);
1572     ///
1573     /// q.push(7);
1574     /// q.push(8);
1575     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1576     /// // half of the elements are currently popped, but the number of popped elements is considered
1577     /// // an implementation detail that may be changed in the future.
1578     /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1579     /// assert_eq!(w.len(), 3);
1580     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1581     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1582         assert!(limit > 0);
1583         let mut head;
1584         let mut block;
1585         let mut offset;
1586 
1587         let backoff = Backoff::new();
1588         loop {
1589             head = self.head.index.load(Ordering::Acquire);
1590             block = self.head.block.load(Ordering::Acquire);
1591 
1592             // Calculate the offset of the index into the block.
1593             offset = (head >> SHIFT) % LAP;
1594 
1595             // If we reached the end of the block, wait until the next one is installed.
1596             if offset == BLOCK_CAP {
1597                 backoff.snooze();
1598             } else {
1599                 break;
1600             }
1601         }
1602 
1603         let mut new_head = head;
1604         let advance;
1605 
1606         if new_head & HAS_NEXT == 0 {
1607             atomic::fence(Ordering::SeqCst);
1608             let tail = self.tail.index.load(Ordering::Relaxed);
1609 
1610             // If the tail equals the head, that means the queue is empty.
1611             if head >> SHIFT == tail >> SHIFT {
1612                 return Steal::Empty;
1613             }
1614 
1615             // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1616             // the right batch size to steal.
1617             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1618                 new_head |= HAS_NEXT;
1619                 // We can steal all tasks till the end of the block.
1620                 advance = (BLOCK_CAP - offset).min(limit);
1621             } else {
1622                 let len = (tail - head) >> SHIFT;
1623                 // Steal half of the available tasks.
1624                 advance = ((len + 1) / 2).min(limit);
1625             }
1626         } else {
1627             // We can steal all tasks till the end of the block.
1628             advance = (BLOCK_CAP - offset).min(limit);
1629         }
1630 
1631         new_head += advance << SHIFT;
1632         let new_offset = offset + advance;
1633 
1634         // Try moving the head index forward.
1635         if self
1636             .head
1637             .index
1638             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1639             .is_err()
1640         {
1641             return Steal::Retry;
1642         }
1643 
1644         // Reserve capacity for the stolen batch.
1645         let batch_size = new_offset - offset;
1646         dest.reserve(batch_size);
1647 
1648         // Get the destination buffer and back index.
1649         let dest_buffer = dest.buffer.get();
1650         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1651 
1652         unsafe {
1653             // If we've reached the end of the block, move to the next one.
1654             if new_offset == BLOCK_CAP {
1655                 let next = (*block).wait_next();
1656                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1657                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1658                     next_index |= HAS_NEXT;
1659                 }
1660 
1661                 self.head.block.store(next, Ordering::Release);
1662                 self.head.index.store(next_index, Ordering::Release);
1663             }
1664 
1665             // Copy values from the injector into the destination queue.
1666             match dest.flavor {
1667                 Flavor::Fifo => {
1668                     for i in 0..batch_size {
1669                         // Read the task.
1670                         let slot = (*block).slots.get_unchecked(offset + i);
1671                         slot.wait_write();
1672                         let task = slot.task.get().read();
1673 
1674                         // Write it into the destination queue.
1675                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1676                     }
1677                 }
1678 
1679                 Flavor::Lifo => {
1680                     for i in 0..batch_size {
1681                         // Read the task.
1682                         let slot = (*block).slots.get_unchecked(offset + i);
1683                         slot.wait_write();
1684                         let task = slot.task.get().read();
1685 
1686                         // Write it into the destination queue.
1687                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1688                     }
1689                 }
1690             }
1691 
1692             atomic::fence(Ordering::Release);
1693 
1694             // Update the back index in the destination queue.
1695             //
1696             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1697             // data races because it doesn't understand fences.
1698             dest.inner
1699                 .back
1700                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1701 
1702             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1703             // but couldn't because we were busy reading from the slot.
1704             if new_offset == BLOCK_CAP {
1705                 Block::destroy(block, offset);
1706             } else {
1707                 for i in offset..new_offset {
1708                     let slot = (*block).slots.get_unchecked(i);
1709 
1710                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1711                         Block::destroy(block, offset);
1712                         break;
1713                     }
1714                 }
1715             }
1716 
1717             Steal::Success(())
1718         }
1719     }
1720 
1721     /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1722     ///
1723     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1724     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1725     ///
1726     /// # Examples
1727     ///
1728     /// ```
1729     /// use crossbeam_deque::{Injector, Steal, Worker};
1730     ///
1731     /// let q = Injector::new();
1732     /// q.push(1);
1733     /// q.push(2);
1734     /// q.push(3);
1735     /// q.push(4);
1736     ///
1737     /// let w = Worker::new_fifo();
1738     /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1739     /// assert_eq!(w.pop(), Some(2));
1740     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1741     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1742         // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1743         // better, but we may change it in the future to be compatible with the same method in Stealer.
1744         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1745     }
1746 
1747     /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1748     ///
1749     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1750     /// steal around half of the tasks in the queue, but also not more than the given limit.
1751     ///
1752     /// # Examples
1753     ///
1754     /// ```
1755     /// use crossbeam_deque::{Injector, Steal, Worker};
1756     ///
1757     /// let q = Injector::new();
1758     /// q.push(1);
1759     /// q.push(2);
1760     /// q.push(3);
1761     /// q.push(4);
1762     /// q.push(5);
1763     /// q.push(6);
1764     ///
1765     /// let w = Worker::new_fifo();
1766     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1767     /// assert_eq!(w.pop(), Some(2));
1768     /// assert_eq!(w.pop(), None);
1769     ///
1770     /// q.push(7);
1771     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1772     /// // half of the elements are currently popped, but the number of popped elements is considered
1773     /// // an implementation detail that may be changed in the future.
1774     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1775     /// assert_eq!(w.pop(), Some(4));
1776     /// assert_eq!(w.pop(), Some(5));
1777     /// assert_eq!(w.pop(), None);
1778     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1779     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1780         assert!(limit > 0);
1781         let mut head;
1782         let mut block;
1783         let mut offset;
1784 
1785         let backoff = Backoff::new();
1786         loop {
1787             head = self.head.index.load(Ordering::Acquire);
1788             block = self.head.block.load(Ordering::Acquire);
1789 
1790             // Calculate the offset of the index into the block.
1791             offset = (head >> SHIFT) % LAP;
1792 
1793             // If we reached the end of the block, wait until the next one is installed.
1794             if offset == BLOCK_CAP {
1795                 backoff.snooze();
1796             } else {
1797                 break;
1798             }
1799         }
1800 
1801         let mut new_head = head;
1802         let advance;
1803 
1804         if new_head & HAS_NEXT == 0 {
1805             atomic::fence(Ordering::SeqCst);
1806             let tail = self.tail.index.load(Ordering::Relaxed);
1807 
1808             // If the tail equals the head, that means the queue is empty.
1809             if head >> SHIFT == tail >> SHIFT {
1810                 return Steal::Empty;
1811             }
1812 
1813             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1814             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1815                 new_head |= HAS_NEXT;
1816                 // We can steal all tasks till the end of the block.
1817                 advance = (BLOCK_CAP - offset).min(limit);
1818             } else {
1819                 let len = (tail - head) >> SHIFT;
1820                 // Steal half of the available tasks.
1821                 advance = ((len + 1) / 2).min(limit);
1822             }
1823         } else {
1824             // We can steal all tasks till the end of the block.
1825             advance = (BLOCK_CAP - offset).min(limit);
1826         }
1827 
1828         new_head += advance << SHIFT;
1829         let new_offset = offset + advance;
1830 
1831         // Try moving the head index forward.
1832         if self
1833             .head
1834             .index
1835             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1836             .is_err()
1837         {
1838             return Steal::Retry;
1839         }
1840 
1841         // Reserve capacity for the stolen batch.
1842         let batch_size = new_offset - offset - 1;
1843         dest.reserve(batch_size);
1844 
1845         // Get the destination buffer and back index.
1846         let dest_buffer = dest.buffer.get();
1847         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1848 
1849         unsafe {
1850             // If we've reached the end of the block, move to the next one.
1851             if new_offset == BLOCK_CAP {
1852                 let next = (*block).wait_next();
1853                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1854                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1855                     next_index |= HAS_NEXT;
1856                 }
1857 
1858                 self.head.block.store(next, Ordering::Release);
1859                 self.head.index.store(next_index, Ordering::Release);
1860             }
1861 
1862             // Read the task.
1863             let slot = (*block).slots.get_unchecked(offset);
1864             slot.wait_write();
1865             let task = slot.task.get().read();
1866 
1867             match dest.flavor {
1868                 Flavor::Fifo => {
1869                     // Copy values from the injector into the destination queue.
1870                     for i in 0..batch_size {
1871                         // Read the task.
1872                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1873                         slot.wait_write();
1874                         let task = slot.task.get().read();
1875 
1876                         // Write it into the destination queue.
1877                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1878                     }
1879                 }
1880 
1881                 Flavor::Lifo => {
1882                     // Copy values from the injector into the destination queue.
1883                     for i in 0..batch_size {
1884                         // Read the task.
1885                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1886                         slot.wait_write();
1887                         let task = slot.task.get().read();
1888 
1889                         // Write it into the destination queue.
1890                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1891                     }
1892                 }
1893             }
1894 
1895             atomic::fence(Ordering::Release);
1896 
1897             // Update the back index in the destination queue.
1898             //
1899             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1900             // data races because it doesn't understand fences.
1901             dest.inner
1902                 .back
1903                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1904 
1905             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1906             // but couldn't because we were busy reading from the slot.
1907             if new_offset == BLOCK_CAP {
1908                 Block::destroy(block, offset);
1909             } else {
1910                 for i in offset..new_offset {
1911                     let slot = (*block).slots.get_unchecked(i);
1912 
1913                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1914                         Block::destroy(block, offset);
1915                         break;
1916                     }
1917                 }
1918             }
1919 
1920             Steal::Success(task.assume_init())
1921         }
1922     }
1923 
1924     /// Returns `true` if the queue is empty.
1925     ///
1926     /// # Examples
1927     ///
1928     /// ```
1929     /// use crossbeam_deque::Injector;
1930     ///
1931     /// let q = Injector::new();
1932     ///
1933     /// assert!(q.is_empty());
1934     /// q.push(1);
1935     /// assert!(!q.is_empty());
1936     /// ```
is_empty(&self) -> bool1937     pub fn is_empty(&self) -> bool {
1938         let head = self.head.index.load(Ordering::SeqCst);
1939         let tail = self.tail.index.load(Ordering::SeqCst);
1940         head >> SHIFT == tail >> SHIFT
1941     }
1942 
1943     /// Returns the number of tasks in the queue.
1944     ///
1945     /// # Examples
1946     ///
1947     /// ```
1948     /// use crossbeam_deque::Injector;
1949     ///
1950     /// let q = Injector::new();
1951     ///
1952     /// assert_eq!(q.len(), 0);
1953     /// q.push(1);
1954     /// assert_eq!(q.len(), 1);
1955     /// q.push(1);
1956     /// assert_eq!(q.len(), 2);
1957     /// ```
len(&self) -> usize1958     pub fn len(&self) -> usize {
1959         loop {
1960             // Load the tail index, then load the head index.
1961             let mut tail = self.tail.index.load(Ordering::SeqCst);
1962             let mut head = self.head.index.load(Ordering::SeqCst);
1963 
1964             // If the tail index didn't change, we've got consistent indices to work with.
1965             if self.tail.index.load(Ordering::SeqCst) == tail {
1966                 // Erase the lower bits.
1967                 tail &= !((1 << SHIFT) - 1);
1968                 head &= !((1 << SHIFT) - 1);
1969 
1970                 // Fix up indices if they fall onto block ends.
1971                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1972                     tail = tail.wrapping_add(1 << SHIFT);
1973                 }
1974                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1975                     head = head.wrapping_add(1 << SHIFT);
1976                 }
1977 
1978                 // Rotate indices so that head falls into the first block.
1979                 let lap = (head >> SHIFT) / LAP;
1980                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1981                 head = head.wrapping_sub((lap * LAP) << SHIFT);
1982 
1983                 // Remove the lower bits.
1984                 tail >>= SHIFT;
1985                 head >>= SHIFT;
1986 
1987                 // Return the difference minus the number of blocks between tail and head.
1988                 return tail - head - tail / LAP;
1989             }
1990         }
1991     }
1992 }
1993 
1994 impl<T> Drop for Injector<T> {
drop(&mut self)1995     fn drop(&mut self) {
1996         let mut head = *self.head.index.get_mut();
1997         let mut tail = *self.tail.index.get_mut();
1998         let mut block = *self.head.block.get_mut();
1999 
2000         // Erase the lower bits.
2001         head &= !((1 << SHIFT) - 1);
2002         tail &= !((1 << SHIFT) - 1);
2003 
2004         unsafe {
2005             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2006             while head != tail {
2007                 let offset = (head >> SHIFT) % LAP;
2008 
2009                 if offset < BLOCK_CAP {
2010                     // Drop the task in the slot.
2011                     let slot = (*block).slots.get_unchecked(offset);
2012                     (*slot.task.get()).assume_init_drop();
2013                 } else {
2014                     // Deallocate the block and move to the next one.
2015                     let next = *(*block).next.get_mut();
2016                     drop(Box::from_raw(block));
2017                     block = next;
2018                 }
2019 
2020                 head = head.wrapping_add(1 << SHIFT);
2021             }
2022 
2023             // Deallocate the last remaining block.
2024             drop(Box::from_raw(block));
2025         }
2026     }
2027 }
2028 
2029 impl<T> fmt::Debug for Injector<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2030     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2031         f.pad("Worker { .. }")
2032     }
2033 }
2034 
2035 /// Possible outcomes of a steal operation.
2036 ///
2037 /// # Examples
2038 ///
2039 /// There are lots of ways to chain results of steal operations together:
2040 ///
2041 /// ```
2042 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2043 ///
2044 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2045 ///
2046 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2047 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2048 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2049 ///
2050 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2051 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2052 /// ```
2053 #[must_use]
2054 #[derive(PartialEq, Eq, Copy, Clone)]
2055 pub enum Steal<T> {
2056     /// The queue was empty at the time of stealing.
2057     Empty,
2058 
2059     /// At least one task was successfully stolen.
2060     Success(T),
2061 
2062     /// The steal operation needs to be retried.
2063     Retry,
2064 }
2065 
2066 impl<T> Steal<T> {
2067     /// Returns `true` if the queue was empty at the time of stealing.
2068     ///
2069     /// # Examples
2070     ///
2071     /// ```
2072     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2073     ///
2074     /// assert!(!Success(7).is_empty());
2075     /// assert!(!Retry::<i32>.is_empty());
2076     ///
2077     /// assert!(Empty::<i32>.is_empty());
2078     /// ```
is_empty(&self) -> bool2079     pub fn is_empty(&self) -> bool {
2080         match self {
2081             Steal::Empty => true,
2082             _ => false,
2083         }
2084     }
2085 
2086     /// Returns `true` if at least one task was stolen.
2087     ///
2088     /// # Examples
2089     ///
2090     /// ```
2091     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2092     ///
2093     /// assert!(!Empty::<i32>.is_success());
2094     /// assert!(!Retry::<i32>.is_success());
2095     ///
2096     /// assert!(Success(7).is_success());
2097     /// ```
is_success(&self) -> bool2098     pub fn is_success(&self) -> bool {
2099         match self {
2100             Steal::Success(_) => true,
2101             _ => false,
2102         }
2103     }
2104 
2105     /// Returns `true` if the steal operation needs to be retried.
2106     ///
2107     /// # Examples
2108     ///
2109     /// ```
2110     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2111     ///
2112     /// assert!(!Empty::<i32>.is_retry());
2113     /// assert!(!Success(7).is_retry());
2114     ///
2115     /// assert!(Retry::<i32>.is_retry());
2116     /// ```
is_retry(&self) -> bool2117     pub fn is_retry(&self) -> bool {
2118         match self {
2119             Steal::Retry => true,
2120             _ => false,
2121         }
2122     }
2123 
2124     /// Returns the result of the operation, if successful.
2125     ///
2126     /// # Examples
2127     ///
2128     /// ```
2129     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2130     ///
2131     /// assert_eq!(Empty::<i32>.success(), None);
2132     /// assert_eq!(Retry::<i32>.success(), None);
2133     ///
2134     /// assert_eq!(Success(7).success(), Some(7));
2135     /// ```
success(self) -> Option<T>2136     pub fn success(self) -> Option<T> {
2137         match self {
2138             Steal::Success(res) => Some(res),
2139             _ => None,
2140         }
2141     }
2142 
2143     /// If no task was stolen, attempts another steal operation.
2144     ///
2145     /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2146     ///
2147     /// * If the second steal resulted in `Success`, it is returned.
2148     /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2149     /// * If both resulted in `None`, then `None` is returned.
2150     ///
2151     /// # Examples
2152     ///
2153     /// ```
2154     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2155     ///
2156     /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2157     /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2158     ///
2159     /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2160     /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2161     ///
2162     /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2163     /// ```
or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2164     pub fn or_else<F>(self, f: F) -> Steal<T>
2165     where
2166         F: FnOnce() -> Steal<T>,
2167     {
2168         match self {
2169             Steal::Empty => f(),
2170             Steal::Success(_) => self,
2171             Steal::Retry => {
2172                 if let Steal::Success(res) = f() {
2173                     Steal::Success(res)
2174                 } else {
2175                     Steal::Retry
2176                 }
2177             }
2178         }
2179     }
2180 }
2181 
2182 impl<T> fmt::Debug for Steal<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2183     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184         match self {
2185             Steal::Empty => f.pad("Empty"),
2186             Steal::Success(_) => f.pad("Success(..)"),
2187             Steal::Retry => f.pad("Retry"),
2188         }
2189     }
2190 }
2191 
2192 impl<T> FromIterator<Steal<T>> for Steal<T> {
2193     /// Consumes items until a `Success` is found and returns it.
2194     ///
2195     /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2196     /// Otherwise, `Empty` is returned.
from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2197     fn from_iter<I>(iter: I) -> Steal<T>
2198     where
2199         I: IntoIterator<Item = Steal<T>>,
2200     {
2201         let mut retry = false;
2202         for s in iter {
2203             match &s {
2204                 Steal::Empty => {}
2205                 Steal::Success(_) => return s,
2206                 Steal::Retry => retry = true,
2207             }
2208         }
2209 
2210         if retry {
2211             Steal::Retry
2212         } else {
2213             Steal::Empty
2214         }
2215     }
2216 }
2217