1 use std::cell::{Cell, UnsafeCell}; 2 use std::cmp; 3 use std::fmt; 4 use std::iter::FromIterator; 5 use std::marker::PhantomData; 6 use std::mem::{self, ManuallyDrop, MaybeUninit}; 7 use std::ptr; 8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; 9 use std::sync::Arc; 10 11 use crate::epoch::{self, Atomic, Owned}; 12 use crate::utils::{Backoff, CachePadded}; 13 14 // Minimum buffer capacity. 15 const MIN_CAP: usize = 64; 16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. 17 const MAX_BATCH: usize = 32; 18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets 19 // deallocated as soon as possible. 20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; 21 22 /// A buffer that holds tasks in a worker queue. 23 /// 24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will 25 /// *not* deallocate the buffer. 26 struct Buffer<T> { 27 /// Pointer to the allocated memory. 28 ptr: *mut T, 29 30 /// Capacity of the buffer. Always a power of two. 31 cap: usize, 32 } 33 34 unsafe impl<T> Send for Buffer<T> {} 35 36 impl<T> Buffer<T> { 37 /// Allocates a new buffer with the specified capacity. alloc(cap: usize) -> Buffer<T>38 fn alloc(cap: usize) -> Buffer<T> { 39 debug_assert_eq!(cap, cap.next_power_of_two()); 40 41 let mut v = ManuallyDrop::new(Vec::with_capacity(cap)); 42 let ptr = v.as_mut_ptr(); 43 44 Buffer { ptr, cap } 45 } 46 47 /// Deallocates the buffer. dealloc(self)48 unsafe fn dealloc(self) { 49 drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); 50 } 51 52 /// Returns a pointer to the task at the specified `index`. at(&self, index: isize) -> *mut T53 unsafe fn at(&self, index: isize) -> *mut T { 54 // `self.cap` is always a power of two. 55 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we 56 // don't actually have the right to access this memory. 57 self.ptr.offset(index & (self.cap - 1) as isize) 58 } 59 60 /// Writes `task` into the specified `index`. 61 /// 62 /// This method might be concurrently called with another `read` at the same index, which is 63 /// technically speaking a data race and therefore UB. We should use an atomic store here, but 64 /// that would be more expensive and difficult to implement generically for all types `T`. 65 /// Hence, as a hack, we use a volatile write instead. write(&self, index: isize, task: MaybeUninit<T>)66 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) { 67 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) 68 } 69 70 /// Reads a task from the specified `index`. 71 /// 72 /// This method might be concurrently called with another `write` at the same index, which is 73 /// technically speaking a data race and therefore UB. We should use an atomic load here, but 74 /// that would be more expensive and difficult to implement generically for all types `T`. 75 /// Hence, as a hack, we use a volatile load instead. read(&self, index: isize) -> MaybeUninit<T>76 unsafe fn read(&self, index: isize) -> MaybeUninit<T> { 77 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) 78 } 79 } 80 81 impl<T> Clone for Buffer<T> { clone(&self) -> Buffer<T>82 fn clone(&self) -> Buffer<T> { 83 Buffer { 84 ptr: self.ptr, 85 cap: self.cap, 86 } 87 } 88 } 89 90 impl<T> Copy for Buffer<T> {} 91 92 /// Internal queue data shared between the worker and stealers. 93 /// 94 /// The implementation is based on the following work: 95 /// 96 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] 97 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. 98 /// PPoPP 2013.][weak-mem] 99 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ 100 /// atomics. OOPSLA 2013.][checker] 101 /// 102 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 103 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 104 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 105 struct Inner<T> { 106 /// The front index. 107 front: AtomicIsize, 108 109 /// The back index. 110 back: AtomicIsize, 111 112 /// The underlying buffer. 113 buffer: CachePadded<Atomic<Buffer<T>>>, 114 } 115 116 impl<T> Drop for Inner<T> { drop(&mut self)117 fn drop(&mut self) { 118 // Load the back index, front index, and buffer. 119 let b = *self.back.get_mut(); 120 let f = *self.front.get_mut(); 121 122 unsafe { 123 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); 124 125 // Go through the buffer from front to back and drop all tasks in the queue. 126 let mut i = f; 127 while i != b { 128 buffer.deref().at(i).drop_in_place(); 129 i = i.wrapping_add(1); 130 } 131 132 // Free the memory allocated by the buffer. 133 buffer.into_owned().into_box().dealloc(); 134 } 135 } 136 } 137 138 /// Worker queue flavor: FIFO or LIFO. 139 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 140 enum Flavor { 141 /// The first-in first-out flavor. 142 Fifo, 143 144 /// The last-in first-out flavor. 145 Lifo, 146 } 147 148 /// A worker queue. 149 /// 150 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal 151 /// tasks from it. Task schedulers typically create a single worker queue per thread. 152 /// 153 /// # Examples 154 /// 155 /// A FIFO worker: 156 /// 157 /// ``` 158 /// use crossbeam_deque::{Steal, Worker}; 159 /// 160 /// let w = Worker::new_fifo(); 161 /// let s = w.stealer(); 162 /// 163 /// w.push(1); 164 /// w.push(2); 165 /// w.push(3); 166 /// 167 /// assert_eq!(s.steal(), Steal::Success(1)); 168 /// assert_eq!(w.pop(), Some(2)); 169 /// assert_eq!(w.pop(), Some(3)); 170 /// ``` 171 /// 172 /// A LIFO worker: 173 /// 174 /// ``` 175 /// use crossbeam_deque::{Steal, Worker}; 176 /// 177 /// let w = Worker::new_lifo(); 178 /// let s = w.stealer(); 179 /// 180 /// w.push(1); 181 /// w.push(2); 182 /// w.push(3); 183 /// 184 /// assert_eq!(s.steal(), Steal::Success(1)); 185 /// assert_eq!(w.pop(), Some(3)); 186 /// assert_eq!(w.pop(), Some(2)); 187 /// ``` 188 pub struct Worker<T> { 189 /// A reference to the inner representation of the queue. 190 inner: Arc<CachePadded<Inner<T>>>, 191 192 /// A copy of `inner.buffer` for quick access. 193 buffer: Cell<Buffer<T>>, 194 195 /// The flavor of the queue. 196 flavor: Flavor, 197 198 /// Indicates that the worker cannot be shared among threads. 199 _marker: PhantomData<*mut ()>, // !Send + !Sync 200 } 201 202 unsafe impl<T: Send> Send for Worker<T> {} 203 204 impl<T> Worker<T> { 205 /// Creates a FIFO worker queue. 206 /// 207 /// Tasks are pushed and popped from opposite ends. 208 /// 209 /// # Examples 210 /// 211 /// ``` 212 /// use crossbeam_deque::Worker; 213 /// 214 /// let w = Worker::<i32>::new_fifo(); 215 /// ``` new_fifo() -> Worker<T>216 pub fn new_fifo() -> Worker<T> { 217 let buffer = Buffer::alloc(MIN_CAP); 218 219 let inner = Arc::new(CachePadded::new(Inner { 220 front: AtomicIsize::new(0), 221 back: AtomicIsize::new(0), 222 buffer: CachePadded::new(Atomic::new(buffer)), 223 })); 224 225 Worker { 226 inner, 227 buffer: Cell::new(buffer), 228 flavor: Flavor::Fifo, 229 _marker: PhantomData, 230 } 231 } 232 233 /// Creates a LIFO worker queue. 234 /// 235 /// Tasks are pushed and popped from the same end. 236 /// 237 /// # Examples 238 /// 239 /// ``` 240 /// use crossbeam_deque::Worker; 241 /// 242 /// let w = Worker::<i32>::new_lifo(); 243 /// ``` new_lifo() -> Worker<T>244 pub fn new_lifo() -> Worker<T> { 245 let buffer = Buffer::alloc(MIN_CAP); 246 247 let inner = Arc::new(CachePadded::new(Inner { 248 front: AtomicIsize::new(0), 249 back: AtomicIsize::new(0), 250 buffer: CachePadded::new(Atomic::new(buffer)), 251 })); 252 253 Worker { 254 inner, 255 buffer: Cell::new(buffer), 256 flavor: Flavor::Lifo, 257 _marker: PhantomData, 258 } 259 } 260 261 /// Creates a stealer for this queue. 262 /// 263 /// The returned stealer can be shared among threads and cloned. 264 /// 265 /// # Examples 266 /// 267 /// ``` 268 /// use crossbeam_deque::Worker; 269 /// 270 /// let w = Worker::<i32>::new_lifo(); 271 /// let s = w.stealer(); 272 /// ``` stealer(&self) -> Stealer<T>273 pub fn stealer(&self) -> Stealer<T> { 274 Stealer { 275 inner: self.inner.clone(), 276 flavor: self.flavor, 277 } 278 } 279 280 /// Resizes the internal buffer to the new capacity of `new_cap`. 281 #[cold] resize(&self, new_cap: usize)282 unsafe fn resize(&self, new_cap: usize) { 283 // Load the back index, front index, and buffer. 284 let b = self.inner.back.load(Ordering::Relaxed); 285 let f = self.inner.front.load(Ordering::Relaxed); 286 let buffer = self.buffer.get(); 287 288 // Allocate a new buffer and copy data from the old buffer to the new one. 289 let new = Buffer::alloc(new_cap); 290 let mut i = f; 291 while i != b { 292 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); 293 i = i.wrapping_add(1); 294 } 295 296 let guard = &epoch::pin(); 297 298 // Replace the old buffer with the new one. 299 self.buffer.replace(new); 300 let old = 301 self.inner 302 .buffer 303 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); 304 305 // Destroy the old buffer later. 306 guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); 307 308 // If the buffer is very large, then flush the thread-local garbage in order to deallocate 309 // it as soon as possible. 310 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { 311 guard.flush(); 312 } 313 } 314 315 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the 316 /// buffer. reserve(&self, reserve_cap: usize)317 fn reserve(&self, reserve_cap: usize) { 318 if reserve_cap > 0 { 319 // Compute the current length. 320 let b = self.inner.back.load(Ordering::Relaxed); 321 let f = self.inner.front.load(Ordering::SeqCst); 322 let len = b.wrapping_sub(f) as usize; 323 324 // The current capacity. 325 let cap = self.buffer.get().cap; 326 327 // Is there enough capacity to push `reserve_cap` tasks? 328 if cap - len < reserve_cap { 329 // Keep doubling the capacity as much as is needed. 330 let mut new_cap = cap * 2; 331 while new_cap - len < reserve_cap { 332 new_cap *= 2; 333 } 334 335 // Resize the buffer. 336 unsafe { 337 self.resize(new_cap); 338 } 339 } 340 } 341 } 342 343 /// Returns `true` if the queue is empty. 344 /// 345 /// ``` 346 /// use crossbeam_deque::Worker; 347 /// 348 /// let w = Worker::new_lifo(); 349 /// 350 /// assert!(w.is_empty()); 351 /// w.push(1); 352 /// assert!(!w.is_empty()); 353 /// ``` is_empty(&self) -> bool354 pub fn is_empty(&self) -> bool { 355 let b = self.inner.back.load(Ordering::Relaxed); 356 let f = self.inner.front.load(Ordering::SeqCst); 357 b.wrapping_sub(f) <= 0 358 } 359 360 /// Returns the number of tasks in the deque. 361 /// 362 /// ``` 363 /// use crossbeam_deque::Worker; 364 /// 365 /// let w = Worker::new_lifo(); 366 /// 367 /// assert_eq!(w.len(), 0); 368 /// w.push(1); 369 /// assert_eq!(w.len(), 1); 370 /// w.push(1); 371 /// assert_eq!(w.len(), 2); 372 /// ``` len(&self) -> usize373 pub fn len(&self) -> usize { 374 let b = self.inner.back.load(Ordering::Relaxed); 375 let f = self.inner.front.load(Ordering::SeqCst); 376 b.wrapping_sub(f).max(0) as usize 377 } 378 379 /// Pushes a task into the queue. 380 /// 381 /// # Examples 382 /// 383 /// ``` 384 /// use crossbeam_deque::Worker; 385 /// 386 /// let w = Worker::new_lifo(); 387 /// w.push(1); 388 /// w.push(2); 389 /// ``` push(&self, task: T)390 pub fn push(&self, task: T) { 391 // Load the back index, front index, and buffer. 392 let b = self.inner.back.load(Ordering::Relaxed); 393 let f = self.inner.front.load(Ordering::Acquire); 394 let mut buffer = self.buffer.get(); 395 396 // Calculate the length of the queue. 397 let len = b.wrapping_sub(f); 398 399 // Is the queue full? 400 if len >= buffer.cap as isize { 401 // Yes. Grow the underlying buffer. 402 unsafe { 403 self.resize(2 * buffer.cap); 404 } 405 buffer = self.buffer.get(); 406 } 407 408 // Write `task` into the slot. 409 unsafe { 410 buffer.write(b, MaybeUninit::new(task)); 411 } 412 413 atomic::fence(Ordering::Release); 414 415 // Increment the back index. 416 // 417 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 418 // races because it doesn't understand fences. 419 self.inner.back.store(b.wrapping_add(1), Ordering::Release); 420 } 421 422 /// Pops a task from the queue. 423 /// 424 /// # Examples 425 /// 426 /// ``` 427 /// use crossbeam_deque::Worker; 428 /// 429 /// let w = Worker::new_fifo(); 430 /// w.push(1); 431 /// w.push(2); 432 /// 433 /// assert_eq!(w.pop(), Some(1)); 434 /// assert_eq!(w.pop(), Some(2)); 435 /// assert_eq!(w.pop(), None); 436 /// ``` pop(&self) -> Option<T>437 pub fn pop(&self) -> Option<T> { 438 // Load the back and front index. 439 let b = self.inner.back.load(Ordering::Relaxed); 440 let f = self.inner.front.load(Ordering::Relaxed); 441 442 // Calculate the length of the queue. 443 let len = b.wrapping_sub(f); 444 445 // Is the queue empty? 446 if len <= 0 { 447 return None; 448 } 449 450 match self.flavor { 451 // Pop from the front of the queue. 452 Flavor::Fifo => { 453 // Try incrementing the front index to pop the task. 454 let f = self.inner.front.fetch_add(1, Ordering::SeqCst); 455 let new_f = f.wrapping_add(1); 456 457 if b.wrapping_sub(new_f) < 0 { 458 self.inner.front.store(f, Ordering::Relaxed); 459 return None; 460 } 461 462 unsafe { 463 // Read the popped task. 464 let buffer = self.buffer.get(); 465 let task = buffer.read(f).assume_init(); 466 467 // Shrink the buffer if `len - 1` is less than one fourth of the capacity. 468 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { 469 self.resize(buffer.cap / 2); 470 } 471 472 Some(task) 473 } 474 } 475 476 // Pop from the back of the queue. 477 Flavor::Lifo => { 478 // Decrement the back index. 479 let b = b.wrapping_sub(1); 480 self.inner.back.store(b, Ordering::Relaxed); 481 482 atomic::fence(Ordering::SeqCst); 483 484 // Load the front index. 485 let f = self.inner.front.load(Ordering::Relaxed); 486 487 // Compute the length after the back index was decremented. 488 let len = b.wrapping_sub(f); 489 490 if len < 0 { 491 // The queue is empty. Restore the back index to the original task. 492 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 493 None 494 } else { 495 // Read the task to be popped. 496 let buffer = self.buffer.get(); 497 let mut task = unsafe { Some(buffer.read(b)) }; 498 499 // Are we popping the last task from the queue? 500 if len == 0 { 501 // Try incrementing the front index. 502 if self 503 .inner 504 .front 505 .compare_exchange( 506 f, 507 f.wrapping_add(1), 508 Ordering::SeqCst, 509 Ordering::Relaxed, 510 ) 511 .is_err() 512 { 513 // Failed. We didn't pop anything. Reset to `None`. 514 task.take(); 515 } 516 517 // Restore the back index to the original task. 518 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 519 } else { 520 // Shrink the buffer if `len` is less than one fourth of the capacity. 521 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { 522 unsafe { 523 self.resize(buffer.cap / 2); 524 } 525 } 526 } 527 528 task.map(|t| unsafe { t.assume_init() }) 529 } 530 } 531 } 532 } 533 } 534 535 impl<T> fmt::Debug for Worker<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 537 f.pad("Worker { .. }") 538 } 539 } 540 541 /// A stealer handle of a worker queue. 542 /// 543 /// Stealers can be shared among threads. 544 /// 545 /// Task schedulers typically have a single worker queue per worker thread. 546 /// 547 /// # Examples 548 /// 549 /// ``` 550 /// use crossbeam_deque::{Steal, Worker}; 551 /// 552 /// let w = Worker::new_lifo(); 553 /// w.push(1); 554 /// w.push(2); 555 /// 556 /// let s = w.stealer(); 557 /// assert_eq!(s.steal(), Steal::Success(1)); 558 /// assert_eq!(s.steal(), Steal::Success(2)); 559 /// assert_eq!(s.steal(), Steal::Empty); 560 /// ``` 561 pub struct Stealer<T> { 562 /// A reference to the inner representation of the queue. 563 inner: Arc<CachePadded<Inner<T>>>, 564 565 /// The flavor of the queue. 566 flavor: Flavor, 567 } 568 569 unsafe impl<T: Send> Send for Stealer<T> {} 570 unsafe impl<T: Send> Sync for Stealer<T> {} 571 572 impl<T> Stealer<T> { 573 /// Returns `true` if the queue is empty. 574 /// 575 /// ``` 576 /// use crossbeam_deque::Worker; 577 /// 578 /// let w = Worker::new_lifo(); 579 /// let s = w.stealer(); 580 /// 581 /// assert!(s.is_empty()); 582 /// w.push(1); 583 /// assert!(!s.is_empty()); 584 /// ``` is_empty(&self) -> bool585 pub fn is_empty(&self) -> bool { 586 let f = self.inner.front.load(Ordering::Acquire); 587 atomic::fence(Ordering::SeqCst); 588 let b = self.inner.back.load(Ordering::Acquire); 589 b.wrapping_sub(f) <= 0 590 } 591 592 /// Returns the number of tasks in the deque. 593 /// 594 /// ``` 595 /// use crossbeam_deque::Worker; 596 /// 597 /// let w = Worker::new_lifo(); 598 /// let s = w.stealer(); 599 /// 600 /// assert_eq!(s.len(), 0); 601 /// w.push(1); 602 /// assert_eq!(s.len(), 1); 603 /// w.push(2); 604 /// assert_eq!(s.len(), 2); 605 /// ``` len(&self) -> usize606 pub fn len(&self) -> usize { 607 let f = self.inner.front.load(Ordering::Acquire); 608 atomic::fence(Ordering::SeqCst); 609 let b = self.inner.back.load(Ordering::Acquire); 610 b.wrapping_sub(f).max(0) as usize 611 } 612 613 /// Steals a task from the queue. 614 /// 615 /// # Examples 616 /// 617 /// ``` 618 /// use crossbeam_deque::{Steal, Worker}; 619 /// 620 /// let w = Worker::new_lifo(); 621 /// w.push(1); 622 /// w.push(2); 623 /// 624 /// let s = w.stealer(); 625 /// assert_eq!(s.steal(), Steal::Success(1)); 626 /// assert_eq!(s.steal(), Steal::Success(2)); 627 /// ``` steal(&self) -> Steal<T>628 pub fn steal(&self) -> Steal<T> { 629 // Load the front index. 630 let f = self.inner.front.load(Ordering::Acquire); 631 632 // A SeqCst fence is needed here. 633 // 634 // If the current thread is already pinned (reentrantly), we must manually issue the 635 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 636 // have to. 637 if epoch::is_pinned() { 638 atomic::fence(Ordering::SeqCst); 639 } 640 641 let guard = &epoch::pin(); 642 643 // Load the back index. 644 let b = self.inner.back.load(Ordering::Acquire); 645 646 // Is the queue empty? 647 if b.wrapping_sub(f) <= 0 { 648 return Steal::Empty; 649 } 650 651 // Load the buffer and read the task at the front. 652 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 653 let task = unsafe { buffer.deref().read(f) }; 654 655 // Try incrementing the front index to steal the task. 656 // If the buffer has been swapped or the increment fails, we retry. 657 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 658 || self 659 .inner 660 .front 661 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 662 .is_err() 663 { 664 // We didn't steal this task, forget it. 665 return Steal::Retry; 666 } 667 668 // Return the stolen task. 669 Steal::Success(unsafe { task.assume_init() }) 670 } 671 672 /// Steals a batch of tasks and pushes them into another worker. 673 /// 674 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 675 /// steal around half of the tasks in the queue, but also not more than some constant limit. 676 /// 677 /// # Examples 678 /// 679 /// ``` 680 /// use crossbeam_deque::Worker; 681 /// 682 /// let w1 = Worker::new_fifo(); 683 /// w1.push(1); 684 /// w1.push(2); 685 /// w1.push(3); 686 /// w1.push(4); 687 /// 688 /// let s = w1.stealer(); 689 /// let w2 = Worker::new_fifo(); 690 /// 691 /// let _ = s.steal_batch(&w2); 692 /// assert_eq!(w2.pop(), Some(1)); 693 /// assert_eq!(w2.pop(), Some(2)); 694 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>695 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 696 self.steal_batch_with_limit(dest, MAX_BATCH) 697 } 698 699 /// Steals no more than `limit` of tasks and pushes them into another worker. 700 /// 701 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 702 /// steal around half of the tasks in the queue, but also not more than the given limit. 703 /// 704 /// # Examples 705 /// 706 /// ``` 707 /// use crossbeam_deque::Worker; 708 /// 709 /// let w1 = Worker::new_fifo(); 710 /// w1.push(1); 711 /// w1.push(2); 712 /// w1.push(3); 713 /// w1.push(4); 714 /// w1.push(5); 715 /// w1.push(6); 716 /// 717 /// let s = w1.stealer(); 718 /// let w2 = Worker::new_fifo(); 719 /// 720 /// let _ = s.steal_batch_with_limit(&w2, 2); 721 /// assert_eq!(w2.pop(), Some(1)); 722 /// assert_eq!(w2.pop(), Some(2)); 723 /// assert_eq!(w2.pop(), None); 724 /// 725 /// w1.push(7); 726 /// w1.push(8); 727 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 728 /// // half of the elements are currently popped, but the number of popped elements is considered 729 /// // an implementation detail that may be changed in the future. 730 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); 731 /// assert_eq!(w2.len(), 3); 732 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>733 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 734 assert!(limit > 0); 735 if Arc::ptr_eq(&self.inner, &dest.inner) { 736 if dest.is_empty() { 737 return Steal::Empty; 738 } else { 739 return Steal::Success(()); 740 } 741 } 742 743 // Load the front index. 744 let mut f = self.inner.front.load(Ordering::Acquire); 745 746 // A SeqCst fence is needed here. 747 // 748 // If the current thread is already pinned (reentrantly), we must manually issue the 749 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 750 // have to. 751 if epoch::is_pinned() { 752 atomic::fence(Ordering::SeqCst); 753 } 754 755 let guard = &epoch::pin(); 756 757 // Load the back index. 758 let b = self.inner.back.load(Ordering::Acquire); 759 760 // Is the queue empty? 761 let len = b.wrapping_sub(f); 762 if len <= 0 { 763 return Steal::Empty; 764 } 765 766 // Reserve capacity for the stolen batch. 767 let batch_size = cmp::min((len as usize + 1) / 2, limit); 768 dest.reserve(batch_size); 769 let mut batch_size = batch_size as isize; 770 771 // Get the destination buffer and back index. 772 let dest_buffer = dest.buffer.get(); 773 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 774 775 // Load the buffer. 776 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 777 778 match self.flavor { 779 // Steal a batch of tasks from the front at once. 780 Flavor::Fifo => { 781 // Copy the batch from the source to the destination buffer. 782 match dest.flavor { 783 Flavor::Fifo => { 784 for i in 0..batch_size { 785 unsafe { 786 let task = buffer.deref().read(f.wrapping_add(i)); 787 dest_buffer.write(dest_b.wrapping_add(i), task); 788 } 789 } 790 } 791 Flavor::Lifo => { 792 for i in 0..batch_size { 793 unsafe { 794 let task = buffer.deref().read(f.wrapping_add(i)); 795 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 796 } 797 } 798 } 799 } 800 801 // Try incrementing the front index to steal the batch. 802 // If the buffer has been swapped or the increment fails, we retry. 803 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 804 || self 805 .inner 806 .front 807 .compare_exchange( 808 f, 809 f.wrapping_add(batch_size), 810 Ordering::SeqCst, 811 Ordering::Relaxed, 812 ) 813 .is_err() 814 { 815 return Steal::Retry; 816 } 817 818 dest_b = dest_b.wrapping_add(batch_size); 819 } 820 821 // Steal a batch of tasks from the front one by one. 822 Flavor::Lifo => { 823 // This loop may modify the batch_size, which triggers a clippy lint warning. 824 // Use a new variable to avoid the warning, and to make it clear we aren't 825 // modifying the loop exit condition during iteration. 826 let original_batch_size = batch_size; 827 828 for i in 0..original_batch_size { 829 // If this is not the first steal, check whether the queue is empty. 830 if i > 0 { 831 // We've already got the current front index. Now execute the fence to 832 // synchronize with other threads. 833 atomic::fence(Ordering::SeqCst); 834 835 // Load the back index. 836 let b = self.inner.back.load(Ordering::Acquire); 837 838 // Is the queue empty? 839 if b.wrapping_sub(f) <= 0 { 840 batch_size = i; 841 break; 842 } 843 } 844 845 // Read the task at the front. 846 let task = unsafe { buffer.deref().read(f) }; 847 848 // Try incrementing the front index to steal the task. 849 // If the buffer has been swapped or the increment fails, we retry. 850 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 851 || self 852 .inner 853 .front 854 .compare_exchange( 855 f, 856 f.wrapping_add(1), 857 Ordering::SeqCst, 858 Ordering::Relaxed, 859 ) 860 .is_err() 861 { 862 // We didn't steal this task, forget it and break from the loop. 863 batch_size = i; 864 break; 865 } 866 867 // Write the stolen task into the destination buffer. 868 unsafe { 869 dest_buffer.write(dest_b, task); 870 } 871 872 // Move the source front index and the destination back index one step forward. 873 f = f.wrapping_add(1); 874 dest_b = dest_b.wrapping_add(1); 875 } 876 877 // If we didn't steal anything, the operation needs to be retried. 878 if batch_size == 0 { 879 return Steal::Retry; 880 } 881 882 // If stealing into a FIFO queue, stolen tasks need to be reversed. 883 if dest.flavor == Flavor::Fifo { 884 for i in 0..batch_size / 2 { 885 unsafe { 886 let i1 = dest_b.wrapping_sub(batch_size - i); 887 let i2 = dest_b.wrapping_sub(i + 1); 888 let t1 = dest_buffer.read(i1); 889 let t2 = dest_buffer.read(i2); 890 dest_buffer.write(i1, t2); 891 dest_buffer.write(i2, t1); 892 } 893 } 894 } 895 } 896 } 897 898 atomic::fence(Ordering::Release); 899 900 // Update the back index in the destination queue. 901 // 902 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 903 // races because it doesn't understand fences. 904 dest.inner.back.store(dest_b, Ordering::Release); 905 906 // Return with success. 907 Steal::Success(()) 908 } 909 910 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. 911 /// 912 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 913 /// steal around half of the tasks in the queue, but also not more than some constant limit. 914 /// 915 /// # Examples 916 /// 917 /// ``` 918 /// use crossbeam_deque::{Steal, Worker}; 919 /// 920 /// let w1 = Worker::new_fifo(); 921 /// w1.push(1); 922 /// w1.push(2); 923 /// w1.push(3); 924 /// w1.push(4); 925 /// 926 /// let s = w1.stealer(); 927 /// let w2 = Worker::new_fifo(); 928 /// 929 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); 930 /// assert_eq!(w2.pop(), Some(2)); 931 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>932 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 933 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) 934 } 935 936 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from 937 /// that worker. 938 /// 939 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 940 /// steal around half of the tasks in the queue, but also not more than the given limit. 941 /// 942 /// # Examples 943 /// 944 /// ``` 945 /// use crossbeam_deque::{Steal, Worker}; 946 /// 947 /// let w1 = Worker::new_fifo(); 948 /// w1.push(1); 949 /// w1.push(2); 950 /// w1.push(3); 951 /// w1.push(4); 952 /// w1.push(5); 953 /// w1.push(6); 954 /// 955 /// let s = w1.stealer(); 956 /// let w2 = Worker::new_fifo(); 957 /// 958 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); 959 /// assert_eq!(w2.pop(), Some(2)); 960 /// assert_eq!(w2.pop(), None); 961 /// 962 /// w1.push(7); 963 /// w1.push(8); 964 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 965 /// // half of the elements are currently popped, but the number of popped elements is considered 966 /// // an implementation detail that may be changed in the future. 967 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); 968 /// assert_eq!(w2.pop(), Some(4)); 969 /// assert_eq!(w2.pop(), Some(5)); 970 /// assert_eq!(w2.pop(), None); 971 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>972 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 973 assert!(limit > 0); 974 if Arc::ptr_eq(&self.inner, &dest.inner) { 975 match dest.pop() { 976 None => return Steal::Empty, 977 Some(task) => return Steal::Success(task), 978 } 979 } 980 981 // Load the front index. 982 let mut f = self.inner.front.load(Ordering::Acquire); 983 984 // A SeqCst fence is needed here. 985 // 986 // If the current thread is already pinned (reentrantly), we must manually issue the 987 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 988 // have to. 989 if epoch::is_pinned() { 990 atomic::fence(Ordering::SeqCst); 991 } 992 993 let guard = &epoch::pin(); 994 995 // Load the back index. 996 let b = self.inner.back.load(Ordering::Acquire); 997 998 // Is the queue empty? 999 let len = b.wrapping_sub(f); 1000 if len <= 0 { 1001 return Steal::Empty; 1002 } 1003 1004 // Reserve capacity for the stolen batch. 1005 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); 1006 dest.reserve(batch_size); 1007 let mut batch_size = batch_size as isize; 1008 1009 // Get the destination buffer and back index. 1010 let dest_buffer = dest.buffer.get(); 1011 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 1012 1013 // Load the buffer 1014 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 1015 1016 // Read the task at the front. 1017 let mut task = unsafe { buffer.deref().read(f) }; 1018 1019 match self.flavor { 1020 // Steal a batch of tasks from the front at once. 1021 Flavor::Fifo => { 1022 // Copy the batch from the source to the destination buffer. 1023 match dest.flavor { 1024 Flavor::Fifo => { 1025 for i in 0..batch_size { 1026 unsafe { 1027 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1028 dest_buffer.write(dest_b.wrapping_add(i), task); 1029 } 1030 } 1031 } 1032 Flavor::Lifo => { 1033 for i in 0..batch_size { 1034 unsafe { 1035 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1036 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 1037 } 1038 } 1039 } 1040 } 1041 1042 // Try incrementing the front index to steal the task. 1043 // If the buffer has been swapped or the increment fails, we retry. 1044 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1045 || self 1046 .inner 1047 .front 1048 .compare_exchange( 1049 f, 1050 f.wrapping_add(batch_size + 1), 1051 Ordering::SeqCst, 1052 Ordering::Relaxed, 1053 ) 1054 .is_err() 1055 { 1056 // We didn't steal this task, forget it. 1057 return Steal::Retry; 1058 } 1059 1060 dest_b = dest_b.wrapping_add(batch_size); 1061 } 1062 1063 // Steal a batch of tasks from the front one by one. 1064 Flavor::Lifo => { 1065 // Try incrementing the front index to steal the task. 1066 if self 1067 .inner 1068 .front 1069 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 1070 .is_err() 1071 { 1072 // We didn't steal this task, forget it. 1073 return Steal::Retry; 1074 } 1075 1076 // Move the front index one step forward. 1077 f = f.wrapping_add(1); 1078 1079 // Repeat the same procedure for the batch steals. 1080 // 1081 // This loop may modify the batch_size, which triggers a clippy lint warning. 1082 // Use a new variable to avoid the warning, and to make it clear we aren't 1083 // modifying the loop exit condition during iteration. 1084 let original_batch_size = batch_size; 1085 for i in 0..original_batch_size { 1086 // We've already got the current front index. Now execute the fence to 1087 // synchronize with other threads. 1088 atomic::fence(Ordering::SeqCst); 1089 1090 // Load the back index. 1091 let b = self.inner.back.load(Ordering::Acquire); 1092 1093 // Is the queue empty? 1094 if b.wrapping_sub(f) <= 0 { 1095 batch_size = i; 1096 break; 1097 } 1098 1099 // Read the task at the front. 1100 let tmp = unsafe { buffer.deref().read(f) }; 1101 1102 // Try incrementing the front index to steal the task. 1103 // If the buffer has been swapped or the increment fails, we retry. 1104 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1105 || self 1106 .inner 1107 .front 1108 .compare_exchange( 1109 f, 1110 f.wrapping_add(1), 1111 Ordering::SeqCst, 1112 Ordering::Relaxed, 1113 ) 1114 .is_err() 1115 { 1116 // We didn't steal this task, forget it and break from the loop. 1117 batch_size = i; 1118 break; 1119 } 1120 1121 // Write the previously stolen task into the destination buffer. 1122 unsafe { 1123 dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); 1124 } 1125 1126 // Move the source front index and the destination back index one step forward. 1127 f = f.wrapping_add(1); 1128 dest_b = dest_b.wrapping_add(1); 1129 } 1130 1131 // If stealing into a FIFO queue, stolen tasks need to be reversed. 1132 if dest.flavor == Flavor::Fifo { 1133 for i in 0..batch_size / 2 { 1134 unsafe { 1135 let i1 = dest_b.wrapping_sub(batch_size - i); 1136 let i2 = dest_b.wrapping_sub(i + 1); 1137 let t1 = dest_buffer.read(i1); 1138 let t2 = dest_buffer.read(i2); 1139 dest_buffer.write(i1, t2); 1140 dest_buffer.write(i2, t1); 1141 } 1142 } 1143 } 1144 } 1145 } 1146 1147 atomic::fence(Ordering::Release); 1148 1149 // Update the back index in the destination queue. 1150 // 1151 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 1152 // races because it doesn't understand fences. 1153 dest.inner.back.store(dest_b, Ordering::Release); 1154 1155 // Return with success. 1156 Steal::Success(unsafe { task.assume_init() }) 1157 } 1158 } 1159 1160 impl<T> Clone for Stealer<T> { clone(&self) -> Stealer<T>1161 fn clone(&self) -> Stealer<T> { 1162 Stealer { 1163 inner: self.inner.clone(), 1164 flavor: self.flavor, 1165 } 1166 } 1167 } 1168 1169 impl<T> fmt::Debug for Stealer<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1171 f.pad("Stealer { .. }") 1172 } 1173 } 1174 1175 // Bits indicating the state of a slot: 1176 // * If a task has been written into the slot, `WRITE` is set. 1177 // * If a task has been read from the slot, `READ` is set. 1178 // * If the block is being destroyed, `DESTROY` is set. 1179 const WRITE: usize = 1; 1180 const READ: usize = 2; 1181 const DESTROY: usize = 4; 1182 1183 // Each block covers one "lap" of indices. 1184 const LAP: usize = 64; 1185 // The maximum number of values a block can hold. 1186 const BLOCK_CAP: usize = LAP - 1; 1187 // How many lower bits are reserved for metadata. 1188 const SHIFT: usize = 1; 1189 // Indicates that the block is not the last one. 1190 const HAS_NEXT: usize = 1; 1191 1192 /// A slot in a block. 1193 struct Slot<T> { 1194 /// The task. 1195 task: UnsafeCell<MaybeUninit<T>>, 1196 1197 /// The state of the slot. 1198 state: AtomicUsize, 1199 } 1200 1201 impl<T> Slot<T> { 1202 const UNINIT: Self = Self { 1203 task: UnsafeCell::new(MaybeUninit::uninit()), 1204 state: AtomicUsize::new(0), 1205 }; 1206 1207 /// Waits until a task is written into the slot. wait_write(&self)1208 fn wait_write(&self) { 1209 let backoff = Backoff::new(); 1210 while self.state.load(Ordering::Acquire) & WRITE == 0 { 1211 backoff.snooze(); 1212 } 1213 } 1214 } 1215 1216 /// A block in a linked list. 1217 /// 1218 /// Each block in the list can hold up to `BLOCK_CAP` values. 1219 struct Block<T> { 1220 /// The next block in the linked list. 1221 next: AtomicPtr<Block<T>>, 1222 1223 /// Slots for values. 1224 slots: [Slot<T>; BLOCK_CAP], 1225 } 1226 1227 impl<T> Block<T> { 1228 /// Creates an empty block that starts at `start_index`. new() -> Block<T>1229 fn new() -> Block<T> { 1230 Self { 1231 next: AtomicPtr::new(ptr::null_mut()), 1232 slots: [Slot::UNINIT; BLOCK_CAP], 1233 } 1234 } 1235 1236 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>1237 fn wait_next(&self) -> *mut Block<T> { 1238 let backoff = Backoff::new(); 1239 loop { 1240 let next = self.next.load(Ordering::Acquire); 1241 if !next.is_null() { 1242 return next; 1243 } 1244 backoff.snooze(); 1245 } 1246 } 1247 1248 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, count: usize)1249 unsafe fn destroy(this: *mut Block<T>, count: usize) { 1250 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 1251 // begun destruction of the block. 1252 for i in (0..count).rev() { 1253 let slot = (*this).slots.get_unchecked(i); 1254 1255 // Mark the `DESTROY` bit if a thread is still using the slot. 1256 if slot.state.load(Ordering::Acquire) & READ == 0 1257 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 1258 { 1259 // If a thread is still using the slot, it will continue destruction of the block. 1260 return; 1261 } 1262 } 1263 1264 // No thread is using the block, now it is safe to destroy it. 1265 drop(Box::from_raw(this)); 1266 } 1267 } 1268 1269 /// A position in a queue. 1270 struct Position<T> { 1271 /// The index in the queue. 1272 index: AtomicUsize, 1273 1274 /// The block in the linked list. 1275 block: AtomicPtr<Block<T>>, 1276 } 1277 1278 /// An injector queue. 1279 /// 1280 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have 1281 /// a single injector queue, which is the entry point for new tasks. 1282 /// 1283 /// # Examples 1284 /// 1285 /// ``` 1286 /// use crossbeam_deque::{Injector, Steal}; 1287 /// 1288 /// let q = Injector::new(); 1289 /// q.push(1); 1290 /// q.push(2); 1291 /// 1292 /// assert_eq!(q.steal(), Steal::Success(1)); 1293 /// assert_eq!(q.steal(), Steal::Success(2)); 1294 /// assert_eq!(q.steal(), Steal::Empty); 1295 /// ``` 1296 pub struct Injector<T> { 1297 /// The head of the queue. 1298 head: CachePadded<Position<T>>, 1299 1300 /// The tail of the queue. 1301 tail: CachePadded<Position<T>>, 1302 1303 /// Indicates that dropping a `Injector<T>` may drop values of type `T`. 1304 _marker: PhantomData<T>, 1305 } 1306 1307 unsafe impl<T: Send> Send for Injector<T> {} 1308 unsafe impl<T: Send> Sync for Injector<T> {} 1309 1310 impl<T> Default for Injector<T> { default() -> Self1311 fn default() -> Self { 1312 let block = Box::into_raw(Box::new(Block::<T>::new())); 1313 Self { 1314 head: CachePadded::new(Position { 1315 block: AtomicPtr::new(block), 1316 index: AtomicUsize::new(0), 1317 }), 1318 tail: CachePadded::new(Position { 1319 block: AtomicPtr::new(block), 1320 index: AtomicUsize::new(0), 1321 }), 1322 _marker: PhantomData, 1323 } 1324 } 1325 } 1326 1327 impl<T> Injector<T> { 1328 /// Creates a new injector queue. 1329 /// 1330 /// # Examples 1331 /// 1332 /// ``` 1333 /// use crossbeam_deque::Injector; 1334 /// 1335 /// let q = Injector::<i32>::new(); 1336 /// ``` new() -> Injector<T>1337 pub fn new() -> Injector<T> { 1338 Self::default() 1339 } 1340 1341 /// Pushes a task into the queue. 1342 /// 1343 /// # Examples 1344 /// 1345 /// ``` 1346 /// use crossbeam_deque::Injector; 1347 /// 1348 /// let w = Injector::new(); 1349 /// w.push(1); 1350 /// w.push(2); 1351 /// ``` push(&self, task: T)1352 pub fn push(&self, task: T) { 1353 let backoff = Backoff::new(); 1354 let mut tail = self.tail.index.load(Ordering::Acquire); 1355 let mut block = self.tail.block.load(Ordering::Acquire); 1356 let mut next_block = None; 1357 1358 loop { 1359 // Calculate the offset of the index into the block. 1360 let offset = (tail >> SHIFT) % LAP; 1361 1362 // If we reached the end of the block, wait until the next one is installed. 1363 if offset == BLOCK_CAP { 1364 backoff.snooze(); 1365 tail = self.tail.index.load(Ordering::Acquire); 1366 block = self.tail.block.load(Ordering::Acquire); 1367 continue; 1368 } 1369 1370 // If we're going to have to install the next block, allocate it in advance in order to 1371 // make the wait for other threads as short as possible. 1372 if offset + 1 == BLOCK_CAP && next_block.is_none() { 1373 next_block = Some(Box::new(Block::<T>::new())); 1374 } 1375 1376 let new_tail = tail + (1 << SHIFT); 1377 1378 // Try advancing the tail forward. 1379 match self.tail.index.compare_exchange_weak( 1380 tail, 1381 new_tail, 1382 Ordering::SeqCst, 1383 Ordering::Acquire, 1384 ) { 1385 Ok(_) => unsafe { 1386 // If we've reached the end of the block, install the next one. 1387 if offset + 1 == BLOCK_CAP { 1388 let next_block = Box::into_raw(next_block.unwrap()); 1389 let next_index = new_tail.wrapping_add(1 << SHIFT); 1390 1391 self.tail.block.store(next_block, Ordering::Release); 1392 self.tail.index.store(next_index, Ordering::Release); 1393 (*block).next.store(next_block, Ordering::Release); 1394 } 1395 1396 // Write the task into the slot. 1397 let slot = (*block).slots.get_unchecked(offset); 1398 slot.task.get().write(MaybeUninit::new(task)); 1399 slot.state.fetch_or(WRITE, Ordering::Release); 1400 1401 return; 1402 }, 1403 Err(t) => { 1404 tail = t; 1405 block = self.tail.block.load(Ordering::Acquire); 1406 backoff.spin(); 1407 } 1408 } 1409 } 1410 } 1411 1412 /// Steals a task from the queue. 1413 /// 1414 /// # Examples 1415 /// 1416 /// ``` 1417 /// use crossbeam_deque::{Injector, Steal}; 1418 /// 1419 /// let q = Injector::new(); 1420 /// q.push(1); 1421 /// q.push(2); 1422 /// 1423 /// assert_eq!(q.steal(), Steal::Success(1)); 1424 /// assert_eq!(q.steal(), Steal::Success(2)); 1425 /// assert_eq!(q.steal(), Steal::Empty); 1426 /// ``` steal(&self) -> Steal<T>1427 pub fn steal(&self) -> Steal<T> { 1428 let mut head; 1429 let mut block; 1430 let mut offset; 1431 1432 let backoff = Backoff::new(); 1433 loop { 1434 head = self.head.index.load(Ordering::Acquire); 1435 block = self.head.block.load(Ordering::Acquire); 1436 1437 // Calculate the offset of the index into the block. 1438 offset = (head >> SHIFT) % LAP; 1439 1440 // If we reached the end of the block, wait until the next one is installed. 1441 if offset == BLOCK_CAP { 1442 backoff.snooze(); 1443 } else { 1444 break; 1445 } 1446 } 1447 1448 let mut new_head = head + (1 << SHIFT); 1449 1450 if new_head & HAS_NEXT == 0 { 1451 atomic::fence(Ordering::SeqCst); 1452 let tail = self.tail.index.load(Ordering::Relaxed); 1453 1454 // If the tail equals the head, that means the queue is empty. 1455 if head >> SHIFT == tail >> SHIFT { 1456 return Steal::Empty; 1457 } 1458 1459 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1460 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1461 new_head |= HAS_NEXT; 1462 } 1463 } 1464 1465 // Try moving the head index forward. 1466 if self 1467 .head 1468 .index 1469 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1470 .is_err() 1471 { 1472 return Steal::Retry; 1473 } 1474 1475 unsafe { 1476 // If we've reached the end of the block, move to the next one. 1477 if offset + 1 == BLOCK_CAP { 1478 let next = (*block).wait_next(); 1479 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1480 if !(*next).next.load(Ordering::Relaxed).is_null() { 1481 next_index |= HAS_NEXT; 1482 } 1483 1484 self.head.block.store(next, Ordering::Release); 1485 self.head.index.store(next_index, Ordering::Release); 1486 } 1487 1488 // Read the task. 1489 let slot = (*block).slots.get_unchecked(offset); 1490 slot.wait_write(); 1491 let task = slot.task.get().read().assume_init(); 1492 1493 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1494 // but couldn't because we were busy reading from the slot. 1495 if (offset + 1 == BLOCK_CAP) 1496 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) 1497 { 1498 Block::destroy(block, offset); 1499 } 1500 1501 Steal::Success(task) 1502 } 1503 } 1504 1505 /// Steals a batch of tasks and pushes them into a worker. 1506 /// 1507 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1508 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1509 /// 1510 /// # Examples 1511 /// 1512 /// ``` 1513 /// use crossbeam_deque::{Injector, Worker}; 1514 /// 1515 /// let q = Injector::new(); 1516 /// q.push(1); 1517 /// q.push(2); 1518 /// q.push(3); 1519 /// q.push(4); 1520 /// 1521 /// let w = Worker::new_fifo(); 1522 /// let _ = q.steal_batch(&w); 1523 /// assert_eq!(w.pop(), Some(1)); 1524 /// assert_eq!(w.pop(), Some(2)); 1525 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>1526 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 1527 self.steal_batch_with_limit(dest, MAX_BATCH) 1528 } 1529 1530 /// Steals no more than of tasks and pushes them into a worker. 1531 /// 1532 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1533 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1534 /// 1535 /// # Examples 1536 /// 1537 /// ``` 1538 /// use crossbeam_deque::{Injector, Worker}; 1539 /// 1540 /// let q = Injector::new(); 1541 /// q.push(1); 1542 /// q.push(2); 1543 /// q.push(3); 1544 /// q.push(4); 1545 /// q.push(5); 1546 /// q.push(6); 1547 /// 1548 /// let w = Worker::new_fifo(); 1549 /// let _ = q.steal_batch_with_limit(&w, 2); 1550 /// assert_eq!(w.pop(), Some(1)); 1551 /// assert_eq!(w.pop(), Some(2)); 1552 /// assert_eq!(w.pop(), None); 1553 /// 1554 /// q.push(7); 1555 /// q.push(8); 1556 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1557 /// // half of the elements are currently popped, but the number of popped elements is considered 1558 /// // an implementation detail that may be changed in the future. 1559 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); 1560 /// assert_eq!(w.len(), 3); 1561 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1562 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 1563 assert!(limit > 0); 1564 let mut head; 1565 let mut block; 1566 let mut offset; 1567 1568 let backoff = Backoff::new(); 1569 loop { 1570 head = self.head.index.load(Ordering::Acquire); 1571 block = self.head.block.load(Ordering::Acquire); 1572 1573 // Calculate the offset of the index into the block. 1574 offset = (head >> SHIFT) % LAP; 1575 1576 // If we reached the end of the block, wait until the next one is installed. 1577 if offset == BLOCK_CAP { 1578 backoff.snooze(); 1579 } else { 1580 break; 1581 } 1582 } 1583 1584 let mut new_head = head; 1585 let advance; 1586 1587 if new_head & HAS_NEXT == 0 { 1588 atomic::fence(Ordering::SeqCst); 1589 let tail = self.tail.index.load(Ordering::Relaxed); 1590 1591 // If the tail equals the head, that means the queue is empty. 1592 if head >> SHIFT == tail >> SHIFT { 1593 return Steal::Empty; 1594 } 1595 1596 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate 1597 // the right batch size to steal. 1598 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1599 new_head |= HAS_NEXT; 1600 // We can steal all tasks till the end of the block. 1601 advance = (BLOCK_CAP - offset).min(limit); 1602 } else { 1603 let len = (tail - head) >> SHIFT; 1604 // Steal half of the available tasks. 1605 advance = ((len + 1) / 2).min(limit); 1606 } 1607 } else { 1608 // We can steal all tasks till the end of the block. 1609 advance = (BLOCK_CAP - offset).min(limit); 1610 } 1611 1612 new_head += advance << SHIFT; 1613 let new_offset = offset + advance; 1614 1615 // Try moving the head index forward. 1616 if self 1617 .head 1618 .index 1619 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1620 .is_err() 1621 { 1622 return Steal::Retry; 1623 } 1624 1625 // Reserve capacity for the stolen batch. 1626 let batch_size = new_offset - offset; 1627 dest.reserve(batch_size); 1628 1629 // Get the destination buffer and back index. 1630 let dest_buffer = dest.buffer.get(); 1631 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1632 1633 unsafe { 1634 // If we've reached the end of the block, move to the next one. 1635 if new_offset == BLOCK_CAP { 1636 let next = (*block).wait_next(); 1637 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1638 if !(*next).next.load(Ordering::Relaxed).is_null() { 1639 next_index |= HAS_NEXT; 1640 } 1641 1642 self.head.block.store(next, Ordering::Release); 1643 self.head.index.store(next_index, Ordering::Release); 1644 } 1645 1646 // Copy values from the injector into the destination queue. 1647 match dest.flavor { 1648 Flavor::Fifo => { 1649 for i in 0..batch_size { 1650 // Read the task. 1651 let slot = (*block).slots.get_unchecked(offset + i); 1652 slot.wait_write(); 1653 let task = slot.task.get().read(); 1654 1655 // Write it into the destination queue. 1656 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1657 } 1658 } 1659 1660 Flavor::Lifo => { 1661 for i in 0..batch_size { 1662 // Read the task. 1663 let slot = (*block).slots.get_unchecked(offset + i); 1664 slot.wait_write(); 1665 let task = slot.task.get().read(); 1666 1667 // Write it into the destination queue. 1668 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1669 } 1670 } 1671 } 1672 1673 atomic::fence(Ordering::Release); 1674 1675 // Update the back index in the destination queue. 1676 // 1677 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1678 // data races because it doesn't understand fences. 1679 dest.inner 1680 .back 1681 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1682 1683 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1684 // but couldn't because we were busy reading from the slot. 1685 if new_offset == BLOCK_CAP { 1686 Block::destroy(block, offset); 1687 } else { 1688 for i in offset..new_offset { 1689 let slot = (*block).slots.get_unchecked(i); 1690 1691 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1692 Block::destroy(block, offset); 1693 break; 1694 } 1695 } 1696 } 1697 1698 Steal::Success(()) 1699 } 1700 } 1701 1702 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. 1703 /// 1704 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1705 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1706 /// 1707 /// # Examples 1708 /// 1709 /// ``` 1710 /// use crossbeam_deque::{Injector, Steal, Worker}; 1711 /// 1712 /// let q = Injector::new(); 1713 /// q.push(1); 1714 /// q.push(2); 1715 /// q.push(3); 1716 /// q.push(4); 1717 /// 1718 /// let w = Worker::new_fifo(); 1719 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); 1720 /// assert_eq!(w.pop(), Some(2)); 1721 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1722 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 1723 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly 1724 // better, but we may change it in the future to be compatible with the same method in Stealer. 1725 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) 1726 } 1727 1728 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. 1729 /// 1730 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1731 /// steal around half of the tasks in the queue, but also not more than the given limit. 1732 /// 1733 /// # Examples 1734 /// 1735 /// ``` 1736 /// use crossbeam_deque::{Injector, Steal, Worker}; 1737 /// 1738 /// let q = Injector::new(); 1739 /// q.push(1); 1740 /// q.push(2); 1741 /// q.push(3); 1742 /// q.push(4); 1743 /// q.push(5); 1744 /// q.push(6); 1745 /// 1746 /// let w = Worker::new_fifo(); 1747 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); 1748 /// assert_eq!(w.pop(), Some(2)); 1749 /// assert_eq!(w.pop(), None); 1750 /// 1751 /// q.push(7); 1752 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1753 /// // half of the elements are currently popped, but the number of popped elements is considered 1754 /// // an implementation detail that may be changed in the future. 1755 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); 1756 /// assert_eq!(w.pop(), Some(4)); 1757 /// assert_eq!(w.pop(), Some(5)); 1758 /// assert_eq!(w.pop(), None); 1759 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1760 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 1761 assert!(limit > 0); 1762 let mut head; 1763 let mut block; 1764 let mut offset; 1765 1766 let backoff = Backoff::new(); 1767 loop { 1768 head = self.head.index.load(Ordering::Acquire); 1769 block = self.head.block.load(Ordering::Acquire); 1770 1771 // Calculate the offset of the index into the block. 1772 offset = (head >> SHIFT) % LAP; 1773 1774 // If we reached the end of the block, wait until the next one is installed. 1775 if offset == BLOCK_CAP { 1776 backoff.snooze(); 1777 } else { 1778 break; 1779 } 1780 } 1781 1782 let mut new_head = head; 1783 let advance; 1784 1785 if new_head & HAS_NEXT == 0 { 1786 atomic::fence(Ordering::SeqCst); 1787 let tail = self.tail.index.load(Ordering::Relaxed); 1788 1789 // If the tail equals the head, that means the queue is empty. 1790 if head >> SHIFT == tail >> SHIFT { 1791 return Steal::Empty; 1792 } 1793 1794 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1795 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1796 new_head |= HAS_NEXT; 1797 // We can steal all tasks till the end of the block. 1798 advance = (BLOCK_CAP - offset).min(limit); 1799 } else { 1800 let len = (tail - head) >> SHIFT; 1801 // Steal half of the available tasks. 1802 advance = ((len + 1) / 2).min(limit); 1803 } 1804 } else { 1805 // We can steal all tasks till the end of the block. 1806 advance = (BLOCK_CAP - offset).min(limit); 1807 } 1808 1809 new_head += advance << SHIFT; 1810 let new_offset = offset + advance; 1811 1812 // Try moving the head index forward. 1813 if self 1814 .head 1815 .index 1816 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1817 .is_err() 1818 { 1819 return Steal::Retry; 1820 } 1821 1822 // Reserve capacity for the stolen batch. 1823 let batch_size = new_offset - offset - 1; 1824 dest.reserve(batch_size); 1825 1826 // Get the destination buffer and back index. 1827 let dest_buffer = dest.buffer.get(); 1828 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1829 1830 unsafe { 1831 // If we've reached the end of the block, move to the next one. 1832 if new_offset == BLOCK_CAP { 1833 let next = (*block).wait_next(); 1834 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1835 if !(*next).next.load(Ordering::Relaxed).is_null() { 1836 next_index |= HAS_NEXT; 1837 } 1838 1839 self.head.block.store(next, Ordering::Release); 1840 self.head.index.store(next_index, Ordering::Release); 1841 } 1842 1843 // Read the task. 1844 let slot = (*block).slots.get_unchecked(offset); 1845 slot.wait_write(); 1846 let task = slot.task.get().read(); 1847 1848 match dest.flavor { 1849 Flavor::Fifo => { 1850 // Copy values from the injector into the destination queue. 1851 for i in 0..batch_size { 1852 // Read the task. 1853 let slot = (*block).slots.get_unchecked(offset + i + 1); 1854 slot.wait_write(); 1855 let task = slot.task.get().read(); 1856 1857 // Write it into the destination queue. 1858 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1859 } 1860 } 1861 1862 Flavor::Lifo => { 1863 // Copy values from the injector into the destination queue. 1864 for i in 0..batch_size { 1865 // Read the task. 1866 let slot = (*block).slots.get_unchecked(offset + i + 1); 1867 slot.wait_write(); 1868 let task = slot.task.get().read(); 1869 1870 // Write it into the destination queue. 1871 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1872 } 1873 } 1874 } 1875 1876 atomic::fence(Ordering::Release); 1877 1878 // Update the back index in the destination queue. 1879 // 1880 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1881 // data races because it doesn't understand fences. 1882 dest.inner 1883 .back 1884 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1885 1886 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1887 // but couldn't because we were busy reading from the slot. 1888 if new_offset == BLOCK_CAP { 1889 Block::destroy(block, offset); 1890 } else { 1891 for i in offset..new_offset { 1892 let slot = (*block).slots.get_unchecked(i); 1893 1894 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1895 Block::destroy(block, offset); 1896 break; 1897 } 1898 } 1899 } 1900 1901 Steal::Success(task.assume_init()) 1902 } 1903 } 1904 1905 /// Returns `true` if the queue is empty. 1906 /// 1907 /// # Examples 1908 /// 1909 /// ``` 1910 /// use crossbeam_deque::Injector; 1911 /// 1912 /// let q = Injector::new(); 1913 /// 1914 /// assert!(q.is_empty()); 1915 /// q.push(1); 1916 /// assert!(!q.is_empty()); 1917 /// ``` is_empty(&self) -> bool1918 pub fn is_empty(&self) -> bool { 1919 let head = self.head.index.load(Ordering::SeqCst); 1920 let tail = self.tail.index.load(Ordering::SeqCst); 1921 head >> SHIFT == tail >> SHIFT 1922 } 1923 1924 /// Returns the number of tasks in the queue. 1925 /// 1926 /// # Examples 1927 /// 1928 /// ``` 1929 /// use crossbeam_deque::Injector; 1930 /// 1931 /// let q = Injector::new(); 1932 /// 1933 /// assert_eq!(q.len(), 0); 1934 /// q.push(1); 1935 /// assert_eq!(q.len(), 1); 1936 /// q.push(1); 1937 /// assert_eq!(q.len(), 2); 1938 /// ``` len(&self) -> usize1939 pub fn len(&self) -> usize { 1940 loop { 1941 // Load the tail index, then load the head index. 1942 let mut tail = self.tail.index.load(Ordering::SeqCst); 1943 let mut head = self.head.index.load(Ordering::SeqCst); 1944 1945 // If the tail index didn't change, we've got consistent indices to work with. 1946 if self.tail.index.load(Ordering::SeqCst) == tail { 1947 // Erase the lower bits. 1948 tail &= !((1 << SHIFT) - 1); 1949 head &= !((1 << SHIFT) - 1); 1950 1951 // Fix up indices if they fall onto block ends. 1952 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 1953 tail = tail.wrapping_add(1 << SHIFT); 1954 } 1955 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 1956 head = head.wrapping_add(1 << SHIFT); 1957 } 1958 1959 // Rotate indices so that head falls into the first block. 1960 let lap = (head >> SHIFT) / LAP; 1961 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 1962 head = head.wrapping_sub((lap * LAP) << SHIFT); 1963 1964 // Remove the lower bits. 1965 tail >>= SHIFT; 1966 head >>= SHIFT; 1967 1968 // Return the difference minus the number of blocks between tail and head. 1969 return tail - head - tail / LAP; 1970 } 1971 } 1972 } 1973 } 1974 1975 impl<T> Drop for Injector<T> { drop(&mut self)1976 fn drop(&mut self) { 1977 let mut head = *self.head.index.get_mut(); 1978 let mut tail = *self.tail.index.get_mut(); 1979 let mut block = *self.head.block.get_mut(); 1980 1981 // Erase the lower bits. 1982 head &= !((1 << SHIFT) - 1); 1983 tail &= !((1 << SHIFT) - 1); 1984 1985 unsafe { 1986 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 1987 while head != tail { 1988 let offset = (head >> SHIFT) % LAP; 1989 1990 if offset < BLOCK_CAP { 1991 // Drop the task in the slot. 1992 let slot = (*block).slots.get_unchecked(offset); 1993 let p = &mut *slot.task.get(); 1994 p.as_mut_ptr().drop_in_place(); 1995 } else { 1996 // Deallocate the block and move to the next one. 1997 let next = *(*block).next.get_mut(); 1998 drop(Box::from_raw(block)); 1999 block = next; 2000 } 2001 2002 head = head.wrapping_add(1 << SHIFT); 2003 } 2004 2005 // Deallocate the last remaining block. 2006 drop(Box::from_raw(block)); 2007 } 2008 } 2009 } 2010 2011 impl<T> fmt::Debug for Injector<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2012 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2013 f.pad("Worker { .. }") 2014 } 2015 } 2016 2017 /// Possible outcomes of a steal operation. 2018 /// 2019 /// # Examples 2020 /// 2021 /// There are lots of ways to chain results of steal operations together: 2022 /// 2023 /// ``` 2024 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; 2025 /// 2026 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); 2027 /// 2028 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); 2029 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); 2030 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); 2031 /// 2032 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); 2033 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); 2034 /// ``` 2035 #[must_use] 2036 #[derive(PartialEq, Eq, Copy, Clone)] 2037 pub enum Steal<T> { 2038 /// The queue was empty at the time of stealing. 2039 Empty, 2040 2041 /// At least one task was successfully stolen. 2042 Success(T), 2043 2044 /// The steal operation needs to be retried. 2045 Retry, 2046 } 2047 2048 impl<T> Steal<T> { 2049 /// Returns `true` if the queue was empty at the time of stealing. 2050 /// 2051 /// # Examples 2052 /// 2053 /// ``` 2054 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2055 /// 2056 /// assert!(!Success(7).is_empty()); 2057 /// assert!(!Retry::<i32>.is_empty()); 2058 /// 2059 /// assert!(Empty::<i32>.is_empty()); 2060 /// ``` is_empty(&self) -> bool2061 pub fn is_empty(&self) -> bool { 2062 match self { 2063 Steal::Empty => true, 2064 _ => false, 2065 } 2066 } 2067 2068 /// Returns `true` if at least one task was stolen. 2069 /// 2070 /// # Examples 2071 /// 2072 /// ``` 2073 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2074 /// 2075 /// assert!(!Empty::<i32>.is_success()); 2076 /// assert!(!Retry::<i32>.is_success()); 2077 /// 2078 /// assert!(Success(7).is_success()); 2079 /// ``` is_success(&self) -> bool2080 pub fn is_success(&self) -> bool { 2081 match self { 2082 Steal::Success(_) => true, 2083 _ => false, 2084 } 2085 } 2086 2087 /// Returns `true` if the steal operation needs to be retried. 2088 /// 2089 /// # Examples 2090 /// 2091 /// ``` 2092 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2093 /// 2094 /// assert!(!Empty::<i32>.is_retry()); 2095 /// assert!(!Success(7).is_retry()); 2096 /// 2097 /// assert!(Retry::<i32>.is_retry()); 2098 /// ``` is_retry(&self) -> bool2099 pub fn is_retry(&self) -> bool { 2100 match self { 2101 Steal::Retry => true, 2102 _ => false, 2103 } 2104 } 2105 2106 /// Returns the result of the operation, if successful. 2107 /// 2108 /// # Examples 2109 /// 2110 /// ``` 2111 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2112 /// 2113 /// assert_eq!(Empty::<i32>.success(), None); 2114 /// assert_eq!(Retry::<i32>.success(), None); 2115 /// 2116 /// assert_eq!(Success(7).success(), Some(7)); 2117 /// ``` success(self) -> Option<T>2118 pub fn success(self) -> Option<T> { 2119 match self { 2120 Steal::Success(res) => Some(res), 2121 _ => None, 2122 } 2123 } 2124 2125 /// If no task was stolen, attempts another steal operation. 2126 /// 2127 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: 2128 /// 2129 /// * If the second steal resulted in `Success`, it is returned. 2130 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. 2131 /// * If both resulted in `None`, then `None` is returned. 2132 /// 2133 /// # Examples 2134 /// 2135 /// ``` 2136 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2137 /// 2138 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); 2139 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); 2140 /// 2141 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); 2142 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); 2143 /// 2144 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); 2145 /// ``` or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2146 pub fn or_else<F>(self, f: F) -> Steal<T> 2147 where 2148 F: FnOnce() -> Steal<T>, 2149 { 2150 match self { 2151 Steal::Empty => f(), 2152 Steal::Success(_) => self, 2153 Steal::Retry => { 2154 if let Steal::Success(res) = f() { 2155 Steal::Success(res) 2156 } else { 2157 Steal::Retry 2158 } 2159 } 2160 } 2161 } 2162 } 2163 2164 impl<T> fmt::Debug for Steal<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2166 match self { 2167 Steal::Empty => f.pad("Empty"), 2168 Steal::Success(_) => f.pad("Success(..)"), 2169 Steal::Retry => f.pad("Retry"), 2170 } 2171 } 2172 } 2173 2174 impl<T> FromIterator<Steal<T>> for Steal<T> { 2175 /// Consumes items until a `Success` is found and returns it. 2176 /// 2177 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. 2178 /// Otherwise, `Empty` is returned. from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2179 fn from_iter<I>(iter: I) -> Steal<T> 2180 where 2181 I: IntoIterator<Item = Steal<T>>, 2182 { 2183 let mut retry = false; 2184 for s in iter { 2185 match &s { 2186 Steal::Empty => {} 2187 Steal::Success(_) => return s, 2188 Steal::Retry => retry = true, 2189 } 2190 } 2191 2192 if retry { 2193 Steal::Retry 2194 } else { 2195 Steal::Empty 2196 } 2197 } 2198 } 2199