1 use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout}; 2 use alloc::boxed::Box; 3 use core::cell::UnsafeCell; 4 use core::fmt; 5 use core::marker::PhantomData; 6 use core::mem::MaybeUninit; 7 use core::panic::{RefUnwindSafe, UnwindSafe}; 8 use core::ptr; 9 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; 10 11 use crossbeam_utils::{Backoff, CachePadded}; 12 13 // Bits indicating the state of a slot: 14 // * If a value has been written into the slot, `WRITE` is set. 15 // * If a value has been read from the slot, `READ` is set. 16 // * If the block is being destroyed, `DESTROY` is set. 17 const WRITE: usize = 1; 18 const READ: usize = 2; 19 const DESTROY: usize = 4; 20 21 // Each block covers one "lap" of indices. 22 const LAP: usize = 32; 23 // The maximum number of values a block can hold. 24 const BLOCK_CAP: usize = LAP - 1; 25 // How many lower bits are reserved for metadata. 26 const SHIFT: usize = 1; 27 // Indicates that the block is not the last one. 28 const HAS_NEXT: usize = 1; 29 30 /// A slot in a block. 31 struct Slot<T> { 32 /// The value. 33 value: UnsafeCell<MaybeUninit<T>>, 34 35 /// The state of the slot. 36 state: AtomicUsize, 37 } 38 39 impl<T> Slot<T> { 40 /// Waits until a value is written into the slot. wait_write(&self)41 fn wait_write(&self) { 42 let backoff = Backoff::new(); 43 while self.state.load(Ordering::Acquire) & WRITE == 0 { 44 backoff.snooze(); 45 } 46 } 47 } 48 49 /// A block in a linked list. 50 /// 51 /// Each block in the list can hold up to `BLOCK_CAP` values. 52 struct Block<T> { 53 /// The next block in the linked list. 54 next: AtomicPtr<Block<T>>, 55 56 /// Slots for values. 57 slots: [Slot<T>; BLOCK_CAP], 58 } 59 60 impl<T> Block<T> { 61 const LAYOUT: Layout = { 62 let layout = Layout::new::<Self>(); 63 assert!( 64 layout.size() != 0, 65 "Block should never be zero-sized, as it has an AtomicPtr field" 66 ); 67 layout 68 }; 69 70 /// Creates an empty block. new() -> Box<Self>71 fn new() -> Box<Self> { 72 // SAFETY: layout is not zero-sized 73 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) }; 74 // Handle allocation failure 75 if ptr.is_null() { 76 handle_alloc_error(Self::LAYOUT) 77 } 78 // SAFETY: This is safe because: 79 // [1] `Block::next` (AtomicPtr) may be safely zero initialized. 80 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. 81 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it 82 // holds a MaybeUninit. 83 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. 84 // TODO: unsafe { Box::new_zeroed().assume_init() } 85 unsafe { Box::from_raw(ptr.cast()) } 86 } 87 88 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>89 fn wait_next(&self) -> *mut Block<T> { 90 let backoff = Backoff::new(); 91 loop { 92 let next = self.next.load(Ordering::Acquire); 93 if !next.is_null() { 94 return next; 95 } 96 backoff.snooze(); 97 } 98 } 99 100 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, start: usize)101 unsafe fn destroy(this: *mut Block<T>, start: usize) { 102 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 103 // begun destruction of the block. 104 for i in start..BLOCK_CAP - 1 { 105 let slot = (*this).slots.get_unchecked(i); 106 107 // Mark the `DESTROY` bit if a thread is still using the slot. 108 if slot.state.load(Ordering::Acquire) & READ == 0 109 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 110 { 111 // If a thread is still using the slot, it will continue destruction of the block. 112 return; 113 } 114 } 115 116 // No thread is using the block, now it is safe to destroy it. 117 drop(Box::from_raw(this)); 118 } 119 } 120 121 /// A position in a queue. 122 struct Position<T> { 123 /// The index in the queue. 124 index: AtomicUsize, 125 126 /// The block in the linked list. 127 block: AtomicPtr<Block<T>>, 128 } 129 130 /// An unbounded multi-producer multi-consumer queue. 131 /// 132 /// This queue is implemented as a linked list of segments, where each segment is a small buffer 133 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue 134 /// at a time. However, since segments need to be dynamically allocated as elements get pushed, 135 /// this queue is somewhat slower than [`ArrayQueue`]. 136 /// 137 /// [`ArrayQueue`]: super::ArrayQueue 138 /// 139 /// # Examples 140 /// 141 /// ``` 142 /// use crossbeam_queue::SegQueue; 143 /// 144 /// let q = SegQueue::new(); 145 /// 146 /// q.push('a'); 147 /// q.push('b'); 148 /// 149 /// assert_eq!(q.pop(), Some('a')); 150 /// assert_eq!(q.pop(), Some('b')); 151 /// assert!(q.pop().is_none()); 152 /// ``` 153 pub struct SegQueue<T> { 154 /// The head of the queue. 155 head: CachePadded<Position<T>>, 156 157 /// The tail of the queue. 158 tail: CachePadded<Position<T>>, 159 160 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. 161 _marker: PhantomData<T>, 162 } 163 164 unsafe impl<T: Send> Send for SegQueue<T> {} 165 unsafe impl<T: Send> Sync for SegQueue<T> {} 166 167 impl<T> UnwindSafe for SegQueue<T> {} 168 impl<T> RefUnwindSafe for SegQueue<T> {} 169 170 impl<T> SegQueue<T> { 171 /// Creates a new unbounded queue. 172 /// 173 /// # Examples 174 /// 175 /// ``` 176 /// use crossbeam_queue::SegQueue; 177 /// 178 /// let q = SegQueue::<i32>::new(); 179 /// ``` new() -> SegQueue<T>180 pub const fn new() -> SegQueue<T> { 181 SegQueue { 182 head: CachePadded::new(Position { 183 block: AtomicPtr::new(ptr::null_mut()), 184 index: AtomicUsize::new(0), 185 }), 186 tail: CachePadded::new(Position { 187 block: AtomicPtr::new(ptr::null_mut()), 188 index: AtomicUsize::new(0), 189 }), 190 _marker: PhantomData, 191 } 192 } 193 194 /// Pushes back an element to the tail. 195 /// 196 /// # Examples 197 /// 198 /// ``` 199 /// use crossbeam_queue::SegQueue; 200 /// 201 /// let q = SegQueue::new(); 202 /// 203 /// q.push(10); 204 /// q.push(20); 205 /// ``` push(&self, value: T)206 pub fn push(&self, value: T) { 207 let backoff = Backoff::new(); 208 let mut tail = self.tail.index.load(Ordering::Acquire); 209 let mut block = self.tail.block.load(Ordering::Acquire); 210 let mut next_block = None; 211 212 loop { 213 // Calculate the offset of the index into the block. 214 let offset = (tail >> SHIFT) % LAP; 215 216 // If we reached the end of the block, wait until the next one is installed. 217 if offset == BLOCK_CAP { 218 backoff.snooze(); 219 tail = self.tail.index.load(Ordering::Acquire); 220 block = self.tail.block.load(Ordering::Acquire); 221 continue; 222 } 223 224 // If we're going to have to install the next block, allocate it in advance in order to 225 // make the wait for other threads as short as possible. 226 if offset + 1 == BLOCK_CAP && next_block.is_none() { 227 next_block = Some(Block::<T>::new()); 228 } 229 230 // If this is the first push operation, we need to allocate the first block. 231 if block.is_null() { 232 let new = Box::into_raw(Block::<T>::new()); 233 234 if self 235 .tail 236 .block 237 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) 238 .is_ok() 239 { 240 self.head.block.store(new, Ordering::Release); 241 block = new; 242 } else { 243 next_block = unsafe { Some(Box::from_raw(new)) }; 244 tail = self.tail.index.load(Ordering::Acquire); 245 block = self.tail.block.load(Ordering::Acquire); 246 continue; 247 } 248 } 249 250 let new_tail = tail + (1 << SHIFT); 251 252 // Try advancing the tail forward. 253 match self.tail.index.compare_exchange_weak( 254 tail, 255 new_tail, 256 Ordering::SeqCst, 257 Ordering::Acquire, 258 ) { 259 Ok(_) => unsafe { 260 // If we've reached the end of the block, install the next one. 261 if offset + 1 == BLOCK_CAP { 262 let next_block = Box::into_raw(next_block.unwrap()); 263 let next_index = new_tail.wrapping_add(1 << SHIFT); 264 265 self.tail.block.store(next_block, Ordering::Release); 266 self.tail.index.store(next_index, Ordering::Release); 267 (*block).next.store(next_block, Ordering::Release); 268 } 269 270 // Write the value into the slot. 271 let slot = (*block).slots.get_unchecked(offset); 272 slot.value.get().write(MaybeUninit::new(value)); 273 slot.state.fetch_or(WRITE, Ordering::Release); 274 275 return; 276 }, 277 Err(t) => { 278 tail = t; 279 block = self.tail.block.load(Ordering::Acquire); 280 backoff.spin(); 281 } 282 } 283 } 284 } 285 286 /// Pops the head element from the queue. 287 /// 288 /// If the queue is empty, `None` is returned. 289 /// 290 /// # Examples 291 /// 292 /// ``` 293 /// use crossbeam_queue::SegQueue; 294 /// 295 /// let q = SegQueue::new(); 296 /// 297 /// q.push(10); 298 /// q.push(20); 299 /// assert_eq!(q.pop(), Some(10)); 300 /// assert_eq!(q.pop(), Some(20)); 301 /// assert!(q.pop().is_none()); 302 /// ``` pop(&self) -> Option<T>303 pub fn pop(&self) -> Option<T> { 304 let backoff = Backoff::new(); 305 let mut head = self.head.index.load(Ordering::Acquire); 306 let mut block = self.head.block.load(Ordering::Acquire); 307 308 loop { 309 // Calculate the offset of the index into the block. 310 let offset = (head >> SHIFT) % LAP; 311 312 // If we reached the end of the block, wait until the next one is installed. 313 if offset == BLOCK_CAP { 314 backoff.snooze(); 315 head = self.head.index.load(Ordering::Acquire); 316 block = self.head.block.load(Ordering::Acquire); 317 continue; 318 } 319 320 let mut new_head = head + (1 << SHIFT); 321 322 if new_head & HAS_NEXT == 0 { 323 atomic::fence(Ordering::SeqCst); 324 let tail = self.tail.index.load(Ordering::Relaxed); 325 326 // If the tail equals the head, that means the queue is empty. 327 if head >> SHIFT == tail >> SHIFT { 328 return None; 329 } 330 331 // If head and tail are not in the same block, set `HAS_NEXT` in head. 332 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 333 new_head |= HAS_NEXT; 334 } 335 } 336 337 // The block can be null here only if the first push operation is in progress. In that 338 // case, just wait until it gets initialized. 339 if block.is_null() { 340 backoff.snooze(); 341 head = self.head.index.load(Ordering::Acquire); 342 block = self.head.block.load(Ordering::Acquire); 343 continue; 344 } 345 346 // Try moving the head index forward. 347 match self.head.index.compare_exchange_weak( 348 head, 349 new_head, 350 Ordering::SeqCst, 351 Ordering::Acquire, 352 ) { 353 Ok(_) => unsafe { 354 // If we've reached the end of the block, move to the next one. 355 if offset + 1 == BLOCK_CAP { 356 let next = (*block).wait_next(); 357 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 358 if !(*next).next.load(Ordering::Relaxed).is_null() { 359 next_index |= HAS_NEXT; 360 } 361 362 self.head.block.store(next, Ordering::Release); 363 self.head.index.store(next_index, Ordering::Release); 364 } 365 366 // Read the value. 367 let slot = (*block).slots.get_unchecked(offset); 368 slot.wait_write(); 369 let value = slot.value.get().read().assume_init(); 370 371 // Destroy the block if we've reached the end, or if another thread wanted to 372 // destroy but couldn't because we were busy reading from the slot. 373 if offset + 1 == BLOCK_CAP { 374 Block::destroy(block, 0); 375 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 376 Block::destroy(block, offset + 1); 377 } 378 379 return Some(value); 380 }, 381 Err(h) => { 382 head = h; 383 block = self.head.block.load(Ordering::Acquire); 384 backoff.spin(); 385 } 386 } 387 } 388 } 389 390 /// Returns `true` if the queue is empty. 391 /// 392 /// # Examples 393 /// 394 /// ``` 395 /// use crossbeam_queue::SegQueue; 396 /// 397 /// let q = SegQueue::new(); 398 /// 399 /// assert!(q.is_empty()); 400 /// q.push(1); 401 /// assert!(!q.is_empty()); 402 /// ``` is_empty(&self) -> bool403 pub fn is_empty(&self) -> bool { 404 let head = self.head.index.load(Ordering::SeqCst); 405 let tail = self.tail.index.load(Ordering::SeqCst); 406 head >> SHIFT == tail >> SHIFT 407 } 408 409 /// Returns the number of elements in the queue. 410 /// 411 /// # Examples 412 /// 413 /// ``` 414 /// use crossbeam_queue::SegQueue; 415 /// 416 /// let q = SegQueue::new(); 417 /// assert_eq!(q.len(), 0); 418 /// 419 /// q.push(10); 420 /// assert_eq!(q.len(), 1); 421 /// 422 /// q.push(20); 423 /// assert_eq!(q.len(), 2); 424 /// ``` len(&self) -> usize425 pub fn len(&self) -> usize { 426 loop { 427 // Load the tail index, then load the head index. 428 let mut tail = self.tail.index.load(Ordering::SeqCst); 429 let mut head = self.head.index.load(Ordering::SeqCst); 430 431 // If the tail index didn't change, we've got consistent indices to work with. 432 if self.tail.index.load(Ordering::SeqCst) == tail { 433 // Erase the lower bits. 434 tail &= !((1 << SHIFT) - 1); 435 head &= !((1 << SHIFT) - 1); 436 437 // Fix up indices if they fall onto block ends. 438 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 439 tail = tail.wrapping_add(1 << SHIFT); 440 } 441 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 442 head = head.wrapping_add(1 << SHIFT); 443 } 444 445 // Rotate indices so that head falls into the first block. 446 let lap = (head >> SHIFT) / LAP; 447 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 448 head = head.wrapping_sub((lap * LAP) << SHIFT); 449 450 // Remove the lower bits. 451 tail >>= SHIFT; 452 head >>= SHIFT; 453 454 // Return the difference minus the number of blocks between tail and head. 455 return tail - head - tail / LAP; 456 } 457 } 458 } 459 } 460 461 impl<T> Drop for SegQueue<T> { drop(&mut self)462 fn drop(&mut self) { 463 let mut head = *self.head.index.get_mut(); 464 let mut tail = *self.tail.index.get_mut(); 465 let mut block = *self.head.block.get_mut(); 466 467 // Erase the lower bits. 468 head &= !((1 << SHIFT) - 1); 469 tail &= !((1 << SHIFT) - 1); 470 471 unsafe { 472 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 473 while head != tail { 474 let offset = (head >> SHIFT) % LAP; 475 476 if offset < BLOCK_CAP { 477 // Drop the value in the slot. 478 let slot = (*block).slots.get_unchecked(offset); 479 (*slot.value.get()).assume_init_drop(); 480 } else { 481 // Deallocate the block and move to the next one. 482 let next = *(*block).next.get_mut(); 483 drop(Box::from_raw(block)); 484 block = next; 485 } 486 487 head = head.wrapping_add(1 << SHIFT); 488 } 489 490 // Deallocate the last remaining block. 491 if !block.is_null() { 492 drop(Box::from_raw(block)); 493 } 494 } 495 } 496 } 497 498 impl<T> fmt::Debug for SegQueue<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result499 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 500 f.pad("SegQueue { .. }") 501 } 502 } 503 504 impl<T> Default for SegQueue<T> { default() -> SegQueue<T>505 fn default() -> SegQueue<T> { 506 SegQueue::new() 507 } 508 } 509 510 impl<T> IntoIterator for SegQueue<T> { 511 type Item = T; 512 513 type IntoIter = IntoIter<T>; 514 into_iter(self) -> Self::IntoIter515 fn into_iter(self) -> Self::IntoIter { 516 IntoIter { value: self } 517 } 518 } 519 520 #[derive(Debug)] 521 pub struct IntoIter<T> { 522 value: SegQueue<T>, 523 } 524 525 impl<T> Iterator for IntoIter<T> { 526 type Item = T; 527 next(&mut self) -> Option<Self::Item>528 fn next(&mut self) -> Option<Self::Item> { 529 let value = &mut self.value; 530 let head = *value.head.index.get_mut(); 531 let tail = *value.tail.index.get_mut(); 532 if head >> SHIFT == tail >> SHIFT { 533 None 534 } else { 535 let block = *value.head.block.get_mut(); 536 let offset = (head >> SHIFT) % LAP; 537 538 // SAFETY: We have mutable access to this, so we can read without 539 // worrying about concurrency. Furthermore, we know this is 540 // initialized because it is the value pointed at by `value.head` 541 // and this is a non-empty queue. 542 let item = unsafe { 543 let slot = (*block).slots.get_unchecked(offset); 544 slot.value.get().read().assume_init() 545 }; 546 if offset + 1 == BLOCK_CAP { 547 // Deallocate the block and move to the next one. 548 // SAFETY: The block is initialized because we've been reading 549 // from it this entire time. We can drop it b/c everything has 550 // been read out of it, so nothing is pointing to it anymore. 551 unsafe { 552 let next = *(*block).next.get_mut(); 553 drop(Box::from_raw(block)); 554 *value.head.block.get_mut() = next; 555 } 556 // The last value in a block is empty, so skip it 557 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); 558 // Double-check that we're pointing to the first item in a block. 559 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); 560 } else { 561 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); 562 } 563 Some(item) 564 } 565 } 566 } 567