1 use std::alloc::{alloc_zeroed, handle_alloc_error, Layout}; 2 use std::boxed::Box; 3 use std::cell::{Cell, UnsafeCell}; 4 use std::cmp; 5 use std::fmt; 6 use std::marker::PhantomData; 7 use std::mem::{self, MaybeUninit}; 8 use std::ptr; 9 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; 10 use std::sync::Arc; 11 12 use crossbeam_epoch::{self as epoch, Atomic, Owned}; 13 use crossbeam_utils::{Backoff, CachePadded}; 14 15 // Minimum buffer capacity. 16 const MIN_CAP: usize = 64; 17 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. 18 const MAX_BATCH: usize = 32; 19 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets 20 // deallocated as soon as possible. 21 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; 22 23 /// A buffer that holds tasks in a worker queue. 24 /// 25 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will 26 /// *not* deallocate the buffer. 27 struct Buffer<T> { 28 /// Pointer to the allocated memory. 29 ptr: *mut T, 30 31 /// Capacity of the buffer. Always a power of two. 32 cap: usize, 33 } 34 35 unsafe impl<T> Send for Buffer<T> {} 36 37 impl<T> Buffer<T> { 38 /// Allocates a new buffer with the specified capacity. alloc(cap: usize) -> Buffer<T>39 fn alloc(cap: usize) -> Buffer<T> { 40 debug_assert_eq!(cap, cap.next_power_of_two()); 41 42 let ptr = Box::into_raw( 43 (0..cap) 44 .map(|_| MaybeUninit::<T>::uninit()) 45 .collect::<Box<[_]>>(), 46 ) 47 .cast::<T>(); 48 49 Buffer { ptr, cap } 50 } 51 52 /// Deallocates the buffer. dealloc(self)53 unsafe fn dealloc(self) { 54 drop(Box::from_raw(ptr::slice_from_raw_parts_mut( 55 self.ptr.cast::<MaybeUninit<T>>(), 56 self.cap, 57 ))); 58 } 59 60 /// Returns a pointer to the task at the specified `index`. at(&self, index: isize) -> *mut T61 unsafe fn at(&self, index: isize) -> *mut T { 62 // `self.cap` is always a power of two. 63 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we 64 // don't actually have the right to access this memory. 65 self.ptr.offset(index & (self.cap - 1) as isize) 66 } 67 68 /// Writes `task` into the specified `index`. 69 /// 70 /// This method might be concurrently called with another `read` at the same index, which is 71 /// technically speaking a data race and therefore UB. We should use an atomic store here, but 72 /// that would be more expensive and difficult to implement generically for all types `T`. 73 /// Hence, as a hack, we use a volatile write instead. write(&self, index: isize, task: MaybeUninit<T>)74 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) { 75 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) 76 } 77 78 /// Reads a task from the specified `index`. 79 /// 80 /// This method might be concurrently called with another `write` at the same index, which is 81 /// technically speaking a data race and therefore UB. We should use an atomic load here, but 82 /// that would be more expensive and difficult to implement generically for all types `T`. 83 /// Hence, as a hack, we use a volatile load instead. read(&self, index: isize) -> MaybeUninit<T>84 unsafe fn read(&self, index: isize) -> MaybeUninit<T> { 85 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) 86 } 87 } 88 89 impl<T> Clone for Buffer<T> { clone(&self) -> Buffer<T>90 fn clone(&self) -> Buffer<T> { 91 *self 92 } 93 } 94 95 impl<T> Copy for Buffer<T> {} 96 97 /// Internal queue data shared between the worker and stealers. 98 /// 99 /// The implementation is based on the following work: 100 /// 101 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] 102 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. 103 /// PPoPP 2013.][weak-mem] 104 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ 105 /// atomics. OOPSLA 2013.][checker] 106 /// 107 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 108 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 109 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 110 struct Inner<T> { 111 /// The front index. 112 front: AtomicIsize, 113 114 /// The back index. 115 back: AtomicIsize, 116 117 /// The underlying buffer. 118 buffer: CachePadded<Atomic<Buffer<T>>>, 119 } 120 121 impl<T> Drop for Inner<T> { drop(&mut self)122 fn drop(&mut self) { 123 // Load the back index, front index, and buffer. 124 let b = *self.back.get_mut(); 125 let f = *self.front.get_mut(); 126 127 unsafe { 128 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); 129 130 // Go through the buffer from front to back and drop all tasks in the queue. 131 let mut i = f; 132 while i != b { 133 buffer.deref().at(i).drop_in_place(); 134 i = i.wrapping_add(1); 135 } 136 137 // Free the memory allocated by the buffer. 138 buffer.into_owned().into_box().dealloc(); 139 } 140 } 141 } 142 143 /// Worker queue flavor: FIFO or LIFO. 144 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 145 enum Flavor { 146 /// The first-in first-out flavor. 147 Fifo, 148 149 /// The last-in first-out flavor. 150 Lifo, 151 } 152 153 /// A worker queue. 154 /// 155 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal 156 /// tasks from it. Task schedulers typically create a single worker queue per thread. 157 /// 158 /// # Examples 159 /// 160 /// A FIFO worker: 161 /// 162 /// ``` 163 /// use crossbeam_deque::{Steal, Worker}; 164 /// 165 /// let w = Worker::new_fifo(); 166 /// let s = w.stealer(); 167 /// 168 /// w.push(1); 169 /// w.push(2); 170 /// w.push(3); 171 /// 172 /// assert_eq!(s.steal(), Steal::Success(1)); 173 /// assert_eq!(w.pop(), Some(2)); 174 /// assert_eq!(w.pop(), Some(3)); 175 /// ``` 176 /// 177 /// A LIFO worker: 178 /// 179 /// ``` 180 /// use crossbeam_deque::{Steal, Worker}; 181 /// 182 /// let w = Worker::new_lifo(); 183 /// let s = w.stealer(); 184 /// 185 /// w.push(1); 186 /// w.push(2); 187 /// w.push(3); 188 /// 189 /// assert_eq!(s.steal(), Steal::Success(1)); 190 /// assert_eq!(w.pop(), Some(3)); 191 /// assert_eq!(w.pop(), Some(2)); 192 /// ``` 193 pub struct Worker<T> { 194 /// A reference to the inner representation of the queue. 195 inner: Arc<CachePadded<Inner<T>>>, 196 197 /// A copy of `inner.buffer` for quick access. 198 buffer: Cell<Buffer<T>>, 199 200 /// The flavor of the queue. 201 flavor: Flavor, 202 203 /// Indicates that the worker cannot be shared among threads. 204 _marker: PhantomData<*mut ()>, // !Send + !Sync 205 } 206 207 unsafe impl<T: Send> Send for Worker<T> {} 208 209 impl<T> Worker<T> { 210 /// Creates a FIFO worker queue. 211 /// 212 /// Tasks are pushed and popped from opposite ends. 213 /// 214 /// # Examples 215 /// 216 /// ``` 217 /// use crossbeam_deque::Worker; 218 /// 219 /// let w = Worker::<i32>::new_fifo(); 220 /// ``` new_fifo() -> Worker<T>221 pub fn new_fifo() -> Worker<T> { 222 let buffer = Buffer::alloc(MIN_CAP); 223 224 let inner = Arc::new(CachePadded::new(Inner { 225 front: AtomicIsize::new(0), 226 back: AtomicIsize::new(0), 227 buffer: CachePadded::new(Atomic::new(buffer)), 228 })); 229 230 Worker { 231 inner, 232 buffer: Cell::new(buffer), 233 flavor: Flavor::Fifo, 234 _marker: PhantomData, 235 } 236 } 237 238 /// Creates a LIFO worker queue. 239 /// 240 /// Tasks are pushed and popped from the same end. 241 /// 242 /// # Examples 243 /// 244 /// ``` 245 /// use crossbeam_deque::Worker; 246 /// 247 /// let w = Worker::<i32>::new_lifo(); 248 /// ``` new_lifo() -> Worker<T>249 pub fn new_lifo() -> Worker<T> { 250 let buffer = Buffer::alloc(MIN_CAP); 251 252 let inner = Arc::new(CachePadded::new(Inner { 253 front: AtomicIsize::new(0), 254 back: AtomicIsize::new(0), 255 buffer: CachePadded::new(Atomic::new(buffer)), 256 })); 257 258 Worker { 259 inner, 260 buffer: Cell::new(buffer), 261 flavor: Flavor::Lifo, 262 _marker: PhantomData, 263 } 264 } 265 266 /// Creates a stealer for this queue. 267 /// 268 /// The returned stealer can be shared among threads and cloned. 269 /// 270 /// # Examples 271 /// 272 /// ``` 273 /// use crossbeam_deque::Worker; 274 /// 275 /// let w = Worker::<i32>::new_lifo(); 276 /// let s = w.stealer(); 277 /// ``` stealer(&self) -> Stealer<T>278 pub fn stealer(&self) -> Stealer<T> { 279 Stealer { 280 inner: self.inner.clone(), 281 flavor: self.flavor, 282 } 283 } 284 285 /// Resizes the internal buffer to the new capacity of `new_cap`. 286 #[cold] resize(&self, new_cap: usize)287 unsafe fn resize(&self, new_cap: usize) { 288 // Load the back index, front index, and buffer. 289 let b = self.inner.back.load(Ordering::Relaxed); 290 let f = self.inner.front.load(Ordering::Relaxed); 291 let buffer = self.buffer.get(); 292 293 // Allocate a new buffer and copy data from the old buffer to the new one. 294 let new = Buffer::alloc(new_cap); 295 let mut i = f; 296 while i != b { 297 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); 298 i = i.wrapping_add(1); 299 } 300 301 let guard = &epoch::pin(); 302 303 // Replace the old buffer with the new one. 304 self.buffer.replace(new); 305 let old = 306 self.inner 307 .buffer 308 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); 309 310 // Destroy the old buffer later. 311 guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); 312 313 // If the buffer is very large, then flush the thread-local garbage in order to deallocate 314 // it as soon as possible. 315 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { 316 guard.flush(); 317 } 318 } 319 320 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the 321 /// buffer. reserve(&self, reserve_cap: usize)322 fn reserve(&self, reserve_cap: usize) { 323 if reserve_cap > 0 { 324 // Compute the current length. 325 let b = self.inner.back.load(Ordering::Relaxed); 326 let f = self.inner.front.load(Ordering::SeqCst); 327 let len = b.wrapping_sub(f) as usize; 328 329 // The current capacity. 330 let cap = self.buffer.get().cap; 331 332 // Is there enough capacity to push `reserve_cap` tasks? 333 if cap - len < reserve_cap { 334 // Keep doubling the capacity as much as is needed. 335 let mut new_cap = cap * 2; 336 while new_cap - len < reserve_cap { 337 new_cap *= 2; 338 } 339 340 // Resize the buffer. 341 unsafe { 342 self.resize(new_cap); 343 } 344 } 345 } 346 } 347 348 /// Returns `true` if the queue is empty. 349 /// 350 /// ``` 351 /// use crossbeam_deque::Worker; 352 /// 353 /// let w = Worker::new_lifo(); 354 /// 355 /// assert!(w.is_empty()); 356 /// w.push(1); 357 /// assert!(!w.is_empty()); 358 /// ``` is_empty(&self) -> bool359 pub fn is_empty(&self) -> bool { 360 let b = self.inner.back.load(Ordering::Relaxed); 361 let f = self.inner.front.load(Ordering::SeqCst); 362 b.wrapping_sub(f) <= 0 363 } 364 365 /// Returns the number of tasks in the deque. 366 /// 367 /// ``` 368 /// use crossbeam_deque::Worker; 369 /// 370 /// let w = Worker::new_lifo(); 371 /// 372 /// assert_eq!(w.len(), 0); 373 /// w.push(1); 374 /// assert_eq!(w.len(), 1); 375 /// w.push(1); 376 /// assert_eq!(w.len(), 2); 377 /// ``` len(&self) -> usize378 pub fn len(&self) -> usize { 379 let b = self.inner.back.load(Ordering::Relaxed); 380 let f = self.inner.front.load(Ordering::SeqCst); 381 b.wrapping_sub(f).max(0) as usize 382 } 383 384 /// Pushes a task into the queue. 385 /// 386 /// # Examples 387 /// 388 /// ``` 389 /// use crossbeam_deque::Worker; 390 /// 391 /// let w = Worker::new_lifo(); 392 /// w.push(1); 393 /// w.push(2); 394 /// ``` push(&self, task: T)395 pub fn push(&self, task: T) { 396 // Load the back index, front index, and buffer. 397 let b = self.inner.back.load(Ordering::Relaxed); 398 let f = self.inner.front.load(Ordering::Acquire); 399 let mut buffer = self.buffer.get(); 400 401 // Calculate the length of the queue. 402 let len = b.wrapping_sub(f); 403 404 // Is the queue full? 405 if len >= buffer.cap as isize { 406 // Yes. Grow the underlying buffer. 407 unsafe { 408 self.resize(2 * buffer.cap); 409 } 410 buffer = self.buffer.get(); 411 } 412 413 // Write `task` into the slot. 414 unsafe { 415 buffer.write(b, MaybeUninit::new(task)); 416 } 417 418 atomic::fence(Ordering::Release); 419 420 // Increment the back index. 421 // 422 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 423 // races because it doesn't understand fences. 424 self.inner.back.store(b.wrapping_add(1), Ordering::Release); 425 } 426 427 /// Pops a task from the queue. 428 /// 429 /// # Examples 430 /// 431 /// ``` 432 /// use crossbeam_deque::Worker; 433 /// 434 /// let w = Worker::new_fifo(); 435 /// w.push(1); 436 /// w.push(2); 437 /// 438 /// assert_eq!(w.pop(), Some(1)); 439 /// assert_eq!(w.pop(), Some(2)); 440 /// assert_eq!(w.pop(), None); 441 /// ``` pop(&self) -> Option<T>442 pub fn pop(&self) -> Option<T> { 443 // Load the back and front index. 444 let b = self.inner.back.load(Ordering::Relaxed); 445 let f = self.inner.front.load(Ordering::Relaxed); 446 447 // Calculate the length of the queue. 448 let len = b.wrapping_sub(f); 449 450 // Is the queue empty? 451 if len <= 0 { 452 return None; 453 } 454 455 match self.flavor { 456 // Pop from the front of the queue. 457 Flavor::Fifo => { 458 // Try incrementing the front index to pop the task. 459 let f = self.inner.front.fetch_add(1, Ordering::SeqCst); 460 let new_f = f.wrapping_add(1); 461 462 if b.wrapping_sub(new_f) < 0 { 463 self.inner.front.store(f, Ordering::Relaxed); 464 return None; 465 } 466 467 unsafe { 468 // Read the popped task. 469 let buffer = self.buffer.get(); 470 let task = buffer.read(f).assume_init(); 471 472 // Shrink the buffer if `len - 1` is less than one fourth of the capacity. 473 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { 474 self.resize(buffer.cap / 2); 475 } 476 477 Some(task) 478 } 479 } 480 481 // Pop from the back of the queue. 482 Flavor::Lifo => { 483 // Decrement the back index. 484 let b = b.wrapping_sub(1); 485 self.inner.back.store(b, Ordering::Relaxed); 486 487 atomic::fence(Ordering::SeqCst); 488 489 // Load the front index. 490 let f = self.inner.front.load(Ordering::Relaxed); 491 492 // Compute the length after the back index was decremented. 493 let len = b.wrapping_sub(f); 494 495 if len < 0 { 496 // The queue is empty. Restore the back index to the original task. 497 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 498 None 499 } else { 500 // Read the task to be popped. 501 let buffer = self.buffer.get(); 502 let mut task = unsafe { Some(buffer.read(b)) }; 503 504 // Are we popping the last task from the queue? 505 if len == 0 { 506 // Try incrementing the front index. 507 if self 508 .inner 509 .front 510 .compare_exchange( 511 f, 512 f.wrapping_add(1), 513 Ordering::SeqCst, 514 Ordering::Relaxed, 515 ) 516 .is_err() 517 { 518 // Failed. We didn't pop anything. Reset to `None`. 519 task.take(); 520 } 521 522 // Restore the back index to the original task. 523 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 524 } else { 525 // Shrink the buffer if `len` is less than one fourth of the capacity. 526 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { 527 unsafe { 528 self.resize(buffer.cap / 2); 529 } 530 } 531 } 532 533 task.map(|t| unsafe { t.assume_init() }) 534 } 535 } 536 } 537 } 538 } 539 540 impl<T> fmt::Debug for Worker<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 542 f.pad("Worker { .. }") 543 } 544 } 545 546 /// A stealer handle of a worker queue. 547 /// 548 /// Stealers can be shared among threads. 549 /// 550 /// Task schedulers typically have a single worker queue per worker thread. 551 /// 552 /// # Examples 553 /// 554 /// ``` 555 /// use crossbeam_deque::{Steal, Worker}; 556 /// 557 /// let w = Worker::new_lifo(); 558 /// w.push(1); 559 /// w.push(2); 560 /// 561 /// let s = w.stealer(); 562 /// assert_eq!(s.steal(), Steal::Success(1)); 563 /// assert_eq!(s.steal(), Steal::Success(2)); 564 /// assert_eq!(s.steal(), Steal::Empty); 565 /// ``` 566 pub struct Stealer<T> { 567 /// A reference to the inner representation of the queue. 568 inner: Arc<CachePadded<Inner<T>>>, 569 570 /// The flavor of the queue. 571 flavor: Flavor, 572 } 573 574 unsafe impl<T: Send> Send for Stealer<T> {} 575 unsafe impl<T: Send> Sync for Stealer<T> {} 576 577 impl<T> Stealer<T> { 578 /// Returns `true` if the queue is empty. 579 /// 580 /// ``` 581 /// use crossbeam_deque::Worker; 582 /// 583 /// let w = Worker::new_lifo(); 584 /// let s = w.stealer(); 585 /// 586 /// assert!(s.is_empty()); 587 /// w.push(1); 588 /// assert!(!s.is_empty()); 589 /// ``` is_empty(&self) -> bool590 pub fn is_empty(&self) -> bool { 591 let f = self.inner.front.load(Ordering::Acquire); 592 atomic::fence(Ordering::SeqCst); 593 let b = self.inner.back.load(Ordering::Acquire); 594 b.wrapping_sub(f) <= 0 595 } 596 597 /// Returns the number of tasks in the deque. 598 /// 599 /// ``` 600 /// use crossbeam_deque::Worker; 601 /// 602 /// let w = Worker::new_lifo(); 603 /// let s = w.stealer(); 604 /// 605 /// assert_eq!(s.len(), 0); 606 /// w.push(1); 607 /// assert_eq!(s.len(), 1); 608 /// w.push(2); 609 /// assert_eq!(s.len(), 2); 610 /// ``` len(&self) -> usize611 pub fn len(&self) -> usize { 612 let f = self.inner.front.load(Ordering::Acquire); 613 atomic::fence(Ordering::SeqCst); 614 let b = self.inner.back.load(Ordering::Acquire); 615 b.wrapping_sub(f).max(0) as usize 616 } 617 618 /// Steals a task from the queue. 619 /// 620 /// # Examples 621 /// 622 /// ``` 623 /// use crossbeam_deque::{Steal, Worker}; 624 /// 625 /// let w = Worker::new_lifo(); 626 /// w.push(1); 627 /// w.push(2); 628 /// 629 /// let s = w.stealer(); 630 /// assert_eq!(s.steal(), Steal::Success(1)); 631 /// assert_eq!(s.steal(), Steal::Success(2)); 632 /// ``` steal(&self) -> Steal<T>633 pub fn steal(&self) -> Steal<T> { 634 // Load the front index. 635 let f = self.inner.front.load(Ordering::Acquire); 636 637 // A SeqCst fence is needed here. 638 // 639 // If the current thread is already pinned (reentrantly), we must manually issue the 640 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 641 // have to. 642 if epoch::is_pinned() { 643 atomic::fence(Ordering::SeqCst); 644 } 645 646 let guard = &epoch::pin(); 647 648 // Load the back index. 649 let b = self.inner.back.load(Ordering::Acquire); 650 651 // Is the queue empty? 652 if b.wrapping_sub(f) <= 0 { 653 return Steal::Empty; 654 } 655 656 // Load the buffer and read the task at the front. 657 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 658 let task = unsafe { buffer.deref().read(f) }; 659 660 // Try incrementing the front index to steal the task. 661 // If the buffer has been swapped or the increment fails, we retry. 662 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 663 || self 664 .inner 665 .front 666 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 667 .is_err() 668 { 669 // We didn't steal this task, forget it. 670 return Steal::Retry; 671 } 672 673 // Return the stolen task. 674 Steal::Success(unsafe { task.assume_init() }) 675 } 676 677 /// Steals a batch of tasks and pushes them into another worker. 678 /// 679 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 680 /// steal around half of the tasks in the queue, but also not more than some constant limit. 681 /// 682 /// # Examples 683 /// 684 /// ``` 685 /// use crossbeam_deque::Worker; 686 /// 687 /// let w1 = Worker::new_fifo(); 688 /// w1.push(1); 689 /// w1.push(2); 690 /// w1.push(3); 691 /// w1.push(4); 692 /// 693 /// let s = w1.stealer(); 694 /// let w2 = Worker::new_fifo(); 695 /// 696 /// let _ = s.steal_batch(&w2); 697 /// assert_eq!(w2.pop(), Some(1)); 698 /// assert_eq!(w2.pop(), Some(2)); 699 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>700 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 701 self.steal_batch_with_limit(dest, MAX_BATCH) 702 } 703 704 /// Steals no more than `limit` of tasks and pushes them into another worker. 705 /// 706 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 707 /// steal around half of the tasks in the queue, but also not more than the given limit. 708 /// 709 /// # Examples 710 /// 711 /// ``` 712 /// use crossbeam_deque::Worker; 713 /// 714 /// let w1 = Worker::new_fifo(); 715 /// w1.push(1); 716 /// w1.push(2); 717 /// w1.push(3); 718 /// w1.push(4); 719 /// w1.push(5); 720 /// w1.push(6); 721 /// 722 /// let s = w1.stealer(); 723 /// let w2 = Worker::new_fifo(); 724 /// 725 /// let _ = s.steal_batch_with_limit(&w2, 2); 726 /// assert_eq!(w2.pop(), Some(1)); 727 /// assert_eq!(w2.pop(), Some(2)); 728 /// assert_eq!(w2.pop(), None); 729 /// 730 /// w1.push(7); 731 /// w1.push(8); 732 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 733 /// // half of the elements are currently popped, but the number of popped elements is considered 734 /// // an implementation detail that may be changed in the future. 735 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); 736 /// assert_eq!(w2.len(), 3); 737 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>738 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 739 assert!(limit > 0); 740 if Arc::ptr_eq(&self.inner, &dest.inner) { 741 if dest.is_empty() { 742 return Steal::Empty; 743 } else { 744 return Steal::Success(()); 745 } 746 } 747 748 // Load the front index. 749 let mut f = self.inner.front.load(Ordering::Acquire); 750 751 // A SeqCst fence is needed here. 752 // 753 // If the current thread is already pinned (reentrantly), we must manually issue the 754 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 755 // have to. 756 if epoch::is_pinned() { 757 atomic::fence(Ordering::SeqCst); 758 } 759 760 let guard = &epoch::pin(); 761 762 // Load the back index. 763 let b = self.inner.back.load(Ordering::Acquire); 764 765 // Is the queue empty? 766 let len = b.wrapping_sub(f); 767 if len <= 0 { 768 return Steal::Empty; 769 } 770 771 // Reserve capacity for the stolen batch. 772 let batch_size = cmp::min((len as usize + 1) / 2, limit); 773 dest.reserve(batch_size); 774 let mut batch_size = batch_size as isize; 775 776 // Get the destination buffer and back index. 777 let dest_buffer = dest.buffer.get(); 778 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 779 780 // Load the buffer. 781 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 782 783 match self.flavor { 784 // Steal a batch of tasks from the front at once. 785 Flavor::Fifo => { 786 // Copy the batch from the source to the destination buffer. 787 match dest.flavor { 788 Flavor::Fifo => { 789 for i in 0..batch_size { 790 unsafe { 791 let task = buffer.deref().read(f.wrapping_add(i)); 792 dest_buffer.write(dest_b.wrapping_add(i), task); 793 } 794 } 795 } 796 Flavor::Lifo => { 797 for i in 0..batch_size { 798 unsafe { 799 let task = buffer.deref().read(f.wrapping_add(i)); 800 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 801 } 802 } 803 } 804 } 805 806 // Try incrementing the front index to steal the batch. 807 // If the buffer has been swapped or the increment fails, we retry. 808 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 809 || self 810 .inner 811 .front 812 .compare_exchange( 813 f, 814 f.wrapping_add(batch_size), 815 Ordering::SeqCst, 816 Ordering::Relaxed, 817 ) 818 .is_err() 819 { 820 return Steal::Retry; 821 } 822 823 dest_b = dest_b.wrapping_add(batch_size); 824 } 825 826 // Steal a batch of tasks from the front one by one. 827 Flavor::Lifo => { 828 // This loop may modify the batch_size, which triggers a clippy lint warning. 829 // Use a new variable to avoid the warning, and to make it clear we aren't 830 // modifying the loop exit condition during iteration. 831 let original_batch_size = batch_size; 832 833 for i in 0..original_batch_size { 834 // If this is not the first steal, check whether the queue is empty. 835 if i > 0 { 836 // We've already got the current front index. Now execute the fence to 837 // synchronize with other threads. 838 atomic::fence(Ordering::SeqCst); 839 840 // Load the back index. 841 let b = self.inner.back.load(Ordering::Acquire); 842 843 // Is the queue empty? 844 if b.wrapping_sub(f) <= 0 { 845 batch_size = i; 846 break; 847 } 848 } 849 850 // Read the task at the front. 851 let task = unsafe { buffer.deref().read(f) }; 852 853 // Try incrementing the front index to steal the task. 854 // If the buffer has been swapped or the increment fails, we retry. 855 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 856 || self 857 .inner 858 .front 859 .compare_exchange( 860 f, 861 f.wrapping_add(1), 862 Ordering::SeqCst, 863 Ordering::Relaxed, 864 ) 865 .is_err() 866 { 867 // We didn't steal this task, forget it and break from the loop. 868 batch_size = i; 869 break; 870 } 871 872 // Write the stolen task into the destination buffer. 873 unsafe { 874 dest_buffer.write(dest_b, task); 875 } 876 877 // Move the source front index and the destination back index one step forward. 878 f = f.wrapping_add(1); 879 dest_b = dest_b.wrapping_add(1); 880 } 881 882 // If we didn't steal anything, the operation needs to be retried. 883 if batch_size == 0 { 884 return Steal::Retry; 885 } 886 887 // If stealing into a FIFO queue, stolen tasks need to be reversed. 888 if dest.flavor == Flavor::Fifo { 889 for i in 0..batch_size / 2 { 890 unsafe { 891 let i1 = dest_b.wrapping_sub(batch_size - i); 892 let i2 = dest_b.wrapping_sub(i + 1); 893 let t1 = dest_buffer.read(i1); 894 let t2 = dest_buffer.read(i2); 895 dest_buffer.write(i1, t2); 896 dest_buffer.write(i2, t1); 897 } 898 } 899 } 900 } 901 } 902 903 atomic::fence(Ordering::Release); 904 905 // Update the back index in the destination queue. 906 // 907 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 908 // races because it doesn't understand fences. 909 dest.inner.back.store(dest_b, Ordering::Release); 910 911 // Return with success. 912 Steal::Success(()) 913 } 914 915 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. 916 /// 917 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 918 /// steal around half of the tasks in the queue, but also not more than some constant limit. 919 /// 920 /// # Examples 921 /// 922 /// ``` 923 /// use crossbeam_deque::{Steal, Worker}; 924 /// 925 /// let w1 = Worker::new_fifo(); 926 /// w1.push(1); 927 /// w1.push(2); 928 /// w1.push(3); 929 /// w1.push(4); 930 /// 931 /// let s = w1.stealer(); 932 /// let w2 = Worker::new_fifo(); 933 /// 934 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); 935 /// assert_eq!(w2.pop(), Some(2)); 936 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>937 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 938 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) 939 } 940 941 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from 942 /// that worker. 943 /// 944 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 945 /// steal around half of the tasks in the queue, but also not more than the given limit. 946 /// 947 /// # Examples 948 /// 949 /// ``` 950 /// use crossbeam_deque::{Steal, Worker}; 951 /// 952 /// let w1 = Worker::new_fifo(); 953 /// w1.push(1); 954 /// w1.push(2); 955 /// w1.push(3); 956 /// w1.push(4); 957 /// w1.push(5); 958 /// w1.push(6); 959 /// 960 /// let s = w1.stealer(); 961 /// let w2 = Worker::new_fifo(); 962 /// 963 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); 964 /// assert_eq!(w2.pop(), Some(2)); 965 /// assert_eq!(w2.pop(), None); 966 /// 967 /// w1.push(7); 968 /// w1.push(8); 969 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 970 /// // half of the elements are currently popped, but the number of popped elements is considered 971 /// // an implementation detail that may be changed in the future. 972 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); 973 /// assert_eq!(w2.pop(), Some(4)); 974 /// assert_eq!(w2.pop(), Some(5)); 975 /// assert_eq!(w2.pop(), None); 976 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>977 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 978 assert!(limit > 0); 979 if Arc::ptr_eq(&self.inner, &dest.inner) { 980 match dest.pop() { 981 None => return Steal::Empty, 982 Some(task) => return Steal::Success(task), 983 } 984 } 985 986 // Load the front index. 987 let mut f = self.inner.front.load(Ordering::Acquire); 988 989 // A SeqCst fence is needed here. 990 // 991 // If the current thread is already pinned (reentrantly), we must manually issue the 992 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 993 // have to. 994 if epoch::is_pinned() { 995 atomic::fence(Ordering::SeqCst); 996 } 997 998 let guard = &epoch::pin(); 999 1000 // Load the back index. 1001 let b = self.inner.back.load(Ordering::Acquire); 1002 1003 // Is the queue empty? 1004 let len = b.wrapping_sub(f); 1005 if len <= 0 { 1006 return Steal::Empty; 1007 } 1008 1009 // Reserve capacity for the stolen batch. 1010 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); 1011 dest.reserve(batch_size); 1012 let mut batch_size = batch_size as isize; 1013 1014 // Get the destination buffer and back index. 1015 let dest_buffer = dest.buffer.get(); 1016 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 1017 1018 // Load the buffer 1019 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 1020 1021 // Read the task at the front. 1022 let mut task = unsafe { buffer.deref().read(f) }; 1023 1024 match self.flavor { 1025 // Steal a batch of tasks from the front at once. 1026 Flavor::Fifo => { 1027 // Copy the batch from the source to the destination buffer. 1028 match dest.flavor { 1029 Flavor::Fifo => { 1030 for i in 0..batch_size { 1031 unsafe { 1032 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1033 dest_buffer.write(dest_b.wrapping_add(i), task); 1034 } 1035 } 1036 } 1037 Flavor::Lifo => { 1038 for i in 0..batch_size { 1039 unsafe { 1040 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1041 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 1042 } 1043 } 1044 } 1045 } 1046 1047 // Try incrementing the front index to steal the task. 1048 // If the buffer has been swapped or the increment fails, we retry. 1049 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1050 || self 1051 .inner 1052 .front 1053 .compare_exchange( 1054 f, 1055 f.wrapping_add(batch_size + 1), 1056 Ordering::SeqCst, 1057 Ordering::Relaxed, 1058 ) 1059 .is_err() 1060 { 1061 // We didn't steal this task, forget it. 1062 return Steal::Retry; 1063 } 1064 1065 dest_b = dest_b.wrapping_add(batch_size); 1066 } 1067 1068 // Steal a batch of tasks from the front one by one. 1069 Flavor::Lifo => { 1070 // Try incrementing the front index to steal the task. 1071 if self 1072 .inner 1073 .front 1074 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 1075 .is_err() 1076 { 1077 // We didn't steal this task, forget it. 1078 return Steal::Retry; 1079 } 1080 1081 // Move the front index one step forward. 1082 f = f.wrapping_add(1); 1083 1084 // Repeat the same procedure for the batch steals. 1085 // 1086 // This loop may modify the batch_size, which triggers a clippy lint warning. 1087 // Use a new variable to avoid the warning, and to make it clear we aren't 1088 // modifying the loop exit condition during iteration. 1089 let original_batch_size = batch_size; 1090 for i in 0..original_batch_size { 1091 // We've already got the current front index. Now execute the fence to 1092 // synchronize with other threads. 1093 atomic::fence(Ordering::SeqCst); 1094 1095 // Load the back index. 1096 let b = self.inner.back.load(Ordering::Acquire); 1097 1098 // Is the queue empty? 1099 if b.wrapping_sub(f) <= 0 { 1100 batch_size = i; 1101 break; 1102 } 1103 1104 // Read the task at the front. 1105 let tmp = unsafe { buffer.deref().read(f) }; 1106 1107 // Try incrementing the front index to steal the task. 1108 // If the buffer has been swapped or the increment fails, we retry. 1109 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1110 || self 1111 .inner 1112 .front 1113 .compare_exchange( 1114 f, 1115 f.wrapping_add(1), 1116 Ordering::SeqCst, 1117 Ordering::Relaxed, 1118 ) 1119 .is_err() 1120 { 1121 // We didn't steal this task, forget it and break from the loop. 1122 batch_size = i; 1123 break; 1124 } 1125 1126 // Write the previously stolen task into the destination buffer. 1127 unsafe { 1128 dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); 1129 } 1130 1131 // Move the source front index and the destination back index one step forward. 1132 f = f.wrapping_add(1); 1133 dest_b = dest_b.wrapping_add(1); 1134 } 1135 1136 // If stealing into a FIFO queue, stolen tasks need to be reversed. 1137 if dest.flavor == Flavor::Fifo { 1138 for i in 0..batch_size / 2 { 1139 unsafe { 1140 let i1 = dest_b.wrapping_sub(batch_size - i); 1141 let i2 = dest_b.wrapping_sub(i + 1); 1142 let t1 = dest_buffer.read(i1); 1143 let t2 = dest_buffer.read(i2); 1144 dest_buffer.write(i1, t2); 1145 dest_buffer.write(i2, t1); 1146 } 1147 } 1148 } 1149 } 1150 } 1151 1152 atomic::fence(Ordering::Release); 1153 1154 // Update the back index in the destination queue. 1155 // 1156 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 1157 // races because it doesn't understand fences. 1158 dest.inner.back.store(dest_b, Ordering::Release); 1159 1160 // Return with success. 1161 Steal::Success(unsafe { task.assume_init() }) 1162 } 1163 } 1164 1165 impl<T> Clone for Stealer<T> { clone(&self) -> Stealer<T>1166 fn clone(&self) -> Stealer<T> { 1167 Stealer { 1168 inner: self.inner.clone(), 1169 flavor: self.flavor, 1170 } 1171 } 1172 } 1173 1174 impl<T> fmt::Debug for Stealer<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1176 f.pad("Stealer { .. }") 1177 } 1178 } 1179 1180 // Bits indicating the state of a slot: 1181 // * If a task has been written into the slot, `WRITE` is set. 1182 // * If a task has been read from the slot, `READ` is set. 1183 // * If the block is being destroyed, `DESTROY` is set. 1184 const WRITE: usize = 1; 1185 const READ: usize = 2; 1186 const DESTROY: usize = 4; 1187 1188 // Each block covers one "lap" of indices. 1189 const LAP: usize = 64; 1190 // The maximum number of values a block can hold. 1191 const BLOCK_CAP: usize = LAP - 1; 1192 // How many lower bits are reserved for metadata. 1193 const SHIFT: usize = 1; 1194 // Indicates that the block is not the last one. 1195 const HAS_NEXT: usize = 1; 1196 1197 /// A slot in a block. 1198 struct Slot<T> { 1199 /// The task. 1200 task: UnsafeCell<MaybeUninit<T>>, 1201 1202 /// The state of the slot. 1203 state: AtomicUsize, 1204 } 1205 1206 impl<T> Slot<T> { 1207 /// Waits until a task is written into the slot. wait_write(&self)1208 fn wait_write(&self) { 1209 let backoff = Backoff::new(); 1210 while self.state.load(Ordering::Acquire) & WRITE == 0 { 1211 backoff.snooze(); 1212 } 1213 } 1214 } 1215 1216 /// A block in a linked list. 1217 /// 1218 /// Each block in the list can hold up to `BLOCK_CAP` values. 1219 struct Block<T> { 1220 /// The next block in the linked list. 1221 next: AtomicPtr<Block<T>>, 1222 1223 /// Slots for values. 1224 slots: [Slot<T>; BLOCK_CAP], 1225 } 1226 1227 impl<T> Block<T> { 1228 const LAYOUT: Layout = { 1229 let layout = Layout::new::<Self>(); 1230 assert!( 1231 layout.size() != 0, 1232 "Block should never be zero-sized, as it has an AtomicPtr field" 1233 ); 1234 layout 1235 }; 1236 1237 /// Creates an empty block. new() -> Box<Self>1238 fn new() -> Box<Self> { 1239 // SAFETY: layout is not zero-sized 1240 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) }; 1241 // Handle allocation failure 1242 if ptr.is_null() { 1243 handle_alloc_error(Self::LAYOUT) 1244 } 1245 // SAFETY: This is safe because: 1246 // [1] `Block::next` (AtomicPtr) may be safely zero initialized. 1247 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. 1248 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it 1249 // holds a MaybeUninit. 1250 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. 1251 // TODO: unsafe { Box::new_zeroed().assume_init() } 1252 unsafe { Box::from_raw(ptr.cast()) } 1253 } 1254 1255 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>1256 fn wait_next(&self) -> *mut Block<T> { 1257 let backoff = Backoff::new(); 1258 loop { 1259 let next = self.next.load(Ordering::Acquire); 1260 if !next.is_null() { 1261 return next; 1262 } 1263 backoff.snooze(); 1264 } 1265 } 1266 1267 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, count: usize)1268 unsafe fn destroy(this: *mut Block<T>, count: usize) { 1269 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 1270 // begun destruction of the block. 1271 for i in (0..count).rev() { 1272 let slot = (*this).slots.get_unchecked(i); 1273 1274 // Mark the `DESTROY` bit if a thread is still using the slot. 1275 if slot.state.load(Ordering::Acquire) & READ == 0 1276 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 1277 { 1278 // If a thread is still using the slot, it will continue destruction of the block. 1279 return; 1280 } 1281 } 1282 1283 // No thread is using the block, now it is safe to destroy it. 1284 drop(Box::from_raw(this)); 1285 } 1286 } 1287 1288 /// A position in a queue. 1289 struct Position<T> { 1290 /// The index in the queue. 1291 index: AtomicUsize, 1292 1293 /// The block in the linked list. 1294 block: AtomicPtr<Block<T>>, 1295 } 1296 1297 /// An injector queue. 1298 /// 1299 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have 1300 /// a single injector queue, which is the entry point for new tasks. 1301 /// 1302 /// # Examples 1303 /// 1304 /// ``` 1305 /// use crossbeam_deque::{Injector, Steal}; 1306 /// 1307 /// let q = Injector::new(); 1308 /// q.push(1); 1309 /// q.push(2); 1310 /// 1311 /// assert_eq!(q.steal(), Steal::Success(1)); 1312 /// assert_eq!(q.steal(), Steal::Success(2)); 1313 /// assert_eq!(q.steal(), Steal::Empty); 1314 /// ``` 1315 pub struct Injector<T> { 1316 /// The head of the queue. 1317 head: CachePadded<Position<T>>, 1318 1319 /// The tail of the queue. 1320 tail: CachePadded<Position<T>>, 1321 1322 /// Indicates that dropping a `Injector<T>` may drop values of type `T`. 1323 _marker: PhantomData<T>, 1324 } 1325 1326 unsafe impl<T: Send> Send for Injector<T> {} 1327 unsafe impl<T: Send> Sync for Injector<T> {} 1328 1329 impl<T> Default for Injector<T> { default() -> Self1330 fn default() -> Self { 1331 let block = Box::into_raw(Block::<T>::new()); 1332 Self { 1333 head: CachePadded::new(Position { 1334 block: AtomicPtr::new(block), 1335 index: AtomicUsize::new(0), 1336 }), 1337 tail: CachePadded::new(Position { 1338 block: AtomicPtr::new(block), 1339 index: AtomicUsize::new(0), 1340 }), 1341 _marker: PhantomData, 1342 } 1343 } 1344 } 1345 1346 impl<T> Injector<T> { 1347 /// Creates a new injector queue. 1348 /// 1349 /// # Examples 1350 /// 1351 /// ``` 1352 /// use crossbeam_deque::Injector; 1353 /// 1354 /// let q = Injector::<i32>::new(); 1355 /// ``` new() -> Injector<T>1356 pub fn new() -> Injector<T> { 1357 Self::default() 1358 } 1359 1360 /// Pushes a task into the queue. 1361 /// 1362 /// # Examples 1363 /// 1364 /// ``` 1365 /// use crossbeam_deque::Injector; 1366 /// 1367 /// let w = Injector::new(); 1368 /// w.push(1); 1369 /// w.push(2); 1370 /// ``` push(&self, task: T)1371 pub fn push(&self, task: T) { 1372 let backoff = Backoff::new(); 1373 let mut tail = self.tail.index.load(Ordering::Acquire); 1374 let mut block = self.tail.block.load(Ordering::Acquire); 1375 let mut next_block = None; 1376 1377 loop { 1378 // Calculate the offset of the index into the block. 1379 let offset = (tail >> SHIFT) % LAP; 1380 1381 // If we reached the end of the block, wait until the next one is installed. 1382 if offset == BLOCK_CAP { 1383 backoff.snooze(); 1384 tail = self.tail.index.load(Ordering::Acquire); 1385 block = self.tail.block.load(Ordering::Acquire); 1386 continue; 1387 } 1388 1389 // If we're going to have to install the next block, allocate it in advance in order to 1390 // make the wait for other threads as short as possible. 1391 if offset + 1 == BLOCK_CAP && next_block.is_none() { 1392 next_block = Some(Block::<T>::new()); 1393 } 1394 1395 let new_tail = tail + (1 << SHIFT); 1396 1397 // Try advancing the tail forward. 1398 match self.tail.index.compare_exchange_weak( 1399 tail, 1400 new_tail, 1401 Ordering::SeqCst, 1402 Ordering::Acquire, 1403 ) { 1404 Ok(_) => unsafe { 1405 // If we've reached the end of the block, install the next one. 1406 if offset + 1 == BLOCK_CAP { 1407 let next_block = Box::into_raw(next_block.unwrap()); 1408 let next_index = new_tail.wrapping_add(1 << SHIFT); 1409 1410 self.tail.block.store(next_block, Ordering::Release); 1411 self.tail.index.store(next_index, Ordering::Release); 1412 (*block).next.store(next_block, Ordering::Release); 1413 } 1414 1415 // Write the task into the slot. 1416 let slot = (*block).slots.get_unchecked(offset); 1417 slot.task.get().write(MaybeUninit::new(task)); 1418 slot.state.fetch_or(WRITE, Ordering::Release); 1419 1420 return; 1421 }, 1422 Err(t) => { 1423 tail = t; 1424 block = self.tail.block.load(Ordering::Acquire); 1425 backoff.spin(); 1426 } 1427 } 1428 } 1429 } 1430 1431 /// Steals a task from the queue. 1432 /// 1433 /// # Examples 1434 /// 1435 /// ``` 1436 /// use crossbeam_deque::{Injector, Steal}; 1437 /// 1438 /// let q = Injector::new(); 1439 /// q.push(1); 1440 /// q.push(2); 1441 /// 1442 /// assert_eq!(q.steal(), Steal::Success(1)); 1443 /// assert_eq!(q.steal(), Steal::Success(2)); 1444 /// assert_eq!(q.steal(), Steal::Empty); 1445 /// ``` steal(&self) -> Steal<T>1446 pub fn steal(&self) -> Steal<T> { 1447 let mut head; 1448 let mut block; 1449 let mut offset; 1450 1451 let backoff = Backoff::new(); 1452 loop { 1453 head = self.head.index.load(Ordering::Acquire); 1454 block = self.head.block.load(Ordering::Acquire); 1455 1456 // Calculate the offset of the index into the block. 1457 offset = (head >> SHIFT) % LAP; 1458 1459 // If we reached the end of the block, wait until the next one is installed. 1460 if offset == BLOCK_CAP { 1461 backoff.snooze(); 1462 } else { 1463 break; 1464 } 1465 } 1466 1467 let mut new_head = head + (1 << SHIFT); 1468 1469 if new_head & HAS_NEXT == 0 { 1470 atomic::fence(Ordering::SeqCst); 1471 let tail = self.tail.index.load(Ordering::Relaxed); 1472 1473 // If the tail equals the head, that means the queue is empty. 1474 if head >> SHIFT == tail >> SHIFT { 1475 return Steal::Empty; 1476 } 1477 1478 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1479 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1480 new_head |= HAS_NEXT; 1481 } 1482 } 1483 1484 // Try moving the head index forward. 1485 if self 1486 .head 1487 .index 1488 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1489 .is_err() 1490 { 1491 return Steal::Retry; 1492 } 1493 1494 unsafe { 1495 // If we've reached the end of the block, move to the next one. 1496 if offset + 1 == BLOCK_CAP { 1497 let next = (*block).wait_next(); 1498 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1499 if !(*next).next.load(Ordering::Relaxed).is_null() { 1500 next_index |= HAS_NEXT; 1501 } 1502 1503 self.head.block.store(next, Ordering::Release); 1504 self.head.index.store(next_index, Ordering::Release); 1505 } 1506 1507 // Read the task. 1508 let slot = (*block).slots.get_unchecked(offset); 1509 slot.wait_write(); 1510 let task = slot.task.get().read().assume_init(); 1511 1512 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1513 // but couldn't because we were busy reading from the slot. 1514 if (offset + 1 == BLOCK_CAP) 1515 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) 1516 { 1517 Block::destroy(block, offset); 1518 } 1519 1520 Steal::Success(task) 1521 } 1522 } 1523 1524 /// Steals a batch of tasks and pushes them into a worker. 1525 /// 1526 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1527 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1528 /// 1529 /// # Examples 1530 /// 1531 /// ``` 1532 /// use crossbeam_deque::{Injector, Worker}; 1533 /// 1534 /// let q = Injector::new(); 1535 /// q.push(1); 1536 /// q.push(2); 1537 /// q.push(3); 1538 /// q.push(4); 1539 /// 1540 /// let w = Worker::new_fifo(); 1541 /// let _ = q.steal_batch(&w); 1542 /// assert_eq!(w.pop(), Some(1)); 1543 /// assert_eq!(w.pop(), Some(2)); 1544 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>1545 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 1546 self.steal_batch_with_limit(dest, MAX_BATCH) 1547 } 1548 1549 /// Steals no more than of tasks and pushes them into a worker. 1550 /// 1551 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1552 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1553 /// 1554 /// # Examples 1555 /// 1556 /// ``` 1557 /// use crossbeam_deque::{Injector, Worker}; 1558 /// 1559 /// let q = Injector::new(); 1560 /// q.push(1); 1561 /// q.push(2); 1562 /// q.push(3); 1563 /// q.push(4); 1564 /// q.push(5); 1565 /// q.push(6); 1566 /// 1567 /// let w = Worker::new_fifo(); 1568 /// let _ = q.steal_batch_with_limit(&w, 2); 1569 /// assert_eq!(w.pop(), Some(1)); 1570 /// assert_eq!(w.pop(), Some(2)); 1571 /// assert_eq!(w.pop(), None); 1572 /// 1573 /// q.push(7); 1574 /// q.push(8); 1575 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1576 /// // half of the elements are currently popped, but the number of popped elements is considered 1577 /// // an implementation detail that may be changed in the future. 1578 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); 1579 /// assert_eq!(w.len(), 3); 1580 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1581 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 1582 assert!(limit > 0); 1583 let mut head; 1584 let mut block; 1585 let mut offset; 1586 1587 let backoff = Backoff::new(); 1588 loop { 1589 head = self.head.index.load(Ordering::Acquire); 1590 block = self.head.block.load(Ordering::Acquire); 1591 1592 // Calculate the offset of the index into the block. 1593 offset = (head >> SHIFT) % LAP; 1594 1595 // If we reached the end of the block, wait until the next one is installed. 1596 if offset == BLOCK_CAP { 1597 backoff.snooze(); 1598 } else { 1599 break; 1600 } 1601 } 1602 1603 let mut new_head = head; 1604 let advance; 1605 1606 if new_head & HAS_NEXT == 0 { 1607 atomic::fence(Ordering::SeqCst); 1608 let tail = self.tail.index.load(Ordering::Relaxed); 1609 1610 // If the tail equals the head, that means the queue is empty. 1611 if head >> SHIFT == tail >> SHIFT { 1612 return Steal::Empty; 1613 } 1614 1615 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate 1616 // the right batch size to steal. 1617 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1618 new_head |= HAS_NEXT; 1619 // We can steal all tasks till the end of the block. 1620 advance = (BLOCK_CAP - offset).min(limit); 1621 } else { 1622 let len = (tail - head) >> SHIFT; 1623 // Steal half of the available tasks. 1624 advance = ((len + 1) / 2).min(limit); 1625 } 1626 } else { 1627 // We can steal all tasks till the end of the block. 1628 advance = (BLOCK_CAP - offset).min(limit); 1629 } 1630 1631 new_head += advance << SHIFT; 1632 let new_offset = offset + advance; 1633 1634 // Try moving the head index forward. 1635 if self 1636 .head 1637 .index 1638 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1639 .is_err() 1640 { 1641 return Steal::Retry; 1642 } 1643 1644 // Reserve capacity for the stolen batch. 1645 let batch_size = new_offset - offset; 1646 dest.reserve(batch_size); 1647 1648 // Get the destination buffer and back index. 1649 let dest_buffer = dest.buffer.get(); 1650 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1651 1652 unsafe { 1653 // If we've reached the end of the block, move to the next one. 1654 if new_offset == BLOCK_CAP { 1655 let next = (*block).wait_next(); 1656 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1657 if !(*next).next.load(Ordering::Relaxed).is_null() { 1658 next_index |= HAS_NEXT; 1659 } 1660 1661 self.head.block.store(next, Ordering::Release); 1662 self.head.index.store(next_index, Ordering::Release); 1663 } 1664 1665 // Copy values from the injector into the destination queue. 1666 match dest.flavor { 1667 Flavor::Fifo => { 1668 for i in 0..batch_size { 1669 // Read the task. 1670 let slot = (*block).slots.get_unchecked(offset + i); 1671 slot.wait_write(); 1672 let task = slot.task.get().read(); 1673 1674 // Write it into the destination queue. 1675 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1676 } 1677 } 1678 1679 Flavor::Lifo => { 1680 for i in 0..batch_size { 1681 // Read the task. 1682 let slot = (*block).slots.get_unchecked(offset + i); 1683 slot.wait_write(); 1684 let task = slot.task.get().read(); 1685 1686 // Write it into the destination queue. 1687 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1688 } 1689 } 1690 } 1691 1692 atomic::fence(Ordering::Release); 1693 1694 // Update the back index in the destination queue. 1695 // 1696 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1697 // data races because it doesn't understand fences. 1698 dest.inner 1699 .back 1700 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1701 1702 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1703 // but couldn't because we were busy reading from the slot. 1704 if new_offset == BLOCK_CAP { 1705 Block::destroy(block, offset); 1706 } else { 1707 for i in offset..new_offset { 1708 let slot = (*block).slots.get_unchecked(i); 1709 1710 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1711 Block::destroy(block, offset); 1712 break; 1713 } 1714 } 1715 } 1716 1717 Steal::Success(()) 1718 } 1719 } 1720 1721 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. 1722 /// 1723 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1724 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1725 /// 1726 /// # Examples 1727 /// 1728 /// ``` 1729 /// use crossbeam_deque::{Injector, Steal, Worker}; 1730 /// 1731 /// let q = Injector::new(); 1732 /// q.push(1); 1733 /// q.push(2); 1734 /// q.push(3); 1735 /// q.push(4); 1736 /// 1737 /// let w = Worker::new_fifo(); 1738 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); 1739 /// assert_eq!(w.pop(), Some(2)); 1740 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1741 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 1742 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly 1743 // better, but we may change it in the future to be compatible with the same method in Stealer. 1744 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) 1745 } 1746 1747 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. 1748 /// 1749 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1750 /// steal around half of the tasks in the queue, but also not more than the given limit. 1751 /// 1752 /// # Examples 1753 /// 1754 /// ``` 1755 /// use crossbeam_deque::{Injector, Steal, Worker}; 1756 /// 1757 /// let q = Injector::new(); 1758 /// q.push(1); 1759 /// q.push(2); 1760 /// q.push(3); 1761 /// q.push(4); 1762 /// q.push(5); 1763 /// q.push(6); 1764 /// 1765 /// let w = Worker::new_fifo(); 1766 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); 1767 /// assert_eq!(w.pop(), Some(2)); 1768 /// assert_eq!(w.pop(), None); 1769 /// 1770 /// q.push(7); 1771 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1772 /// // half of the elements are currently popped, but the number of popped elements is considered 1773 /// // an implementation detail that may be changed in the future. 1774 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); 1775 /// assert_eq!(w.pop(), Some(4)); 1776 /// assert_eq!(w.pop(), Some(5)); 1777 /// assert_eq!(w.pop(), None); 1778 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1779 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 1780 assert!(limit > 0); 1781 let mut head; 1782 let mut block; 1783 let mut offset; 1784 1785 let backoff = Backoff::new(); 1786 loop { 1787 head = self.head.index.load(Ordering::Acquire); 1788 block = self.head.block.load(Ordering::Acquire); 1789 1790 // Calculate the offset of the index into the block. 1791 offset = (head >> SHIFT) % LAP; 1792 1793 // If we reached the end of the block, wait until the next one is installed. 1794 if offset == BLOCK_CAP { 1795 backoff.snooze(); 1796 } else { 1797 break; 1798 } 1799 } 1800 1801 let mut new_head = head; 1802 let advance; 1803 1804 if new_head & HAS_NEXT == 0 { 1805 atomic::fence(Ordering::SeqCst); 1806 let tail = self.tail.index.load(Ordering::Relaxed); 1807 1808 // If the tail equals the head, that means the queue is empty. 1809 if head >> SHIFT == tail >> SHIFT { 1810 return Steal::Empty; 1811 } 1812 1813 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1814 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1815 new_head |= HAS_NEXT; 1816 // We can steal all tasks till the end of the block. 1817 advance = (BLOCK_CAP - offset).min(limit); 1818 } else { 1819 let len = (tail - head) >> SHIFT; 1820 // Steal half of the available tasks. 1821 advance = ((len + 1) / 2).min(limit); 1822 } 1823 } else { 1824 // We can steal all tasks till the end of the block. 1825 advance = (BLOCK_CAP - offset).min(limit); 1826 } 1827 1828 new_head += advance << SHIFT; 1829 let new_offset = offset + advance; 1830 1831 // Try moving the head index forward. 1832 if self 1833 .head 1834 .index 1835 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1836 .is_err() 1837 { 1838 return Steal::Retry; 1839 } 1840 1841 // Reserve capacity for the stolen batch. 1842 let batch_size = new_offset - offset - 1; 1843 dest.reserve(batch_size); 1844 1845 // Get the destination buffer and back index. 1846 let dest_buffer = dest.buffer.get(); 1847 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1848 1849 unsafe { 1850 // If we've reached the end of the block, move to the next one. 1851 if new_offset == BLOCK_CAP { 1852 let next = (*block).wait_next(); 1853 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1854 if !(*next).next.load(Ordering::Relaxed).is_null() { 1855 next_index |= HAS_NEXT; 1856 } 1857 1858 self.head.block.store(next, Ordering::Release); 1859 self.head.index.store(next_index, Ordering::Release); 1860 } 1861 1862 // Read the task. 1863 let slot = (*block).slots.get_unchecked(offset); 1864 slot.wait_write(); 1865 let task = slot.task.get().read(); 1866 1867 match dest.flavor { 1868 Flavor::Fifo => { 1869 // Copy values from the injector into the destination queue. 1870 for i in 0..batch_size { 1871 // Read the task. 1872 let slot = (*block).slots.get_unchecked(offset + i + 1); 1873 slot.wait_write(); 1874 let task = slot.task.get().read(); 1875 1876 // Write it into the destination queue. 1877 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1878 } 1879 } 1880 1881 Flavor::Lifo => { 1882 // Copy values from the injector into the destination queue. 1883 for i in 0..batch_size { 1884 // Read the task. 1885 let slot = (*block).slots.get_unchecked(offset + i + 1); 1886 slot.wait_write(); 1887 let task = slot.task.get().read(); 1888 1889 // Write it into the destination queue. 1890 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1891 } 1892 } 1893 } 1894 1895 atomic::fence(Ordering::Release); 1896 1897 // Update the back index in the destination queue. 1898 // 1899 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1900 // data races because it doesn't understand fences. 1901 dest.inner 1902 .back 1903 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1904 1905 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1906 // but couldn't because we were busy reading from the slot. 1907 if new_offset == BLOCK_CAP { 1908 Block::destroy(block, offset); 1909 } else { 1910 for i in offset..new_offset { 1911 let slot = (*block).slots.get_unchecked(i); 1912 1913 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1914 Block::destroy(block, offset); 1915 break; 1916 } 1917 } 1918 } 1919 1920 Steal::Success(task.assume_init()) 1921 } 1922 } 1923 1924 /// Returns `true` if the queue is empty. 1925 /// 1926 /// # Examples 1927 /// 1928 /// ``` 1929 /// use crossbeam_deque::Injector; 1930 /// 1931 /// let q = Injector::new(); 1932 /// 1933 /// assert!(q.is_empty()); 1934 /// q.push(1); 1935 /// assert!(!q.is_empty()); 1936 /// ``` is_empty(&self) -> bool1937 pub fn is_empty(&self) -> bool { 1938 let head = self.head.index.load(Ordering::SeqCst); 1939 let tail = self.tail.index.load(Ordering::SeqCst); 1940 head >> SHIFT == tail >> SHIFT 1941 } 1942 1943 /// Returns the number of tasks in the queue. 1944 /// 1945 /// # Examples 1946 /// 1947 /// ``` 1948 /// use crossbeam_deque::Injector; 1949 /// 1950 /// let q = Injector::new(); 1951 /// 1952 /// assert_eq!(q.len(), 0); 1953 /// q.push(1); 1954 /// assert_eq!(q.len(), 1); 1955 /// q.push(1); 1956 /// assert_eq!(q.len(), 2); 1957 /// ``` len(&self) -> usize1958 pub fn len(&self) -> usize { 1959 loop { 1960 // Load the tail index, then load the head index. 1961 let mut tail = self.tail.index.load(Ordering::SeqCst); 1962 let mut head = self.head.index.load(Ordering::SeqCst); 1963 1964 // If the tail index didn't change, we've got consistent indices to work with. 1965 if self.tail.index.load(Ordering::SeqCst) == tail { 1966 // Erase the lower bits. 1967 tail &= !((1 << SHIFT) - 1); 1968 head &= !((1 << SHIFT) - 1); 1969 1970 // Fix up indices if they fall onto block ends. 1971 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 1972 tail = tail.wrapping_add(1 << SHIFT); 1973 } 1974 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 1975 head = head.wrapping_add(1 << SHIFT); 1976 } 1977 1978 // Rotate indices so that head falls into the first block. 1979 let lap = (head >> SHIFT) / LAP; 1980 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 1981 head = head.wrapping_sub((lap * LAP) << SHIFT); 1982 1983 // Remove the lower bits. 1984 tail >>= SHIFT; 1985 head >>= SHIFT; 1986 1987 // Return the difference minus the number of blocks between tail and head. 1988 return tail - head - tail / LAP; 1989 } 1990 } 1991 } 1992 } 1993 1994 impl<T> Drop for Injector<T> { drop(&mut self)1995 fn drop(&mut self) { 1996 let mut head = *self.head.index.get_mut(); 1997 let mut tail = *self.tail.index.get_mut(); 1998 let mut block = *self.head.block.get_mut(); 1999 2000 // Erase the lower bits. 2001 head &= !((1 << SHIFT) - 1); 2002 tail &= !((1 << SHIFT) - 1); 2003 2004 unsafe { 2005 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 2006 while head != tail { 2007 let offset = (head >> SHIFT) % LAP; 2008 2009 if offset < BLOCK_CAP { 2010 // Drop the task in the slot. 2011 let slot = (*block).slots.get_unchecked(offset); 2012 (*slot.task.get()).assume_init_drop(); 2013 } else { 2014 // Deallocate the block and move to the next one. 2015 let next = *(*block).next.get_mut(); 2016 drop(Box::from_raw(block)); 2017 block = next; 2018 } 2019 2020 head = head.wrapping_add(1 << SHIFT); 2021 } 2022 2023 // Deallocate the last remaining block. 2024 drop(Box::from_raw(block)); 2025 } 2026 } 2027 } 2028 2029 impl<T> fmt::Debug for Injector<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2030 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2031 f.pad("Worker { .. }") 2032 } 2033 } 2034 2035 /// Possible outcomes of a steal operation. 2036 /// 2037 /// # Examples 2038 /// 2039 /// There are lots of ways to chain results of steal operations together: 2040 /// 2041 /// ``` 2042 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; 2043 /// 2044 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); 2045 /// 2046 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); 2047 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); 2048 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); 2049 /// 2050 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); 2051 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); 2052 /// ``` 2053 #[must_use] 2054 #[derive(PartialEq, Eq, Copy, Clone)] 2055 pub enum Steal<T> { 2056 /// The queue was empty at the time of stealing. 2057 Empty, 2058 2059 /// At least one task was successfully stolen. 2060 Success(T), 2061 2062 /// The steal operation needs to be retried. 2063 Retry, 2064 } 2065 2066 impl<T> Steal<T> { 2067 /// Returns `true` if the queue was empty at the time of stealing. 2068 /// 2069 /// # Examples 2070 /// 2071 /// ``` 2072 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2073 /// 2074 /// assert!(!Success(7).is_empty()); 2075 /// assert!(!Retry::<i32>.is_empty()); 2076 /// 2077 /// assert!(Empty::<i32>.is_empty()); 2078 /// ``` is_empty(&self) -> bool2079 pub fn is_empty(&self) -> bool { 2080 match self { 2081 Steal::Empty => true, 2082 _ => false, 2083 } 2084 } 2085 2086 /// Returns `true` if at least one task was stolen. 2087 /// 2088 /// # Examples 2089 /// 2090 /// ``` 2091 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2092 /// 2093 /// assert!(!Empty::<i32>.is_success()); 2094 /// assert!(!Retry::<i32>.is_success()); 2095 /// 2096 /// assert!(Success(7).is_success()); 2097 /// ``` is_success(&self) -> bool2098 pub fn is_success(&self) -> bool { 2099 match self { 2100 Steal::Success(_) => true, 2101 _ => false, 2102 } 2103 } 2104 2105 /// Returns `true` if the steal operation needs to be retried. 2106 /// 2107 /// # Examples 2108 /// 2109 /// ``` 2110 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2111 /// 2112 /// assert!(!Empty::<i32>.is_retry()); 2113 /// assert!(!Success(7).is_retry()); 2114 /// 2115 /// assert!(Retry::<i32>.is_retry()); 2116 /// ``` is_retry(&self) -> bool2117 pub fn is_retry(&self) -> bool { 2118 match self { 2119 Steal::Retry => true, 2120 _ => false, 2121 } 2122 } 2123 2124 /// Returns the result of the operation, if successful. 2125 /// 2126 /// # Examples 2127 /// 2128 /// ``` 2129 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2130 /// 2131 /// assert_eq!(Empty::<i32>.success(), None); 2132 /// assert_eq!(Retry::<i32>.success(), None); 2133 /// 2134 /// assert_eq!(Success(7).success(), Some(7)); 2135 /// ``` success(self) -> Option<T>2136 pub fn success(self) -> Option<T> { 2137 match self { 2138 Steal::Success(res) => Some(res), 2139 _ => None, 2140 } 2141 } 2142 2143 /// If no task was stolen, attempts another steal operation. 2144 /// 2145 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: 2146 /// 2147 /// * If the second steal resulted in `Success`, it is returned. 2148 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. 2149 /// * If both resulted in `None`, then `None` is returned. 2150 /// 2151 /// # Examples 2152 /// 2153 /// ``` 2154 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2155 /// 2156 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); 2157 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); 2158 /// 2159 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); 2160 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); 2161 /// 2162 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); 2163 /// ``` or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2164 pub fn or_else<F>(self, f: F) -> Steal<T> 2165 where 2166 F: FnOnce() -> Steal<T>, 2167 { 2168 match self { 2169 Steal::Empty => f(), 2170 Steal::Success(_) => self, 2171 Steal::Retry => { 2172 if let Steal::Success(res) = f() { 2173 Steal::Success(res) 2174 } else { 2175 Steal::Retry 2176 } 2177 } 2178 } 2179 } 2180 } 2181 2182 impl<T> fmt::Debug for Steal<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2184 match self { 2185 Steal::Empty => f.pad("Empty"), 2186 Steal::Success(_) => f.pad("Success(..)"), 2187 Steal::Retry => f.pad("Retry"), 2188 } 2189 } 2190 } 2191 2192 impl<T> FromIterator<Steal<T>> for Steal<T> { 2193 /// Consumes items until a `Success` is found and returns it. 2194 /// 2195 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. 2196 /// Otherwise, `Empty` is returned. from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2197 fn from_iter<I>(iter: I) -> Steal<T> 2198 where 2199 I: IntoIterator<Item = Steal<T>>, 2200 { 2201 let mut retry = false; 2202 for s in iter { 2203 match &s { 2204 Steal::Empty => {} 2205 Steal::Success(_) => return s, 2206 Steal::Retry => retry = true, 2207 } 2208 } 2209 2210 if retry { 2211 Steal::Retry 2212 } else { 2213 Steal::Empty 2214 } 2215 } 2216 } 2217