1 // TODO(@jeehoonkang): we mutates `batch_size` inside `for i in 0..batch_size {}`. It is difficult 2 // to read because we're mutating the range bound. 3 #![allow(clippy::mut_range_bound)] 4 5 use std::cell::{Cell, UnsafeCell}; 6 use std::cmp; 7 use std::fmt; 8 use std::iter::FromIterator; 9 use std::marker::PhantomData; 10 use std::mem::{self, MaybeUninit}; 11 use std::ptr; 12 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; 13 use std::sync::Arc; 14 15 use crate::epoch::{self, Atomic, Owned}; 16 use crate::utils::{Backoff, CachePadded}; 17 18 // Minimum buffer capacity. 19 const MIN_CAP: usize = 64; 20 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. 21 const MAX_BATCH: usize = 32; 22 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets 23 // deallocated as soon as possible. 24 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; 25 26 /// A buffer that holds tasks in a worker queue. 27 /// 28 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will 29 /// *not* deallocate the buffer. 30 struct Buffer<T> { 31 /// Pointer to the allocated memory. 32 ptr: *mut T, 33 34 /// Capacity of the buffer. Always a power of two. 35 cap: usize, 36 } 37 38 unsafe impl<T> Send for Buffer<T> {} 39 40 impl<T> Buffer<T> { 41 /// Allocates a new buffer with the specified capacity. alloc(cap: usize) -> Buffer<T>42 fn alloc(cap: usize) -> Buffer<T> { 43 debug_assert_eq!(cap, cap.next_power_of_two()); 44 45 let mut v = Vec::with_capacity(cap); 46 let ptr = v.as_mut_ptr(); 47 mem::forget(v); 48 49 Buffer { ptr, cap } 50 } 51 52 /// Deallocates the buffer. dealloc(self)53 unsafe fn dealloc(self) { 54 drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); 55 } 56 57 /// Returns a pointer to the task at the specified `index`. at(&self, index: isize) -> *mut T58 unsafe fn at(&self, index: isize) -> *mut T { 59 // `self.cap` is always a power of two. 60 self.ptr.offset(index & (self.cap - 1) as isize) 61 } 62 63 /// Writes `task` into the specified `index`. 64 /// 65 /// This method might be concurrently called with another `read` at the same index, which is 66 /// technically speaking a data race and therefore UB. We should use an atomic store here, but 67 /// that would be more expensive and difficult to implement generically for all types `T`. 68 /// Hence, as a hack, we use a volatile write instead. write(&self, index: isize, task: T)69 unsafe fn write(&self, index: isize, task: T) { 70 ptr::write_volatile(self.at(index), task) 71 } 72 73 /// Reads a task from the specified `index`. 74 /// 75 /// This method might be concurrently called with another `write` at the same index, which is 76 /// technically speaking a data race and therefore UB. We should use an atomic load here, but 77 /// that would be more expensive and difficult to implement generically for all types `T`. 78 /// Hence, as a hack, we use a volatile write instead. read(&self, index: isize) -> T79 unsafe fn read(&self, index: isize) -> T { 80 ptr::read_volatile(self.at(index)) 81 } 82 } 83 84 impl<T> Clone for Buffer<T> { clone(&self) -> Buffer<T>85 fn clone(&self) -> Buffer<T> { 86 Buffer { 87 ptr: self.ptr, 88 cap: self.cap, 89 } 90 } 91 } 92 93 impl<T> Copy for Buffer<T> {} 94 95 /// Internal queue data shared between the worker and stealers. 96 /// 97 /// The implementation is based on the following work: 98 /// 99 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] 100 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. 101 /// PPoPP 2013.][weak-mem] 102 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ 103 /// atomics. OOPSLA 2013.][checker] 104 /// 105 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 106 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 107 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 108 struct Inner<T> { 109 /// The front index. 110 front: AtomicIsize, 111 112 /// The back index. 113 back: AtomicIsize, 114 115 /// The underlying buffer. 116 buffer: CachePadded<Atomic<Buffer<T>>>, 117 } 118 119 impl<T> Drop for Inner<T> { drop(&mut self)120 fn drop(&mut self) { 121 // Load the back index, front index, and buffer. 122 let b = self.back.load(Ordering::Relaxed); 123 let f = self.front.load(Ordering::Relaxed); 124 125 unsafe { 126 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); 127 128 // Go through the buffer from front to back and drop all tasks in the queue. 129 let mut i = f; 130 while i != b { 131 buffer.deref().at(i).drop_in_place(); 132 i = i.wrapping_add(1); 133 } 134 135 // Free the memory allocated by the buffer. 136 buffer.into_owned().into_box().dealloc(); 137 } 138 } 139 } 140 141 /// Worker queue flavor: FIFO or LIFO. 142 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 143 enum Flavor { 144 /// The first-in first-out flavor. 145 Fifo, 146 147 /// The last-in first-out flavor. 148 Lifo, 149 } 150 151 /// A worker queue. 152 /// 153 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal 154 /// tasks from it. Task schedulers typically create a single worker queue per thread. 155 /// 156 /// # Examples 157 /// 158 /// A FIFO worker: 159 /// 160 /// ``` 161 /// use crossbeam_deque::{Steal, Worker}; 162 /// 163 /// let w = Worker::new_fifo(); 164 /// let s = w.stealer(); 165 /// 166 /// w.push(1); 167 /// w.push(2); 168 /// w.push(3); 169 /// 170 /// assert_eq!(s.steal(), Steal::Success(1)); 171 /// assert_eq!(w.pop(), Some(2)); 172 /// assert_eq!(w.pop(), Some(3)); 173 /// ``` 174 /// 175 /// A LIFO worker: 176 /// 177 /// ``` 178 /// use crossbeam_deque::{Steal, Worker}; 179 /// 180 /// let w = Worker::new_lifo(); 181 /// let s = w.stealer(); 182 /// 183 /// w.push(1); 184 /// w.push(2); 185 /// w.push(3); 186 /// 187 /// assert_eq!(s.steal(), Steal::Success(1)); 188 /// assert_eq!(w.pop(), Some(3)); 189 /// assert_eq!(w.pop(), Some(2)); 190 /// ``` 191 pub struct Worker<T> { 192 /// A reference to the inner representation of the queue. 193 inner: Arc<CachePadded<Inner<T>>>, 194 195 /// A copy of `inner.buffer` for quick access. 196 buffer: Cell<Buffer<T>>, 197 198 /// The flavor of the queue. 199 flavor: Flavor, 200 201 /// Indicates that the worker cannot be shared among threads. 202 _marker: PhantomData<*mut ()>, // !Send + !Sync 203 } 204 205 unsafe impl<T: Send> Send for Worker<T> {} 206 207 impl<T> Worker<T> { 208 /// Creates a FIFO worker queue. 209 /// 210 /// Tasks are pushed and popped from opposite ends. 211 /// 212 /// # Examples 213 /// 214 /// ``` 215 /// use crossbeam_deque::Worker; 216 /// 217 /// let w = Worker::<i32>::new_fifo(); 218 /// ``` new_fifo() -> Worker<T>219 pub fn new_fifo() -> Worker<T> { 220 let buffer = Buffer::alloc(MIN_CAP); 221 222 let inner = Arc::new(CachePadded::new(Inner { 223 front: AtomicIsize::new(0), 224 back: AtomicIsize::new(0), 225 buffer: CachePadded::new(Atomic::new(buffer)), 226 })); 227 228 Worker { 229 inner, 230 buffer: Cell::new(buffer), 231 flavor: Flavor::Fifo, 232 _marker: PhantomData, 233 } 234 } 235 236 /// Creates a LIFO worker queue. 237 /// 238 /// Tasks are pushed and popped from the same end. 239 /// 240 /// # Examples 241 /// 242 /// ``` 243 /// use crossbeam_deque::Worker; 244 /// 245 /// let w = Worker::<i32>::new_lifo(); 246 /// ``` new_lifo() -> Worker<T>247 pub fn new_lifo() -> Worker<T> { 248 let buffer = Buffer::alloc(MIN_CAP); 249 250 let inner = Arc::new(CachePadded::new(Inner { 251 front: AtomicIsize::new(0), 252 back: AtomicIsize::new(0), 253 buffer: CachePadded::new(Atomic::new(buffer)), 254 })); 255 256 Worker { 257 inner, 258 buffer: Cell::new(buffer), 259 flavor: Flavor::Lifo, 260 _marker: PhantomData, 261 } 262 } 263 264 /// Creates a stealer for this queue. 265 /// 266 /// The returned stealer can be shared among threads and cloned. 267 /// 268 /// # Examples 269 /// 270 /// ``` 271 /// use crossbeam_deque::Worker; 272 /// 273 /// let w = Worker::<i32>::new_lifo(); 274 /// let s = w.stealer(); 275 /// ``` stealer(&self) -> Stealer<T>276 pub fn stealer(&self) -> Stealer<T> { 277 Stealer { 278 inner: self.inner.clone(), 279 flavor: self.flavor, 280 } 281 } 282 283 /// Resizes the internal buffer to the new capacity of `new_cap`. 284 #[cold] resize(&self, new_cap: usize)285 unsafe fn resize(&self, new_cap: usize) { 286 // Load the back index, front index, and buffer. 287 let b = self.inner.back.load(Ordering::Relaxed); 288 let f = self.inner.front.load(Ordering::Relaxed); 289 let buffer = self.buffer.get(); 290 291 // Allocate a new buffer and copy data from the old buffer to the new one. 292 let new = Buffer::alloc(new_cap); 293 let mut i = f; 294 while i != b { 295 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); 296 i = i.wrapping_add(1); 297 } 298 299 let guard = &epoch::pin(); 300 301 // Replace the old buffer with the new one. 302 self.buffer.replace(new); 303 let old = 304 self.inner 305 .buffer 306 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); 307 308 // Destroy the old buffer later. 309 guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); 310 311 // If the buffer is very large, then flush the thread-local garbage in order to deallocate 312 // it as soon as possible. 313 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { 314 guard.flush(); 315 } 316 } 317 318 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the 319 /// buffer. reserve(&self, reserve_cap: usize)320 fn reserve(&self, reserve_cap: usize) { 321 if reserve_cap > 0 { 322 // Compute the current length. 323 let b = self.inner.back.load(Ordering::Relaxed); 324 let f = self.inner.front.load(Ordering::SeqCst); 325 let len = b.wrapping_sub(f) as usize; 326 327 // The current capacity. 328 let cap = self.buffer.get().cap; 329 330 // Is there enough capacity to push `reserve_cap` tasks? 331 if cap - len < reserve_cap { 332 // Keep doubling the capacity as much as is needed. 333 let mut new_cap = cap * 2; 334 while new_cap - len < reserve_cap { 335 new_cap *= 2; 336 } 337 338 // Resize the buffer. 339 unsafe { 340 self.resize(new_cap); 341 } 342 } 343 } 344 } 345 346 /// Returns `true` if the queue is empty. 347 /// 348 /// ``` 349 /// use crossbeam_deque::Worker; 350 /// 351 /// let w = Worker::new_lifo(); 352 /// 353 /// assert!(w.is_empty()); 354 /// w.push(1); 355 /// assert!(!w.is_empty()); 356 /// ``` is_empty(&self) -> bool357 pub fn is_empty(&self) -> bool { 358 let b = self.inner.back.load(Ordering::Relaxed); 359 let f = self.inner.front.load(Ordering::SeqCst); 360 b.wrapping_sub(f) <= 0 361 } 362 363 /// Returns the number of tasks in the deque. 364 /// 365 /// ``` 366 /// use crossbeam_deque::Worker; 367 /// 368 /// let w = Worker::new_lifo(); 369 /// 370 /// assert_eq!(w.len(), 0); 371 /// w.push(1); 372 /// assert_eq!(w.len(), 1); 373 /// w.push(1); 374 /// assert_eq!(w.len(), 2); 375 /// ``` len(&self) -> usize376 pub fn len(&self) -> usize { 377 let b = self.inner.back.load(Ordering::Relaxed); 378 let f = self.inner.front.load(Ordering::SeqCst); 379 b.wrapping_sub(f).max(0) as usize 380 } 381 382 /// Pushes a task into the queue. 383 /// 384 /// # Examples 385 /// 386 /// ``` 387 /// use crossbeam_deque::Worker; 388 /// 389 /// let w = Worker::new_lifo(); 390 /// w.push(1); 391 /// w.push(2); 392 /// ``` push(&self, task: T)393 pub fn push(&self, task: T) { 394 // Load the back index, front index, and buffer. 395 let b = self.inner.back.load(Ordering::Relaxed); 396 let f = self.inner.front.load(Ordering::Acquire); 397 let mut buffer = self.buffer.get(); 398 399 // Calculate the length of the queue. 400 let len = b.wrapping_sub(f); 401 402 // Is the queue full? 403 if len >= buffer.cap as isize { 404 // Yes. Grow the underlying buffer. 405 unsafe { 406 self.resize(2 * buffer.cap); 407 } 408 buffer = self.buffer.get(); 409 } 410 411 // Write `task` into the slot. 412 unsafe { 413 buffer.write(b, task); 414 } 415 416 atomic::fence(Ordering::Release); 417 418 // Increment the back index. 419 // 420 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 421 // races because it doesn't understand fences. 422 self.inner.back.store(b.wrapping_add(1), Ordering::Release); 423 } 424 425 /// Pops a task from the queue. 426 /// 427 /// # Examples 428 /// 429 /// ``` 430 /// use crossbeam_deque::Worker; 431 /// 432 /// let w = Worker::new_fifo(); 433 /// w.push(1); 434 /// w.push(2); 435 /// 436 /// assert_eq!(w.pop(), Some(1)); 437 /// assert_eq!(w.pop(), Some(2)); 438 /// assert_eq!(w.pop(), None); 439 /// ``` pop(&self) -> Option<T>440 pub fn pop(&self) -> Option<T> { 441 // Load the back and front index. 442 let b = self.inner.back.load(Ordering::Relaxed); 443 let f = self.inner.front.load(Ordering::Relaxed); 444 445 // Calculate the length of the queue. 446 let len = b.wrapping_sub(f); 447 448 // Is the queue empty? 449 if len <= 0 { 450 return None; 451 } 452 453 match self.flavor { 454 // Pop from the front of the queue. 455 Flavor::Fifo => { 456 // Try incrementing the front index to pop the task. 457 let f = self.inner.front.fetch_add(1, Ordering::SeqCst); 458 let new_f = f.wrapping_add(1); 459 460 if b.wrapping_sub(new_f) < 0 { 461 self.inner.front.store(f, Ordering::Relaxed); 462 return None; 463 } 464 465 unsafe { 466 // Read the popped task. 467 let buffer = self.buffer.get(); 468 let task = buffer.read(f); 469 470 // Shrink the buffer if `len - 1` is less than one fourth of the capacity. 471 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { 472 self.resize(buffer.cap / 2); 473 } 474 475 Some(task) 476 } 477 } 478 479 // Pop from the back of the queue. 480 Flavor::Lifo => { 481 // Decrement the back index. 482 let b = b.wrapping_sub(1); 483 self.inner.back.store(b, Ordering::Relaxed); 484 485 atomic::fence(Ordering::SeqCst); 486 487 // Load the front index. 488 let f = self.inner.front.load(Ordering::Relaxed); 489 490 // Compute the length after the back index was decremented. 491 let len = b.wrapping_sub(f); 492 493 if len < 0 { 494 // The queue is empty. Restore the back index to the original task. 495 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 496 None 497 } else { 498 // Read the task to be popped. 499 let buffer = self.buffer.get(); 500 let mut task = unsafe { Some(buffer.read(b)) }; 501 502 // Are we popping the last task from the queue? 503 if len == 0 { 504 // Try incrementing the front index. 505 if self 506 .inner 507 .front 508 .compare_exchange( 509 f, 510 f.wrapping_add(1), 511 Ordering::SeqCst, 512 Ordering::Relaxed, 513 ) 514 .is_err() 515 { 516 // Failed. We didn't pop anything. 517 mem::forget(task.take()); 518 } 519 520 // Restore the back index to the original task. 521 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 522 } else { 523 // Shrink the buffer if `len` is less than one fourth of the capacity. 524 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { 525 unsafe { 526 self.resize(buffer.cap / 2); 527 } 528 } 529 } 530 531 task 532 } 533 } 534 } 535 } 536 } 537 538 impl<T> fmt::Debug for Worker<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 540 f.pad("Worker { .. }") 541 } 542 } 543 544 /// A stealer handle of a worker queue. 545 /// 546 /// Stealers can be shared among threads. 547 /// 548 /// Task schedulers typically have a single worker queue per worker thread. 549 /// 550 /// # Examples 551 /// 552 /// ``` 553 /// use crossbeam_deque::{Steal, Worker}; 554 /// 555 /// let w = Worker::new_lifo(); 556 /// w.push(1); 557 /// w.push(2); 558 /// 559 /// let s = w.stealer(); 560 /// assert_eq!(s.steal(), Steal::Success(1)); 561 /// assert_eq!(s.steal(), Steal::Success(2)); 562 /// assert_eq!(s.steal(), Steal::Empty); 563 /// ``` 564 pub struct Stealer<T> { 565 /// A reference to the inner representation of the queue. 566 inner: Arc<CachePadded<Inner<T>>>, 567 568 /// The flavor of the queue. 569 flavor: Flavor, 570 } 571 572 unsafe impl<T: Send> Send for Stealer<T> {} 573 unsafe impl<T: Send> Sync for Stealer<T> {} 574 575 impl<T> Stealer<T> { 576 /// Returns `true` if the queue is empty. 577 /// 578 /// ``` 579 /// use crossbeam_deque::Worker; 580 /// 581 /// let w = Worker::new_lifo(); 582 /// let s = w.stealer(); 583 /// 584 /// assert!(s.is_empty()); 585 /// w.push(1); 586 /// assert!(!s.is_empty()); 587 /// ``` is_empty(&self) -> bool588 pub fn is_empty(&self) -> bool { 589 let f = self.inner.front.load(Ordering::Acquire); 590 atomic::fence(Ordering::SeqCst); 591 let b = self.inner.back.load(Ordering::Acquire); 592 b.wrapping_sub(f) <= 0 593 } 594 595 /// Steals a task from the queue. 596 /// 597 /// # Examples 598 /// 599 /// ``` 600 /// use crossbeam_deque::{Steal, Worker}; 601 /// 602 /// let w = Worker::new_lifo(); 603 /// w.push(1); 604 /// w.push(2); 605 /// 606 /// let s = w.stealer(); 607 /// assert_eq!(s.steal(), Steal::Success(1)); 608 /// assert_eq!(s.steal(), Steal::Success(2)); 609 /// ``` steal(&self) -> Steal<T>610 pub fn steal(&self) -> Steal<T> { 611 // Load the front index. 612 let f = self.inner.front.load(Ordering::Acquire); 613 614 // A SeqCst fence is needed here. 615 // 616 // If the current thread is already pinned (reentrantly), we must manually issue the 617 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 618 // have to. 619 if epoch::is_pinned() { 620 atomic::fence(Ordering::SeqCst); 621 } 622 623 let guard = &epoch::pin(); 624 625 // Load the back index. 626 let b = self.inner.back.load(Ordering::Acquire); 627 628 // Is the queue empty? 629 if b.wrapping_sub(f) <= 0 { 630 return Steal::Empty; 631 } 632 633 // Load the buffer and read the task at the front. 634 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 635 let task = unsafe { buffer.deref().read(f) }; 636 637 // Try incrementing the front index to steal the task. 638 if self 639 .inner 640 .front 641 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 642 .is_err() 643 { 644 // We didn't steal this task, forget it. 645 mem::forget(task); 646 return Steal::Retry; 647 } 648 649 // Return the stolen task. 650 Steal::Success(task) 651 } 652 653 /// Steals a batch of tasks and pushes them into another worker. 654 /// 655 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 656 /// steal around half of the tasks in the queue, but also not more than some constant limit. 657 /// 658 /// # Examples 659 /// 660 /// ``` 661 /// use crossbeam_deque::Worker; 662 /// 663 /// let w1 = Worker::new_fifo(); 664 /// w1.push(1); 665 /// w1.push(2); 666 /// w1.push(3); 667 /// w1.push(4); 668 /// 669 /// let s = w1.stealer(); 670 /// let w2 = Worker::new_fifo(); 671 /// 672 /// let _ = s.steal_batch(&w2); 673 /// assert_eq!(w2.pop(), Some(1)); 674 /// assert_eq!(w2.pop(), Some(2)); 675 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>676 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 677 if Arc::ptr_eq(&self.inner, &dest.inner) { 678 if dest.is_empty() { 679 return Steal::Empty; 680 } else { 681 return Steal::Success(()); 682 } 683 } 684 685 // Load the front index. 686 let mut f = self.inner.front.load(Ordering::Acquire); 687 688 // A SeqCst fence is needed here. 689 // 690 // If the current thread is already pinned (reentrantly), we must manually issue the 691 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 692 // have to. 693 if epoch::is_pinned() { 694 atomic::fence(Ordering::SeqCst); 695 } 696 697 let guard = &epoch::pin(); 698 699 // Load the back index. 700 let b = self.inner.back.load(Ordering::Acquire); 701 702 // Is the queue empty? 703 let len = b.wrapping_sub(f); 704 if len <= 0 { 705 return Steal::Empty; 706 } 707 708 // Reserve capacity for the stolen batch. 709 let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH); 710 dest.reserve(batch_size); 711 let mut batch_size = batch_size as isize; 712 713 // Get the destination buffer and back index. 714 let dest_buffer = dest.buffer.get(); 715 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 716 717 // Load the buffer. 718 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 719 720 match self.flavor { 721 // Steal a batch of tasks from the front at once. 722 Flavor::Fifo => { 723 // Copy the batch from the source to the destination buffer. 724 match dest.flavor { 725 Flavor::Fifo => { 726 for i in 0..batch_size { 727 unsafe { 728 let task = buffer.deref().read(f.wrapping_add(i)); 729 dest_buffer.write(dest_b.wrapping_add(i), task); 730 } 731 } 732 } 733 Flavor::Lifo => { 734 for i in 0..batch_size { 735 unsafe { 736 let task = buffer.deref().read(f.wrapping_add(i)); 737 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 738 } 739 } 740 } 741 } 742 743 // Try incrementing the front index to steal the batch. 744 if self 745 .inner 746 .front 747 .compare_exchange( 748 f, 749 f.wrapping_add(batch_size), 750 Ordering::SeqCst, 751 Ordering::Relaxed, 752 ) 753 .is_err() 754 { 755 return Steal::Retry; 756 } 757 758 dest_b = dest_b.wrapping_add(batch_size); 759 } 760 761 // Steal a batch of tasks from the front one by one. 762 Flavor::Lifo => { 763 for i in 0..batch_size { 764 // If this is not the first steal, check whether the queue is empty. 765 if i > 0 { 766 // We've already got the current front index. Now execute the fence to 767 // synchronize with other threads. 768 atomic::fence(Ordering::SeqCst); 769 770 // Load the back index. 771 let b = self.inner.back.load(Ordering::Acquire); 772 773 // Is the queue empty? 774 if b.wrapping_sub(f) <= 0 { 775 batch_size = i; 776 break; 777 } 778 } 779 780 // Read the task at the front. 781 let task = unsafe { buffer.deref().read(f) }; 782 783 // Try incrementing the front index to steal the task. 784 if self 785 .inner 786 .front 787 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 788 .is_err() 789 { 790 // We didn't steal this task, forget it and break from the loop. 791 mem::forget(task); 792 batch_size = i; 793 break; 794 } 795 796 // Write the stolen task into the destination buffer. 797 unsafe { 798 dest_buffer.write(dest_b, task); 799 } 800 801 // Move the source front index and the destination back index one step forward. 802 f = f.wrapping_add(1); 803 dest_b = dest_b.wrapping_add(1); 804 } 805 806 // If we didn't steal anything, the operation needs to be retried. 807 if batch_size == 0 { 808 return Steal::Retry; 809 } 810 811 // If stealing into a FIFO queue, stolen tasks need to be reversed. 812 if dest.flavor == Flavor::Fifo { 813 for i in 0..batch_size / 2 { 814 unsafe { 815 let i1 = dest_b.wrapping_sub(batch_size - i); 816 let i2 = dest_b.wrapping_sub(i + 1); 817 let t1 = dest_buffer.read(i1); 818 let t2 = dest_buffer.read(i2); 819 dest_buffer.write(i1, t2); 820 dest_buffer.write(i2, t1); 821 } 822 } 823 } 824 } 825 } 826 827 atomic::fence(Ordering::Release); 828 829 // Update the back index in the destination queue. 830 // 831 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 832 // races because it doesn't understand fences. 833 dest.inner.back.store(dest_b, Ordering::Release); 834 835 // Return with success. 836 Steal::Success(()) 837 } 838 839 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. 840 /// 841 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 842 /// steal around half of the tasks in the queue, but also not more than some constant limit. 843 /// 844 /// # Examples 845 /// 846 /// ``` 847 /// use crossbeam_deque::{Steal, Worker}; 848 /// 849 /// let w1 = Worker::new_fifo(); 850 /// w1.push(1); 851 /// w1.push(2); 852 /// w1.push(3); 853 /// w1.push(4); 854 /// 855 /// let s = w1.stealer(); 856 /// let w2 = Worker::new_fifo(); 857 /// 858 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); 859 /// assert_eq!(w2.pop(), Some(2)); 860 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>861 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 862 if Arc::ptr_eq(&self.inner, &dest.inner) { 863 match dest.pop() { 864 None => return Steal::Empty, 865 Some(task) => return Steal::Success(task), 866 } 867 } 868 869 // Load the front index. 870 let mut f = self.inner.front.load(Ordering::Acquire); 871 872 // A SeqCst fence is needed here. 873 // 874 // If the current thread is already pinned (reentrantly), we must manually issue the 875 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 876 // have to. 877 if epoch::is_pinned() { 878 atomic::fence(Ordering::SeqCst); 879 } 880 881 let guard = &epoch::pin(); 882 883 // Load the back index. 884 let b = self.inner.back.load(Ordering::Acquire); 885 886 // Is the queue empty? 887 let len = b.wrapping_sub(f); 888 if len <= 0 { 889 return Steal::Empty; 890 } 891 892 // Reserve capacity for the stolen batch. 893 let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1); 894 dest.reserve(batch_size); 895 let mut batch_size = batch_size as isize; 896 897 // Get the destination buffer and back index. 898 let dest_buffer = dest.buffer.get(); 899 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 900 901 // Load the buffer 902 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 903 904 // Read the task at the front. 905 let mut task = unsafe { buffer.deref().read(f) }; 906 907 match self.flavor { 908 // Steal a batch of tasks from the front at once. 909 Flavor::Fifo => { 910 // Copy the batch from the source to the destination buffer. 911 match dest.flavor { 912 Flavor::Fifo => { 913 for i in 0..batch_size { 914 unsafe { 915 let task = buffer.deref().read(f.wrapping_add(i + 1)); 916 dest_buffer.write(dest_b.wrapping_add(i), task); 917 } 918 } 919 } 920 Flavor::Lifo => { 921 for i in 0..batch_size { 922 unsafe { 923 let task = buffer.deref().read(f.wrapping_add(i + 1)); 924 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 925 } 926 } 927 } 928 } 929 930 // Try incrementing the front index to steal the batch. 931 if self 932 .inner 933 .front 934 .compare_exchange( 935 f, 936 f.wrapping_add(batch_size + 1), 937 Ordering::SeqCst, 938 Ordering::Relaxed, 939 ) 940 .is_err() 941 { 942 // We didn't steal this task, forget it. 943 mem::forget(task); 944 return Steal::Retry; 945 } 946 947 dest_b = dest_b.wrapping_add(batch_size); 948 } 949 950 // Steal a batch of tasks from the front one by one. 951 Flavor::Lifo => { 952 // Try incrementing the front index to steal the task. 953 if self 954 .inner 955 .front 956 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 957 .is_err() 958 { 959 // We didn't steal this task, forget it. 960 mem::forget(task); 961 return Steal::Retry; 962 } 963 964 // Move the front index one step forward. 965 f = f.wrapping_add(1); 966 967 // Repeat the same procedure for the batch steals. 968 for i in 0..batch_size { 969 // We've already got the current front index. Now execute the fence to 970 // synchronize with other threads. 971 atomic::fence(Ordering::SeqCst); 972 973 // Load the back index. 974 let b = self.inner.back.load(Ordering::Acquire); 975 976 // Is the queue empty? 977 if b.wrapping_sub(f) <= 0 { 978 batch_size = i; 979 break; 980 } 981 982 // Read the task at the front. 983 let tmp = unsafe { buffer.deref().read(f) }; 984 985 // Try incrementing the front index to steal the task. 986 if self 987 .inner 988 .front 989 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 990 .is_err() 991 { 992 // We didn't steal this task, forget it and break from the loop. 993 mem::forget(tmp); 994 batch_size = i; 995 break; 996 } 997 998 // Write the previously stolen task into the destination buffer. 999 unsafe { 1000 dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); 1001 } 1002 1003 // Move the source front index and the destination back index one step forward. 1004 f = f.wrapping_add(1); 1005 dest_b = dest_b.wrapping_add(1); 1006 } 1007 1008 // If stealing into a FIFO queue, stolen tasks need to be reversed. 1009 if dest.flavor == Flavor::Fifo { 1010 for i in 0..batch_size / 2 { 1011 unsafe { 1012 let i1 = dest_b.wrapping_sub(batch_size - i); 1013 let i2 = dest_b.wrapping_sub(i + 1); 1014 let t1 = dest_buffer.read(i1); 1015 let t2 = dest_buffer.read(i2); 1016 dest_buffer.write(i1, t2); 1017 dest_buffer.write(i2, t1); 1018 } 1019 } 1020 } 1021 } 1022 } 1023 1024 atomic::fence(Ordering::Release); 1025 1026 // Update the back index in the destination queue. 1027 // 1028 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 1029 // races because it doesn't understand fences. 1030 dest.inner.back.store(dest_b, Ordering::Release); 1031 1032 // Return with success. 1033 Steal::Success(task) 1034 } 1035 } 1036 1037 impl<T> Clone for Stealer<T> { clone(&self) -> Stealer<T>1038 fn clone(&self) -> Stealer<T> { 1039 Stealer { 1040 inner: self.inner.clone(), 1041 flavor: self.flavor, 1042 } 1043 } 1044 } 1045 1046 impl<T> fmt::Debug for Stealer<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1047 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1048 f.pad("Stealer { .. }") 1049 } 1050 } 1051 1052 // Bits indicating the state of a slot: 1053 // * If a task has been written into the slot, `WRITE` is set. 1054 // * If a task has been read from the slot, `READ` is set. 1055 // * If the block is being destroyed, `DESTROY` is set. 1056 const WRITE: usize = 1; 1057 const READ: usize = 2; 1058 const DESTROY: usize = 4; 1059 1060 // Each block covers one "lap" of indices. 1061 const LAP: usize = 64; 1062 // The maximum number of values a block can hold. 1063 const BLOCK_CAP: usize = LAP - 1; 1064 // How many lower bits are reserved for metadata. 1065 const SHIFT: usize = 1; 1066 // Indicates that the block is not the last one. 1067 const HAS_NEXT: usize = 1; 1068 1069 /// A slot in a block. 1070 struct Slot<T> { 1071 /// The task. 1072 task: UnsafeCell<MaybeUninit<T>>, 1073 1074 /// The state of the slot. 1075 state: AtomicUsize, 1076 } 1077 1078 impl<T> Slot<T> { 1079 /// Waits until a task is written into the slot. wait_write(&self)1080 fn wait_write(&self) { 1081 let backoff = Backoff::new(); 1082 while self.state.load(Ordering::Acquire) & WRITE == 0 { 1083 backoff.snooze(); 1084 } 1085 } 1086 } 1087 1088 /// A block in a linked list. 1089 /// 1090 /// Each block in the list can hold up to `BLOCK_CAP` values. 1091 struct Block<T> { 1092 /// The next block in the linked list. 1093 next: AtomicPtr<Block<T>>, 1094 1095 /// Slots for values. 1096 slots: [Slot<T>; BLOCK_CAP], 1097 } 1098 1099 impl<T> Block<T> { 1100 /// Creates an empty block that starts at `start_index`. new() -> Block<T>1101 fn new() -> Block<T> { 1102 // SAFETY: This is safe because: 1103 // [1] `Block::next` (AtomicPtr) may be safely zero initialized. 1104 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. 1105 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it 1106 // holds a MaybeUninit. 1107 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. 1108 unsafe { MaybeUninit::zeroed().assume_init() } 1109 } 1110 1111 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>1112 fn wait_next(&self) -> *mut Block<T> { 1113 let backoff = Backoff::new(); 1114 loop { 1115 let next = self.next.load(Ordering::Acquire); 1116 if !next.is_null() { 1117 return next; 1118 } 1119 backoff.snooze(); 1120 } 1121 } 1122 1123 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, count: usize)1124 unsafe fn destroy(this: *mut Block<T>, count: usize) { 1125 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 1126 // begun destruction of the block. 1127 for i in (0..count).rev() { 1128 let slot = (*this).slots.get_unchecked(i); 1129 1130 // Mark the `DESTROY` bit if a thread is still using the slot. 1131 if slot.state.load(Ordering::Acquire) & READ == 0 1132 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 1133 { 1134 // If a thread is still using the slot, it will continue destruction of the block. 1135 return; 1136 } 1137 } 1138 1139 // No thread is using the block, now it is safe to destroy it. 1140 drop(Box::from_raw(this)); 1141 } 1142 } 1143 1144 /// A position in a queue. 1145 struct Position<T> { 1146 /// The index in the queue. 1147 index: AtomicUsize, 1148 1149 /// The block in the linked list. 1150 block: AtomicPtr<Block<T>>, 1151 } 1152 1153 /// An injector queue. 1154 /// 1155 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have 1156 /// a single injector queue, which is the entry point for new tasks. 1157 /// 1158 /// # Examples 1159 /// 1160 /// ``` 1161 /// use crossbeam_deque::{Injector, Steal}; 1162 /// 1163 /// let q = Injector::new(); 1164 /// q.push(1); 1165 /// q.push(2); 1166 /// 1167 /// assert_eq!(q.steal(), Steal::Success(1)); 1168 /// assert_eq!(q.steal(), Steal::Success(2)); 1169 /// assert_eq!(q.steal(), Steal::Empty); 1170 /// ``` 1171 pub struct Injector<T> { 1172 /// The head of the queue. 1173 head: CachePadded<Position<T>>, 1174 1175 /// The tail of the queue. 1176 tail: CachePadded<Position<T>>, 1177 1178 /// Indicates that dropping a `Injector<T>` may drop values of type `T`. 1179 _marker: PhantomData<T>, 1180 } 1181 1182 unsafe impl<T: Send> Send for Injector<T> {} 1183 unsafe impl<T: Send> Sync for Injector<T> {} 1184 1185 impl<T> Default for Injector<T> { default() -> Self1186 fn default() -> Self { 1187 let block = Box::into_raw(Box::new(Block::<T>::new())); 1188 Self { 1189 head: CachePadded::new(Position { 1190 block: AtomicPtr::new(block), 1191 index: AtomicUsize::new(0), 1192 }), 1193 tail: CachePadded::new(Position { 1194 block: AtomicPtr::new(block), 1195 index: AtomicUsize::new(0), 1196 }), 1197 _marker: PhantomData, 1198 } 1199 } 1200 } 1201 1202 impl<T> Injector<T> { 1203 /// Creates a new injector queue. 1204 /// 1205 /// # Examples 1206 /// 1207 /// ``` 1208 /// use crossbeam_deque::Injector; 1209 /// 1210 /// let q = Injector::<i32>::new(); 1211 /// ``` new() -> Injector<T>1212 pub fn new() -> Injector<T> { 1213 Self::default() 1214 } 1215 1216 /// Pushes a task into the queue. 1217 /// 1218 /// # Examples 1219 /// 1220 /// ``` 1221 /// use crossbeam_deque::Injector; 1222 /// 1223 /// let w = Injector::new(); 1224 /// w.push(1); 1225 /// w.push(2); 1226 /// ``` push(&self, task: T)1227 pub fn push(&self, task: T) { 1228 let backoff = Backoff::new(); 1229 let mut tail = self.tail.index.load(Ordering::Acquire); 1230 let mut block = self.tail.block.load(Ordering::Acquire); 1231 let mut next_block = None; 1232 1233 loop { 1234 // Calculate the offset of the index into the block. 1235 let offset = (tail >> SHIFT) % LAP; 1236 1237 // If we reached the end of the block, wait until the next one is installed. 1238 if offset == BLOCK_CAP { 1239 backoff.snooze(); 1240 tail = self.tail.index.load(Ordering::Acquire); 1241 block = self.tail.block.load(Ordering::Acquire); 1242 continue; 1243 } 1244 1245 // If we're going to have to install the next block, allocate it in advance in order to 1246 // make the wait for other threads as short as possible. 1247 if offset + 1 == BLOCK_CAP && next_block.is_none() { 1248 next_block = Some(Box::new(Block::<T>::new())); 1249 } 1250 1251 let new_tail = tail + (1 << SHIFT); 1252 1253 // Try advancing the tail forward. 1254 match self.tail.index.compare_exchange_weak( 1255 tail, 1256 new_tail, 1257 Ordering::SeqCst, 1258 Ordering::Acquire, 1259 ) { 1260 Ok(_) => unsafe { 1261 // If we've reached the end of the block, install the next one. 1262 if offset + 1 == BLOCK_CAP { 1263 let next_block = Box::into_raw(next_block.unwrap()); 1264 let next_index = new_tail.wrapping_add(1 << SHIFT); 1265 1266 self.tail.block.store(next_block, Ordering::Release); 1267 self.tail.index.store(next_index, Ordering::Release); 1268 (*block).next.store(next_block, Ordering::Release); 1269 } 1270 1271 // Write the task into the slot. 1272 let slot = (*block).slots.get_unchecked(offset); 1273 slot.task.get().write(MaybeUninit::new(task)); 1274 slot.state.fetch_or(WRITE, Ordering::Release); 1275 1276 return; 1277 }, 1278 Err(t) => { 1279 tail = t; 1280 block = self.tail.block.load(Ordering::Acquire); 1281 backoff.spin(); 1282 } 1283 } 1284 } 1285 } 1286 1287 /// Steals a task from the queue. 1288 /// 1289 /// # Examples 1290 /// 1291 /// ``` 1292 /// use crossbeam_deque::{Injector, Steal}; 1293 /// 1294 /// let q = Injector::new(); 1295 /// q.push(1); 1296 /// q.push(2); 1297 /// 1298 /// assert_eq!(q.steal(), Steal::Success(1)); 1299 /// assert_eq!(q.steal(), Steal::Success(2)); 1300 /// assert_eq!(q.steal(), Steal::Empty); 1301 /// ``` steal(&self) -> Steal<T>1302 pub fn steal(&self) -> Steal<T> { 1303 let mut head; 1304 let mut block; 1305 let mut offset; 1306 1307 let backoff = Backoff::new(); 1308 loop { 1309 head = self.head.index.load(Ordering::Acquire); 1310 block = self.head.block.load(Ordering::Acquire); 1311 1312 // Calculate the offset of the index into the block. 1313 offset = (head >> SHIFT) % LAP; 1314 1315 // If we reached the end of the block, wait until the next one is installed. 1316 if offset == BLOCK_CAP { 1317 backoff.snooze(); 1318 } else { 1319 break; 1320 } 1321 } 1322 1323 let mut new_head = head + (1 << SHIFT); 1324 1325 if new_head & HAS_NEXT == 0 { 1326 atomic::fence(Ordering::SeqCst); 1327 let tail = self.tail.index.load(Ordering::Relaxed); 1328 1329 // If the tail equals the head, that means the queue is empty. 1330 if head >> SHIFT == tail >> SHIFT { 1331 return Steal::Empty; 1332 } 1333 1334 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1335 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1336 new_head |= HAS_NEXT; 1337 } 1338 } 1339 1340 // Try moving the head index forward. 1341 if self 1342 .head 1343 .index 1344 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1345 .is_err() 1346 { 1347 return Steal::Retry; 1348 } 1349 1350 unsafe { 1351 // If we've reached the end of the block, move to the next one. 1352 if offset + 1 == BLOCK_CAP { 1353 let next = (*block).wait_next(); 1354 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1355 if !(*next).next.load(Ordering::Relaxed).is_null() { 1356 next_index |= HAS_NEXT; 1357 } 1358 1359 self.head.block.store(next, Ordering::Release); 1360 self.head.index.store(next_index, Ordering::Release); 1361 } 1362 1363 // Read the task. 1364 let slot = (*block).slots.get_unchecked(offset); 1365 slot.wait_write(); 1366 let task = slot.task.get().read().assume_init(); 1367 1368 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1369 // but couldn't because we were busy reading from the slot. 1370 if (offset + 1 == BLOCK_CAP) || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) { 1371 Block::destroy(block, offset); 1372 } 1373 1374 Steal::Success(task) 1375 } 1376 } 1377 1378 /// Steals a batch of tasks and pushes them into a worker. 1379 /// 1380 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1381 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1382 /// 1383 /// # Examples 1384 /// 1385 /// ``` 1386 /// use crossbeam_deque::{Injector, Worker}; 1387 /// 1388 /// let q = Injector::new(); 1389 /// q.push(1); 1390 /// q.push(2); 1391 /// q.push(3); 1392 /// q.push(4); 1393 /// 1394 /// let w = Worker::new_fifo(); 1395 /// let _ = q.steal_batch(&w); 1396 /// assert_eq!(w.pop(), Some(1)); 1397 /// assert_eq!(w.pop(), Some(2)); 1398 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>1399 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 1400 let mut head; 1401 let mut block; 1402 let mut offset; 1403 1404 let backoff = Backoff::new(); 1405 loop { 1406 head = self.head.index.load(Ordering::Acquire); 1407 block = self.head.block.load(Ordering::Acquire); 1408 1409 // Calculate the offset of the index into the block. 1410 offset = (head >> SHIFT) % LAP; 1411 1412 // If we reached the end of the block, wait until the next one is installed. 1413 if offset == BLOCK_CAP { 1414 backoff.snooze(); 1415 } else { 1416 break; 1417 } 1418 } 1419 1420 let mut new_head = head; 1421 let advance; 1422 1423 if new_head & HAS_NEXT == 0 { 1424 atomic::fence(Ordering::SeqCst); 1425 let tail = self.tail.index.load(Ordering::Relaxed); 1426 1427 // If the tail equals the head, that means the queue is empty. 1428 if head >> SHIFT == tail >> SHIFT { 1429 return Steal::Empty; 1430 } 1431 1432 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate 1433 // the right batch size to steal. 1434 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1435 new_head |= HAS_NEXT; 1436 // We can steal all tasks till the end of the block. 1437 advance = (BLOCK_CAP - offset).min(MAX_BATCH); 1438 } else { 1439 let len = (tail - head) >> SHIFT; 1440 // Steal half of the available tasks. 1441 advance = ((len + 1) / 2).min(MAX_BATCH); 1442 } 1443 } else { 1444 // We can steal all tasks till the end of the block. 1445 advance = (BLOCK_CAP - offset).min(MAX_BATCH); 1446 } 1447 1448 new_head += advance << SHIFT; 1449 let new_offset = offset + advance; 1450 1451 // Try moving the head index forward. 1452 if self 1453 .head 1454 .index 1455 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1456 .is_err() 1457 { 1458 return Steal::Retry; 1459 } 1460 1461 // Reserve capacity for the stolen batch. 1462 let batch_size = new_offset - offset; 1463 dest.reserve(batch_size); 1464 1465 // Get the destination buffer and back index. 1466 let dest_buffer = dest.buffer.get(); 1467 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1468 1469 unsafe { 1470 // If we've reached the end of the block, move to the next one. 1471 if new_offset == BLOCK_CAP { 1472 let next = (*block).wait_next(); 1473 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1474 if !(*next).next.load(Ordering::Relaxed).is_null() { 1475 next_index |= HAS_NEXT; 1476 } 1477 1478 self.head.block.store(next, Ordering::Release); 1479 self.head.index.store(next_index, Ordering::Release); 1480 } 1481 1482 // Copy values from the injector into the destination queue. 1483 match dest.flavor { 1484 Flavor::Fifo => { 1485 for i in 0..batch_size { 1486 // Read the task. 1487 let slot = (*block).slots.get_unchecked(offset + i); 1488 slot.wait_write(); 1489 let task = slot.task.get().read().assume_init(); 1490 1491 // Write it into the destination queue. 1492 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1493 } 1494 } 1495 1496 Flavor::Lifo => { 1497 for i in 0..batch_size { 1498 // Read the task. 1499 let slot = (*block).slots.get_unchecked(offset + i); 1500 slot.wait_write(); 1501 let task = slot.task.get().read().assume_init(); 1502 1503 // Write it into the destination queue. 1504 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1505 } 1506 } 1507 } 1508 1509 atomic::fence(Ordering::Release); 1510 1511 // Update the back index in the destination queue. 1512 // 1513 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1514 // data races because it doesn't understand fences. 1515 dest.inner 1516 .back 1517 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1518 1519 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1520 // but couldn't because we were busy reading from the slot. 1521 if new_offset == BLOCK_CAP { 1522 Block::destroy(block, offset); 1523 } else { 1524 for i in offset..new_offset { 1525 let slot = (*block).slots.get_unchecked(i); 1526 1527 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1528 Block::destroy(block, offset); 1529 break; 1530 } 1531 } 1532 } 1533 1534 Steal::Success(()) 1535 } 1536 } 1537 1538 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. 1539 /// 1540 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1541 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1542 /// 1543 /// # Examples 1544 /// 1545 /// ``` 1546 /// use crossbeam_deque::{Injector, Steal, Worker}; 1547 /// 1548 /// let q = Injector::new(); 1549 /// q.push(1); 1550 /// q.push(2); 1551 /// q.push(3); 1552 /// q.push(4); 1553 /// 1554 /// let w = Worker::new_fifo(); 1555 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); 1556 /// assert_eq!(w.pop(), Some(2)); 1557 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1558 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 1559 let mut head; 1560 let mut block; 1561 let mut offset; 1562 1563 let backoff = Backoff::new(); 1564 loop { 1565 head = self.head.index.load(Ordering::Acquire); 1566 block = self.head.block.load(Ordering::Acquire); 1567 1568 // Calculate the offset of the index into the block. 1569 offset = (head >> SHIFT) % LAP; 1570 1571 // If we reached the end of the block, wait until the next one is installed. 1572 if offset == BLOCK_CAP { 1573 backoff.snooze(); 1574 } else { 1575 break; 1576 } 1577 } 1578 1579 let mut new_head = head; 1580 let advance; 1581 1582 if new_head & HAS_NEXT == 0 { 1583 atomic::fence(Ordering::SeqCst); 1584 let tail = self.tail.index.load(Ordering::Relaxed); 1585 1586 // If the tail equals the head, that means the queue is empty. 1587 if head >> SHIFT == tail >> SHIFT { 1588 return Steal::Empty; 1589 } 1590 1591 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1592 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1593 new_head |= HAS_NEXT; 1594 // We can steal all tasks till the end of the block. 1595 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); 1596 } else { 1597 let len = (tail - head) >> SHIFT; 1598 // Steal half of the available tasks. 1599 advance = ((len + 1) / 2).min(MAX_BATCH + 1); 1600 } 1601 } else { 1602 // We can steal all tasks till the end of the block. 1603 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); 1604 } 1605 1606 new_head += advance << SHIFT; 1607 let new_offset = offset + advance; 1608 1609 // Try moving the head index forward. 1610 if self 1611 .head 1612 .index 1613 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1614 .is_err() 1615 { 1616 return Steal::Retry; 1617 } 1618 1619 // Reserve capacity for the stolen batch. 1620 let batch_size = new_offset - offset - 1; 1621 dest.reserve(batch_size); 1622 1623 // Get the destination buffer and back index. 1624 let dest_buffer = dest.buffer.get(); 1625 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1626 1627 unsafe { 1628 // If we've reached the end of the block, move to the next one. 1629 if new_offset == BLOCK_CAP { 1630 let next = (*block).wait_next(); 1631 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1632 if !(*next).next.load(Ordering::Relaxed).is_null() { 1633 next_index |= HAS_NEXT; 1634 } 1635 1636 self.head.block.store(next, Ordering::Release); 1637 self.head.index.store(next_index, Ordering::Release); 1638 } 1639 1640 // Read the task. 1641 let slot = (*block).slots.get_unchecked(offset); 1642 slot.wait_write(); 1643 let task = slot.task.get().read().assume_init(); 1644 1645 match dest.flavor { 1646 Flavor::Fifo => { 1647 // Copy values from the injector into the destination queue. 1648 for i in 0..batch_size { 1649 // Read the task. 1650 let slot = (*block).slots.get_unchecked(offset + i + 1); 1651 slot.wait_write(); 1652 let task = slot.task.get().read().assume_init(); 1653 1654 // Write it into the destination queue. 1655 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1656 } 1657 } 1658 1659 Flavor::Lifo => { 1660 // Copy values from the injector into the destination queue. 1661 for i in 0..batch_size { 1662 // Read the task. 1663 let slot = (*block).slots.get_unchecked(offset + i + 1); 1664 slot.wait_write(); 1665 let task = slot.task.get().read().assume_init(); 1666 1667 // Write it into the destination queue. 1668 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1669 } 1670 } 1671 } 1672 1673 atomic::fence(Ordering::Release); 1674 1675 // Update the back index in the destination queue. 1676 // 1677 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1678 // data races because it doesn't understand fences. 1679 dest.inner 1680 .back 1681 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1682 1683 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1684 // but couldn't because we were busy reading from the slot. 1685 if new_offset == BLOCK_CAP { 1686 Block::destroy(block, offset); 1687 } else { 1688 for i in offset..new_offset { 1689 let slot = (*block).slots.get_unchecked(i); 1690 1691 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1692 Block::destroy(block, offset); 1693 break; 1694 } 1695 } 1696 } 1697 1698 Steal::Success(task) 1699 } 1700 } 1701 1702 /// Returns `true` if the queue is empty. 1703 /// 1704 /// # Examples 1705 /// 1706 /// ``` 1707 /// use crossbeam_deque::Injector; 1708 /// 1709 /// let q = Injector::new(); 1710 /// 1711 /// assert!(q.is_empty()); 1712 /// q.push(1); 1713 /// assert!(!q.is_empty()); 1714 /// ``` is_empty(&self) -> bool1715 pub fn is_empty(&self) -> bool { 1716 let head = self.head.index.load(Ordering::SeqCst); 1717 let tail = self.tail.index.load(Ordering::SeqCst); 1718 head >> SHIFT == tail >> SHIFT 1719 } 1720 1721 /// Returns the number of tasks in the queue. 1722 /// 1723 /// # Examples 1724 /// 1725 /// ``` 1726 /// use crossbeam_deque::Injector; 1727 /// 1728 /// let q = Injector::new(); 1729 /// 1730 /// assert_eq!(q.len(), 0); 1731 /// q.push(1); 1732 /// assert_eq!(q.len(), 1); 1733 /// q.push(1); 1734 /// assert_eq!(q.len(), 2); 1735 /// ``` len(&self) -> usize1736 pub fn len(&self) -> usize { 1737 loop { 1738 // Load the tail index, then load the head index. 1739 let mut tail = self.tail.index.load(Ordering::SeqCst); 1740 let mut head = self.head.index.load(Ordering::SeqCst); 1741 1742 // If the tail index didn't change, we've got consistent indices to work with. 1743 if self.tail.index.load(Ordering::SeqCst) == tail { 1744 // Erase the lower bits. 1745 tail &= !((1 << SHIFT) - 1); 1746 head &= !((1 << SHIFT) - 1); 1747 1748 // Fix up indices if they fall onto block ends. 1749 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 1750 tail = tail.wrapping_add(1 << SHIFT); 1751 } 1752 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 1753 head = head.wrapping_add(1 << SHIFT); 1754 } 1755 1756 // Rotate indices so that head falls into the first block. 1757 let lap = (head >> SHIFT) / LAP; 1758 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 1759 head = head.wrapping_sub((lap * LAP) << SHIFT); 1760 1761 // Remove the lower bits. 1762 tail >>= SHIFT; 1763 head >>= SHIFT; 1764 1765 // Return the difference minus the number of blocks between tail and head. 1766 return tail - head - tail / LAP; 1767 } 1768 } 1769 } 1770 } 1771 1772 impl<T> Drop for Injector<T> { drop(&mut self)1773 fn drop(&mut self) { 1774 let mut head = self.head.index.load(Ordering::Relaxed); 1775 let mut tail = self.tail.index.load(Ordering::Relaxed); 1776 let mut block = self.head.block.load(Ordering::Relaxed); 1777 1778 // Erase the lower bits. 1779 head &= !((1 << SHIFT) - 1); 1780 tail &= !((1 << SHIFT) - 1); 1781 1782 unsafe { 1783 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 1784 while head != tail { 1785 let offset = (head >> SHIFT) % LAP; 1786 1787 if offset < BLOCK_CAP { 1788 // Drop the task in the slot. 1789 let slot = (*block).slots.get_unchecked(offset); 1790 let p = &mut *slot.task.get(); 1791 p.as_mut_ptr().drop_in_place(); 1792 } else { 1793 // Deallocate the block and move to the next one. 1794 let next = (*block).next.load(Ordering::Relaxed); 1795 drop(Box::from_raw(block)); 1796 block = next; 1797 } 1798 1799 head = head.wrapping_add(1 << SHIFT); 1800 } 1801 1802 // Deallocate the last remaining block. 1803 drop(Box::from_raw(block)); 1804 } 1805 } 1806 } 1807 1808 impl<T> fmt::Debug for Injector<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1809 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1810 f.pad("Worker { .. }") 1811 } 1812 } 1813 1814 /// Possible outcomes of a steal operation. 1815 /// 1816 /// # Examples 1817 /// 1818 /// There are lots of ways to chain results of steal operations together: 1819 /// 1820 /// ``` 1821 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; 1822 /// 1823 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); 1824 /// 1825 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); 1826 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); 1827 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); 1828 /// 1829 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); 1830 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); 1831 /// ``` 1832 #[must_use] 1833 #[derive(PartialEq, Eq, Copy, Clone)] 1834 pub enum Steal<T> { 1835 /// The queue was empty at the time of stealing. 1836 Empty, 1837 1838 /// At least one task was successfully stolen. 1839 Success(T), 1840 1841 /// The steal operation needs to be retried. 1842 Retry, 1843 } 1844 1845 impl<T> Steal<T> { 1846 /// Returns `true` if the queue was empty at the time of stealing. 1847 /// 1848 /// # Examples 1849 /// 1850 /// ``` 1851 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1852 /// 1853 /// assert!(!Success(7).is_empty()); 1854 /// assert!(!Retry::<i32>.is_empty()); 1855 /// 1856 /// assert!(Empty::<i32>.is_empty()); 1857 /// ``` is_empty(&self) -> bool1858 pub fn is_empty(&self) -> bool { 1859 match self { 1860 Steal::Empty => true, 1861 _ => false, 1862 } 1863 } 1864 1865 /// Returns `true` if at least one task was stolen. 1866 /// 1867 /// # Examples 1868 /// 1869 /// ``` 1870 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1871 /// 1872 /// assert!(!Empty::<i32>.is_success()); 1873 /// assert!(!Retry::<i32>.is_success()); 1874 /// 1875 /// assert!(Success(7).is_success()); 1876 /// ``` is_success(&self) -> bool1877 pub fn is_success(&self) -> bool { 1878 match self { 1879 Steal::Success(_) => true, 1880 _ => false, 1881 } 1882 } 1883 1884 /// Returns `true` if the steal operation needs to be retried. 1885 /// 1886 /// # Examples 1887 /// 1888 /// ``` 1889 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1890 /// 1891 /// assert!(!Empty::<i32>.is_retry()); 1892 /// assert!(!Success(7).is_retry()); 1893 /// 1894 /// assert!(Retry::<i32>.is_retry()); 1895 /// ``` is_retry(&self) -> bool1896 pub fn is_retry(&self) -> bool { 1897 match self { 1898 Steal::Retry => true, 1899 _ => false, 1900 } 1901 } 1902 1903 /// Returns the result of the operation, if successful. 1904 /// 1905 /// # Examples 1906 /// 1907 /// ``` 1908 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1909 /// 1910 /// assert_eq!(Empty::<i32>.success(), None); 1911 /// assert_eq!(Retry::<i32>.success(), None); 1912 /// 1913 /// assert_eq!(Success(7).success(), Some(7)); 1914 /// ``` success(self) -> Option<T>1915 pub fn success(self) -> Option<T> { 1916 match self { 1917 Steal::Success(res) => Some(res), 1918 _ => None, 1919 } 1920 } 1921 1922 /// If no task was stolen, attempts another steal operation. 1923 /// 1924 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: 1925 /// 1926 /// * If the second steal resulted in `Success`, it is returned. 1927 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. 1928 /// * If both resulted in `None`, then `None` is returned. 1929 /// 1930 /// # Examples 1931 /// 1932 /// ``` 1933 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 1934 /// 1935 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); 1936 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); 1937 /// 1938 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); 1939 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); 1940 /// 1941 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); 1942 /// ``` or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,1943 pub fn or_else<F>(self, f: F) -> Steal<T> 1944 where 1945 F: FnOnce() -> Steal<T>, 1946 { 1947 match self { 1948 Steal::Empty => f(), 1949 Steal::Success(_) => self, 1950 Steal::Retry => { 1951 if let Steal::Success(res) = f() { 1952 Steal::Success(res) 1953 } else { 1954 Steal::Retry 1955 } 1956 } 1957 } 1958 } 1959 } 1960 1961 impl<T> fmt::Debug for Steal<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1962 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1963 match self { 1964 Steal::Empty => f.pad("Empty"), 1965 Steal::Success(_) => f.pad("Success(..)"), 1966 Steal::Retry => f.pad("Retry"), 1967 } 1968 } 1969 } 1970 1971 impl<T> FromIterator<Steal<T>> for Steal<T> { 1972 /// Consumes items until a `Success` is found and returns it. 1973 /// 1974 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. 1975 /// Otherwise, `Empty` is returned. from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,1976 fn from_iter<I>(iter: I) -> Steal<T> 1977 where 1978 I: IntoIterator<Item = Steal<T>>, 1979 { 1980 let mut retry = false; 1981 for s in iter { 1982 match &s { 1983 Steal::Empty => {} 1984 Steal::Success(_) => return s, 1985 Steal::Retry => retry = true, 1986 } 1987 } 1988 1989 if retry { 1990 Steal::Retry 1991 } else { 1992 Steal::Empty 1993 } 1994 } 1995 } 1996