• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::cell::{Cell, UnsafeCell};
2 use std::cmp;
3 use std::fmt;
4 use std::iter::FromIterator;
5 use std::marker::PhantomData;
6 use std::mem::{self, ManuallyDrop, MaybeUninit};
7 use std::ptr;
8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9 use std::sync::Arc;
10 
11 use crate::epoch::{self, Atomic, Owned};
12 use crate::utils::{Backoff, CachePadded};
13 
14 // Minimum buffer capacity.
15 const MIN_CAP: usize = 64;
16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17 const MAX_BATCH: usize = 32;
18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19 // deallocated as soon as possible.
20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21 
22 /// A buffer that holds tasks in a worker queue.
23 ///
24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25 /// *not* deallocate the buffer.
26 struct Buffer<T> {
27     /// Pointer to the allocated memory.
28     ptr: *mut T,
29 
30     /// Capacity of the buffer. Always a power of two.
31     cap: usize,
32 }
33 
34 unsafe impl<T> Send for Buffer<T> {}
35 
36 impl<T> Buffer<T> {
37     /// Allocates a new buffer with the specified capacity.
alloc(cap: usize) -> Buffer<T>38     fn alloc(cap: usize) -> Buffer<T> {
39         debug_assert_eq!(cap, cap.next_power_of_two());
40 
41         let mut v = ManuallyDrop::new(Vec::with_capacity(cap));
42         let ptr = v.as_mut_ptr();
43 
44         Buffer { ptr, cap }
45     }
46 
47     /// Deallocates the buffer.
dealloc(self)48     unsafe fn dealloc(self) {
49         drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
50     }
51 
52     /// Returns a pointer to the task at the specified `index`.
at(&self, index: isize) -> *mut T53     unsafe fn at(&self, index: isize) -> *mut T {
54         // `self.cap` is always a power of two.
55         // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
56         // don't actually have the right to access this memory.
57         self.ptr.offset(index & (self.cap - 1) as isize)
58     }
59 
60     /// Writes `task` into the specified `index`.
61     ///
62     /// This method might be concurrently called with another `read` at the same index, which is
63     /// technically speaking a data race and therefore UB. We should use an atomic store here, but
64     /// that would be more expensive and difficult to implement generically for all types `T`.
65     /// Hence, as a hack, we use a volatile write instead.
write(&self, index: isize, task: MaybeUninit<T>)66     unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
67         ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
68     }
69 
70     /// Reads a task from the specified `index`.
71     ///
72     /// This method might be concurrently called with another `write` at the same index, which is
73     /// technically speaking a data race and therefore UB. We should use an atomic load here, but
74     /// that would be more expensive and difficult to implement generically for all types `T`.
75     /// Hence, as a hack, we use a volatile load instead.
read(&self, index: isize) -> MaybeUninit<T>76     unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
77         ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
78     }
79 }
80 
81 impl<T> Clone for Buffer<T> {
clone(&self) -> Buffer<T>82     fn clone(&self) -> Buffer<T> {
83         Buffer {
84             ptr: self.ptr,
85             cap: self.cap,
86         }
87     }
88 }
89 
90 impl<T> Copy for Buffer<T> {}
91 
92 /// Internal queue data shared between the worker and stealers.
93 ///
94 /// The implementation is based on the following work:
95 ///
96 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
97 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
98 ///    PPoPP 2013.][weak-mem]
99 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
100 ///    atomics. OOPSLA 2013.][checker]
101 ///
102 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
103 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
104 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514
105 struct Inner<T> {
106     /// The front index.
107     front: AtomicIsize,
108 
109     /// The back index.
110     back: AtomicIsize,
111 
112     /// The underlying buffer.
113     buffer: CachePadded<Atomic<Buffer<T>>>,
114 }
115 
116 impl<T> Drop for Inner<T> {
drop(&mut self)117     fn drop(&mut self) {
118         // Load the back index, front index, and buffer.
119         let b = *self.back.get_mut();
120         let f = *self.front.get_mut();
121 
122         unsafe {
123             let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
124 
125             // Go through the buffer from front to back and drop all tasks in the queue.
126             let mut i = f;
127             while i != b {
128                 buffer.deref().at(i).drop_in_place();
129                 i = i.wrapping_add(1);
130             }
131 
132             // Free the memory allocated by the buffer.
133             buffer.into_owned().into_box().dealloc();
134         }
135     }
136 }
137 
138 /// Worker queue flavor: FIFO or LIFO.
139 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
140 enum Flavor {
141     /// The first-in first-out flavor.
142     Fifo,
143 
144     /// The last-in first-out flavor.
145     Lifo,
146 }
147 
148 /// A worker queue.
149 ///
150 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
151 /// tasks from it. Task schedulers typically create a single worker queue per thread.
152 ///
153 /// # Examples
154 ///
155 /// A FIFO worker:
156 ///
157 /// ```
158 /// use crossbeam_deque::{Steal, Worker};
159 ///
160 /// let w = Worker::new_fifo();
161 /// let s = w.stealer();
162 ///
163 /// w.push(1);
164 /// w.push(2);
165 /// w.push(3);
166 ///
167 /// assert_eq!(s.steal(), Steal::Success(1));
168 /// assert_eq!(w.pop(), Some(2));
169 /// assert_eq!(w.pop(), Some(3));
170 /// ```
171 ///
172 /// A LIFO worker:
173 ///
174 /// ```
175 /// use crossbeam_deque::{Steal, Worker};
176 ///
177 /// let w = Worker::new_lifo();
178 /// let s = w.stealer();
179 ///
180 /// w.push(1);
181 /// w.push(2);
182 /// w.push(3);
183 ///
184 /// assert_eq!(s.steal(), Steal::Success(1));
185 /// assert_eq!(w.pop(), Some(3));
186 /// assert_eq!(w.pop(), Some(2));
187 /// ```
188 pub struct Worker<T> {
189     /// A reference to the inner representation of the queue.
190     inner: Arc<CachePadded<Inner<T>>>,
191 
192     /// A copy of `inner.buffer` for quick access.
193     buffer: Cell<Buffer<T>>,
194 
195     /// The flavor of the queue.
196     flavor: Flavor,
197 
198     /// Indicates that the worker cannot be shared among threads.
199     _marker: PhantomData<*mut ()>, // !Send + !Sync
200 }
201 
202 unsafe impl<T: Send> Send for Worker<T> {}
203 
204 impl<T> Worker<T> {
205     /// Creates a FIFO worker queue.
206     ///
207     /// Tasks are pushed and popped from opposite ends.
208     ///
209     /// # Examples
210     ///
211     /// ```
212     /// use crossbeam_deque::Worker;
213     ///
214     /// let w = Worker::<i32>::new_fifo();
215     /// ```
new_fifo() -> Worker<T>216     pub fn new_fifo() -> Worker<T> {
217         let buffer = Buffer::alloc(MIN_CAP);
218 
219         let inner = Arc::new(CachePadded::new(Inner {
220             front: AtomicIsize::new(0),
221             back: AtomicIsize::new(0),
222             buffer: CachePadded::new(Atomic::new(buffer)),
223         }));
224 
225         Worker {
226             inner,
227             buffer: Cell::new(buffer),
228             flavor: Flavor::Fifo,
229             _marker: PhantomData,
230         }
231     }
232 
233     /// Creates a LIFO worker queue.
234     ///
235     /// Tasks are pushed and popped from the same end.
236     ///
237     /// # Examples
238     ///
239     /// ```
240     /// use crossbeam_deque::Worker;
241     ///
242     /// let w = Worker::<i32>::new_lifo();
243     /// ```
new_lifo() -> Worker<T>244     pub fn new_lifo() -> Worker<T> {
245         let buffer = Buffer::alloc(MIN_CAP);
246 
247         let inner = Arc::new(CachePadded::new(Inner {
248             front: AtomicIsize::new(0),
249             back: AtomicIsize::new(0),
250             buffer: CachePadded::new(Atomic::new(buffer)),
251         }));
252 
253         Worker {
254             inner,
255             buffer: Cell::new(buffer),
256             flavor: Flavor::Lifo,
257             _marker: PhantomData,
258         }
259     }
260 
261     /// Creates a stealer for this queue.
262     ///
263     /// The returned stealer can be shared among threads and cloned.
264     ///
265     /// # Examples
266     ///
267     /// ```
268     /// use crossbeam_deque::Worker;
269     ///
270     /// let w = Worker::<i32>::new_lifo();
271     /// let s = w.stealer();
272     /// ```
stealer(&self) -> Stealer<T>273     pub fn stealer(&self) -> Stealer<T> {
274         Stealer {
275             inner: self.inner.clone(),
276             flavor: self.flavor,
277         }
278     }
279 
280     /// Resizes the internal buffer to the new capacity of `new_cap`.
281     #[cold]
resize(&self, new_cap: usize)282     unsafe fn resize(&self, new_cap: usize) {
283         // Load the back index, front index, and buffer.
284         let b = self.inner.back.load(Ordering::Relaxed);
285         let f = self.inner.front.load(Ordering::Relaxed);
286         let buffer = self.buffer.get();
287 
288         // Allocate a new buffer and copy data from the old buffer to the new one.
289         let new = Buffer::alloc(new_cap);
290         let mut i = f;
291         while i != b {
292             ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
293             i = i.wrapping_add(1);
294         }
295 
296         let guard = &epoch::pin();
297 
298         // Replace the old buffer with the new one.
299         self.buffer.replace(new);
300         let old =
301             self.inner
302                 .buffer
303                 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
304 
305         // Destroy the old buffer later.
306         guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
307 
308         // If the buffer is very large, then flush the thread-local garbage in order to deallocate
309         // it as soon as possible.
310         if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
311             guard.flush();
312         }
313     }
314 
315     /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
316     /// buffer.
reserve(&self, reserve_cap: usize)317     fn reserve(&self, reserve_cap: usize) {
318         if reserve_cap > 0 {
319             // Compute the current length.
320             let b = self.inner.back.load(Ordering::Relaxed);
321             let f = self.inner.front.load(Ordering::SeqCst);
322             let len = b.wrapping_sub(f) as usize;
323 
324             // The current capacity.
325             let cap = self.buffer.get().cap;
326 
327             // Is there enough capacity to push `reserve_cap` tasks?
328             if cap - len < reserve_cap {
329                 // Keep doubling the capacity as much as is needed.
330                 let mut new_cap = cap * 2;
331                 while new_cap - len < reserve_cap {
332                     new_cap *= 2;
333                 }
334 
335                 // Resize the buffer.
336                 unsafe {
337                     self.resize(new_cap);
338                 }
339             }
340         }
341     }
342 
343     /// Returns `true` if the queue is empty.
344     ///
345     /// ```
346     /// use crossbeam_deque::Worker;
347     ///
348     /// let w = Worker::new_lifo();
349     ///
350     /// assert!(w.is_empty());
351     /// w.push(1);
352     /// assert!(!w.is_empty());
353     /// ```
is_empty(&self) -> bool354     pub fn is_empty(&self) -> bool {
355         let b = self.inner.back.load(Ordering::Relaxed);
356         let f = self.inner.front.load(Ordering::SeqCst);
357         b.wrapping_sub(f) <= 0
358     }
359 
360     /// Returns the number of tasks in the deque.
361     ///
362     /// ```
363     /// use crossbeam_deque::Worker;
364     ///
365     /// let w = Worker::new_lifo();
366     ///
367     /// assert_eq!(w.len(), 0);
368     /// w.push(1);
369     /// assert_eq!(w.len(), 1);
370     /// w.push(1);
371     /// assert_eq!(w.len(), 2);
372     /// ```
len(&self) -> usize373     pub fn len(&self) -> usize {
374         let b = self.inner.back.load(Ordering::Relaxed);
375         let f = self.inner.front.load(Ordering::SeqCst);
376         b.wrapping_sub(f).max(0) as usize
377     }
378 
379     /// Pushes a task into the queue.
380     ///
381     /// # Examples
382     ///
383     /// ```
384     /// use crossbeam_deque::Worker;
385     ///
386     /// let w = Worker::new_lifo();
387     /// w.push(1);
388     /// w.push(2);
389     /// ```
push(&self, task: T)390     pub fn push(&self, task: T) {
391         // Load the back index, front index, and buffer.
392         let b = self.inner.back.load(Ordering::Relaxed);
393         let f = self.inner.front.load(Ordering::Acquire);
394         let mut buffer = self.buffer.get();
395 
396         // Calculate the length of the queue.
397         let len = b.wrapping_sub(f);
398 
399         // Is the queue full?
400         if len >= buffer.cap as isize {
401             // Yes. Grow the underlying buffer.
402             unsafe {
403                 self.resize(2 * buffer.cap);
404             }
405             buffer = self.buffer.get();
406         }
407 
408         // Write `task` into the slot.
409         unsafe {
410             buffer.write(b, MaybeUninit::new(task));
411         }
412 
413         atomic::fence(Ordering::Release);
414 
415         // Increment the back index.
416         //
417         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
418         // races because it doesn't understand fences.
419         self.inner.back.store(b.wrapping_add(1), Ordering::Release);
420     }
421 
422     /// Pops a task from the queue.
423     ///
424     /// # Examples
425     ///
426     /// ```
427     /// use crossbeam_deque::Worker;
428     ///
429     /// let w = Worker::new_fifo();
430     /// w.push(1);
431     /// w.push(2);
432     ///
433     /// assert_eq!(w.pop(), Some(1));
434     /// assert_eq!(w.pop(), Some(2));
435     /// assert_eq!(w.pop(), None);
436     /// ```
pop(&self) -> Option<T>437     pub fn pop(&self) -> Option<T> {
438         // Load the back and front index.
439         let b = self.inner.back.load(Ordering::Relaxed);
440         let f = self.inner.front.load(Ordering::Relaxed);
441 
442         // Calculate the length of the queue.
443         let len = b.wrapping_sub(f);
444 
445         // Is the queue empty?
446         if len <= 0 {
447             return None;
448         }
449 
450         match self.flavor {
451             // Pop from the front of the queue.
452             Flavor::Fifo => {
453                 // Try incrementing the front index to pop the task.
454                 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
455                 let new_f = f.wrapping_add(1);
456 
457                 if b.wrapping_sub(new_f) < 0 {
458                     self.inner.front.store(f, Ordering::Relaxed);
459                     return None;
460                 }
461 
462                 unsafe {
463                     // Read the popped task.
464                     let buffer = self.buffer.get();
465                     let task = buffer.read(f).assume_init();
466 
467                     // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
468                     if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
469                         self.resize(buffer.cap / 2);
470                     }
471 
472                     Some(task)
473                 }
474             }
475 
476             // Pop from the back of the queue.
477             Flavor::Lifo => {
478                 // Decrement the back index.
479                 let b = b.wrapping_sub(1);
480                 self.inner.back.store(b, Ordering::Relaxed);
481 
482                 atomic::fence(Ordering::SeqCst);
483 
484                 // Load the front index.
485                 let f = self.inner.front.load(Ordering::Relaxed);
486 
487                 // Compute the length after the back index was decremented.
488                 let len = b.wrapping_sub(f);
489 
490                 if len < 0 {
491                     // The queue is empty. Restore the back index to the original task.
492                     self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
493                     None
494                 } else {
495                     // Read the task to be popped.
496                     let buffer = self.buffer.get();
497                     let mut task = unsafe { Some(buffer.read(b)) };
498 
499                     // Are we popping the last task from the queue?
500                     if len == 0 {
501                         // Try incrementing the front index.
502                         if self
503                             .inner
504                             .front
505                             .compare_exchange(
506                                 f,
507                                 f.wrapping_add(1),
508                                 Ordering::SeqCst,
509                                 Ordering::Relaxed,
510                             )
511                             .is_err()
512                         {
513                             // Failed. We didn't pop anything. Reset to `None`.
514                             task.take();
515                         }
516 
517                         // Restore the back index to the original task.
518                         self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
519                     } else {
520                         // Shrink the buffer if `len` is less than one fourth of the capacity.
521                         if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
522                             unsafe {
523                                 self.resize(buffer.cap / 2);
524                             }
525                         }
526                     }
527 
528                     task.map(|t| unsafe { t.assume_init() })
529                 }
530             }
531         }
532     }
533 }
534 
535 impl<T> fmt::Debug for Worker<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result536     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537         f.pad("Worker { .. }")
538     }
539 }
540 
541 /// A stealer handle of a worker queue.
542 ///
543 /// Stealers can be shared among threads.
544 ///
545 /// Task schedulers typically have a single worker queue per worker thread.
546 ///
547 /// # Examples
548 ///
549 /// ```
550 /// use crossbeam_deque::{Steal, Worker};
551 ///
552 /// let w = Worker::new_lifo();
553 /// w.push(1);
554 /// w.push(2);
555 ///
556 /// let s = w.stealer();
557 /// assert_eq!(s.steal(), Steal::Success(1));
558 /// assert_eq!(s.steal(), Steal::Success(2));
559 /// assert_eq!(s.steal(), Steal::Empty);
560 /// ```
561 pub struct Stealer<T> {
562     /// A reference to the inner representation of the queue.
563     inner: Arc<CachePadded<Inner<T>>>,
564 
565     /// The flavor of the queue.
566     flavor: Flavor,
567 }
568 
569 unsafe impl<T: Send> Send for Stealer<T> {}
570 unsafe impl<T: Send> Sync for Stealer<T> {}
571 
572 impl<T> Stealer<T> {
573     /// Returns `true` if the queue is empty.
574     ///
575     /// ```
576     /// use crossbeam_deque::Worker;
577     ///
578     /// let w = Worker::new_lifo();
579     /// let s = w.stealer();
580     ///
581     /// assert!(s.is_empty());
582     /// w.push(1);
583     /// assert!(!s.is_empty());
584     /// ```
is_empty(&self) -> bool585     pub fn is_empty(&self) -> bool {
586         let f = self.inner.front.load(Ordering::Acquire);
587         atomic::fence(Ordering::SeqCst);
588         let b = self.inner.back.load(Ordering::Acquire);
589         b.wrapping_sub(f) <= 0
590     }
591 
592     /// Returns the number of tasks in the deque.
593     ///
594     /// ```
595     /// use crossbeam_deque::Worker;
596     ///
597     /// let w = Worker::new_lifo();
598     /// let s = w.stealer();
599     ///
600     /// assert_eq!(s.len(), 0);
601     /// w.push(1);
602     /// assert_eq!(s.len(), 1);
603     /// w.push(2);
604     /// assert_eq!(s.len(), 2);
605     /// ```
len(&self) -> usize606     pub fn len(&self) -> usize {
607         let f = self.inner.front.load(Ordering::Acquire);
608         atomic::fence(Ordering::SeqCst);
609         let b = self.inner.back.load(Ordering::Acquire);
610         b.wrapping_sub(f).max(0) as usize
611     }
612 
613     /// Steals a task from the queue.
614     ///
615     /// # Examples
616     ///
617     /// ```
618     /// use crossbeam_deque::{Steal, Worker};
619     ///
620     /// let w = Worker::new_lifo();
621     /// w.push(1);
622     /// w.push(2);
623     ///
624     /// let s = w.stealer();
625     /// assert_eq!(s.steal(), Steal::Success(1));
626     /// assert_eq!(s.steal(), Steal::Success(2));
627     /// ```
steal(&self) -> Steal<T>628     pub fn steal(&self) -> Steal<T> {
629         // Load the front index.
630         let f = self.inner.front.load(Ordering::Acquire);
631 
632         // A SeqCst fence is needed here.
633         //
634         // If the current thread is already pinned (reentrantly), we must manually issue the
635         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
636         // have to.
637         if epoch::is_pinned() {
638             atomic::fence(Ordering::SeqCst);
639         }
640 
641         let guard = &epoch::pin();
642 
643         // Load the back index.
644         let b = self.inner.back.load(Ordering::Acquire);
645 
646         // Is the queue empty?
647         if b.wrapping_sub(f) <= 0 {
648             return Steal::Empty;
649         }
650 
651         // Load the buffer and read the task at the front.
652         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
653         let task = unsafe { buffer.deref().read(f) };
654 
655         // Try incrementing the front index to steal the task.
656         // If the buffer has been swapped or the increment fails, we retry.
657         if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
658             || self
659                 .inner
660                 .front
661                 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
662                 .is_err()
663         {
664             // We didn't steal this task, forget it.
665             return Steal::Retry;
666         }
667 
668         // Return the stolen task.
669         Steal::Success(unsafe { task.assume_init() })
670     }
671 
672     /// Steals a batch of tasks and pushes them into another worker.
673     ///
674     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
675     /// steal around half of the tasks in the queue, but also not more than some constant limit.
676     ///
677     /// # Examples
678     ///
679     /// ```
680     /// use crossbeam_deque::Worker;
681     ///
682     /// let w1 = Worker::new_fifo();
683     /// w1.push(1);
684     /// w1.push(2);
685     /// w1.push(3);
686     /// w1.push(4);
687     ///
688     /// let s = w1.stealer();
689     /// let w2 = Worker::new_fifo();
690     ///
691     /// let _ = s.steal_batch(&w2);
692     /// assert_eq!(w2.pop(), Some(1));
693     /// assert_eq!(w2.pop(), Some(2));
694     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>695     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
696         self.steal_batch_with_limit(dest, MAX_BATCH)
697     }
698 
699     /// Steals no more than `limit` of tasks and pushes them into another worker.
700     ///
701     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
702     /// steal around half of the tasks in the queue, but also not more than the given limit.
703     ///
704     /// # Examples
705     ///
706     /// ```
707     /// use crossbeam_deque::Worker;
708     ///
709     /// let w1 = Worker::new_fifo();
710     /// w1.push(1);
711     /// w1.push(2);
712     /// w1.push(3);
713     /// w1.push(4);
714     /// w1.push(5);
715     /// w1.push(6);
716     ///
717     /// let s = w1.stealer();
718     /// let w2 = Worker::new_fifo();
719     ///
720     /// let _ = s.steal_batch_with_limit(&w2, 2);
721     /// assert_eq!(w2.pop(), Some(1));
722     /// assert_eq!(w2.pop(), Some(2));
723     /// assert_eq!(w2.pop(), None);
724     ///
725     /// w1.push(7);
726     /// w1.push(8);
727     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
728     /// // half of the elements are currently popped, but the number of popped elements is considered
729     /// // an implementation detail that may be changed in the future.
730     /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
731     /// assert_eq!(w2.len(), 3);
732     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>733     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
734         assert!(limit > 0);
735         if Arc::ptr_eq(&self.inner, &dest.inner) {
736             if dest.is_empty() {
737                 return Steal::Empty;
738             } else {
739                 return Steal::Success(());
740             }
741         }
742 
743         // Load the front index.
744         let mut f = self.inner.front.load(Ordering::Acquire);
745 
746         // A SeqCst fence is needed here.
747         //
748         // If the current thread is already pinned (reentrantly), we must manually issue the
749         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
750         // have to.
751         if epoch::is_pinned() {
752             atomic::fence(Ordering::SeqCst);
753         }
754 
755         let guard = &epoch::pin();
756 
757         // Load the back index.
758         let b = self.inner.back.load(Ordering::Acquire);
759 
760         // Is the queue empty?
761         let len = b.wrapping_sub(f);
762         if len <= 0 {
763             return Steal::Empty;
764         }
765 
766         // Reserve capacity for the stolen batch.
767         let batch_size = cmp::min((len as usize + 1) / 2, limit);
768         dest.reserve(batch_size);
769         let mut batch_size = batch_size as isize;
770 
771         // Get the destination buffer and back index.
772         let dest_buffer = dest.buffer.get();
773         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
774 
775         // Load the buffer.
776         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
777 
778         match self.flavor {
779             // Steal a batch of tasks from the front at once.
780             Flavor::Fifo => {
781                 // Copy the batch from the source to the destination buffer.
782                 match dest.flavor {
783                     Flavor::Fifo => {
784                         for i in 0..batch_size {
785                             unsafe {
786                                 let task = buffer.deref().read(f.wrapping_add(i));
787                                 dest_buffer.write(dest_b.wrapping_add(i), task);
788                             }
789                         }
790                     }
791                     Flavor::Lifo => {
792                         for i in 0..batch_size {
793                             unsafe {
794                                 let task = buffer.deref().read(f.wrapping_add(i));
795                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
796                             }
797                         }
798                     }
799                 }
800 
801                 // Try incrementing the front index to steal the batch.
802                 // If the buffer has been swapped or the increment fails, we retry.
803                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
804                     || self
805                         .inner
806                         .front
807                         .compare_exchange(
808                             f,
809                             f.wrapping_add(batch_size),
810                             Ordering::SeqCst,
811                             Ordering::Relaxed,
812                         )
813                         .is_err()
814                 {
815                     return Steal::Retry;
816                 }
817 
818                 dest_b = dest_b.wrapping_add(batch_size);
819             }
820 
821             // Steal a batch of tasks from the front one by one.
822             Flavor::Lifo => {
823                 // This loop may modify the batch_size, which triggers a clippy lint warning.
824                 // Use a new variable to avoid the warning, and to make it clear we aren't
825                 // modifying the loop exit condition during iteration.
826                 let original_batch_size = batch_size;
827 
828                 for i in 0..original_batch_size {
829                     // If this is not the first steal, check whether the queue is empty.
830                     if i > 0 {
831                         // We've already got the current front index. Now execute the fence to
832                         // synchronize with other threads.
833                         atomic::fence(Ordering::SeqCst);
834 
835                         // Load the back index.
836                         let b = self.inner.back.load(Ordering::Acquire);
837 
838                         // Is the queue empty?
839                         if b.wrapping_sub(f) <= 0 {
840                             batch_size = i;
841                             break;
842                         }
843                     }
844 
845                     // Read the task at the front.
846                     let task = unsafe { buffer.deref().read(f) };
847 
848                     // Try incrementing the front index to steal the task.
849                     // If the buffer has been swapped or the increment fails, we retry.
850                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
851                         || self
852                             .inner
853                             .front
854                             .compare_exchange(
855                                 f,
856                                 f.wrapping_add(1),
857                                 Ordering::SeqCst,
858                                 Ordering::Relaxed,
859                             )
860                             .is_err()
861                     {
862                         // We didn't steal this task, forget it and break from the loop.
863                         batch_size = i;
864                         break;
865                     }
866 
867                     // Write the stolen task into the destination buffer.
868                     unsafe {
869                         dest_buffer.write(dest_b, task);
870                     }
871 
872                     // Move the source front index and the destination back index one step forward.
873                     f = f.wrapping_add(1);
874                     dest_b = dest_b.wrapping_add(1);
875                 }
876 
877                 // If we didn't steal anything, the operation needs to be retried.
878                 if batch_size == 0 {
879                     return Steal::Retry;
880                 }
881 
882                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
883                 if dest.flavor == Flavor::Fifo {
884                     for i in 0..batch_size / 2 {
885                         unsafe {
886                             let i1 = dest_b.wrapping_sub(batch_size - i);
887                             let i2 = dest_b.wrapping_sub(i + 1);
888                             let t1 = dest_buffer.read(i1);
889                             let t2 = dest_buffer.read(i2);
890                             dest_buffer.write(i1, t2);
891                             dest_buffer.write(i2, t1);
892                         }
893                     }
894                 }
895             }
896         }
897 
898         atomic::fence(Ordering::Release);
899 
900         // Update the back index in the destination queue.
901         //
902         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
903         // races because it doesn't understand fences.
904         dest.inner.back.store(dest_b, Ordering::Release);
905 
906         // Return with success.
907         Steal::Success(())
908     }
909 
910     /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
911     ///
912     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
913     /// steal around half of the tasks in the queue, but also not more than some constant limit.
914     ///
915     /// # Examples
916     ///
917     /// ```
918     /// use crossbeam_deque::{Steal, Worker};
919     ///
920     /// let w1 = Worker::new_fifo();
921     /// w1.push(1);
922     /// w1.push(2);
923     /// w1.push(3);
924     /// w1.push(4);
925     ///
926     /// let s = w1.stealer();
927     /// let w2 = Worker::new_fifo();
928     ///
929     /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
930     /// assert_eq!(w2.pop(), Some(2));
931     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>932     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
933         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
934     }
935 
936     /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
937     /// that worker.
938     ///
939     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
940     /// steal around half of the tasks in the queue, but also not more than the given limit.
941     ///
942     /// # Examples
943     ///
944     /// ```
945     /// use crossbeam_deque::{Steal, Worker};
946     ///
947     /// let w1 = Worker::new_fifo();
948     /// w1.push(1);
949     /// w1.push(2);
950     /// w1.push(3);
951     /// w1.push(4);
952     /// w1.push(5);
953     /// w1.push(6);
954     ///
955     /// let s = w1.stealer();
956     /// let w2 = Worker::new_fifo();
957     ///
958     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
959     /// assert_eq!(w2.pop(), Some(2));
960     /// assert_eq!(w2.pop(), None);
961     ///
962     /// w1.push(7);
963     /// w1.push(8);
964     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
965     /// // half of the elements are currently popped, but the number of popped elements is considered
966     /// // an implementation detail that may be changed in the future.
967     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
968     /// assert_eq!(w2.pop(), Some(4));
969     /// assert_eq!(w2.pop(), Some(5));
970     /// assert_eq!(w2.pop(), None);
971     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>972     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
973         assert!(limit > 0);
974         if Arc::ptr_eq(&self.inner, &dest.inner) {
975             match dest.pop() {
976                 None => return Steal::Empty,
977                 Some(task) => return Steal::Success(task),
978             }
979         }
980 
981         // Load the front index.
982         let mut f = self.inner.front.load(Ordering::Acquire);
983 
984         // A SeqCst fence is needed here.
985         //
986         // If the current thread is already pinned (reentrantly), we must manually issue the
987         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
988         // have to.
989         if epoch::is_pinned() {
990             atomic::fence(Ordering::SeqCst);
991         }
992 
993         let guard = &epoch::pin();
994 
995         // Load the back index.
996         let b = self.inner.back.load(Ordering::Acquire);
997 
998         // Is the queue empty?
999         let len = b.wrapping_sub(f);
1000         if len <= 0 {
1001             return Steal::Empty;
1002         }
1003 
1004         // Reserve capacity for the stolen batch.
1005         let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1006         dest.reserve(batch_size);
1007         let mut batch_size = batch_size as isize;
1008 
1009         // Get the destination buffer and back index.
1010         let dest_buffer = dest.buffer.get();
1011         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1012 
1013         // Load the buffer
1014         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1015 
1016         // Read the task at the front.
1017         let mut task = unsafe { buffer.deref().read(f) };
1018 
1019         match self.flavor {
1020             // Steal a batch of tasks from the front at once.
1021             Flavor::Fifo => {
1022                 // Copy the batch from the source to the destination buffer.
1023                 match dest.flavor {
1024                     Flavor::Fifo => {
1025                         for i in 0..batch_size {
1026                             unsafe {
1027                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1028                                 dest_buffer.write(dest_b.wrapping_add(i), task);
1029                             }
1030                         }
1031                     }
1032                     Flavor::Lifo => {
1033                         for i in 0..batch_size {
1034                             unsafe {
1035                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1036                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1037                             }
1038                         }
1039                     }
1040                 }
1041 
1042                 // Try incrementing the front index to steal the task.
1043                 // If the buffer has been swapped or the increment fails, we retry.
1044                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1045                     || self
1046                         .inner
1047                         .front
1048                         .compare_exchange(
1049                             f,
1050                             f.wrapping_add(batch_size + 1),
1051                             Ordering::SeqCst,
1052                             Ordering::Relaxed,
1053                         )
1054                         .is_err()
1055                 {
1056                     // We didn't steal this task, forget it.
1057                     return Steal::Retry;
1058                 }
1059 
1060                 dest_b = dest_b.wrapping_add(batch_size);
1061             }
1062 
1063             // Steal a batch of tasks from the front one by one.
1064             Flavor::Lifo => {
1065                 // Try incrementing the front index to steal the task.
1066                 if self
1067                     .inner
1068                     .front
1069                     .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1070                     .is_err()
1071                 {
1072                     // We didn't steal this task, forget it.
1073                     return Steal::Retry;
1074                 }
1075 
1076                 // Move the front index one step forward.
1077                 f = f.wrapping_add(1);
1078 
1079                 // Repeat the same procedure for the batch steals.
1080                 //
1081                 // This loop may modify the batch_size, which triggers a clippy lint warning.
1082                 // Use a new variable to avoid the warning, and to make it clear we aren't
1083                 // modifying the loop exit condition during iteration.
1084                 let original_batch_size = batch_size;
1085                 for i in 0..original_batch_size {
1086                     // We've already got the current front index. Now execute the fence to
1087                     // synchronize with other threads.
1088                     atomic::fence(Ordering::SeqCst);
1089 
1090                     // Load the back index.
1091                     let b = self.inner.back.load(Ordering::Acquire);
1092 
1093                     // Is the queue empty?
1094                     if b.wrapping_sub(f) <= 0 {
1095                         batch_size = i;
1096                         break;
1097                     }
1098 
1099                     // Read the task at the front.
1100                     let tmp = unsafe { buffer.deref().read(f) };
1101 
1102                     // Try incrementing the front index to steal the task.
1103                     // If the buffer has been swapped or the increment fails, we retry.
1104                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1105                         || self
1106                             .inner
1107                             .front
1108                             .compare_exchange(
1109                                 f,
1110                                 f.wrapping_add(1),
1111                                 Ordering::SeqCst,
1112                                 Ordering::Relaxed,
1113                             )
1114                             .is_err()
1115                     {
1116                         // We didn't steal this task, forget it and break from the loop.
1117                         batch_size = i;
1118                         break;
1119                     }
1120 
1121                     // Write the previously stolen task into the destination buffer.
1122                     unsafe {
1123                         dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1124                     }
1125 
1126                     // Move the source front index and the destination back index one step forward.
1127                     f = f.wrapping_add(1);
1128                     dest_b = dest_b.wrapping_add(1);
1129                 }
1130 
1131                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1132                 if dest.flavor == Flavor::Fifo {
1133                     for i in 0..batch_size / 2 {
1134                         unsafe {
1135                             let i1 = dest_b.wrapping_sub(batch_size - i);
1136                             let i2 = dest_b.wrapping_sub(i + 1);
1137                             let t1 = dest_buffer.read(i1);
1138                             let t2 = dest_buffer.read(i2);
1139                             dest_buffer.write(i1, t2);
1140                             dest_buffer.write(i2, t1);
1141                         }
1142                     }
1143                 }
1144             }
1145         }
1146 
1147         atomic::fence(Ordering::Release);
1148 
1149         // Update the back index in the destination queue.
1150         //
1151         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1152         // races because it doesn't understand fences.
1153         dest.inner.back.store(dest_b, Ordering::Release);
1154 
1155         // Return with success.
1156         Steal::Success(unsafe { task.assume_init() })
1157     }
1158 }
1159 
1160 impl<T> Clone for Stealer<T> {
clone(&self) -> Stealer<T>1161     fn clone(&self) -> Stealer<T> {
1162         Stealer {
1163             inner: self.inner.clone(),
1164             flavor: self.flavor,
1165         }
1166     }
1167 }
1168 
1169 impl<T> fmt::Debug for Stealer<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1170     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1171         f.pad("Stealer { .. }")
1172     }
1173 }
1174 
1175 // Bits indicating the state of a slot:
1176 // * If a task has been written into the slot, `WRITE` is set.
1177 // * If a task has been read from the slot, `READ` is set.
1178 // * If the block is being destroyed, `DESTROY` is set.
1179 const WRITE: usize = 1;
1180 const READ: usize = 2;
1181 const DESTROY: usize = 4;
1182 
1183 // Each block covers one "lap" of indices.
1184 const LAP: usize = 64;
1185 // The maximum number of values a block can hold.
1186 const BLOCK_CAP: usize = LAP - 1;
1187 // How many lower bits are reserved for metadata.
1188 const SHIFT: usize = 1;
1189 // Indicates that the block is not the last one.
1190 const HAS_NEXT: usize = 1;
1191 
1192 /// A slot in a block.
1193 struct Slot<T> {
1194     /// The task.
1195     task: UnsafeCell<MaybeUninit<T>>,
1196 
1197     /// The state of the slot.
1198     state: AtomicUsize,
1199 }
1200 
1201 impl<T> Slot<T> {
1202     const UNINIT: Self = Self {
1203         task: UnsafeCell::new(MaybeUninit::uninit()),
1204         state: AtomicUsize::new(0),
1205     };
1206 
1207     /// Waits until a task is written into the slot.
wait_write(&self)1208     fn wait_write(&self) {
1209         let backoff = Backoff::new();
1210         while self.state.load(Ordering::Acquire) & WRITE == 0 {
1211             backoff.snooze();
1212         }
1213     }
1214 }
1215 
1216 /// A block in a linked list.
1217 ///
1218 /// Each block in the list can hold up to `BLOCK_CAP` values.
1219 struct Block<T> {
1220     /// The next block in the linked list.
1221     next: AtomicPtr<Block<T>>,
1222 
1223     /// Slots for values.
1224     slots: [Slot<T>; BLOCK_CAP],
1225 }
1226 
1227 impl<T> Block<T> {
1228     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>1229     fn new() -> Block<T> {
1230         Self {
1231             next: AtomicPtr::new(ptr::null_mut()),
1232             slots: [Slot::UNINIT; BLOCK_CAP],
1233         }
1234     }
1235 
1236     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>1237     fn wait_next(&self) -> *mut Block<T> {
1238         let backoff = Backoff::new();
1239         loop {
1240             let next = self.next.load(Ordering::Acquire);
1241             if !next.is_null() {
1242                 return next;
1243             }
1244             backoff.snooze();
1245         }
1246     }
1247 
1248     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, count: usize)1249     unsafe fn destroy(this: *mut Block<T>, count: usize) {
1250         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1251         // begun destruction of the block.
1252         for i in (0..count).rev() {
1253             let slot = (*this).slots.get_unchecked(i);
1254 
1255             // Mark the `DESTROY` bit if a thread is still using the slot.
1256             if slot.state.load(Ordering::Acquire) & READ == 0
1257                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1258             {
1259                 // If a thread is still using the slot, it will continue destruction of the block.
1260                 return;
1261             }
1262         }
1263 
1264         // No thread is using the block, now it is safe to destroy it.
1265         drop(Box::from_raw(this));
1266     }
1267 }
1268 
1269 /// A position in a queue.
1270 struct Position<T> {
1271     /// The index in the queue.
1272     index: AtomicUsize,
1273 
1274     /// The block in the linked list.
1275     block: AtomicPtr<Block<T>>,
1276 }
1277 
1278 /// An injector queue.
1279 ///
1280 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1281 /// a single injector queue, which is the entry point for new tasks.
1282 ///
1283 /// # Examples
1284 ///
1285 /// ```
1286 /// use crossbeam_deque::{Injector, Steal};
1287 ///
1288 /// let q = Injector::new();
1289 /// q.push(1);
1290 /// q.push(2);
1291 ///
1292 /// assert_eq!(q.steal(), Steal::Success(1));
1293 /// assert_eq!(q.steal(), Steal::Success(2));
1294 /// assert_eq!(q.steal(), Steal::Empty);
1295 /// ```
1296 pub struct Injector<T> {
1297     /// The head of the queue.
1298     head: CachePadded<Position<T>>,
1299 
1300     /// The tail of the queue.
1301     tail: CachePadded<Position<T>>,
1302 
1303     /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1304     _marker: PhantomData<T>,
1305 }
1306 
1307 unsafe impl<T: Send> Send for Injector<T> {}
1308 unsafe impl<T: Send> Sync for Injector<T> {}
1309 
1310 impl<T> Default for Injector<T> {
default() -> Self1311     fn default() -> Self {
1312         let block = Box::into_raw(Box::new(Block::<T>::new()));
1313         Self {
1314             head: CachePadded::new(Position {
1315                 block: AtomicPtr::new(block),
1316                 index: AtomicUsize::new(0),
1317             }),
1318             tail: CachePadded::new(Position {
1319                 block: AtomicPtr::new(block),
1320                 index: AtomicUsize::new(0),
1321             }),
1322             _marker: PhantomData,
1323         }
1324     }
1325 }
1326 
1327 impl<T> Injector<T> {
1328     /// Creates a new injector queue.
1329     ///
1330     /// # Examples
1331     ///
1332     /// ```
1333     /// use crossbeam_deque::Injector;
1334     ///
1335     /// let q = Injector::<i32>::new();
1336     /// ```
new() -> Injector<T>1337     pub fn new() -> Injector<T> {
1338         Self::default()
1339     }
1340 
1341     /// Pushes a task into the queue.
1342     ///
1343     /// # Examples
1344     ///
1345     /// ```
1346     /// use crossbeam_deque::Injector;
1347     ///
1348     /// let w = Injector::new();
1349     /// w.push(1);
1350     /// w.push(2);
1351     /// ```
push(&self, task: T)1352     pub fn push(&self, task: T) {
1353         let backoff = Backoff::new();
1354         let mut tail = self.tail.index.load(Ordering::Acquire);
1355         let mut block = self.tail.block.load(Ordering::Acquire);
1356         let mut next_block = None;
1357 
1358         loop {
1359             // Calculate the offset of the index into the block.
1360             let offset = (tail >> SHIFT) % LAP;
1361 
1362             // If we reached the end of the block, wait until the next one is installed.
1363             if offset == BLOCK_CAP {
1364                 backoff.snooze();
1365                 tail = self.tail.index.load(Ordering::Acquire);
1366                 block = self.tail.block.load(Ordering::Acquire);
1367                 continue;
1368             }
1369 
1370             // If we're going to have to install the next block, allocate it in advance in order to
1371             // make the wait for other threads as short as possible.
1372             if offset + 1 == BLOCK_CAP && next_block.is_none() {
1373                 next_block = Some(Box::new(Block::<T>::new()));
1374             }
1375 
1376             let new_tail = tail + (1 << SHIFT);
1377 
1378             // Try advancing the tail forward.
1379             match self.tail.index.compare_exchange_weak(
1380                 tail,
1381                 new_tail,
1382                 Ordering::SeqCst,
1383                 Ordering::Acquire,
1384             ) {
1385                 Ok(_) => unsafe {
1386                     // If we've reached the end of the block, install the next one.
1387                     if offset + 1 == BLOCK_CAP {
1388                         let next_block = Box::into_raw(next_block.unwrap());
1389                         let next_index = new_tail.wrapping_add(1 << SHIFT);
1390 
1391                         self.tail.block.store(next_block, Ordering::Release);
1392                         self.tail.index.store(next_index, Ordering::Release);
1393                         (*block).next.store(next_block, Ordering::Release);
1394                     }
1395 
1396                     // Write the task into the slot.
1397                     let slot = (*block).slots.get_unchecked(offset);
1398                     slot.task.get().write(MaybeUninit::new(task));
1399                     slot.state.fetch_or(WRITE, Ordering::Release);
1400 
1401                     return;
1402                 },
1403                 Err(t) => {
1404                     tail = t;
1405                     block = self.tail.block.load(Ordering::Acquire);
1406                     backoff.spin();
1407                 }
1408             }
1409         }
1410     }
1411 
1412     /// Steals a task from the queue.
1413     ///
1414     /// # Examples
1415     ///
1416     /// ```
1417     /// use crossbeam_deque::{Injector, Steal};
1418     ///
1419     /// let q = Injector::new();
1420     /// q.push(1);
1421     /// q.push(2);
1422     ///
1423     /// assert_eq!(q.steal(), Steal::Success(1));
1424     /// assert_eq!(q.steal(), Steal::Success(2));
1425     /// assert_eq!(q.steal(), Steal::Empty);
1426     /// ```
steal(&self) -> Steal<T>1427     pub fn steal(&self) -> Steal<T> {
1428         let mut head;
1429         let mut block;
1430         let mut offset;
1431 
1432         let backoff = Backoff::new();
1433         loop {
1434             head = self.head.index.load(Ordering::Acquire);
1435             block = self.head.block.load(Ordering::Acquire);
1436 
1437             // Calculate the offset of the index into the block.
1438             offset = (head >> SHIFT) % LAP;
1439 
1440             // If we reached the end of the block, wait until the next one is installed.
1441             if offset == BLOCK_CAP {
1442                 backoff.snooze();
1443             } else {
1444                 break;
1445             }
1446         }
1447 
1448         let mut new_head = head + (1 << SHIFT);
1449 
1450         if new_head & HAS_NEXT == 0 {
1451             atomic::fence(Ordering::SeqCst);
1452             let tail = self.tail.index.load(Ordering::Relaxed);
1453 
1454             // If the tail equals the head, that means the queue is empty.
1455             if head >> SHIFT == tail >> SHIFT {
1456                 return Steal::Empty;
1457             }
1458 
1459             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1460             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1461                 new_head |= HAS_NEXT;
1462             }
1463         }
1464 
1465         // Try moving the head index forward.
1466         if self
1467             .head
1468             .index
1469             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1470             .is_err()
1471         {
1472             return Steal::Retry;
1473         }
1474 
1475         unsafe {
1476             // If we've reached the end of the block, move to the next one.
1477             if offset + 1 == BLOCK_CAP {
1478                 let next = (*block).wait_next();
1479                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1480                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1481                     next_index |= HAS_NEXT;
1482                 }
1483 
1484                 self.head.block.store(next, Ordering::Release);
1485                 self.head.index.store(next_index, Ordering::Release);
1486             }
1487 
1488             // Read the task.
1489             let slot = (*block).slots.get_unchecked(offset);
1490             slot.wait_write();
1491             let task = slot.task.get().read().assume_init();
1492 
1493             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1494             // but couldn't because we were busy reading from the slot.
1495             if (offset + 1 == BLOCK_CAP)
1496                 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1497             {
1498                 Block::destroy(block, offset);
1499             }
1500 
1501             Steal::Success(task)
1502         }
1503     }
1504 
1505     /// Steals a batch of tasks and pushes them into a worker.
1506     ///
1507     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1508     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1509     ///
1510     /// # Examples
1511     ///
1512     /// ```
1513     /// use crossbeam_deque::{Injector, Worker};
1514     ///
1515     /// let q = Injector::new();
1516     /// q.push(1);
1517     /// q.push(2);
1518     /// q.push(3);
1519     /// q.push(4);
1520     ///
1521     /// let w = Worker::new_fifo();
1522     /// let _ = q.steal_batch(&w);
1523     /// assert_eq!(w.pop(), Some(1));
1524     /// assert_eq!(w.pop(), Some(2));
1525     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>1526     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1527         self.steal_batch_with_limit(dest, MAX_BATCH)
1528     }
1529 
1530     /// Steals no more than of tasks and pushes them into a worker.
1531     ///
1532     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1533     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1534     ///
1535     /// # Examples
1536     ///
1537     /// ```
1538     /// use crossbeam_deque::{Injector, Worker};
1539     ///
1540     /// let q = Injector::new();
1541     /// q.push(1);
1542     /// q.push(2);
1543     /// q.push(3);
1544     /// q.push(4);
1545     /// q.push(5);
1546     /// q.push(6);
1547     ///
1548     /// let w = Worker::new_fifo();
1549     /// let _ = q.steal_batch_with_limit(&w, 2);
1550     /// assert_eq!(w.pop(), Some(1));
1551     /// assert_eq!(w.pop(), Some(2));
1552     /// assert_eq!(w.pop(), None);
1553     ///
1554     /// q.push(7);
1555     /// q.push(8);
1556     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1557     /// // half of the elements are currently popped, but the number of popped elements is considered
1558     /// // an implementation detail that may be changed in the future.
1559     /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1560     /// assert_eq!(w.len(), 3);
1561     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1562     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1563         assert!(limit > 0);
1564         let mut head;
1565         let mut block;
1566         let mut offset;
1567 
1568         let backoff = Backoff::new();
1569         loop {
1570             head = self.head.index.load(Ordering::Acquire);
1571             block = self.head.block.load(Ordering::Acquire);
1572 
1573             // Calculate the offset of the index into the block.
1574             offset = (head >> SHIFT) % LAP;
1575 
1576             // If we reached the end of the block, wait until the next one is installed.
1577             if offset == BLOCK_CAP {
1578                 backoff.snooze();
1579             } else {
1580                 break;
1581             }
1582         }
1583 
1584         let mut new_head = head;
1585         let advance;
1586 
1587         if new_head & HAS_NEXT == 0 {
1588             atomic::fence(Ordering::SeqCst);
1589             let tail = self.tail.index.load(Ordering::Relaxed);
1590 
1591             // If the tail equals the head, that means the queue is empty.
1592             if head >> SHIFT == tail >> SHIFT {
1593                 return Steal::Empty;
1594             }
1595 
1596             // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1597             // the right batch size to steal.
1598             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1599                 new_head |= HAS_NEXT;
1600                 // We can steal all tasks till the end of the block.
1601                 advance = (BLOCK_CAP - offset).min(limit);
1602             } else {
1603                 let len = (tail - head) >> SHIFT;
1604                 // Steal half of the available tasks.
1605                 advance = ((len + 1) / 2).min(limit);
1606             }
1607         } else {
1608             // We can steal all tasks till the end of the block.
1609             advance = (BLOCK_CAP - offset).min(limit);
1610         }
1611 
1612         new_head += advance << SHIFT;
1613         let new_offset = offset + advance;
1614 
1615         // Try moving the head index forward.
1616         if self
1617             .head
1618             .index
1619             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1620             .is_err()
1621         {
1622             return Steal::Retry;
1623         }
1624 
1625         // Reserve capacity for the stolen batch.
1626         let batch_size = new_offset - offset;
1627         dest.reserve(batch_size);
1628 
1629         // Get the destination buffer and back index.
1630         let dest_buffer = dest.buffer.get();
1631         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1632 
1633         unsafe {
1634             // If we've reached the end of the block, move to the next one.
1635             if new_offset == BLOCK_CAP {
1636                 let next = (*block).wait_next();
1637                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1638                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1639                     next_index |= HAS_NEXT;
1640                 }
1641 
1642                 self.head.block.store(next, Ordering::Release);
1643                 self.head.index.store(next_index, Ordering::Release);
1644             }
1645 
1646             // Copy values from the injector into the destination queue.
1647             match dest.flavor {
1648                 Flavor::Fifo => {
1649                     for i in 0..batch_size {
1650                         // Read the task.
1651                         let slot = (*block).slots.get_unchecked(offset + i);
1652                         slot.wait_write();
1653                         let task = slot.task.get().read();
1654 
1655                         // Write it into the destination queue.
1656                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1657                     }
1658                 }
1659 
1660                 Flavor::Lifo => {
1661                     for i in 0..batch_size {
1662                         // Read the task.
1663                         let slot = (*block).slots.get_unchecked(offset + i);
1664                         slot.wait_write();
1665                         let task = slot.task.get().read();
1666 
1667                         // Write it into the destination queue.
1668                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1669                     }
1670                 }
1671             }
1672 
1673             atomic::fence(Ordering::Release);
1674 
1675             // Update the back index in the destination queue.
1676             //
1677             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1678             // data races because it doesn't understand fences.
1679             dest.inner
1680                 .back
1681                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1682 
1683             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1684             // but couldn't because we were busy reading from the slot.
1685             if new_offset == BLOCK_CAP {
1686                 Block::destroy(block, offset);
1687             } else {
1688                 for i in offset..new_offset {
1689                     let slot = (*block).slots.get_unchecked(i);
1690 
1691                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1692                         Block::destroy(block, offset);
1693                         break;
1694                     }
1695                 }
1696             }
1697 
1698             Steal::Success(())
1699         }
1700     }
1701 
1702     /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1703     ///
1704     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1705     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1706     ///
1707     /// # Examples
1708     ///
1709     /// ```
1710     /// use crossbeam_deque::{Injector, Steal, Worker};
1711     ///
1712     /// let q = Injector::new();
1713     /// q.push(1);
1714     /// q.push(2);
1715     /// q.push(3);
1716     /// q.push(4);
1717     ///
1718     /// let w = Worker::new_fifo();
1719     /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1720     /// assert_eq!(w.pop(), Some(2));
1721     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1722     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1723         // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1724         // better, but we may change it in the future to be compatible with the same method in Stealer.
1725         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1726     }
1727 
1728     /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1729     ///
1730     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1731     /// steal around half of the tasks in the queue, but also not more than the given limit.
1732     ///
1733     /// # Examples
1734     ///
1735     /// ```
1736     /// use crossbeam_deque::{Injector, Steal, Worker};
1737     ///
1738     /// let q = Injector::new();
1739     /// q.push(1);
1740     /// q.push(2);
1741     /// q.push(3);
1742     /// q.push(4);
1743     /// q.push(5);
1744     /// q.push(6);
1745     ///
1746     /// let w = Worker::new_fifo();
1747     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1748     /// assert_eq!(w.pop(), Some(2));
1749     /// assert_eq!(w.pop(), None);
1750     ///
1751     /// q.push(7);
1752     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1753     /// // half of the elements are currently popped, but the number of popped elements is considered
1754     /// // an implementation detail that may be changed in the future.
1755     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1756     /// assert_eq!(w.pop(), Some(4));
1757     /// assert_eq!(w.pop(), Some(5));
1758     /// assert_eq!(w.pop(), None);
1759     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1760     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1761         assert!(limit > 0);
1762         let mut head;
1763         let mut block;
1764         let mut offset;
1765 
1766         let backoff = Backoff::new();
1767         loop {
1768             head = self.head.index.load(Ordering::Acquire);
1769             block = self.head.block.load(Ordering::Acquire);
1770 
1771             // Calculate the offset of the index into the block.
1772             offset = (head >> SHIFT) % LAP;
1773 
1774             // If we reached the end of the block, wait until the next one is installed.
1775             if offset == BLOCK_CAP {
1776                 backoff.snooze();
1777             } else {
1778                 break;
1779             }
1780         }
1781 
1782         let mut new_head = head;
1783         let advance;
1784 
1785         if new_head & HAS_NEXT == 0 {
1786             atomic::fence(Ordering::SeqCst);
1787             let tail = self.tail.index.load(Ordering::Relaxed);
1788 
1789             // If the tail equals the head, that means the queue is empty.
1790             if head >> SHIFT == tail >> SHIFT {
1791                 return Steal::Empty;
1792             }
1793 
1794             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1795             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1796                 new_head |= HAS_NEXT;
1797                 // We can steal all tasks till the end of the block.
1798                 advance = (BLOCK_CAP - offset).min(limit);
1799             } else {
1800                 let len = (tail - head) >> SHIFT;
1801                 // Steal half of the available tasks.
1802                 advance = ((len + 1) / 2).min(limit);
1803             }
1804         } else {
1805             // We can steal all tasks till the end of the block.
1806             advance = (BLOCK_CAP - offset).min(limit);
1807         }
1808 
1809         new_head += advance << SHIFT;
1810         let new_offset = offset + advance;
1811 
1812         // Try moving the head index forward.
1813         if self
1814             .head
1815             .index
1816             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1817             .is_err()
1818         {
1819             return Steal::Retry;
1820         }
1821 
1822         // Reserve capacity for the stolen batch.
1823         let batch_size = new_offset - offset - 1;
1824         dest.reserve(batch_size);
1825 
1826         // Get the destination buffer and back index.
1827         let dest_buffer = dest.buffer.get();
1828         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1829 
1830         unsafe {
1831             // If we've reached the end of the block, move to the next one.
1832             if new_offset == BLOCK_CAP {
1833                 let next = (*block).wait_next();
1834                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1835                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1836                     next_index |= HAS_NEXT;
1837                 }
1838 
1839                 self.head.block.store(next, Ordering::Release);
1840                 self.head.index.store(next_index, Ordering::Release);
1841             }
1842 
1843             // Read the task.
1844             let slot = (*block).slots.get_unchecked(offset);
1845             slot.wait_write();
1846             let task = slot.task.get().read();
1847 
1848             match dest.flavor {
1849                 Flavor::Fifo => {
1850                     // Copy values from the injector into the destination queue.
1851                     for i in 0..batch_size {
1852                         // Read the task.
1853                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1854                         slot.wait_write();
1855                         let task = slot.task.get().read();
1856 
1857                         // Write it into the destination queue.
1858                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1859                     }
1860                 }
1861 
1862                 Flavor::Lifo => {
1863                     // Copy values from the injector into the destination queue.
1864                     for i in 0..batch_size {
1865                         // Read the task.
1866                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1867                         slot.wait_write();
1868                         let task = slot.task.get().read();
1869 
1870                         // Write it into the destination queue.
1871                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1872                     }
1873                 }
1874             }
1875 
1876             atomic::fence(Ordering::Release);
1877 
1878             // Update the back index in the destination queue.
1879             //
1880             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1881             // data races because it doesn't understand fences.
1882             dest.inner
1883                 .back
1884                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1885 
1886             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1887             // but couldn't because we were busy reading from the slot.
1888             if new_offset == BLOCK_CAP {
1889                 Block::destroy(block, offset);
1890             } else {
1891                 for i in offset..new_offset {
1892                     let slot = (*block).slots.get_unchecked(i);
1893 
1894                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1895                         Block::destroy(block, offset);
1896                         break;
1897                     }
1898                 }
1899             }
1900 
1901             Steal::Success(task.assume_init())
1902         }
1903     }
1904 
1905     /// Returns `true` if the queue is empty.
1906     ///
1907     /// # Examples
1908     ///
1909     /// ```
1910     /// use crossbeam_deque::Injector;
1911     ///
1912     /// let q = Injector::new();
1913     ///
1914     /// assert!(q.is_empty());
1915     /// q.push(1);
1916     /// assert!(!q.is_empty());
1917     /// ```
is_empty(&self) -> bool1918     pub fn is_empty(&self) -> bool {
1919         let head = self.head.index.load(Ordering::SeqCst);
1920         let tail = self.tail.index.load(Ordering::SeqCst);
1921         head >> SHIFT == tail >> SHIFT
1922     }
1923 
1924     /// Returns the number of tasks in the queue.
1925     ///
1926     /// # Examples
1927     ///
1928     /// ```
1929     /// use crossbeam_deque::Injector;
1930     ///
1931     /// let q = Injector::new();
1932     ///
1933     /// assert_eq!(q.len(), 0);
1934     /// q.push(1);
1935     /// assert_eq!(q.len(), 1);
1936     /// q.push(1);
1937     /// assert_eq!(q.len(), 2);
1938     /// ```
len(&self) -> usize1939     pub fn len(&self) -> usize {
1940         loop {
1941             // Load the tail index, then load the head index.
1942             let mut tail = self.tail.index.load(Ordering::SeqCst);
1943             let mut head = self.head.index.load(Ordering::SeqCst);
1944 
1945             // If the tail index didn't change, we've got consistent indices to work with.
1946             if self.tail.index.load(Ordering::SeqCst) == tail {
1947                 // Erase the lower bits.
1948                 tail &= !((1 << SHIFT) - 1);
1949                 head &= !((1 << SHIFT) - 1);
1950 
1951                 // Fix up indices if they fall onto block ends.
1952                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1953                     tail = tail.wrapping_add(1 << SHIFT);
1954                 }
1955                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1956                     head = head.wrapping_add(1 << SHIFT);
1957                 }
1958 
1959                 // Rotate indices so that head falls into the first block.
1960                 let lap = (head >> SHIFT) / LAP;
1961                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1962                 head = head.wrapping_sub((lap * LAP) << SHIFT);
1963 
1964                 // Remove the lower bits.
1965                 tail >>= SHIFT;
1966                 head >>= SHIFT;
1967 
1968                 // Return the difference minus the number of blocks between tail and head.
1969                 return tail - head - tail / LAP;
1970             }
1971         }
1972     }
1973 }
1974 
1975 impl<T> Drop for Injector<T> {
drop(&mut self)1976     fn drop(&mut self) {
1977         let mut head = *self.head.index.get_mut();
1978         let mut tail = *self.tail.index.get_mut();
1979         let mut block = *self.head.block.get_mut();
1980 
1981         // Erase the lower bits.
1982         head &= !((1 << SHIFT) - 1);
1983         tail &= !((1 << SHIFT) - 1);
1984 
1985         unsafe {
1986             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1987             while head != tail {
1988                 let offset = (head >> SHIFT) % LAP;
1989 
1990                 if offset < BLOCK_CAP {
1991                     // Drop the task in the slot.
1992                     let slot = (*block).slots.get_unchecked(offset);
1993                     let p = &mut *slot.task.get();
1994                     p.as_mut_ptr().drop_in_place();
1995                 } else {
1996                     // Deallocate the block and move to the next one.
1997                     let next = *(*block).next.get_mut();
1998                     drop(Box::from_raw(block));
1999                     block = next;
2000                 }
2001 
2002                 head = head.wrapping_add(1 << SHIFT);
2003             }
2004 
2005             // Deallocate the last remaining block.
2006             drop(Box::from_raw(block));
2007         }
2008     }
2009 }
2010 
2011 impl<T> fmt::Debug for Injector<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2012     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2013         f.pad("Worker { .. }")
2014     }
2015 }
2016 
2017 /// Possible outcomes of a steal operation.
2018 ///
2019 /// # Examples
2020 ///
2021 /// There are lots of ways to chain results of steal operations together:
2022 ///
2023 /// ```
2024 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2025 ///
2026 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2027 ///
2028 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2029 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2030 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2031 ///
2032 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2033 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2034 /// ```
2035 #[must_use]
2036 #[derive(PartialEq, Eq, Copy, Clone)]
2037 pub enum Steal<T> {
2038     /// The queue was empty at the time of stealing.
2039     Empty,
2040 
2041     /// At least one task was successfully stolen.
2042     Success(T),
2043 
2044     /// The steal operation needs to be retried.
2045     Retry,
2046 }
2047 
2048 impl<T> Steal<T> {
2049     /// Returns `true` if the queue was empty at the time of stealing.
2050     ///
2051     /// # Examples
2052     ///
2053     /// ```
2054     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2055     ///
2056     /// assert!(!Success(7).is_empty());
2057     /// assert!(!Retry::<i32>.is_empty());
2058     ///
2059     /// assert!(Empty::<i32>.is_empty());
2060     /// ```
is_empty(&self) -> bool2061     pub fn is_empty(&self) -> bool {
2062         match self {
2063             Steal::Empty => true,
2064             _ => false,
2065         }
2066     }
2067 
2068     /// Returns `true` if at least one task was stolen.
2069     ///
2070     /// # Examples
2071     ///
2072     /// ```
2073     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2074     ///
2075     /// assert!(!Empty::<i32>.is_success());
2076     /// assert!(!Retry::<i32>.is_success());
2077     ///
2078     /// assert!(Success(7).is_success());
2079     /// ```
is_success(&self) -> bool2080     pub fn is_success(&self) -> bool {
2081         match self {
2082             Steal::Success(_) => true,
2083             _ => false,
2084         }
2085     }
2086 
2087     /// Returns `true` if the steal operation needs to be retried.
2088     ///
2089     /// # Examples
2090     ///
2091     /// ```
2092     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2093     ///
2094     /// assert!(!Empty::<i32>.is_retry());
2095     /// assert!(!Success(7).is_retry());
2096     ///
2097     /// assert!(Retry::<i32>.is_retry());
2098     /// ```
is_retry(&self) -> bool2099     pub fn is_retry(&self) -> bool {
2100         match self {
2101             Steal::Retry => true,
2102             _ => false,
2103         }
2104     }
2105 
2106     /// Returns the result of the operation, if successful.
2107     ///
2108     /// # Examples
2109     ///
2110     /// ```
2111     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2112     ///
2113     /// assert_eq!(Empty::<i32>.success(), None);
2114     /// assert_eq!(Retry::<i32>.success(), None);
2115     ///
2116     /// assert_eq!(Success(7).success(), Some(7));
2117     /// ```
success(self) -> Option<T>2118     pub fn success(self) -> Option<T> {
2119         match self {
2120             Steal::Success(res) => Some(res),
2121             _ => None,
2122         }
2123     }
2124 
2125     /// If no task was stolen, attempts another steal operation.
2126     ///
2127     /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2128     ///
2129     /// * If the second steal resulted in `Success`, it is returned.
2130     /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2131     /// * If both resulted in `None`, then `None` is returned.
2132     ///
2133     /// # Examples
2134     ///
2135     /// ```
2136     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2137     ///
2138     /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2139     /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2140     ///
2141     /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2142     /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2143     ///
2144     /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2145     /// ```
or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2146     pub fn or_else<F>(self, f: F) -> Steal<T>
2147     where
2148         F: FnOnce() -> Steal<T>,
2149     {
2150         match self {
2151             Steal::Empty => f(),
2152             Steal::Success(_) => self,
2153             Steal::Retry => {
2154                 if let Steal::Success(res) = f() {
2155                     Steal::Success(res)
2156                 } else {
2157                     Steal::Retry
2158                 }
2159             }
2160         }
2161     }
2162 }
2163 
2164 impl<T> fmt::Debug for Steal<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2165     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166         match self {
2167             Steal::Empty => f.pad("Empty"),
2168             Steal::Success(_) => f.pad("Success(..)"),
2169             Steal::Retry => f.pad("Retry"),
2170         }
2171     }
2172 }
2173 
2174 impl<T> FromIterator<Steal<T>> for Steal<T> {
2175     /// Consumes items until a `Success` is found and returns it.
2176     ///
2177     /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2178     /// Otherwise, `Empty` is returned.
from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2179     fn from_iter<I>(iter: I) -> Steal<T>
2180     where
2181         I: IntoIterator<Item = Steal<T>>,
2182     {
2183         let mut retry = false;
2184         for s in iter {
2185             match &s {
2186                 Steal::Empty => {}
2187                 Steal::Success(_) => return s,
2188                 Steal::Retry => retry = true,
2189             }
2190         }
2191 
2192         if retry {
2193             Steal::Retry
2194         } else {
2195             Steal::Empty
2196         }
2197     }
2198 }
2199