1 use std::cell::{Cell, UnsafeCell}; 2 use std::cmp; 3 use std::fmt; 4 use std::iter::FromIterator; 5 use std::marker::PhantomData; 6 use std::mem::{self, MaybeUninit}; 7 use std::ptr; 8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; 9 use std::sync::Arc; 10 11 use crate::epoch::{self, Atomic, Owned}; 12 use crate::utils::{Backoff, CachePadded}; 13 14 // Minimum buffer capacity. 15 const MIN_CAP: usize = 64; 16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. 17 const MAX_BATCH: usize = 32; 18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets 19 // deallocated as soon as possible. 20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; 21 22 /// A buffer that holds tasks in a worker queue. 23 /// 24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will 25 /// *not* deallocate the buffer. 26 struct Buffer<T> { 27 /// Pointer to the allocated memory. 28 ptr: *mut T, 29 30 /// Capacity of the buffer. Always a power of two. 31 cap: usize, 32 } 33 34 unsafe impl<T> Send for Buffer<T> {} 35 36 impl<T> Buffer<T> { 37 /// Allocates a new buffer with the specified capacity. alloc(cap: usize) -> Buffer<T>38 fn alloc(cap: usize) -> Buffer<T> { 39 debug_assert_eq!(cap, cap.next_power_of_two()); 40 41 let mut v = Vec::with_capacity(cap); 42 let ptr = v.as_mut_ptr(); 43 mem::forget(v); 44 45 Buffer { ptr, cap } 46 } 47 48 /// Deallocates the buffer. dealloc(self)49 unsafe fn dealloc(self) { 50 drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); 51 } 52 53 /// Returns a pointer to the task at the specified `index`. at(&self, index: isize) -> *mut T54 unsafe fn at(&self, index: isize) -> *mut T { 55 // `self.cap` is always a power of two. 56 self.ptr.offset(index & (self.cap - 1) as isize) 57 } 58 59 /// Writes `task` into the specified `index`. 60 /// 61 /// This method might be concurrently called with another `read` at the same index, which is 62 /// technically speaking a data race and therefore UB. We should use an atomic store here, but 63 /// that would be more expensive and difficult to implement generically for all types `T`. 64 /// Hence, as a hack, we use a volatile write instead. write(&self, index: isize, task: T)65 unsafe fn write(&self, index: isize, task: T) { 66 ptr::write_volatile(self.at(index), task) 67 } 68 69 /// Reads a task from the specified `index`. 70 /// 71 /// This method might be concurrently called with another `write` at the same index, which is 72 /// technically speaking a data race and therefore UB. We should use an atomic load here, but 73 /// that would be more expensive and difficult to implement generically for all types `T`. 74 /// Hence, as a hack, we use a volatile write instead. read(&self, index: isize) -> T75 unsafe fn read(&self, index: isize) -> T { 76 ptr::read_volatile(self.at(index)) 77 } 78 } 79 80 impl<T> Clone for Buffer<T> { clone(&self) -> Buffer<T>81 fn clone(&self) -> Buffer<T> { 82 Buffer { 83 ptr: self.ptr, 84 cap: self.cap, 85 } 86 } 87 } 88 89 impl<T> Copy for Buffer<T> {} 90 91 /// Internal queue data shared between the worker and stealers. 92 /// 93 /// The implementation is based on the following work: 94 /// 95 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] 96 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. 97 /// PPoPP 2013.][weak-mem] 98 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ 99 /// atomics. OOPSLA 2013.][checker] 100 /// 101 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 102 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 103 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 104 struct Inner<T> { 105 /// The front index. 106 front: AtomicIsize, 107 108 /// The back index. 109 back: AtomicIsize, 110 111 /// The underlying buffer. 112 buffer: CachePadded<Atomic<Buffer<T>>>, 113 } 114 115 impl<T> Drop for Inner<T> { drop(&mut self)116 fn drop(&mut self) { 117 // Load the back index, front index, and buffer. 118 let b = self.back.load(Ordering::Relaxed); 119 let f = self.front.load(Ordering::Relaxed); 120 121 unsafe { 122 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); 123 124 // Go through the buffer from front to back and drop all tasks in the queue. 125 let mut i = f; 126 while i != b { 127 buffer.deref().at(i).drop_in_place(); 128 i = i.wrapping_add(1); 129 } 130 131 // Free the memory allocated by the buffer. 132 buffer.into_owned().into_box().dealloc(); 133 } 134 } 135 } 136 137 /// Worker queue flavor: FIFO or LIFO. 138 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 139 enum Flavor { 140 /// The first-in first-out flavor. 141 Fifo, 142 143 /// The last-in first-out flavor. 144 Lifo, 145 } 146 147 /// A worker queue. 148 /// 149 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal 150 /// tasks from it. Task schedulers typically create a single worker queue per thread. 151 /// 152 /// # Examples 153 /// 154 /// A FIFO worker: 155 /// 156 /// ``` 157 /// use crossbeam_deque::{Steal, Worker}; 158 /// 159 /// let w = Worker::new_fifo(); 160 /// let s = w.stealer(); 161 /// 162 /// w.push(1); 163 /// w.push(2); 164 /// w.push(3); 165 /// 166 /// assert_eq!(s.steal(), Steal::Success(1)); 167 /// assert_eq!(w.pop(), Some(2)); 168 /// assert_eq!(w.pop(), Some(3)); 169 /// ``` 170 /// 171 /// A LIFO worker: 172 /// 173 /// ``` 174 /// use crossbeam_deque::{Steal, Worker}; 175 /// 176 /// let w = Worker::new_lifo(); 177 /// let s = w.stealer(); 178 /// 179 /// w.push(1); 180 /// w.push(2); 181 /// w.push(3); 182 /// 183 /// assert_eq!(s.steal(), Steal::Success(1)); 184 /// assert_eq!(w.pop(), Some(3)); 185 /// assert_eq!(w.pop(), Some(2)); 186 /// ``` 187 pub struct Worker<T> { 188 /// A reference to the inner representation of the queue. 189 inner: Arc<CachePadded<Inner<T>>>, 190 191 /// A copy of `inner.buffer` for quick access. 192 buffer: Cell<Buffer<T>>, 193 194 /// The flavor of the queue. 195 flavor: Flavor, 196 197 /// Indicates that the worker cannot be shared among threads. 198 _marker: PhantomData<*mut ()>, // !Send + !Sync 199 } 200 201 unsafe impl<T: Send> Send for Worker<T> {} 202 203 impl<T> Worker<T> { 204 /// Creates a FIFO worker queue. 205 /// 206 /// Tasks are pushed and popped from opposite ends. 207 /// 208 /// # Examples 209 /// 210 /// ``` 211 /// use crossbeam_deque::Worker; 212 /// 213 /// let w = Worker::<i32>::new_fifo(); 214 /// ``` new_fifo() -> Worker<T>215 pub fn new_fifo() -> Worker<T> { 216 let buffer = Buffer::alloc(MIN_CAP); 217 218 let inner = Arc::new(CachePadded::new(Inner { 219 front: AtomicIsize::new(0), 220 back: AtomicIsize::new(0), 221 buffer: CachePadded::new(Atomic::new(buffer)), 222 })); 223 224 Worker { 225 inner, 226 buffer: Cell::new(buffer), 227 flavor: Flavor::Fifo, 228 _marker: PhantomData, 229 } 230 } 231 232 /// Creates a LIFO worker queue. 233 /// 234 /// Tasks are pushed and popped from the same end. 235 /// 236 /// # Examples 237 /// 238 /// ``` 239 /// use crossbeam_deque::Worker; 240 /// 241 /// let w = Worker::<i32>::new_lifo(); 242 /// ``` new_lifo() -> Worker<T>243 pub fn new_lifo() -> Worker<T> { 244 let buffer = Buffer::alloc(MIN_CAP); 245 246 let inner = Arc::new(CachePadded::new(Inner { 247 front: AtomicIsize::new(0), 248 back: AtomicIsize::new(0), 249 buffer: CachePadded::new(Atomic::new(buffer)), 250 })); 251 252 Worker { 253 inner, 254 buffer: Cell::new(buffer), 255 flavor: Flavor::Lifo, 256 _marker: PhantomData, 257 } 258 } 259 260 /// Creates a stealer for this queue. 261 /// 262 /// The returned stealer can be shared among threads and cloned. 263 /// 264 /// # Examples 265 /// 266 /// ``` 267 /// use crossbeam_deque::Worker; 268 /// 269 /// let w = Worker::<i32>::new_lifo(); 270 /// let s = w.stealer(); 271 /// ``` stealer(&self) -> Stealer<T>272 pub fn stealer(&self) -> Stealer<T> { 273 Stealer { 274 inner: self.inner.clone(), 275 flavor: self.flavor, 276 } 277 } 278 279 /// Resizes the internal buffer to the new capacity of `new_cap`. 280 #[cold] resize(&self, new_cap: usize)281 unsafe fn resize(&self, new_cap: usize) { 282 // Load the back index, front index, and buffer. 283 let b = self.inner.back.load(Ordering::Relaxed); 284 let f = self.inner.front.load(Ordering::Relaxed); 285 let buffer = self.buffer.get(); 286 287 // Allocate a new buffer and copy data from the old buffer to the new one. 288 let new = Buffer::alloc(new_cap); 289 let mut i = f; 290 while i != b { 291 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); 292 i = i.wrapping_add(1); 293 } 294 295 let guard = &epoch::pin(); 296 297 // Replace the old buffer with the new one. 298 self.buffer.replace(new); 299 let old = 300 self.inner 301 .buffer 302 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); 303 304 // Destroy the old buffer later. 305 guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); 306 307 // If the buffer is very large, then flush the thread-local garbage in order to deallocate 308 // it as soon as possible. 309 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { 310 guard.flush(); 311 } 312 } 313 314 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the 315 /// buffer. reserve(&self, reserve_cap: usize)316 fn reserve(&self, reserve_cap: usize) { 317 if reserve_cap > 0 { 318 // Compute the current length. 319 let b = self.inner.back.load(Ordering::Relaxed); 320 let f = self.inner.front.load(Ordering::SeqCst); 321 let len = b.wrapping_sub(f) as usize; 322 323 // The current capacity. 324 let cap = self.buffer.get().cap; 325 326 // Is there enough capacity to push `reserve_cap` tasks? 327 if cap - len < reserve_cap { 328 // Keep doubling the capacity as much as is needed. 329 let mut new_cap = cap * 2; 330 while new_cap - len < reserve_cap { 331 new_cap *= 2; 332 } 333 334 // Resize the buffer. 335 unsafe { 336 self.resize(new_cap); 337 } 338 } 339 } 340 } 341 342 /// Returns `true` if the queue is empty. 343 /// 344 /// ``` 345 /// use crossbeam_deque::Worker; 346 /// 347 /// let w = Worker::new_lifo(); 348 /// 349 /// assert!(w.is_empty()); 350 /// w.push(1); 351 /// assert!(!w.is_empty()); 352 /// ``` is_empty(&self) -> bool353 pub fn is_empty(&self) -> bool { 354 let b = self.inner.back.load(Ordering::Relaxed); 355 let f = self.inner.front.load(Ordering::SeqCst); 356 b.wrapping_sub(f) <= 0 357 } 358 359 /// Returns the number of tasks in the deque. 360 /// 361 /// ``` 362 /// use crossbeam_deque::Worker; 363 /// 364 /// let w = Worker::new_lifo(); 365 /// 366 /// assert_eq!(w.len(), 0); 367 /// w.push(1); 368 /// assert_eq!(w.len(), 1); 369 /// w.push(1); 370 /// assert_eq!(w.len(), 2); 371 /// ``` len(&self) -> usize372 pub fn len(&self) -> usize { 373 let b = self.inner.back.load(Ordering::Relaxed); 374 let f = self.inner.front.load(Ordering::SeqCst); 375 b.wrapping_sub(f).max(0) as usize 376 } 377 378 /// Pushes a task into the queue. 379 /// 380 /// # Examples 381 /// 382 /// ``` 383 /// use crossbeam_deque::Worker; 384 /// 385 /// let w = Worker::new_lifo(); 386 /// w.push(1); 387 /// w.push(2); 388 /// ``` push(&self, task: T)389 pub fn push(&self, task: T) { 390 // Load the back index, front index, and buffer. 391 let b = self.inner.back.load(Ordering::Relaxed); 392 let f = self.inner.front.load(Ordering::Acquire); 393 let mut buffer = self.buffer.get(); 394 395 // Calculate the length of the queue. 396 let len = b.wrapping_sub(f); 397 398 // Is the queue full? 399 if len >= buffer.cap as isize { 400 // Yes. Grow the underlying buffer. 401 unsafe { 402 self.resize(2 * buffer.cap); 403 } 404 buffer = self.buffer.get(); 405 } 406 407 // Write `task` into the slot. 408 unsafe { 409 buffer.write(b, task); 410 } 411 412 atomic::fence(Ordering::Release); 413 414 // Increment the back index. 415 // 416 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 417 // races because it doesn't understand fences. 418 self.inner.back.store(b.wrapping_add(1), Ordering::Release); 419 } 420 421 /// Pops a task from the queue. 422 /// 423 /// # Examples 424 /// 425 /// ``` 426 /// use crossbeam_deque::Worker; 427 /// 428 /// let w = Worker::new_fifo(); 429 /// w.push(1); 430 /// w.push(2); 431 /// 432 /// assert_eq!(w.pop(), Some(1)); 433 /// assert_eq!(w.pop(), Some(2)); 434 /// assert_eq!(w.pop(), None); 435 /// ``` pop(&self) -> Option<T>436 pub fn pop(&self) -> Option<T> { 437 // Load the back and front index. 438 let b = self.inner.back.load(Ordering::Relaxed); 439 let f = self.inner.front.load(Ordering::Relaxed); 440 441 // Calculate the length of the queue. 442 let len = b.wrapping_sub(f); 443 444 // Is the queue empty? 445 if len <= 0 { 446 return None; 447 } 448 449 match self.flavor { 450 // Pop from the front of the queue. 451 Flavor::Fifo => { 452 // Try incrementing the front index to pop the task. 453 let f = self.inner.front.fetch_add(1, Ordering::SeqCst); 454 let new_f = f.wrapping_add(1); 455 456 if b.wrapping_sub(new_f) < 0 { 457 self.inner.front.store(f, Ordering::Relaxed); 458 return None; 459 } 460 461 unsafe { 462 // Read the popped task. 463 let buffer = self.buffer.get(); 464 let task = buffer.read(f); 465 466 // Shrink the buffer if `len - 1` is less than one fourth of the capacity. 467 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { 468 self.resize(buffer.cap / 2); 469 } 470 471 Some(task) 472 } 473 } 474 475 // Pop from the back of the queue. 476 Flavor::Lifo => { 477 // Decrement the back index. 478 let b = b.wrapping_sub(1); 479 self.inner.back.store(b, Ordering::Relaxed); 480 481 atomic::fence(Ordering::SeqCst); 482 483 // Load the front index. 484 let f = self.inner.front.load(Ordering::Relaxed); 485 486 // Compute the length after the back index was decremented. 487 let len = b.wrapping_sub(f); 488 489 if len < 0 { 490 // The queue is empty. Restore the back index to the original task. 491 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 492 None 493 } else { 494 // Read the task to be popped. 495 let buffer = self.buffer.get(); 496 let mut task = unsafe { Some(buffer.read(b)) }; 497 498 // Are we popping the last task from the queue? 499 if len == 0 { 500 // Try incrementing the front index. 501 if self 502 .inner 503 .front 504 .compare_exchange( 505 f, 506 f.wrapping_add(1), 507 Ordering::SeqCst, 508 Ordering::Relaxed, 509 ) 510 .is_err() 511 { 512 // Failed. We didn't pop anything. 513 mem::forget(task.take()); 514 } 515 516 // Restore the back index to the original task. 517 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 518 } else { 519 // Shrink the buffer if `len` is less than one fourth of the capacity. 520 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { 521 unsafe { 522 self.resize(buffer.cap / 2); 523 } 524 } 525 } 526 527 task 528 } 529 } 530 } 531 } 532 } 533 534 impl<T> fmt::Debug for Worker<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result535 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 536 f.pad("Worker { .. }") 537 } 538 } 539 540 /// A stealer handle of a worker queue. 541 /// 542 /// Stealers can be shared among threads. 543 /// 544 /// Task schedulers typically have a single worker queue per worker thread. 545 /// 546 /// # Examples 547 /// 548 /// ``` 549 /// use crossbeam_deque::{Steal, Worker}; 550 /// 551 /// let w = Worker::new_lifo(); 552 /// w.push(1); 553 /// w.push(2); 554 /// 555 /// let s = w.stealer(); 556 /// assert_eq!(s.steal(), Steal::Success(1)); 557 /// assert_eq!(s.steal(), Steal::Success(2)); 558 /// assert_eq!(s.steal(), Steal::Empty); 559 /// ``` 560 pub struct Stealer<T> { 561 /// A reference to the inner representation of the queue. 562 inner: Arc<CachePadded<Inner<T>>>, 563 564 /// The flavor of the queue. 565 flavor: Flavor, 566 } 567 568 unsafe impl<T: Send> Send for Stealer<T> {} 569 unsafe impl<T: Send> Sync for Stealer<T> {} 570 571 impl<T> Stealer<T> { 572 /// Returns `true` if the queue is empty. 573 /// 574 /// ``` 575 /// use crossbeam_deque::Worker; 576 /// 577 /// let w = Worker::new_lifo(); 578 /// let s = w.stealer(); 579 /// 580 /// assert!(s.is_empty()); 581 /// w.push(1); 582 /// assert!(!s.is_empty()); 583 /// ``` is_empty(&self) -> bool584 pub fn is_empty(&self) -> bool { 585 let f = self.inner.front.load(Ordering::Acquire); 586 atomic::fence(Ordering::SeqCst); 587 let b = self.inner.back.load(Ordering::Acquire); 588 b.wrapping_sub(f) <= 0 589 } 590 591 /// Returns the number of tasks in the deque. 592 /// 593 /// ``` 594 /// use crossbeam_deque::Worker; 595 /// 596 /// let w = Worker::new_lifo(); 597 /// let s = w.stealer(); 598 /// 599 /// assert_eq!(s.len(), 0); 600 /// w.push(1); 601 /// assert_eq!(s.len(), 1); 602 /// w.push(2); 603 /// assert_eq!(s.len(), 2); 604 /// ``` len(&self) -> usize605 pub fn len(&self) -> usize { 606 let f = self.inner.front.load(Ordering::Acquire); 607 atomic::fence(Ordering::SeqCst); 608 let b = self.inner.back.load(Ordering::Acquire); 609 b.wrapping_sub(f).max(0) as usize 610 } 611 612 /// Steals a task from the queue. 613 /// 614 /// # Examples 615 /// 616 /// ``` 617 /// use crossbeam_deque::{Steal, Worker}; 618 /// 619 /// let w = Worker::new_lifo(); 620 /// w.push(1); 621 /// w.push(2); 622 /// 623 /// let s = w.stealer(); 624 /// assert_eq!(s.steal(), Steal::Success(1)); 625 /// assert_eq!(s.steal(), Steal::Success(2)); 626 /// ``` steal(&self) -> Steal<T>627 pub fn steal(&self) -> Steal<T> { 628 // Load the front index. 629 let f = self.inner.front.load(Ordering::Acquire); 630 631 // A SeqCst fence is needed here. 632 // 633 // If the current thread is already pinned (reentrantly), we must manually issue the 634 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 635 // have to. 636 if epoch::is_pinned() { 637 atomic::fence(Ordering::SeqCst); 638 } 639 640 let guard = &epoch::pin(); 641 642 // Load the back index. 643 let b = self.inner.back.load(Ordering::Acquire); 644 645 // Is the queue empty? 646 if b.wrapping_sub(f) <= 0 { 647 return Steal::Empty; 648 } 649 650 // Load the buffer and read the task at the front. 651 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 652 let task = unsafe { buffer.deref().read(f) }; 653 654 // Try incrementing the front index to steal the task. 655 // If the buffer has been swapped or the increment fails, we retry. 656 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 657 || self 658 .inner 659 .front 660 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 661 .is_err() 662 { 663 // We didn't steal this task, forget it. 664 mem::forget(task); 665 return Steal::Retry; 666 } 667 668 // Return the stolen task. 669 Steal::Success(task) 670 } 671 672 /// Steals a batch of tasks and pushes them into another worker. 673 /// 674 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 675 /// steal around half of the tasks in the queue, but also not more than some constant limit. 676 /// 677 /// # Examples 678 /// 679 /// ``` 680 /// use crossbeam_deque::Worker; 681 /// 682 /// let w1 = Worker::new_fifo(); 683 /// w1.push(1); 684 /// w1.push(2); 685 /// w1.push(3); 686 /// w1.push(4); 687 /// 688 /// let s = w1.stealer(); 689 /// let w2 = Worker::new_fifo(); 690 /// 691 /// let _ = s.steal_batch(&w2); 692 /// assert_eq!(w2.pop(), Some(1)); 693 /// assert_eq!(w2.pop(), Some(2)); 694 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>695 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 696 if Arc::ptr_eq(&self.inner, &dest.inner) { 697 if dest.is_empty() { 698 return Steal::Empty; 699 } else { 700 return Steal::Success(()); 701 } 702 } 703 704 // Load the front index. 705 let mut f = self.inner.front.load(Ordering::Acquire); 706 707 // A SeqCst fence is needed here. 708 // 709 // If the current thread is already pinned (reentrantly), we must manually issue the 710 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 711 // have to. 712 if epoch::is_pinned() { 713 atomic::fence(Ordering::SeqCst); 714 } 715 716 let guard = &epoch::pin(); 717 718 // Load the back index. 719 let b = self.inner.back.load(Ordering::Acquire); 720 721 // Is the queue empty? 722 let len = b.wrapping_sub(f); 723 if len <= 0 { 724 return Steal::Empty; 725 } 726 727 // Reserve capacity for the stolen batch. 728 let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH); 729 dest.reserve(batch_size); 730 let mut batch_size = batch_size as isize; 731 732 // Get the destination buffer and back index. 733 let dest_buffer = dest.buffer.get(); 734 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 735 736 // Load the buffer. 737 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 738 739 match self.flavor { 740 // Steal a batch of tasks from the front at once. 741 Flavor::Fifo => { 742 // Copy the batch from the source to the destination buffer. 743 match dest.flavor { 744 Flavor::Fifo => { 745 for i in 0..batch_size { 746 unsafe { 747 let task = buffer.deref().read(f.wrapping_add(i)); 748 dest_buffer.write(dest_b.wrapping_add(i), task); 749 } 750 } 751 } 752 Flavor::Lifo => { 753 for i in 0..batch_size { 754 unsafe { 755 let task = buffer.deref().read(f.wrapping_add(i)); 756 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 757 } 758 } 759 } 760 } 761 762 // Try incrementing the front index to steal the batch. 763 // If the buffer has been swapped or the increment fails, we retry. 764 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 765 || self 766 .inner 767 .front 768 .compare_exchange( 769 f, 770 f.wrapping_add(batch_size), 771 Ordering::SeqCst, 772 Ordering::Relaxed, 773 ) 774 .is_err() 775 { 776 return Steal::Retry; 777 } 778 779 dest_b = dest_b.wrapping_add(batch_size); 780 } 781 782 // Steal a batch of tasks from the front one by one. 783 Flavor::Lifo => { 784 // This loop may modify the batch_size, which triggers a clippy lint warning. 785 // Use a new variable to avoid the warning, and to make it clear we aren't 786 // modifying the loop exit condition during iteration. 787 let original_batch_size = batch_size; 788 789 for i in 0..original_batch_size { 790 // If this is not the first steal, check whether the queue is empty. 791 if i > 0 { 792 // We've already got the current front index. Now execute the fence to 793 // synchronize with other threads. 794 atomic::fence(Ordering::SeqCst); 795 796 // Load the back index. 797 let b = self.inner.back.load(Ordering::Acquire); 798 799 // Is the queue empty? 800 if b.wrapping_sub(f) <= 0 { 801 batch_size = i; 802 break; 803 } 804 } 805 806 // Read the task at the front. 807 let task = unsafe { buffer.deref().read(f) }; 808 809 // Try incrementing the front index to steal the task. 810 // If the buffer has been swapped or the increment fails, we retry. 811 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 812 || self 813 .inner 814 .front 815 .compare_exchange( 816 f, 817 f.wrapping_add(1), 818 Ordering::SeqCst, 819 Ordering::Relaxed, 820 ) 821 .is_err() 822 { 823 // We didn't steal this task, forget it and break from the loop. 824 mem::forget(task); 825 batch_size = i; 826 break; 827 } 828 829 // Write the stolen task into the destination buffer. 830 unsafe { 831 dest_buffer.write(dest_b, task); 832 } 833 834 // Move the source front index and the destination back index one step forward. 835 f = f.wrapping_add(1); 836 dest_b = dest_b.wrapping_add(1); 837 } 838 839 // If we didn't steal anything, the operation needs to be retried. 840 if batch_size == 0 { 841 return Steal::Retry; 842 } 843 844 // If stealing into a FIFO queue, stolen tasks need to be reversed. 845 if dest.flavor == Flavor::Fifo { 846 for i in 0..batch_size / 2 { 847 unsafe { 848 let i1 = dest_b.wrapping_sub(batch_size - i); 849 let i2 = dest_b.wrapping_sub(i + 1); 850 let t1 = dest_buffer.read(i1); 851 let t2 = dest_buffer.read(i2); 852 dest_buffer.write(i1, t2); 853 dest_buffer.write(i2, t1); 854 } 855 } 856 } 857 } 858 } 859 860 atomic::fence(Ordering::Release); 861 862 // Update the back index in the destination queue. 863 // 864 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 865 // races because it doesn't understand fences. 866 dest.inner.back.store(dest_b, Ordering::Release); 867 868 // Return with success. 869 Steal::Success(()) 870 } 871 872 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. 873 /// 874 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 875 /// steal around half of the tasks in the queue, but also not more than some constant limit. 876 /// 877 /// # Examples 878 /// 879 /// ``` 880 /// use crossbeam_deque::{Steal, Worker}; 881 /// 882 /// let w1 = Worker::new_fifo(); 883 /// w1.push(1); 884 /// w1.push(2); 885 /// w1.push(3); 886 /// w1.push(4); 887 /// 888 /// let s = w1.stealer(); 889 /// let w2 = Worker::new_fifo(); 890 /// 891 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); 892 /// assert_eq!(w2.pop(), Some(2)); 893 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>894 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 895 if Arc::ptr_eq(&self.inner, &dest.inner) { 896 match dest.pop() { 897 None => return Steal::Empty, 898 Some(task) => return Steal::Success(task), 899 } 900 } 901 902 // Load the front index. 903 let mut f = self.inner.front.load(Ordering::Acquire); 904 905 // A SeqCst fence is needed here. 906 // 907 // If the current thread is already pinned (reentrantly), we must manually issue the 908 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 909 // have to. 910 if epoch::is_pinned() { 911 atomic::fence(Ordering::SeqCst); 912 } 913 914 let guard = &epoch::pin(); 915 916 // Load the back index. 917 let b = self.inner.back.load(Ordering::Acquire); 918 919 // Is the queue empty? 920 let len = b.wrapping_sub(f); 921 if len <= 0 { 922 return Steal::Empty; 923 } 924 925 // Reserve capacity for the stolen batch. 926 let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1); 927 dest.reserve(batch_size); 928 let mut batch_size = batch_size as isize; 929 930 // Get the destination buffer and back index. 931 let dest_buffer = dest.buffer.get(); 932 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 933 934 // Load the buffer 935 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 936 937 // Read the task at the front. 938 let mut task = unsafe { buffer.deref().read(f) }; 939 940 match self.flavor { 941 // Steal a batch of tasks from the front at once. 942 Flavor::Fifo => { 943 // Copy the batch from the source to the destination buffer. 944 match dest.flavor { 945 Flavor::Fifo => { 946 for i in 0..batch_size { 947 unsafe { 948 let task = buffer.deref().read(f.wrapping_add(i + 1)); 949 dest_buffer.write(dest_b.wrapping_add(i), task); 950 } 951 } 952 } 953 Flavor::Lifo => { 954 for i in 0..batch_size { 955 unsafe { 956 let task = buffer.deref().read(f.wrapping_add(i + 1)); 957 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 958 } 959 } 960 } 961 } 962 963 // Try incrementing the front index to steal the task. 964 // If the buffer has been swapped or the increment fails, we retry. 965 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 966 || self 967 .inner 968 .front 969 .compare_exchange( 970 f, 971 f.wrapping_add(batch_size + 1), 972 Ordering::SeqCst, 973 Ordering::Relaxed, 974 ) 975 .is_err() 976 { 977 // We didn't steal this task, forget it. 978 mem::forget(task); 979 return Steal::Retry; 980 } 981 982 dest_b = dest_b.wrapping_add(batch_size); 983 } 984 985 // Steal a batch of tasks from the front one by one. 986 Flavor::Lifo => { 987 // Try incrementing the front index to steal the task. 988 if self 989 .inner 990 .front 991 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 992 .is_err() 993 { 994 // We didn't steal this task, forget it. 995 mem::forget(task); 996 return Steal::Retry; 997 } 998 999 // Move the front index one step forward. 1000 f = f.wrapping_add(1); 1001 1002 // Repeat the same procedure for the batch steals. 1003 // 1004 // This loop may modify the batch_size, which triggers a clippy lint warning. 1005 // Use a new variable to avoid the warning, and to make it clear we aren't 1006 // modifying the loop exit condition during iteration. 1007 let original_batch_size = batch_size; 1008 for i in 0..original_batch_size { 1009 // We've already got the current front index. Now execute the fence to 1010 // synchronize with other threads. 1011 atomic::fence(Ordering::SeqCst); 1012 1013 // Load the back index. 1014 let b = self.inner.back.load(Ordering::Acquire); 1015 1016 // Is the queue empty? 1017 if b.wrapping_sub(f) <= 0 { 1018 batch_size = i; 1019 break; 1020 } 1021 1022 // Read the task at the front. 1023 let tmp = unsafe { buffer.deref().read(f) }; 1024 1025 // Try incrementing the front index to steal the task. 1026 // If the buffer has been swapped or the increment fails, we retry. 1027 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1028 || self 1029 .inner 1030 .front 1031 .compare_exchange( 1032 f, 1033 f.wrapping_add(1), 1034 Ordering::SeqCst, 1035 Ordering::Relaxed, 1036 ) 1037 .is_err() 1038 { 1039 // We didn't steal this task, forget it and break from the loop. 1040 mem::forget(tmp); 1041 batch_size = i; 1042 break; 1043 } 1044 1045 // Write the previously stolen task into the destination buffer. 1046 unsafe { 1047 dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); 1048 } 1049 1050 // Move the source front index and the destination back index one step forward. 1051 f = f.wrapping_add(1); 1052 dest_b = dest_b.wrapping_add(1); 1053 } 1054 1055 // If stealing into a FIFO queue, stolen tasks need to be reversed. 1056 if dest.flavor == Flavor::Fifo { 1057 for i in 0..batch_size / 2 { 1058 unsafe { 1059 let i1 = dest_b.wrapping_sub(batch_size - i); 1060 let i2 = dest_b.wrapping_sub(i + 1); 1061 let t1 = dest_buffer.read(i1); 1062 let t2 = dest_buffer.read(i2); 1063 dest_buffer.write(i1, t2); 1064 dest_buffer.write(i2, t1); 1065 } 1066 } 1067 } 1068 } 1069 } 1070 1071 atomic::fence(Ordering::Release); 1072 1073 // Update the back index in the destination queue. 1074 // 1075 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 1076 // races because it doesn't understand fences. 1077 dest.inner.back.store(dest_b, Ordering::Release); 1078 1079 // Return with success. 1080 Steal::Success(task) 1081 } 1082 } 1083 1084 impl<T> Clone for Stealer<T> { clone(&self) -> Stealer<T>1085 fn clone(&self) -> Stealer<T> { 1086 Stealer { 1087 inner: self.inner.clone(), 1088 flavor: self.flavor, 1089 } 1090 } 1091 } 1092 1093 impl<T> fmt::Debug for Stealer<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1094 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1095 f.pad("Stealer { .. }") 1096 } 1097 } 1098 1099 // Bits indicating the state of a slot: 1100 // * If a task has been written into the slot, `WRITE` is set. 1101 // * If a task has been read from the slot, `READ` is set. 1102 // * If the block is being destroyed, `DESTROY` is set. 1103 const WRITE: usize = 1; 1104 const READ: usize = 2; 1105 const DESTROY: usize = 4; 1106 1107 // Each block covers one "lap" of indices. 1108 const LAP: usize = 64; 1109 // The maximum number of values a block can hold. 1110 const BLOCK_CAP: usize = LAP - 1; 1111 // How many lower bits are reserved for metadata. 1112 const SHIFT: usize = 1; 1113 // Indicates that the block is not the last one. 1114 const HAS_NEXT: usize = 1; 1115 1116 /// A slot in a block. 1117 struct Slot<T> { 1118 /// The task. 1119 task: UnsafeCell<MaybeUninit<T>>, 1120 1121 /// The state of the slot. 1122 state: AtomicUsize, 1123 } 1124 1125 impl<T> Slot<T> { 1126 /// Waits until a task is written into the slot. wait_write(&self)1127 fn wait_write(&self) { 1128 let backoff = Backoff::new(); 1129 while self.state.load(Ordering::Acquire) & WRITE == 0 { 1130 backoff.snooze(); 1131 } 1132 } 1133 } 1134 1135 /// A block in a linked list. 1136 /// 1137 /// Each block in the list can hold up to `BLOCK_CAP` values. 1138 struct Block<T> { 1139 /// The next block in the linked list. 1140 next: AtomicPtr<Block<T>>, 1141 1142 /// Slots for values. 1143 slots: [Slot<T>; BLOCK_CAP], 1144 } 1145 1146 impl<T> Block<T> { 1147 /// Creates an empty block that starts at `start_index`. new() -> Block<T>1148 fn new() -> Block<T> { 1149 // SAFETY: This is safe because: 1150 // [1] `Block::next` (AtomicPtr) may be safely zero initialized. 1151 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. 1152 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it 1153 // holds a MaybeUninit. 1154 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. 1155 unsafe { MaybeUninit::zeroed().assume_init() } 1156 } 1157 1158 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>1159 fn wait_next(&self) -> *mut Block<T> { 1160 let backoff = Backoff::new(); 1161 loop { 1162 let next = self.next.load(Ordering::Acquire); 1163 if !next.is_null() { 1164 return next; 1165 } 1166 backoff.snooze(); 1167 } 1168 } 1169 1170 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, count: usize)1171 unsafe fn destroy(this: *mut Block<T>, count: usize) { 1172 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 1173 // begun destruction of the block. 1174 for i in (0..count).rev() { 1175 let slot = (*this).slots.get_unchecked(i); 1176 1177 // Mark the `DESTROY` bit if a thread is still using the slot. 1178 if slot.state.load(Ordering::Acquire) & READ == 0 1179 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 1180 { 1181 // If a thread is still using the slot, it will continue destruction of the block. 1182 return; 1183 } 1184 } 1185 1186 // No thread is using the block, now it is safe to destroy it. 1187 drop(Box::from_raw(this)); 1188 } 1189 } 1190 1191 /// A position in a queue. 1192 struct Position<T> { 1193 /// The index in the queue. 1194 index: AtomicUsize, 1195 1196 /// The block in the linked list. 1197 block: AtomicPtr<Block<T>>, 1198 } 1199 1200 /// An injector queue. 1201 /// 1202 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have 1203 /// a single injector queue, which is the entry point for new tasks. 1204 /// 1205 /// # Examples 1206 /// 1207 /// ``` 1208 /// use crossbeam_deque::{Injector, Steal}; 1209 /// 1210 /// let q = Injector::new(); 1211 /// q.push(1); 1212 /// q.push(2); 1213 /// 1214 /// assert_eq!(q.steal(), Steal::Success(1)); 1215 /// assert_eq!(q.steal(), Steal::Success(2)); 1216 /// assert_eq!(q.steal(), Steal::Empty); 1217 /// ``` 1218 pub struct Injector<T> { 1219 /// The head of the queue. 1220 head: CachePadded<Position<T>>, 1221 1222 /// The tail of the queue. 1223 tail: CachePadded<Position<T>>, 1224 1225 /// Indicates that dropping a `Injector<T>` may drop values of type `T`. 1226 _marker: PhantomData<T>, 1227 } 1228 1229 unsafe impl<T: Send> Send for Injector<T> {} 1230 unsafe impl<T: Send> Sync for Injector<T> {} 1231 1232 impl<T> Default for Injector<T> { default() -> Self1233 fn default() -> Self { 1234 let block = Box::into_raw(Box::new(Block::<T>::new())); 1235 Self { 1236 head: CachePadded::new(Position { 1237 block: AtomicPtr::new(block), 1238 index: AtomicUsize::new(0), 1239 }), 1240 tail: CachePadded::new(Position { 1241 block: AtomicPtr::new(block), 1242 index: AtomicUsize::new(0), 1243 }), 1244 _marker: PhantomData, 1245 } 1246 } 1247 } 1248 1249 impl<T> Injector<T> { 1250 /// Creates a new injector queue. 1251 /// 1252 /// # Examples 1253 /// 1254 /// ``` 1255 /// use crossbeam_deque::Injector; 1256 /// 1257 /// let q = Injector::<i32>::new(); 1258 /// ``` new() -> Injector<T>1259 pub fn new() -> Injector<T> { 1260 Self::default() 1261 } 1262 1263 /// Pushes a task into the queue. 1264 /// 1265 /// # Examples 1266 /// 1267 /// ``` 1268 /// use crossbeam_deque::Injector; 1269 /// 1270 /// let w = Injector::new(); 1271 /// w.push(1); 1272 /// w.push(2); 1273 /// ``` push(&self, task: T)1274 pub fn push(&self, task: T) { 1275 let backoff = Backoff::new(); 1276 let mut tail = self.tail.index.load(Ordering::Acquire); 1277 let mut block = self.tail.block.load(Ordering::Acquire); 1278 let mut next_block = None; 1279 1280 loop { 1281 // Calculate the offset of the index into the block. 1282 let offset = (tail >> SHIFT) % LAP; 1283 1284 // If we reached the end of the block, wait until the next one is installed. 1285 if offset == BLOCK_CAP { 1286 backoff.snooze(); 1287 tail = self.tail.index.load(Ordering::Acquire); 1288 block = self.tail.block.load(Ordering::Acquire); 1289 continue; 1290 } 1291 1292 // If we're going to have to install the next block, allocate it in advance in order to 1293 // make the wait for other threads as short as possible. 1294 if offset + 1 == BLOCK_CAP && next_block.is_none() { 1295 next_block = Some(Box::new(Block::<T>::new())); 1296 } 1297 1298 let new_tail = tail + (1 << SHIFT); 1299 1300 // Try advancing the tail forward. 1301 match self.tail.index.compare_exchange_weak( 1302 tail, 1303 new_tail, 1304 Ordering::SeqCst, 1305 Ordering::Acquire, 1306 ) { 1307 Ok(_) => unsafe { 1308 // If we've reached the end of the block, install the next one. 1309 if offset + 1 == BLOCK_CAP { 1310 let next_block = Box::into_raw(next_block.unwrap()); 1311 let next_index = new_tail.wrapping_add(1 << SHIFT); 1312 1313 self.tail.block.store(next_block, Ordering::Release); 1314 self.tail.index.store(next_index, Ordering::Release); 1315 (*block).next.store(next_block, Ordering::Release); 1316 } 1317 1318 // Write the task into the slot. 1319 let slot = (*block).slots.get_unchecked(offset); 1320 slot.task.get().write(MaybeUninit::new(task)); 1321 slot.state.fetch_or(WRITE, Ordering::Release); 1322 1323 return; 1324 }, 1325 Err(t) => { 1326 tail = t; 1327 block = self.tail.block.load(Ordering::Acquire); 1328 backoff.spin(); 1329 } 1330 } 1331 } 1332 } 1333 1334 /// Steals a task from the queue. 1335 /// 1336 /// # Examples 1337 /// 1338 /// ``` 1339 /// use crossbeam_deque::{Injector, Steal}; 1340 /// 1341 /// let q = Injector::new(); 1342 /// q.push(1); 1343 /// q.push(2); 1344 /// 1345 /// assert_eq!(q.steal(), Steal::Success(1)); 1346 /// assert_eq!(q.steal(), Steal::Success(2)); 1347 /// assert_eq!(q.steal(), Steal::Empty); 1348 /// ``` steal(&self) -> Steal<T>1349 pub fn steal(&self) -> Steal<T> { 1350 let mut head; 1351 let mut block; 1352 let mut offset; 1353 1354 let backoff = Backoff::new(); 1355 loop { 1356 head = self.head.index.load(Ordering::Acquire); 1357 block = self.head.block.load(Ordering::Acquire); 1358 1359 // Calculate the offset of the index into the block. 1360 offset = (head >> SHIFT) % LAP; 1361 1362 // If we reached the end of the block, wait until the next one is installed. 1363 if offset == BLOCK_CAP { 1364 backoff.snooze(); 1365 } else { 1366 break; 1367 } 1368 } 1369 1370 let mut new_head = head + (1 << SHIFT); 1371 1372 if new_head & HAS_NEXT == 0 { 1373 atomic::fence(Ordering::SeqCst); 1374 let tail = self.tail.index.load(Ordering::Relaxed); 1375 1376 // If the tail equals the head, that means the queue is empty. 1377 if head >> SHIFT == tail >> SHIFT { 1378 return Steal::Empty; 1379 } 1380 1381 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1382 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1383 new_head |= HAS_NEXT; 1384 } 1385 } 1386 1387 // Try moving the head index forward. 1388 if self 1389 .head 1390 .index 1391 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1392 .is_err() 1393 { 1394 return Steal::Retry; 1395 } 1396 1397 unsafe { 1398 // If we've reached the end of the block, move to the next one. 1399 if offset + 1 == BLOCK_CAP { 1400 let next = (*block).wait_next(); 1401 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1402 if !(*next).next.load(Ordering::Relaxed).is_null() { 1403 next_index |= HAS_NEXT; 1404 } 1405 1406 self.head.block.store(next, Ordering::Release); 1407 self.head.index.store(next_index, Ordering::Release); 1408 } 1409 1410 // Read the task. 1411 let slot = (*block).slots.get_unchecked(offset); 1412 slot.wait_write(); 1413 let task = slot.task.get().read().assume_init(); 1414 1415 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1416 // but couldn't because we were busy reading from the slot. 1417 if (offset + 1 == BLOCK_CAP) 1418 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) 1419 { 1420 Block::destroy(block, offset); 1421 } 1422 1423 Steal::Success(task) 1424 } 1425 } 1426 1427 /// Steals a batch of tasks and pushes them into a worker. 1428 /// 1429 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1430 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1431 /// 1432 /// # Examples 1433 /// 1434 /// ``` 1435 /// use crossbeam_deque::{Injector, Worker}; 1436 /// 1437 /// let q = Injector::new(); 1438 /// q.push(1); 1439 /// q.push(2); 1440 /// q.push(3); 1441 /// q.push(4); 1442 /// 1443 /// let w = Worker::new_fifo(); 1444 /// let _ = q.steal_batch(&w); 1445 /// assert_eq!(w.pop(), Some(1)); 1446 /// assert_eq!(w.pop(), Some(2)); 1447 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>1448 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 1449 let mut head; 1450 let mut block; 1451 let mut offset; 1452 1453 let backoff = Backoff::new(); 1454 loop { 1455 head = self.head.index.load(Ordering::Acquire); 1456 block = self.head.block.load(Ordering::Acquire); 1457 1458 // Calculate the offset of the index into the block. 1459 offset = (head >> SHIFT) % LAP; 1460 1461 // If we reached the end of the block, wait until the next one is installed. 1462 if offset == BLOCK_CAP { 1463 backoff.snooze(); 1464 } else { 1465 break; 1466 } 1467 } 1468 1469 let mut new_head = head; 1470 let advance; 1471 1472 if new_head & HAS_NEXT == 0 { 1473 atomic::fence(Ordering::SeqCst); 1474 let tail = self.tail.index.load(Ordering::Relaxed); 1475 1476 // If the tail equals the head, that means the queue is empty. 1477 if head >> SHIFT == tail >> SHIFT { 1478 return Steal::Empty; 1479 } 1480 1481 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate 1482 // the right batch size to steal. 1483 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1484 new_head |= HAS_NEXT; 1485 // We can steal all tasks till the end of the block. 1486 advance = (BLOCK_CAP - offset).min(MAX_BATCH); 1487 } else { 1488 let len = (tail - head) >> SHIFT; 1489 // Steal half of the available tasks. 1490 advance = ((len + 1) / 2).min(MAX_BATCH); 1491 } 1492 } else { 1493 // We can steal all tasks till the end of the block. 1494 advance = (BLOCK_CAP - offset).min(MAX_BATCH); 1495 } 1496 1497 new_head += advance << SHIFT; 1498 let new_offset = offset + advance; 1499 1500 // Try moving the head index forward. 1501 if self 1502 .head 1503 .index 1504 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1505 .is_err() 1506 { 1507 return Steal::Retry; 1508 } 1509 1510 // Reserve capacity for the stolen batch. 1511 let batch_size = new_offset - offset; 1512 dest.reserve(batch_size); 1513 1514 // Get the destination buffer and back index. 1515 let dest_buffer = dest.buffer.get(); 1516 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1517 1518 unsafe { 1519 // If we've reached the end of the block, move to the next one. 1520 if new_offset == BLOCK_CAP { 1521 let next = (*block).wait_next(); 1522 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1523 if !(*next).next.load(Ordering::Relaxed).is_null() { 1524 next_index |= HAS_NEXT; 1525 } 1526 1527 self.head.block.store(next, Ordering::Release); 1528 self.head.index.store(next_index, Ordering::Release); 1529 } 1530 1531 // Copy values from the injector into the destination queue. 1532 match dest.flavor { 1533 Flavor::Fifo => { 1534 for i in 0..batch_size { 1535 // Read the task. 1536 let slot = (*block).slots.get_unchecked(offset + i); 1537 slot.wait_write(); 1538 let task = slot.task.get().read().assume_init(); 1539 1540 // Write it into the destination queue. 1541 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1542 } 1543 } 1544 1545 Flavor::Lifo => { 1546 for i in 0..batch_size { 1547 // Read the task. 1548 let slot = (*block).slots.get_unchecked(offset + i); 1549 slot.wait_write(); 1550 let task = slot.task.get().read().assume_init(); 1551 1552 // Write it into the destination queue. 1553 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1554 } 1555 } 1556 } 1557 1558 atomic::fence(Ordering::Release); 1559 1560 // Update the back index in the destination queue. 1561 // 1562 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1563 // data races because it doesn't understand fences. 1564 dest.inner 1565 .back 1566 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1567 1568 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1569 // but couldn't because we were busy reading from the slot. 1570 if new_offset == BLOCK_CAP { 1571 Block::destroy(block, offset); 1572 } else { 1573 for i in offset..new_offset { 1574 let slot = (*block).slots.get_unchecked(i); 1575 1576 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1577 Block::destroy(block, offset); 1578 break; 1579 } 1580 } 1581 } 1582 1583 Steal::Success(()) 1584 } 1585 } 1586 1587 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. 1588 /// 1589 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1590 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1591 /// 1592 /// # Examples 1593 /// 1594 /// ``` 1595 /// use crossbeam_deque::{Injector, Steal, Worker}; 1596 /// 1597 /// let q = Injector::new(); 1598 /// q.push(1); 1599 /// q.push(2); 1600 /// q.push(3); 1601 /// q.push(4); 1602 /// 1603 /// let w = Worker::new_fifo(); 1604 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); 1605 /// assert_eq!(w.pop(), Some(2)); 1606 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1607 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 1608 let mut head; 1609 let mut block; 1610 let mut offset; 1611 1612 let backoff = Backoff::new(); 1613 loop { 1614 head = self.head.index.load(Ordering::Acquire); 1615 block = self.head.block.load(Ordering::Acquire); 1616 1617 // Calculate the offset of the index into the block. 1618 offset = (head >> SHIFT) % LAP; 1619 1620 // If we reached the end of the block, wait until the next one is installed. 1621 if offset == BLOCK_CAP { 1622 backoff.snooze(); 1623 } else { 1624 break; 1625 } 1626 } 1627 1628 let mut new_head = head; 1629 let advance; 1630 1631 if new_head & HAS_NEXT == 0 { 1632 atomic::fence(Ordering::SeqCst); 1633 let tail = self.tail.index.load(Ordering::Relaxed); 1634 1635 // If the tail equals the head, that means the queue is empty. 1636 if head >> SHIFT == tail >> SHIFT { 1637 return Steal::Empty; 1638 } 1639 1640 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1641 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1642 new_head |= HAS_NEXT; 1643 // We can steal all tasks till the end of the block. 1644 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); 1645 } else { 1646 let len = (tail - head) >> SHIFT; 1647 // Steal half of the available tasks. 1648 advance = ((len + 1) / 2).min(MAX_BATCH + 1); 1649 } 1650 } else { 1651 // We can steal all tasks till the end of the block. 1652 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); 1653 } 1654 1655 new_head += advance << SHIFT; 1656 let new_offset = offset + advance; 1657 1658 // Try moving the head index forward. 1659 if self 1660 .head 1661 .index 1662 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1663 .is_err() 1664 { 1665 return Steal::Retry; 1666 } 1667 1668 // Reserve capacity for the stolen batch. 1669 let batch_size = new_offset - offset - 1; 1670 dest.reserve(batch_size); 1671 1672 // Get the destination buffer and back index. 1673 let dest_buffer = dest.buffer.get(); 1674 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1675 1676 unsafe { 1677 // If we've reached the end of the block, move to the next one. 1678 if new_offset == BLOCK_CAP { 1679 let next = (*block).wait_next(); 1680 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1681 if !(*next).next.load(Ordering::Relaxed).is_null() { 1682 next_index |= HAS_NEXT; 1683 } 1684 1685 self.head.block.store(next, Ordering::Release); 1686 self.head.index.store(next_index, Ordering::Release); 1687 } 1688 1689 // Read the task. 1690 let slot = (*block).slots.get_unchecked(offset); 1691 slot.wait_write(); 1692 let task = slot.task.get().read().assume_init(); 1693 1694 match dest.flavor { 1695 Flavor::Fifo => { 1696 // Copy values from the injector into the destination queue. 1697 for i in 0..batch_size { 1698 // Read the task. 1699 let slot = (*block).slots.get_unchecked(offset + i + 1); 1700 slot.wait_write(); 1701 let task = slot.task.get().read().assume_init(); 1702 1703 // Write it into the destination queue. 1704 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1705 } 1706 } 1707 1708 Flavor::Lifo => { 1709 // Copy values from the injector into the destination queue. 1710 for i in 0..batch_size { 1711 // Read the task. 1712 let slot = (*block).slots.get_unchecked(offset + i + 1); 1713 slot.wait_write(); 1714 let task = slot.task.get().read().assume_init(); 1715 1716 // Write it into the destination queue. 1717 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1718 } 1719 } 1720 } 1721 1722 atomic::fence(Ordering::Release); 1723 1724 // Update the back index in the destination queue. 1725 // 1726 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1727 // data races because it doesn't understand fences. 1728 dest.inner 1729 .back 1730 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1731 1732 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1733 // but couldn't because we were busy reading from the slot. 1734 if new_offset == BLOCK_CAP { 1735 Block::destroy(block, offset); 1736 } else { 1737 for i in offset..new_offset { 1738 let slot = (*block).slots.get_unchecked(i); 1739 1740 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1741 Block::destroy(block, offset); 1742 break; 1743 } 1744 } 1745 } 1746 1747 Steal::Success(task) 1748 } 1749 } 1750 1751 /// Returns `true` if the queue is empty. 1752 /// 1753 /// # Examples 1754 /// 1755 /// ``` 1756 /// use crossbeam_deque::Injector; 1757 /// 1758 /// let q = Injector::new(); 1759 /// 1760 /// assert!(q.is_empty()); 1761 /// q.push(1); 1762 /// assert!(!q.is_empty()); 1763 /// ``` is_empty(&self) -> bool1764 pub fn is_empty(&self) -> bool { 1765 let head = self.head.index.load(Ordering::SeqCst); 1766 let tail = self.tail.index.load(Ordering::SeqCst); 1767 head >> SHIFT == tail >> SHIFT 1768 } 1769 1770 /// Returns the number of tasks in the queue. 1771 /// 1772 /// # Examples 1773 /// 1774 /// ``` 1775 /// use crossbeam_deque::Injector; 1776 /// 1777 /// let q = Injector::new(); 1778 /// 1779 /// assert_eq!(q.len(), 0); 1780 /// q.push(1); 1781 /// assert_eq!(q.len(), 1); 1782 /// q.push(1); 1783 /// assert_eq!(q.len(), 2); 1784 /// ``` len(&self) -> usize1785 pub fn len(&self) -> usize { 1786 loop { 1787 // Load the tail index, then load the head index. 1788 let mut tail = self.tail.index.load(Ordering::SeqCst); 1789 let mut head = self.head.index.load(Ordering::SeqCst); 1790 1791 // If the tail index didn't change, we've got consistent indices to work with. 1792 if self.tail.index.load(Ordering::SeqCst) == tail { 1793 // Erase the lower bits. 1794 tail &= !((1 << SHIFT) - 1); 1795 head &= !((1 << SHIFT) - 1); 1796 1797 // Fix up indices if they fall onto block ends. 1798 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 1799 tail = tail.wrapping_add(1 << SHIFT); 1800 } 1801 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 1802 head = head.wrapping_add(1 << SHIFT); 1803 } 1804 1805 // Rotate indices so that head falls into the first block. 1806 let lap = (head >> SHIFT) / LAP; 1807 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 1808 head = head.wrapping_sub((lap * LAP) << SHIFT); 1809 1810 // Remove the lower bits. 1811 tail >>= SHIFT; 1812 head >>= SHIFT; 1813 1814 // Return the difference minus the number of blocks between tail and head. 1815 return tail - head - tail / LAP; 1816 } 1817 } 1818 } 1819 } 1820 1821 impl<T> Drop for Injector<T> { drop(&mut self)1822 fn drop(&mut self) { 1823 let mut head = self.head.index.load(Ordering::Relaxed); 1824 let mut tail = self.tail.index.load(Ordering::Relaxed); 1825 let mut block = self.head.block.load(Ordering::Relaxed); 1826 1827 // Erase the lower bits. 1828 head &= !((1 << SHIFT) - 1); 1829 tail &= !((1 << SHIFT) - 1); 1830 1831 unsafe { 1832 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 1833 while head != tail { 1834 let offset = (head >> SHIFT) % LAP; 1835 1836 if offset < BLOCK_CAP { 1837 // Drop the task in the slot. 1838 let slot = (*block).slots.get_unchecked(offset); 1839 let p = &mut *slot.task.get(); 1840 p.as_mut_ptr().drop_in_place(); 1841 } else { 1842 // Deallocate the block and move to the next one. 1843 let next = (*block).next.load(Ordering::Relaxed); 1844 drop(Box::from_raw(block)); 1845 block = next; 1846 } 1847 1848 head = head.wrapping_add(1 << SHIFT); 1849 } 1850 1851 // Deallocate the last remaining block. 1852 drop(Box::from_raw(block)); 1853 } 1854 } 1855 } 1856 1857 impl<T> fmt::Debug for Injector<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1858 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1859 f.pad("Worker { .. }") 1860 } 1861 } 1862 1863 /// Possible outcomes of a steal operation. 1864 /// 1865 /// # Examples 1866 /// 1867 /// There are lots of ways to chain results of steal operations together: 1868 /// 1869 /// ``` 1870 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; 1871 /// 1872 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); 1873 /// 1874 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); 1875 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); 1876 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); 1877 /// 1878 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); 1879 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); 1880 /// ``` 1881 #[must_use] 1882 #[derive(PartialEq, Eq, Copy, Clone)] 1883 pub enum Steal<T> { 1884 /// The queue was empty at the time of stealing. 1885 Empty, 1886 1887 /// At least one task was successfully stolen. 1888 Success(T), 1889 1890 /// The steal operation needs to be retried. 1891 Retry, 1892 } 1893 1894 impl<T> Steal<T> { 1895 /// Returns `true` if the queue was empty at the time of stealing. 1896 /// 1897 /// # Examples 1898 /// 1899 /// ``` 1900 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1901 /// 1902 /// assert!(!Success(7).is_empty()); 1903 /// assert!(!Retry::<i32>.is_empty()); 1904 /// 1905 /// assert!(Empty::<i32>.is_empty()); 1906 /// ``` is_empty(&self) -> bool1907 pub fn is_empty(&self) -> bool { 1908 match self { 1909 Steal::Empty => true, 1910 _ => false, 1911 } 1912 } 1913 1914 /// Returns `true` if at least one task was stolen. 1915 /// 1916 /// # Examples 1917 /// 1918 /// ``` 1919 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1920 /// 1921 /// assert!(!Empty::<i32>.is_success()); 1922 /// assert!(!Retry::<i32>.is_success()); 1923 /// 1924 /// assert!(Success(7).is_success()); 1925 /// ``` is_success(&self) -> bool1926 pub fn is_success(&self) -> bool { 1927 match self { 1928 Steal::Success(_) => true, 1929 _ => false, 1930 } 1931 } 1932 1933 /// Returns `true` if the steal operation needs to be retried. 1934 /// 1935 /// # Examples 1936 /// 1937 /// ``` 1938 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1939 /// 1940 /// assert!(!Empty::<i32>.is_retry()); 1941 /// assert!(!Success(7).is_retry()); 1942 /// 1943 /// assert!(Retry::<i32>.is_retry()); 1944 /// ``` is_retry(&self) -> bool1945 pub fn is_retry(&self) -> bool { 1946 match self { 1947 Steal::Retry => true, 1948 _ => false, 1949 } 1950 } 1951 1952 /// Returns the result of the operation, if successful. 1953 /// 1954 /// # Examples 1955 /// 1956 /// ``` 1957 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1958 /// 1959 /// assert_eq!(Empty::<i32>.success(), None); 1960 /// assert_eq!(Retry::<i32>.success(), None); 1961 /// 1962 /// assert_eq!(Success(7).success(), Some(7)); 1963 /// ``` success(self) -> Option<T>1964 pub fn success(self) -> Option<T> { 1965 match self { 1966 Steal::Success(res) => Some(res), 1967 _ => None, 1968 } 1969 } 1970 1971 /// If no task was stolen, attempts another steal operation. 1972 /// 1973 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: 1974 /// 1975 /// * If the second steal resulted in `Success`, it is returned. 1976 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. 1977 /// * If both resulted in `None`, then `None` is returned. 1978 /// 1979 /// # Examples 1980 /// 1981 /// ``` 1982 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1983 /// 1984 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); 1985 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); 1986 /// 1987 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); 1988 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); 1989 /// 1990 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); 1991 /// ``` or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,1992 pub fn or_else<F>(self, f: F) -> Steal<T> 1993 where 1994 F: FnOnce() -> Steal<T>, 1995 { 1996 match self { 1997 Steal::Empty => f(), 1998 Steal::Success(_) => self, 1999 Steal::Retry => { 2000 if let Steal::Success(res) = f() { 2001 Steal::Success(res) 2002 } else { 2003 Steal::Retry 2004 } 2005 } 2006 } 2007 } 2008 } 2009 2010 impl<T> fmt::Debug for Steal<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2011 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2012 match self { 2013 Steal::Empty => f.pad("Empty"), 2014 Steal::Success(_) => f.pad("Success(..)"), 2015 Steal::Retry => f.pad("Retry"), 2016 } 2017 } 2018 } 2019 2020 impl<T> FromIterator<Steal<T>> for Steal<T> { 2021 /// Consumes items until a `Success` is found and returns it. 2022 /// 2023 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. 2024 /// Otherwise, `Empty` is returned. from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2025 fn from_iter<I>(iter: I) -> Steal<T> 2026 where 2027 I: IntoIterator<Item = Steal<T>>, 2028 { 2029 let mut retry = false; 2030 for s in iter { 2031 match &s { 2032 Steal::Empty => {} 2033 Steal::Success(_) => return s, 2034 Steal::Retry => retry = true, 2035 } 2036 } 2037 2038 if retry { 2039 Steal::Retry 2040 } else { 2041 Steal::Empty 2042 } 2043 } 2044 } 2045