• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::cell::{Cell, UnsafeCell};
2 use std::cmp;
3 use std::fmt;
4 use std::iter::FromIterator;
5 use std::marker::PhantomData;
6 use std::mem::{self, MaybeUninit};
7 use std::ptr;
8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9 use std::sync::Arc;
10 
11 use crate::epoch::{self, Atomic, Owned};
12 use crate::utils::{Backoff, CachePadded};
13 
14 // Minimum buffer capacity.
15 const MIN_CAP: usize = 64;
16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17 const MAX_BATCH: usize = 32;
18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19 // deallocated as soon as possible.
20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21 
22 /// A buffer that holds tasks in a worker queue.
23 ///
24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25 /// *not* deallocate the buffer.
26 struct Buffer<T> {
27     /// Pointer to the allocated memory.
28     ptr: *mut T,
29 
30     /// Capacity of the buffer. Always a power of two.
31     cap: usize,
32 }
33 
34 unsafe impl<T> Send for Buffer<T> {}
35 
36 impl<T> Buffer<T> {
37     /// Allocates a new buffer with the specified capacity.
alloc(cap: usize) -> Buffer<T>38     fn alloc(cap: usize) -> Buffer<T> {
39         debug_assert_eq!(cap, cap.next_power_of_two());
40 
41         let mut v = Vec::with_capacity(cap);
42         let ptr = v.as_mut_ptr();
43         mem::forget(v);
44 
45         Buffer { ptr, cap }
46     }
47 
48     /// Deallocates the buffer.
dealloc(self)49     unsafe fn dealloc(self) {
50         drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
51     }
52 
53     /// Returns a pointer to the task at the specified `index`.
at(&self, index: isize) -> *mut T54     unsafe fn at(&self, index: isize) -> *mut T {
55         // `self.cap` is always a power of two.
56         self.ptr.offset(index & (self.cap - 1) as isize)
57     }
58 
59     /// Writes `task` into the specified `index`.
60     ///
61     /// This method might be concurrently called with another `read` at the same index, which is
62     /// technically speaking a data race and therefore UB. We should use an atomic store here, but
63     /// that would be more expensive and difficult to implement generically for all types `T`.
64     /// Hence, as a hack, we use a volatile write instead.
write(&self, index: isize, task: T)65     unsafe fn write(&self, index: isize, task: T) {
66         ptr::write_volatile(self.at(index), task)
67     }
68 
69     /// Reads a task from the specified `index`.
70     ///
71     /// This method might be concurrently called with another `write` at the same index, which is
72     /// technically speaking a data race and therefore UB. We should use an atomic load here, but
73     /// that would be more expensive and difficult to implement generically for all types `T`.
74     /// Hence, as a hack, we use a volatile write instead.
read(&self, index: isize) -> T75     unsafe fn read(&self, index: isize) -> T {
76         ptr::read_volatile(self.at(index))
77     }
78 }
79 
80 impl<T> Clone for Buffer<T> {
clone(&self) -> Buffer<T>81     fn clone(&self) -> Buffer<T> {
82         Buffer {
83             ptr: self.ptr,
84             cap: self.cap,
85         }
86     }
87 }
88 
89 impl<T> Copy for Buffer<T> {}
90 
91 /// Internal queue data shared between the worker and stealers.
92 ///
93 /// The implementation is based on the following work:
94 ///
95 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
96 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
97 ///    PPoPP 2013.][weak-mem]
98 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
99 ///    atomics. OOPSLA 2013.][checker]
100 ///
101 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
102 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
103 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514
104 struct Inner<T> {
105     /// The front index.
106     front: AtomicIsize,
107 
108     /// The back index.
109     back: AtomicIsize,
110 
111     /// The underlying buffer.
112     buffer: CachePadded<Atomic<Buffer<T>>>,
113 }
114 
115 impl<T> Drop for Inner<T> {
drop(&mut self)116     fn drop(&mut self) {
117         // Load the back index, front index, and buffer.
118         let b = self.back.load(Ordering::Relaxed);
119         let f = self.front.load(Ordering::Relaxed);
120 
121         unsafe {
122             let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
123 
124             // Go through the buffer from front to back and drop all tasks in the queue.
125             let mut i = f;
126             while i != b {
127                 buffer.deref().at(i).drop_in_place();
128                 i = i.wrapping_add(1);
129             }
130 
131             // Free the memory allocated by the buffer.
132             buffer.into_owned().into_box().dealloc();
133         }
134     }
135 }
136 
137 /// Worker queue flavor: FIFO or LIFO.
138 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
139 enum Flavor {
140     /// The first-in first-out flavor.
141     Fifo,
142 
143     /// The last-in first-out flavor.
144     Lifo,
145 }
146 
147 /// A worker queue.
148 ///
149 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
150 /// tasks from it. Task schedulers typically create a single worker queue per thread.
151 ///
152 /// # Examples
153 ///
154 /// A FIFO worker:
155 ///
156 /// ```
157 /// use crossbeam_deque::{Steal, Worker};
158 ///
159 /// let w = Worker::new_fifo();
160 /// let s = w.stealer();
161 ///
162 /// w.push(1);
163 /// w.push(2);
164 /// w.push(3);
165 ///
166 /// assert_eq!(s.steal(), Steal::Success(1));
167 /// assert_eq!(w.pop(), Some(2));
168 /// assert_eq!(w.pop(), Some(3));
169 /// ```
170 ///
171 /// A LIFO worker:
172 ///
173 /// ```
174 /// use crossbeam_deque::{Steal, Worker};
175 ///
176 /// let w = Worker::new_lifo();
177 /// let s = w.stealer();
178 ///
179 /// w.push(1);
180 /// w.push(2);
181 /// w.push(3);
182 ///
183 /// assert_eq!(s.steal(), Steal::Success(1));
184 /// assert_eq!(w.pop(), Some(3));
185 /// assert_eq!(w.pop(), Some(2));
186 /// ```
187 pub struct Worker<T> {
188     /// A reference to the inner representation of the queue.
189     inner: Arc<CachePadded<Inner<T>>>,
190 
191     /// A copy of `inner.buffer` for quick access.
192     buffer: Cell<Buffer<T>>,
193 
194     /// The flavor of the queue.
195     flavor: Flavor,
196 
197     /// Indicates that the worker cannot be shared among threads.
198     _marker: PhantomData<*mut ()>, // !Send + !Sync
199 }
200 
201 unsafe impl<T: Send> Send for Worker<T> {}
202 
203 impl<T> Worker<T> {
204     /// Creates a FIFO worker queue.
205     ///
206     /// Tasks are pushed and popped from opposite ends.
207     ///
208     /// # Examples
209     ///
210     /// ```
211     /// use crossbeam_deque::Worker;
212     ///
213     /// let w = Worker::<i32>::new_fifo();
214     /// ```
new_fifo() -> Worker<T>215     pub fn new_fifo() -> Worker<T> {
216         let buffer = Buffer::alloc(MIN_CAP);
217 
218         let inner = Arc::new(CachePadded::new(Inner {
219             front: AtomicIsize::new(0),
220             back: AtomicIsize::new(0),
221             buffer: CachePadded::new(Atomic::new(buffer)),
222         }));
223 
224         Worker {
225             inner,
226             buffer: Cell::new(buffer),
227             flavor: Flavor::Fifo,
228             _marker: PhantomData,
229         }
230     }
231 
232     /// Creates a LIFO worker queue.
233     ///
234     /// Tasks are pushed and popped from the same end.
235     ///
236     /// # Examples
237     ///
238     /// ```
239     /// use crossbeam_deque::Worker;
240     ///
241     /// let w = Worker::<i32>::new_lifo();
242     /// ```
new_lifo() -> Worker<T>243     pub fn new_lifo() -> Worker<T> {
244         let buffer = Buffer::alloc(MIN_CAP);
245 
246         let inner = Arc::new(CachePadded::new(Inner {
247             front: AtomicIsize::new(0),
248             back: AtomicIsize::new(0),
249             buffer: CachePadded::new(Atomic::new(buffer)),
250         }));
251 
252         Worker {
253             inner,
254             buffer: Cell::new(buffer),
255             flavor: Flavor::Lifo,
256             _marker: PhantomData,
257         }
258     }
259 
260     /// Creates a stealer for this queue.
261     ///
262     /// The returned stealer can be shared among threads and cloned.
263     ///
264     /// # Examples
265     ///
266     /// ```
267     /// use crossbeam_deque::Worker;
268     ///
269     /// let w = Worker::<i32>::new_lifo();
270     /// let s = w.stealer();
271     /// ```
stealer(&self) -> Stealer<T>272     pub fn stealer(&self) -> Stealer<T> {
273         Stealer {
274             inner: self.inner.clone(),
275             flavor: self.flavor,
276         }
277     }
278 
279     /// Resizes the internal buffer to the new capacity of `new_cap`.
280     #[cold]
resize(&self, new_cap: usize)281     unsafe fn resize(&self, new_cap: usize) {
282         // Load the back index, front index, and buffer.
283         let b = self.inner.back.load(Ordering::Relaxed);
284         let f = self.inner.front.load(Ordering::Relaxed);
285         let buffer = self.buffer.get();
286 
287         // Allocate a new buffer and copy data from the old buffer to the new one.
288         let new = Buffer::alloc(new_cap);
289         let mut i = f;
290         while i != b {
291             ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
292             i = i.wrapping_add(1);
293         }
294 
295         let guard = &epoch::pin();
296 
297         // Replace the old buffer with the new one.
298         self.buffer.replace(new);
299         let old =
300             self.inner
301                 .buffer
302                 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
303 
304         // Destroy the old buffer later.
305         guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
306 
307         // If the buffer is very large, then flush the thread-local garbage in order to deallocate
308         // it as soon as possible.
309         if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
310             guard.flush();
311         }
312     }
313 
314     /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
315     /// buffer.
reserve(&self, reserve_cap: usize)316     fn reserve(&self, reserve_cap: usize) {
317         if reserve_cap > 0 {
318             // Compute the current length.
319             let b = self.inner.back.load(Ordering::Relaxed);
320             let f = self.inner.front.load(Ordering::SeqCst);
321             let len = b.wrapping_sub(f) as usize;
322 
323             // The current capacity.
324             let cap = self.buffer.get().cap;
325 
326             // Is there enough capacity to push `reserve_cap` tasks?
327             if cap - len < reserve_cap {
328                 // Keep doubling the capacity as much as is needed.
329                 let mut new_cap = cap * 2;
330                 while new_cap - len < reserve_cap {
331                     new_cap *= 2;
332                 }
333 
334                 // Resize the buffer.
335                 unsafe {
336                     self.resize(new_cap);
337                 }
338             }
339         }
340     }
341 
342     /// Returns `true` if the queue is empty.
343     ///
344     /// ```
345     /// use crossbeam_deque::Worker;
346     ///
347     /// let w = Worker::new_lifo();
348     ///
349     /// assert!(w.is_empty());
350     /// w.push(1);
351     /// assert!(!w.is_empty());
352     /// ```
is_empty(&self) -> bool353     pub fn is_empty(&self) -> bool {
354         let b = self.inner.back.load(Ordering::Relaxed);
355         let f = self.inner.front.load(Ordering::SeqCst);
356         b.wrapping_sub(f) <= 0
357     }
358 
359     /// Returns the number of tasks in the deque.
360     ///
361     /// ```
362     /// use crossbeam_deque::Worker;
363     ///
364     /// let w = Worker::new_lifo();
365     ///
366     /// assert_eq!(w.len(), 0);
367     /// w.push(1);
368     /// assert_eq!(w.len(), 1);
369     /// w.push(1);
370     /// assert_eq!(w.len(), 2);
371     /// ```
len(&self) -> usize372     pub fn len(&self) -> usize {
373         let b = self.inner.back.load(Ordering::Relaxed);
374         let f = self.inner.front.load(Ordering::SeqCst);
375         b.wrapping_sub(f).max(0) as usize
376     }
377 
378     /// Pushes a task into the queue.
379     ///
380     /// # Examples
381     ///
382     /// ```
383     /// use crossbeam_deque::Worker;
384     ///
385     /// let w = Worker::new_lifo();
386     /// w.push(1);
387     /// w.push(2);
388     /// ```
push(&self, task: T)389     pub fn push(&self, task: T) {
390         // Load the back index, front index, and buffer.
391         let b = self.inner.back.load(Ordering::Relaxed);
392         let f = self.inner.front.load(Ordering::Acquire);
393         let mut buffer = self.buffer.get();
394 
395         // Calculate the length of the queue.
396         let len = b.wrapping_sub(f);
397 
398         // Is the queue full?
399         if len >= buffer.cap as isize {
400             // Yes. Grow the underlying buffer.
401             unsafe {
402                 self.resize(2 * buffer.cap);
403             }
404             buffer = self.buffer.get();
405         }
406 
407         // Write `task` into the slot.
408         unsafe {
409             buffer.write(b, task);
410         }
411 
412         atomic::fence(Ordering::Release);
413 
414         // Increment the back index.
415         //
416         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
417         // races because it doesn't understand fences.
418         self.inner.back.store(b.wrapping_add(1), Ordering::Release);
419     }
420 
421     /// Pops a task from the queue.
422     ///
423     /// # Examples
424     ///
425     /// ```
426     /// use crossbeam_deque::Worker;
427     ///
428     /// let w = Worker::new_fifo();
429     /// w.push(1);
430     /// w.push(2);
431     ///
432     /// assert_eq!(w.pop(), Some(1));
433     /// assert_eq!(w.pop(), Some(2));
434     /// assert_eq!(w.pop(), None);
435     /// ```
pop(&self) -> Option<T>436     pub fn pop(&self) -> Option<T> {
437         // Load the back and front index.
438         let b = self.inner.back.load(Ordering::Relaxed);
439         let f = self.inner.front.load(Ordering::Relaxed);
440 
441         // Calculate the length of the queue.
442         let len = b.wrapping_sub(f);
443 
444         // Is the queue empty?
445         if len <= 0 {
446             return None;
447         }
448 
449         match self.flavor {
450             // Pop from the front of the queue.
451             Flavor::Fifo => {
452                 // Try incrementing the front index to pop the task.
453                 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
454                 let new_f = f.wrapping_add(1);
455 
456                 if b.wrapping_sub(new_f) < 0 {
457                     self.inner.front.store(f, Ordering::Relaxed);
458                     return None;
459                 }
460 
461                 unsafe {
462                     // Read the popped task.
463                     let buffer = self.buffer.get();
464                     let task = buffer.read(f);
465 
466                     // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
467                     if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
468                         self.resize(buffer.cap / 2);
469                     }
470 
471                     Some(task)
472                 }
473             }
474 
475             // Pop from the back of the queue.
476             Flavor::Lifo => {
477                 // Decrement the back index.
478                 let b = b.wrapping_sub(1);
479                 self.inner.back.store(b, Ordering::Relaxed);
480 
481                 atomic::fence(Ordering::SeqCst);
482 
483                 // Load the front index.
484                 let f = self.inner.front.load(Ordering::Relaxed);
485 
486                 // Compute the length after the back index was decremented.
487                 let len = b.wrapping_sub(f);
488 
489                 if len < 0 {
490                     // The queue is empty. Restore the back index to the original task.
491                     self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
492                     None
493                 } else {
494                     // Read the task to be popped.
495                     let buffer = self.buffer.get();
496                     let mut task = unsafe { Some(buffer.read(b)) };
497 
498                     // Are we popping the last task from the queue?
499                     if len == 0 {
500                         // Try incrementing the front index.
501                         if self
502                             .inner
503                             .front
504                             .compare_exchange(
505                                 f,
506                                 f.wrapping_add(1),
507                                 Ordering::SeqCst,
508                                 Ordering::Relaxed,
509                             )
510                             .is_err()
511                         {
512                             // Failed. We didn't pop anything.
513                             mem::forget(task.take());
514                         }
515 
516                         // Restore the back index to the original task.
517                         self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
518                     } else {
519                         // Shrink the buffer if `len` is less than one fourth of the capacity.
520                         if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
521                             unsafe {
522                                 self.resize(buffer.cap / 2);
523                             }
524                         }
525                     }
526 
527                     task
528                 }
529             }
530         }
531     }
532 }
533 
534 impl<T> fmt::Debug for Worker<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result535     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536         f.pad("Worker { .. }")
537     }
538 }
539 
540 /// A stealer handle of a worker queue.
541 ///
542 /// Stealers can be shared among threads.
543 ///
544 /// Task schedulers typically have a single worker queue per worker thread.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// use crossbeam_deque::{Steal, Worker};
550 ///
551 /// let w = Worker::new_lifo();
552 /// w.push(1);
553 /// w.push(2);
554 ///
555 /// let s = w.stealer();
556 /// assert_eq!(s.steal(), Steal::Success(1));
557 /// assert_eq!(s.steal(), Steal::Success(2));
558 /// assert_eq!(s.steal(), Steal::Empty);
559 /// ```
560 pub struct Stealer<T> {
561     /// A reference to the inner representation of the queue.
562     inner: Arc<CachePadded<Inner<T>>>,
563 
564     /// The flavor of the queue.
565     flavor: Flavor,
566 }
567 
568 unsafe impl<T: Send> Send for Stealer<T> {}
569 unsafe impl<T: Send> Sync for Stealer<T> {}
570 
571 impl<T> Stealer<T> {
572     /// Returns `true` if the queue is empty.
573     ///
574     /// ```
575     /// use crossbeam_deque::Worker;
576     ///
577     /// let w = Worker::new_lifo();
578     /// let s = w.stealer();
579     ///
580     /// assert!(s.is_empty());
581     /// w.push(1);
582     /// assert!(!s.is_empty());
583     /// ```
is_empty(&self) -> bool584     pub fn is_empty(&self) -> bool {
585         let f = self.inner.front.load(Ordering::Acquire);
586         atomic::fence(Ordering::SeqCst);
587         let b = self.inner.back.load(Ordering::Acquire);
588         b.wrapping_sub(f) <= 0
589     }
590 
591     /// Returns the number of tasks in the deque.
592     ///
593     /// ```
594     /// use crossbeam_deque::Worker;
595     ///
596     /// let w = Worker::new_lifo();
597     /// let s = w.stealer();
598     ///
599     /// assert_eq!(s.len(), 0);
600     /// w.push(1);
601     /// assert_eq!(s.len(), 1);
602     /// w.push(2);
603     /// assert_eq!(s.len(), 2);
604     /// ```
len(&self) -> usize605     pub fn len(&self) -> usize {
606         let f = self.inner.front.load(Ordering::Acquire);
607         atomic::fence(Ordering::SeqCst);
608         let b = self.inner.back.load(Ordering::Acquire);
609         b.wrapping_sub(f).max(0) as usize
610     }
611 
612     /// Steals a task from the queue.
613     ///
614     /// # Examples
615     ///
616     /// ```
617     /// use crossbeam_deque::{Steal, Worker};
618     ///
619     /// let w = Worker::new_lifo();
620     /// w.push(1);
621     /// w.push(2);
622     ///
623     /// let s = w.stealer();
624     /// assert_eq!(s.steal(), Steal::Success(1));
625     /// assert_eq!(s.steal(), Steal::Success(2));
626     /// ```
steal(&self) -> Steal<T>627     pub fn steal(&self) -> Steal<T> {
628         // Load the front index.
629         let f = self.inner.front.load(Ordering::Acquire);
630 
631         // A SeqCst fence is needed here.
632         //
633         // If the current thread is already pinned (reentrantly), we must manually issue the
634         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
635         // have to.
636         if epoch::is_pinned() {
637             atomic::fence(Ordering::SeqCst);
638         }
639 
640         let guard = &epoch::pin();
641 
642         // Load the back index.
643         let b = self.inner.back.load(Ordering::Acquire);
644 
645         // Is the queue empty?
646         if b.wrapping_sub(f) <= 0 {
647             return Steal::Empty;
648         }
649 
650         // Load the buffer and read the task at the front.
651         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
652         let task = unsafe { buffer.deref().read(f) };
653 
654         // Try incrementing the front index to steal the task.
655         // If the buffer has been swapped or the increment fails, we retry.
656         if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
657             || self
658                 .inner
659                 .front
660                 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
661                 .is_err()
662         {
663             // We didn't steal this task, forget it.
664             mem::forget(task);
665             return Steal::Retry;
666         }
667 
668         // Return the stolen task.
669         Steal::Success(task)
670     }
671 
672     /// Steals a batch of tasks and pushes them into another worker.
673     ///
674     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
675     /// steal around half of the tasks in the queue, but also not more than some constant limit.
676     ///
677     /// # Examples
678     ///
679     /// ```
680     /// use crossbeam_deque::Worker;
681     ///
682     /// let w1 = Worker::new_fifo();
683     /// w1.push(1);
684     /// w1.push(2);
685     /// w1.push(3);
686     /// w1.push(4);
687     ///
688     /// let s = w1.stealer();
689     /// let w2 = Worker::new_fifo();
690     ///
691     /// let _ = s.steal_batch(&w2);
692     /// assert_eq!(w2.pop(), Some(1));
693     /// assert_eq!(w2.pop(), Some(2));
694     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>695     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
696         if Arc::ptr_eq(&self.inner, &dest.inner) {
697             if dest.is_empty() {
698                 return Steal::Empty;
699             } else {
700                 return Steal::Success(());
701             }
702         }
703 
704         // Load the front index.
705         let mut f = self.inner.front.load(Ordering::Acquire);
706 
707         // A SeqCst fence is needed here.
708         //
709         // If the current thread is already pinned (reentrantly), we must manually issue the
710         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
711         // have to.
712         if epoch::is_pinned() {
713             atomic::fence(Ordering::SeqCst);
714         }
715 
716         let guard = &epoch::pin();
717 
718         // Load the back index.
719         let b = self.inner.back.load(Ordering::Acquire);
720 
721         // Is the queue empty?
722         let len = b.wrapping_sub(f);
723         if len <= 0 {
724             return Steal::Empty;
725         }
726 
727         // Reserve capacity for the stolen batch.
728         let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
729         dest.reserve(batch_size);
730         let mut batch_size = batch_size as isize;
731 
732         // Get the destination buffer and back index.
733         let dest_buffer = dest.buffer.get();
734         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
735 
736         // Load the buffer.
737         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
738 
739         match self.flavor {
740             // Steal a batch of tasks from the front at once.
741             Flavor::Fifo => {
742                 // Copy the batch from the source to the destination buffer.
743                 match dest.flavor {
744                     Flavor::Fifo => {
745                         for i in 0..batch_size {
746                             unsafe {
747                                 let task = buffer.deref().read(f.wrapping_add(i));
748                                 dest_buffer.write(dest_b.wrapping_add(i), task);
749                             }
750                         }
751                     }
752                     Flavor::Lifo => {
753                         for i in 0..batch_size {
754                             unsafe {
755                                 let task = buffer.deref().read(f.wrapping_add(i));
756                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
757                             }
758                         }
759                     }
760                 }
761 
762                 // Try incrementing the front index to steal the batch.
763                 // If the buffer has been swapped or the increment fails, we retry.
764                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
765                     || self
766                         .inner
767                         .front
768                         .compare_exchange(
769                             f,
770                             f.wrapping_add(batch_size),
771                             Ordering::SeqCst,
772                             Ordering::Relaxed,
773                         )
774                         .is_err()
775                 {
776                     return Steal::Retry;
777                 }
778 
779                 dest_b = dest_b.wrapping_add(batch_size);
780             }
781 
782             // Steal a batch of tasks from the front one by one.
783             Flavor::Lifo => {
784                 // This loop may modify the batch_size, which triggers a clippy lint warning.
785                 // Use a new variable to avoid the warning, and to make it clear we aren't
786                 // modifying the loop exit condition during iteration.
787                 let original_batch_size = batch_size;
788 
789                 for i in 0..original_batch_size {
790                     // If this is not the first steal, check whether the queue is empty.
791                     if i > 0 {
792                         // We've already got the current front index. Now execute the fence to
793                         // synchronize with other threads.
794                         atomic::fence(Ordering::SeqCst);
795 
796                         // Load the back index.
797                         let b = self.inner.back.load(Ordering::Acquire);
798 
799                         // Is the queue empty?
800                         if b.wrapping_sub(f) <= 0 {
801                             batch_size = i;
802                             break;
803                         }
804                     }
805 
806                     // Read the task at the front.
807                     let task = unsafe { buffer.deref().read(f) };
808 
809                     // Try incrementing the front index to steal the task.
810                     // If the buffer has been swapped or the increment fails, we retry.
811                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
812                         || self
813                             .inner
814                             .front
815                             .compare_exchange(
816                                 f,
817                                 f.wrapping_add(1),
818                                 Ordering::SeqCst,
819                                 Ordering::Relaxed,
820                             )
821                             .is_err()
822                     {
823                         // We didn't steal this task, forget it and break from the loop.
824                         mem::forget(task);
825                         batch_size = i;
826                         break;
827                     }
828 
829                     // Write the stolen task into the destination buffer.
830                     unsafe {
831                         dest_buffer.write(dest_b, task);
832                     }
833 
834                     // Move the source front index and the destination back index one step forward.
835                     f = f.wrapping_add(1);
836                     dest_b = dest_b.wrapping_add(1);
837                 }
838 
839                 // If we didn't steal anything, the operation needs to be retried.
840                 if batch_size == 0 {
841                     return Steal::Retry;
842                 }
843 
844                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
845                 if dest.flavor == Flavor::Fifo {
846                     for i in 0..batch_size / 2 {
847                         unsafe {
848                             let i1 = dest_b.wrapping_sub(batch_size - i);
849                             let i2 = dest_b.wrapping_sub(i + 1);
850                             let t1 = dest_buffer.read(i1);
851                             let t2 = dest_buffer.read(i2);
852                             dest_buffer.write(i1, t2);
853                             dest_buffer.write(i2, t1);
854                         }
855                     }
856                 }
857             }
858         }
859 
860         atomic::fence(Ordering::Release);
861 
862         // Update the back index in the destination queue.
863         //
864         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
865         // races because it doesn't understand fences.
866         dest.inner.back.store(dest_b, Ordering::Release);
867 
868         // Return with success.
869         Steal::Success(())
870     }
871 
872     /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
873     ///
874     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
875     /// steal around half of the tasks in the queue, but also not more than some constant limit.
876     ///
877     /// # Examples
878     ///
879     /// ```
880     /// use crossbeam_deque::{Steal, Worker};
881     ///
882     /// let w1 = Worker::new_fifo();
883     /// w1.push(1);
884     /// w1.push(2);
885     /// w1.push(3);
886     /// w1.push(4);
887     ///
888     /// let s = w1.stealer();
889     /// let w2 = Worker::new_fifo();
890     ///
891     /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
892     /// assert_eq!(w2.pop(), Some(2));
893     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>894     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
895         if Arc::ptr_eq(&self.inner, &dest.inner) {
896             match dest.pop() {
897                 None => return Steal::Empty,
898                 Some(task) => return Steal::Success(task),
899             }
900         }
901 
902         // Load the front index.
903         let mut f = self.inner.front.load(Ordering::Acquire);
904 
905         // A SeqCst fence is needed here.
906         //
907         // If the current thread is already pinned (reentrantly), we must manually issue the
908         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
909         // have to.
910         if epoch::is_pinned() {
911             atomic::fence(Ordering::SeqCst);
912         }
913 
914         let guard = &epoch::pin();
915 
916         // Load the back index.
917         let b = self.inner.back.load(Ordering::Acquire);
918 
919         // Is the queue empty?
920         let len = b.wrapping_sub(f);
921         if len <= 0 {
922             return Steal::Empty;
923         }
924 
925         // Reserve capacity for the stolen batch.
926         let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
927         dest.reserve(batch_size);
928         let mut batch_size = batch_size as isize;
929 
930         // Get the destination buffer and back index.
931         let dest_buffer = dest.buffer.get();
932         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
933 
934         // Load the buffer
935         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
936 
937         // Read the task at the front.
938         let mut task = unsafe { buffer.deref().read(f) };
939 
940         match self.flavor {
941             // Steal a batch of tasks from the front at once.
942             Flavor::Fifo => {
943                 // Copy the batch from the source to the destination buffer.
944                 match dest.flavor {
945                     Flavor::Fifo => {
946                         for i in 0..batch_size {
947                             unsafe {
948                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
949                                 dest_buffer.write(dest_b.wrapping_add(i), task);
950                             }
951                         }
952                     }
953                     Flavor::Lifo => {
954                         for i in 0..batch_size {
955                             unsafe {
956                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
957                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
958                             }
959                         }
960                     }
961                 }
962 
963                 // Try incrementing the front index to steal the task.
964                 // If the buffer has been swapped or the increment fails, we retry.
965                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
966                     || self
967                         .inner
968                         .front
969                         .compare_exchange(
970                             f,
971                             f.wrapping_add(batch_size + 1),
972                             Ordering::SeqCst,
973                             Ordering::Relaxed,
974                         )
975                         .is_err()
976                 {
977                     // We didn't steal this task, forget it.
978                     mem::forget(task);
979                     return Steal::Retry;
980                 }
981 
982                 dest_b = dest_b.wrapping_add(batch_size);
983             }
984 
985             // Steal a batch of tasks from the front one by one.
986             Flavor::Lifo => {
987                 // Try incrementing the front index to steal the task.
988                 if self
989                     .inner
990                     .front
991                     .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
992                     .is_err()
993                 {
994                     // We didn't steal this task, forget it.
995                     mem::forget(task);
996                     return Steal::Retry;
997                 }
998 
999                 // Move the front index one step forward.
1000                 f = f.wrapping_add(1);
1001 
1002                 // Repeat the same procedure for the batch steals.
1003                 //
1004                 // This loop may modify the batch_size, which triggers a clippy lint warning.
1005                 // Use a new variable to avoid the warning, and to make it clear we aren't
1006                 // modifying the loop exit condition during iteration.
1007                 let original_batch_size = batch_size;
1008                 for i in 0..original_batch_size {
1009                     // We've already got the current front index. Now execute the fence to
1010                     // synchronize with other threads.
1011                     atomic::fence(Ordering::SeqCst);
1012 
1013                     // Load the back index.
1014                     let b = self.inner.back.load(Ordering::Acquire);
1015 
1016                     // Is the queue empty?
1017                     if b.wrapping_sub(f) <= 0 {
1018                         batch_size = i;
1019                         break;
1020                     }
1021 
1022                     // Read the task at the front.
1023                     let tmp = unsafe { buffer.deref().read(f) };
1024 
1025                     // Try incrementing the front index to steal the task.
1026                     // If the buffer has been swapped or the increment fails, we retry.
1027                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1028                         || self
1029                             .inner
1030                             .front
1031                             .compare_exchange(
1032                                 f,
1033                                 f.wrapping_add(1),
1034                                 Ordering::SeqCst,
1035                                 Ordering::Relaxed,
1036                             )
1037                             .is_err()
1038                     {
1039                         // We didn't steal this task, forget it and break from the loop.
1040                         mem::forget(tmp);
1041                         batch_size = i;
1042                         break;
1043                     }
1044 
1045                     // Write the previously stolen task into the destination buffer.
1046                     unsafe {
1047                         dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1048                     }
1049 
1050                     // Move the source front index and the destination back index one step forward.
1051                     f = f.wrapping_add(1);
1052                     dest_b = dest_b.wrapping_add(1);
1053                 }
1054 
1055                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1056                 if dest.flavor == Flavor::Fifo {
1057                     for i in 0..batch_size / 2 {
1058                         unsafe {
1059                             let i1 = dest_b.wrapping_sub(batch_size - i);
1060                             let i2 = dest_b.wrapping_sub(i + 1);
1061                             let t1 = dest_buffer.read(i1);
1062                             let t2 = dest_buffer.read(i2);
1063                             dest_buffer.write(i1, t2);
1064                             dest_buffer.write(i2, t1);
1065                         }
1066                     }
1067                 }
1068             }
1069         }
1070 
1071         atomic::fence(Ordering::Release);
1072 
1073         // Update the back index in the destination queue.
1074         //
1075         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1076         // races because it doesn't understand fences.
1077         dest.inner.back.store(dest_b, Ordering::Release);
1078 
1079         // Return with success.
1080         Steal::Success(task)
1081     }
1082 }
1083 
1084 impl<T> Clone for Stealer<T> {
clone(&self) -> Stealer<T>1085     fn clone(&self) -> Stealer<T> {
1086         Stealer {
1087             inner: self.inner.clone(),
1088             flavor: self.flavor,
1089         }
1090     }
1091 }
1092 
1093 impl<T> fmt::Debug for Stealer<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1094     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095         f.pad("Stealer { .. }")
1096     }
1097 }
1098 
1099 // Bits indicating the state of a slot:
1100 // * If a task has been written into the slot, `WRITE` is set.
1101 // * If a task has been read from the slot, `READ` is set.
1102 // * If the block is being destroyed, `DESTROY` is set.
1103 const WRITE: usize = 1;
1104 const READ: usize = 2;
1105 const DESTROY: usize = 4;
1106 
1107 // Each block covers one "lap" of indices.
1108 const LAP: usize = 64;
1109 // The maximum number of values a block can hold.
1110 const BLOCK_CAP: usize = LAP - 1;
1111 // How many lower bits are reserved for metadata.
1112 const SHIFT: usize = 1;
1113 // Indicates that the block is not the last one.
1114 const HAS_NEXT: usize = 1;
1115 
1116 /// A slot in a block.
1117 struct Slot<T> {
1118     /// The task.
1119     task: UnsafeCell<MaybeUninit<T>>,
1120 
1121     /// The state of the slot.
1122     state: AtomicUsize,
1123 }
1124 
1125 impl<T> Slot<T> {
1126     /// Waits until a task is written into the slot.
wait_write(&self)1127     fn wait_write(&self) {
1128         let backoff = Backoff::new();
1129         while self.state.load(Ordering::Acquire) & WRITE == 0 {
1130             backoff.snooze();
1131         }
1132     }
1133 }
1134 
1135 /// A block in a linked list.
1136 ///
1137 /// Each block in the list can hold up to `BLOCK_CAP` values.
1138 struct Block<T> {
1139     /// The next block in the linked list.
1140     next: AtomicPtr<Block<T>>,
1141 
1142     /// Slots for values.
1143     slots: [Slot<T>; BLOCK_CAP],
1144 }
1145 
1146 impl<T> Block<T> {
1147     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>1148     fn new() -> Block<T> {
1149         // SAFETY: This is safe because:
1150         //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1151         //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1152         //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1153         //       holds a MaybeUninit.
1154         //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1155         unsafe { MaybeUninit::zeroed().assume_init() }
1156     }
1157 
1158     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>1159     fn wait_next(&self) -> *mut Block<T> {
1160         let backoff = Backoff::new();
1161         loop {
1162             let next = self.next.load(Ordering::Acquire);
1163             if !next.is_null() {
1164                 return next;
1165             }
1166             backoff.snooze();
1167         }
1168     }
1169 
1170     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, count: usize)1171     unsafe fn destroy(this: *mut Block<T>, count: usize) {
1172         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1173         // begun destruction of the block.
1174         for i in (0..count).rev() {
1175             let slot = (*this).slots.get_unchecked(i);
1176 
1177             // Mark the `DESTROY` bit if a thread is still using the slot.
1178             if slot.state.load(Ordering::Acquire) & READ == 0
1179                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1180             {
1181                 // If a thread is still using the slot, it will continue destruction of the block.
1182                 return;
1183             }
1184         }
1185 
1186         // No thread is using the block, now it is safe to destroy it.
1187         drop(Box::from_raw(this));
1188     }
1189 }
1190 
1191 /// A position in a queue.
1192 struct Position<T> {
1193     /// The index in the queue.
1194     index: AtomicUsize,
1195 
1196     /// The block in the linked list.
1197     block: AtomicPtr<Block<T>>,
1198 }
1199 
1200 /// An injector queue.
1201 ///
1202 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1203 /// a single injector queue, which is the entry point for new tasks.
1204 ///
1205 /// # Examples
1206 ///
1207 /// ```
1208 /// use crossbeam_deque::{Injector, Steal};
1209 ///
1210 /// let q = Injector::new();
1211 /// q.push(1);
1212 /// q.push(2);
1213 ///
1214 /// assert_eq!(q.steal(), Steal::Success(1));
1215 /// assert_eq!(q.steal(), Steal::Success(2));
1216 /// assert_eq!(q.steal(), Steal::Empty);
1217 /// ```
1218 pub struct Injector<T> {
1219     /// The head of the queue.
1220     head: CachePadded<Position<T>>,
1221 
1222     /// The tail of the queue.
1223     tail: CachePadded<Position<T>>,
1224 
1225     /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1226     _marker: PhantomData<T>,
1227 }
1228 
1229 unsafe impl<T: Send> Send for Injector<T> {}
1230 unsafe impl<T: Send> Sync for Injector<T> {}
1231 
1232 impl<T> Default for Injector<T> {
default() -> Self1233     fn default() -> Self {
1234         let block = Box::into_raw(Box::new(Block::<T>::new()));
1235         Self {
1236             head: CachePadded::new(Position {
1237                 block: AtomicPtr::new(block),
1238                 index: AtomicUsize::new(0),
1239             }),
1240             tail: CachePadded::new(Position {
1241                 block: AtomicPtr::new(block),
1242                 index: AtomicUsize::new(0),
1243             }),
1244             _marker: PhantomData,
1245         }
1246     }
1247 }
1248 
1249 impl<T> Injector<T> {
1250     /// Creates a new injector queue.
1251     ///
1252     /// # Examples
1253     ///
1254     /// ```
1255     /// use crossbeam_deque::Injector;
1256     ///
1257     /// let q = Injector::<i32>::new();
1258     /// ```
new() -> Injector<T>1259     pub fn new() -> Injector<T> {
1260         Self::default()
1261     }
1262 
1263     /// Pushes a task into the queue.
1264     ///
1265     /// # Examples
1266     ///
1267     /// ```
1268     /// use crossbeam_deque::Injector;
1269     ///
1270     /// let w = Injector::new();
1271     /// w.push(1);
1272     /// w.push(2);
1273     /// ```
push(&self, task: T)1274     pub fn push(&self, task: T) {
1275         let backoff = Backoff::new();
1276         let mut tail = self.tail.index.load(Ordering::Acquire);
1277         let mut block = self.tail.block.load(Ordering::Acquire);
1278         let mut next_block = None;
1279 
1280         loop {
1281             // Calculate the offset of the index into the block.
1282             let offset = (tail >> SHIFT) % LAP;
1283 
1284             // If we reached the end of the block, wait until the next one is installed.
1285             if offset == BLOCK_CAP {
1286                 backoff.snooze();
1287                 tail = self.tail.index.load(Ordering::Acquire);
1288                 block = self.tail.block.load(Ordering::Acquire);
1289                 continue;
1290             }
1291 
1292             // If we're going to have to install the next block, allocate it in advance in order to
1293             // make the wait for other threads as short as possible.
1294             if offset + 1 == BLOCK_CAP && next_block.is_none() {
1295                 next_block = Some(Box::new(Block::<T>::new()));
1296             }
1297 
1298             let new_tail = tail + (1 << SHIFT);
1299 
1300             // Try advancing the tail forward.
1301             match self.tail.index.compare_exchange_weak(
1302                 tail,
1303                 new_tail,
1304                 Ordering::SeqCst,
1305                 Ordering::Acquire,
1306             ) {
1307                 Ok(_) => unsafe {
1308                     // If we've reached the end of the block, install the next one.
1309                     if offset + 1 == BLOCK_CAP {
1310                         let next_block = Box::into_raw(next_block.unwrap());
1311                         let next_index = new_tail.wrapping_add(1 << SHIFT);
1312 
1313                         self.tail.block.store(next_block, Ordering::Release);
1314                         self.tail.index.store(next_index, Ordering::Release);
1315                         (*block).next.store(next_block, Ordering::Release);
1316                     }
1317 
1318                     // Write the task into the slot.
1319                     let slot = (*block).slots.get_unchecked(offset);
1320                     slot.task.get().write(MaybeUninit::new(task));
1321                     slot.state.fetch_or(WRITE, Ordering::Release);
1322 
1323                     return;
1324                 },
1325                 Err(t) => {
1326                     tail = t;
1327                     block = self.tail.block.load(Ordering::Acquire);
1328                     backoff.spin();
1329                 }
1330             }
1331         }
1332     }
1333 
1334     /// Steals a task from the queue.
1335     ///
1336     /// # Examples
1337     ///
1338     /// ```
1339     /// use crossbeam_deque::{Injector, Steal};
1340     ///
1341     /// let q = Injector::new();
1342     /// q.push(1);
1343     /// q.push(2);
1344     ///
1345     /// assert_eq!(q.steal(), Steal::Success(1));
1346     /// assert_eq!(q.steal(), Steal::Success(2));
1347     /// assert_eq!(q.steal(), Steal::Empty);
1348     /// ```
steal(&self) -> Steal<T>1349     pub fn steal(&self) -> Steal<T> {
1350         let mut head;
1351         let mut block;
1352         let mut offset;
1353 
1354         let backoff = Backoff::new();
1355         loop {
1356             head = self.head.index.load(Ordering::Acquire);
1357             block = self.head.block.load(Ordering::Acquire);
1358 
1359             // Calculate the offset of the index into the block.
1360             offset = (head >> SHIFT) % LAP;
1361 
1362             // If we reached the end of the block, wait until the next one is installed.
1363             if offset == BLOCK_CAP {
1364                 backoff.snooze();
1365             } else {
1366                 break;
1367             }
1368         }
1369 
1370         let mut new_head = head + (1 << SHIFT);
1371 
1372         if new_head & HAS_NEXT == 0 {
1373             atomic::fence(Ordering::SeqCst);
1374             let tail = self.tail.index.load(Ordering::Relaxed);
1375 
1376             // If the tail equals the head, that means the queue is empty.
1377             if head >> SHIFT == tail >> SHIFT {
1378                 return Steal::Empty;
1379             }
1380 
1381             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1382             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1383                 new_head |= HAS_NEXT;
1384             }
1385         }
1386 
1387         // Try moving the head index forward.
1388         if self
1389             .head
1390             .index
1391             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1392             .is_err()
1393         {
1394             return Steal::Retry;
1395         }
1396 
1397         unsafe {
1398             // If we've reached the end of the block, move to the next one.
1399             if offset + 1 == BLOCK_CAP {
1400                 let next = (*block).wait_next();
1401                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1402                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1403                     next_index |= HAS_NEXT;
1404                 }
1405 
1406                 self.head.block.store(next, Ordering::Release);
1407                 self.head.index.store(next_index, Ordering::Release);
1408             }
1409 
1410             // Read the task.
1411             let slot = (*block).slots.get_unchecked(offset);
1412             slot.wait_write();
1413             let task = slot.task.get().read().assume_init();
1414 
1415             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1416             // but couldn't because we were busy reading from the slot.
1417             if (offset + 1 == BLOCK_CAP)
1418                 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1419             {
1420                 Block::destroy(block, offset);
1421             }
1422 
1423             Steal::Success(task)
1424         }
1425     }
1426 
1427     /// Steals a batch of tasks and pushes them into a worker.
1428     ///
1429     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1430     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1431     ///
1432     /// # Examples
1433     ///
1434     /// ```
1435     /// use crossbeam_deque::{Injector, Worker};
1436     ///
1437     /// let q = Injector::new();
1438     /// q.push(1);
1439     /// q.push(2);
1440     /// q.push(3);
1441     /// q.push(4);
1442     ///
1443     /// let w = Worker::new_fifo();
1444     /// let _ = q.steal_batch(&w);
1445     /// assert_eq!(w.pop(), Some(1));
1446     /// assert_eq!(w.pop(), Some(2));
1447     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>1448     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1449         let mut head;
1450         let mut block;
1451         let mut offset;
1452 
1453         let backoff = Backoff::new();
1454         loop {
1455             head = self.head.index.load(Ordering::Acquire);
1456             block = self.head.block.load(Ordering::Acquire);
1457 
1458             // Calculate the offset of the index into the block.
1459             offset = (head >> SHIFT) % LAP;
1460 
1461             // If we reached the end of the block, wait until the next one is installed.
1462             if offset == BLOCK_CAP {
1463                 backoff.snooze();
1464             } else {
1465                 break;
1466             }
1467         }
1468 
1469         let mut new_head = head;
1470         let advance;
1471 
1472         if new_head & HAS_NEXT == 0 {
1473             atomic::fence(Ordering::SeqCst);
1474             let tail = self.tail.index.load(Ordering::Relaxed);
1475 
1476             // If the tail equals the head, that means the queue is empty.
1477             if head >> SHIFT == tail >> SHIFT {
1478                 return Steal::Empty;
1479             }
1480 
1481             // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1482             // the right batch size to steal.
1483             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1484                 new_head |= HAS_NEXT;
1485                 // We can steal all tasks till the end of the block.
1486                 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1487             } else {
1488                 let len = (tail - head) >> SHIFT;
1489                 // Steal half of the available tasks.
1490                 advance = ((len + 1) / 2).min(MAX_BATCH);
1491             }
1492         } else {
1493             // We can steal all tasks till the end of the block.
1494             advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1495         }
1496 
1497         new_head += advance << SHIFT;
1498         let new_offset = offset + advance;
1499 
1500         // Try moving the head index forward.
1501         if self
1502             .head
1503             .index
1504             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1505             .is_err()
1506         {
1507             return Steal::Retry;
1508         }
1509 
1510         // Reserve capacity for the stolen batch.
1511         let batch_size = new_offset - offset;
1512         dest.reserve(batch_size);
1513 
1514         // Get the destination buffer and back index.
1515         let dest_buffer = dest.buffer.get();
1516         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1517 
1518         unsafe {
1519             // If we've reached the end of the block, move to the next one.
1520             if new_offset == BLOCK_CAP {
1521                 let next = (*block).wait_next();
1522                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1523                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1524                     next_index |= HAS_NEXT;
1525                 }
1526 
1527                 self.head.block.store(next, Ordering::Release);
1528                 self.head.index.store(next_index, Ordering::Release);
1529             }
1530 
1531             // Copy values from the injector into the destination queue.
1532             match dest.flavor {
1533                 Flavor::Fifo => {
1534                     for i in 0..batch_size {
1535                         // Read the task.
1536                         let slot = (*block).slots.get_unchecked(offset + i);
1537                         slot.wait_write();
1538                         let task = slot.task.get().read().assume_init();
1539 
1540                         // Write it into the destination queue.
1541                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1542                     }
1543                 }
1544 
1545                 Flavor::Lifo => {
1546                     for i in 0..batch_size {
1547                         // Read the task.
1548                         let slot = (*block).slots.get_unchecked(offset + i);
1549                         slot.wait_write();
1550                         let task = slot.task.get().read().assume_init();
1551 
1552                         // Write it into the destination queue.
1553                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1554                     }
1555                 }
1556             }
1557 
1558             atomic::fence(Ordering::Release);
1559 
1560             // Update the back index in the destination queue.
1561             //
1562             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1563             // data races because it doesn't understand fences.
1564             dest.inner
1565                 .back
1566                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1567 
1568             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1569             // but couldn't because we were busy reading from the slot.
1570             if new_offset == BLOCK_CAP {
1571                 Block::destroy(block, offset);
1572             } else {
1573                 for i in offset..new_offset {
1574                     let slot = (*block).slots.get_unchecked(i);
1575 
1576                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1577                         Block::destroy(block, offset);
1578                         break;
1579                     }
1580                 }
1581             }
1582 
1583             Steal::Success(())
1584         }
1585     }
1586 
1587     /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1588     ///
1589     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1590     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1591     ///
1592     /// # Examples
1593     ///
1594     /// ```
1595     /// use crossbeam_deque::{Injector, Steal, Worker};
1596     ///
1597     /// let q = Injector::new();
1598     /// q.push(1);
1599     /// q.push(2);
1600     /// q.push(3);
1601     /// q.push(4);
1602     ///
1603     /// let w = Worker::new_fifo();
1604     /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1605     /// assert_eq!(w.pop(), Some(2));
1606     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1607     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1608         let mut head;
1609         let mut block;
1610         let mut offset;
1611 
1612         let backoff = Backoff::new();
1613         loop {
1614             head = self.head.index.load(Ordering::Acquire);
1615             block = self.head.block.load(Ordering::Acquire);
1616 
1617             // Calculate the offset of the index into the block.
1618             offset = (head >> SHIFT) % LAP;
1619 
1620             // If we reached the end of the block, wait until the next one is installed.
1621             if offset == BLOCK_CAP {
1622                 backoff.snooze();
1623             } else {
1624                 break;
1625             }
1626         }
1627 
1628         let mut new_head = head;
1629         let advance;
1630 
1631         if new_head & HAS_NEXT == 0 {
1632             atomic::fence(Ordering::SeqCst);
1633             let tail = self.tail.index.load(Ordering::Relaxed);
1634 
1635             // If the tail equals the head, that means the queue is empty.
1636             if head >> SHIFT == tail >> SHIFT {
1637                 return Steal::Empty;
1638             }
1639 
1640             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1641             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1642                 new_head |= HAS_NEXT;
1643                 // We can steal all tasks till the end of the block.
1644                 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1645             } else {
1646                 let len = (tail - head) >> SHIFT;
1647                 // Steal half of the available tasks.
1648                 advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1649             }
1650         } else {
1651             // We can steal all tasks till the end of the block.
1652             advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1653         }
1654 
1655         new_head += advance << SHIFT;
1656         let new_offset = offset + advance;
1657 
1658         // Try moving the head index forward.
1659         if self
1660             .head
1661             .index
1662             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1663             .is_err()
1664         {
1665             return Steal::Retry;
1666         }
1667 
1668         // Reserve capacity for the stolen batch.
1669         let batch_size = new_offset - offset - 1;
1670         dest.reserve(batch_size);
1671 
1672         // Get the destination buffer and back index.
1673         let dest_buffer = dest.buffer.get();
1674         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1675 
1676         unsafe {
1677             // If we've reached the end of the block, move to the next one.
1678             if new_offset == BLOCK_CAP {
1679                 let next = (*block).wait_next();
1680                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1681                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1682                     next_index |= HAS_NEXT;
1683                 }
1684 
1685                 self.head.block.store(next, Ordering::Release);
1686                 self.head.index.store(next_index, Ordering::Release);
1687             }
1688 
1689             // Read the task.
1690             let slot = (*block).slots.get_unchecked(offset);
1691             slot.wait_write();
1692             let task = slot.task.get().read().assume_init();
1693 
1694             match dest.flavor {
1695                 Flavor::Fifo => {
1696                     // Copy values from the injector into the destination queue.
1697                     for i in 0..batch_size {
1698                         // Read the task.
1699                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1700                         slot.wait_write();
1701                         let task = slot.task.get().read().assume_init();
1702 
1703                         // Write it into the destination queue.
1704                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1705                     }
1706                 }
1707 
1708                 Flavor::Lifo => {
1709                     // Copy values from the injector into the destination queue.
1710                     for i in 0..batch_size {
1711                         // Read the task.
1712                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1713                         slot.wait_write();
1714                         let task = slot.task.get().read().assume_init();
1715 
1716                         // Write it into the destination queue.
1717                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1718                     }
1719                 }
1720             }
1721 
1722             atomic::fence(Ordering::Release);
1723 
1724             // Update the back index in the destination queue.
1725             //
1726             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1727             // data races because it doesn't understand fences.
1728             dest.inner
1729                 .back
1730                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1731 
1732             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1733             // but couldn't because we were busy reading from the slot.
1734             if new_offset == BLOCK_CAP {
1735                 Block::destroy(block, offset);
1736             } else {
1737                 for i in offset..new_offset {
1738                     let slot = (*block).slots.get_unchecked(i);
1739 
1740                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1741                         Block::destroy(block, offset);
1742                         break;
1743                     }
1744                 }
1745             }
1746 
1747             Steal::Success(task)
1748         }
1749     }
1750 
1751     /// Returns `true` if the queue is empty.
1752     ///
1753     /// # Examples
1754     ///
1755     /// ```
1756     /// use crossbeam_deque::Injector;
1757     ///
1758     /// let q = Injector::new();
1759     ///
1760     /// assert!(q.is_empty());
1761     /// q.push(1);
1762     /// assert!(!q.is_empty());
1763     /// ```
is_empty(&self) -> bool1764     pub fn is_empty(&self) -> bool {
1765         let head = self.head.index.load(Ordering::SeqCst);
1766         let tail = self.tail.index.load(Ordering::SeqCst);
1767         head >> SHIFT == tail >> SHIFT
1768     }
1769 
1770     /// Returns the number of tasks in the queue.
1771     ///
1772     /// # Examples
1773     ///
1774     /// ```
1775     /// use crossbeam_deque::Injector;
1776     ///
1777     /// let q = Injector::new();
1778     ///
1779     /// assert_eq!(q.len(), 0);
1780     /// q.push(1);
1781     /// assert_eq!(q.len(), 1);
1782     /// q.push(1);
1783     /// assert_eq!(q.len(), 2);
1784     /// ```
len(&self) -> usize1785     pub fn len(&self) -> usize {
1786         loop {
1787             // Load the tail index, then load the head index.
1788             let mut tail = self.tail.index.load(Ordering::SeqCst);
1789             let mut head = self.head.index.load(Ordering::SeqCst);
1790 
1791             // If the tail index didn't change, we've got consistent indices to work with.
1792             if self.tail.index.load(Ordering::SeqCst) == tail {
1793                 // Erase the lower bits.
1794                 tail &= !((1 << SHIFT) - 1);
1795                 head &= !((1 << SHIFT) - 1);
1796 
1797                 // Fix up indices if they fall onto block ends.
1798                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1799                     tail = tail.wrapping_add(1 << SHIFT);
1800                 }
1801                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1802                     head = head.wrapping_add(1 << SHIFT);
1803                 }
1804 
1805                 // Rotate indices so that head falls into the first block.
1806                 let lap = (head >> SHIFT) / LAP;
1807                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1808                 head = head.wrapping_sub((lap * LAP) << SHIFT);
1809 
1810                 // Remove the lower bits.
1811                 tail >>= SHIFT;
1812                 head >>= SHIFT;
1813 
1814                 // Return the difference minus the number of blocks between tail and head.
1815                 return tail - head - tail / LAP;
1816             }
1817         }
1818     }
1819 }
1820 
1821 impl<T> Drop for Injector<T> {
drop(&mut self)1822     fn drop(&mut self) {
1823         let mut head = self.head.index.load(Ordering::Relaxed);
1824         let mut tail = self.tail.index.load(Ordering::Relaxed);
1825         let mut block = self.head.block.load(Ordering::Relaxed);
1826 
1827         // Erase the lower bits.
1828         head &= !((1 << SHIFT) - 1);
1829         tail &= !((1 << SHIFT) - 1);
1830 
1831         unsafe {
1832             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1833             while head != tail {
1834                 let offset = (head >> SHIFT) % LAP;
1835 
1836                 if offset < BLOCK_CAP {
1837                     // Drop the task in the slot.
1838                     let slot = (*block).slots.get_unchecked(offset);
1839                     let p = &mut *slot.task.get();
1840                     p.as_mut_ptr().drop_in_place();
1841                 } else {
1842                     // Deallocate the block and move to the next one.
1843                     let next = (*block).next.load(Ordering::Relaxed);
1844                     drop(Box::from_raw(block));
1845                     block = next;
1846                 }
1847 
1848                 head = head.wrapping_add(1 << SHIFT);
1849             }
1850 
1851             // Deallocate the last remaining block.
1852             drop(Box::from_raw(block));
1853         }
1854     }
1855 }
1856 
1857 impl<T> fmt::Debug for Injector<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1858     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1859         f.pad("Worker { .. }")
1860     }
1861 }
1862 
1863 /// Possible outcomes of a steal operation.
1864 ///
1865 /// # Examples
1866 ///
1867 /// There are lots of ways to chain results of steal operations together:
1868 ///
1869 /// ```
1870 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
1871 ///
1872 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
1873 ///
1874 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
1875 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
1876 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
1877 ///
1878 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
1879 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
1880 /// ```
1881 #[must_use]
1882 #[derive(PartialEq, Eq, Copy, Clone)]
1883 pub enum Steal<T> {
1884     /// The queue was empty at the time of stealing.
1885     Empty,
1886 
1887     /// At least one task was successfully stolen.
1888     Success(T),
1889 
1890     /// The steal operation needs to be retried.
1891     Retry,
1892 }
1893 
1894 impl<T> Steal<T> {
1895     /// Returns `true` if the queue was empty at the time of stealing.
1896     ///
1897     /// # Examples
1898     ///
1899     /// ```
1900     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1901     ///
1902     /// assert!(!Success(7).is_empty());
1903     /// assert!(!Retry::<i32>.is_empty());
1904     ///
1905     /// assert!(Empty::<i32>.is_empty());
1906     /// ```
is_empty(&self) -> bool1907     pub fn is_empty(&self) -> bool {
1908         match self {
1909             Steal::Empty => true,
1910             _ => false,
1911         }
1912     }
1913 
1914     /// Returns `true` if at least one task was stolen.
1915     ///
1916     /// # Examples
1917     ///
1918     /// ```
1919     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1920     ///
1921     /// assert!(!Empty::<i32>.is_success());
1922     /// assert!(!Retry::<i32>.is_success());
1923     ///
1924     /// assert!(Success(7).is_success());
1925     /// ```
is_success(&self) -> bool1926     pub fn is_success(&self) -> bool {
1927         match self {
1928             Steal::Success(_) => true,
1929             _ => false,
1930         }
1931     }
1932 
1933     /// Returns `true` if the steal operation needs to be retried.
1934     ///
1935     /// # Examples
1936     ///
1937     /// ```
1938     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1939     ///
1940     /// assert!(!Empty::<i32>.is_retry());
1941     /// assert!(!Success(7).is_retry());
1942     ///
1943     /// assert!(Retry::<i32>.is_retry());
1944     /// ```
is_retry(&self) -> bool1945     pub fn is_retry(&self) -> bool {
1946         match self {
1947             Steal::Retry => true,
1948             _ => false,
1949         }
1950     }
1951 
1952     /// Returns the result of the operation, if successful.
1953     ///
1954     /// # Examples
1955     ///
1956     /// ```
1957     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1958     ///
1959     /// assert_eq!(Empty::<i32>.success(), None);
1960     /// assert_eq!(Retry::<i32>.success(), None);
1961     ///
1962     /// assert_eq!(Success(7).success(), Some(7));
1963     /// ```
success(self) -> Option<T>1964     pub fn success(self) -> Option<T> {
1965         match self {
1966             Steal::Success(res) => Some(res),
1967             _ => None,
1968         }
1969     }
1970 
1971     /// If no task was stolen, attempts another steal operation.
1972     ///
1973     /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1974     ///
1975     /// * If the second steal resulted in `Success`, it is returned.
1976     /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1977     /// * If both resulted in `None`, then `None` is returned.
1978     ///
1979     /// # Examples
1980     ///
1981     /// ```
1982     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1983     ///
1984     /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
1985     /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
1986     ///
1987     /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
1988     /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
1989     ///
1990     /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
1991     /// ```
or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,1992     pub fn or_else<F>(self, f: F) -> Steal<T>
1993     where
1994         F: FnOnce() -> Steal<T>,
1995     {
1996         match self {
1997             Steal::Empty => f(),
1998             Steal::Success(_) => self,
1999             Steal::Retry => {
2000                 if let Steal::Success(res) = f() {
2001                     Steal::Success(res)
2002                 } else {
2003                     Steal::Retry
2004                 }
2005             }
2006         }
2007     }
2008 }
2009 
2010 impl<T> fmt::Debug for Steal<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2011     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2012         match self {
2013             Steal::Empty => f.pad("Empty"),
2014             Steal::Success(_) => f.pad("Success(..)"),
2015             Steal::Retry => f.pad("Retry"),
2016         }
2017     }
2018 }
2019 
2020 impl<T> FromIterator<Steal<T>> for Steal<T> {
2021     /// Consumes items until a `Success` is found and returns it.
2022     ///
2023     /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2024     /// Otherwise, `Empty` is returned.
from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2025     fn from_iter<I>(iter: I) -> Steal<T>
2026     where
2027         I: IntoIterator<Item = Steal<T>>,
2028     {
2029         let mut retry = false;
2030         for s in iter {
2031             match &s {
2032                 Steal::Empty => {}
2033                 Steal::Success(_) => return s,
2034                 Steal::Retry => retry = true,
2035             }
2036         }
2037 
2038         if retry {
2039             Steal::Retry
2040         } else {
2041             Steal::Empty
2042         }
2043     }
2044 }
2045