1 use alloc::boxed::Box; 2 use core::cell::UnsafeCell; 3 use core::fmt; 4 use core::marker::PhantomData; 5 use core::mem::MaybeUninit; 6 use core::ptr; 7 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; 8 9 use crossbeam_utils::{Backoff, CachePadded}; 10 11 // Bits indicating the state of a slot: 12 // * If a value has been written into the slot, `WRITE` is set. 13 // * If a value has been read from the slot, `READ` is set. 14 // * If the block is being destroyed, `DESTROY` is set. 15 const WRITE: usize = 1; 16 const READ: usize = 2; 17 const DESTROY: usize = 4; 18 19 // Each block covers one "lap" of indices. 20 const LAP: usize = 32; 21 // The maximum number of values a block can hold. 22 const BLOCK_CAP: usize = LAP - 1; 23 // How many lower bits are reserved for metadata. 24 const SHIFT: usize = 1; 25 // Indicates that the block is not the last one. 26 const HAS_NEXT: usize = 1; 27 28 /// A slot in a block. 29 struct Slot<T> { 30 /// The value. 31 value: UnsafeCell<MaybeUninit<T>>, 32 33 /// The state of the slot. 34 state: AtomicUsize, 35 } 36 37 impl<T> Slot<T> { 38 /// Waits until a value is written into the slot. wait_write(&self)39 fn wait_write(&self) { 40 let backoff = Backoff::new(); 41 while self.state.load(Ordering::Acquire) & WRITE == 0 { 42 backoff.snooze(); 43 } 44 } 45 } 46 47 /// A block in a linked list. 48 /// 49 /// Each block in the list can hold up to `BLOCK_CAP` values. 50 struct Block<T> { 51 /// The next block in the linked list. 52 next: AtomicPtr<Block<T>>, 53 54 /// Slots for values. 55 slots: [Slot<T>; BLOCK_CAP], 56 } 57 58 impl<T> Block<T> { 59 /// Creates an empty block that starts at `start_index`. new() -> Block<T>60 fn new() -> Block<T> { 61 // SAFETY: This is safe because: 62 // [1] `Block::next` (AtomicPtr) may be safely zero initialized. 63 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. 64 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it 65 // holds a MaybeUninit. 66 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. 67 unsafe { MaybeUninit::zeroed().assume_init() } 68 } 69 70 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>71 fn wait_next(&self) -> *mut Block<T> { 72 let backoff = Backoff::new(); 73 loop { 74 let next = self.next.load(Ordering::Acquire); 75 if !next.is_null() { 76 return next; 77 } 78 backoff.snooze(); 79 } 80 } 81 82 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, start: usize)83 unsafe fn destroy(this: *mut Block<T>, start: usize) { 84 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 85 // begun destruction of the block. 86 for i in start..BLOCK_CAP - 1 { 87 let slot = (*this).slots.get_unchecked(i); 88 89 // Mark the `DESTROY` bit if a thread is still using the slot. 90 if slot.state.load(Ordering::Acquire) & READ == 0 91 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 92 { 93 // If a thread is still using the slot, it will continue destruction of the block. 94 return; 95 } 96 } 97 98 // No thread is using the block, now it is safe to destroy it. 99 drop(Box::from_raw(this)); 100 } 101 } 102 103 /// A position in a queue. 104 struct Position<T> { 105 /// The index in the queue. 106 index: AtomicUsize, 107 108 /// The block in the linked list. 109 block: AtomicPtr<Block<T>>, 110 } 111 112 /// An unbounded multi-producer multi-consumer queue. 113 /// 114 /// This queue is implemented as a linked list of segments, where each segment is a small buffer 115 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue 116 /// at a time. However, since segments need to be dynamically allocated as elements get pushed, 117 /// this queue is somewhat slower than [`ArrayQueue`]. 118 /// 119 /// [`ArrayQueue`]: super::ArrayQueue 120 /// 121 /// # Examples 122 /// 123 /// ``` 124 /// use crossbeam_queue::SegQueue; 125 /// 126 /// let q = SegQueue::new(); 127 /// 128 /// q.push('a'); 129 /// q.push('b'); 130 /// 131 /// assert_eq!(q.pop(), Some('a')); 132 /// assert_eq!(q.pop(), Some('b')); 133 /// assert!(q.pop().is_none()); 134 /// ``` 135 pub struct SegQueue<T> { 136 /// The head of the queue. 137 head: CachePadded<Position<T>>, 138 139 /// The tail of the queue. 140 tail: CachePadded<Position<T>>, 141 142 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. 143 _marker: PhantomData<T>, 144 } 145 146 unsafe impl<T: Send> Send for SegQueue<T> {} 147 unsafe impl<T: Send> Sync for SegQueue<T> {} 148 149 impl<T> SegQueue<T> { 150 /// Creates a new unbounded queue. 151 /// 152 /// # Examples 153 /// 154 /// ``` 155 /// use crossbeam_queue::SegQueue; 156 /// 157 /// let q = SegQueue::<i32>::new(); 158 /// ``` new() -> SegQueue<T>159 pub const fn new() -> SegQueue<T> { 160 SegQueue { 161 head: CachePadded::new(Position { 162 block: AtomicPtr::new(ptr::null_mut()), 163 index: AtomicUsize::new(0), 164 }), 165 tail: CachePadded::new(Position { 166 block: AtomicPtr::new(ptr::null_mut()), 167 index: AtomicUsize::new(0), 168 }), 169 _marker: PhantomData, 170 } 171 } 172 173 /// Pushes an element into the queue. 174 /// 175 /// # Examples 176 /// 177 /// ``` 178 /// use crossbeam_queue::SegQueue; 179 /// 180 /// let q = SegQueue::new(); 181 /// 182 /// q.push(10); 183 /// q.push(20); 184 /// ``` push(&self, value: T)185 pub fn push(&self, value: T) { 186 let backoff = Backoff::new(); 187 let mut tail = self.tail.index.load(Ordering::Acquire); 188 let mut block = self.tail.block.load(Ordering::Acquire); 189 let mut next_block = None; 190 191 loop { 192 // Calculate the offset of the index into the block. 193 let offset = (tail >> SHIFT) % LAP; 194 195 // If we reached the end of the block, wait until the next one is installed. 196 if offset == BLOCK_CAP { 197 backoff.snooze(); 198 tail = self.tail.index.load(Ordering::Acquire); 199 block = self.tail.block.load(Ordering::Acquire); 200 continue; 201 } 202 203 // If we're going to have to install the next block, allocate it in advance in order to 204 // make the wait for other threads as short as possible. 205 if offset + 1 == BLOCK_CAP && next_block.is_none() { 206 next_block = Some(Box::new(Block::<T>::new())); 207 } 208 209 // If this is the first push operation, we need to allocate the first block. 210 if block.is_null() { 211 let new = Box::into_raw(Box::new(Block::<T>::new())); 212 213 if self 214 .tail 215 .block 216 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) 217 .is_ok() 218 { 219 self.head.block.store(new, Ordering::Release); 220 block = new; 221 } else { 222 next_block = unsafe { Some(Box::from_raw(new)) }; 223 tail = self.tail.index.load(Ordering::Acquire); 224 block = self.tail.block.load(Ordering::Acquire); 225 continue; 226 } 227 } 228 229 let new_tail = tail + (1 << SHIFT); 230 231 // Try advancing the tail forward. 232 match self.tail.index.compare_exchange_weak( 233 tail, 234 new_tail, 235 Ordering::SeqCst, 236 Ordering::Acquire, 237 ) { 238 Ok(_) => unsafe { 239 // If we've reached the end of the block, install the next one. 240 if offset + 1 == BLOCK_CAP { 241 let next_block = Box::into_raw(next_block.unwrap()); 242 let next_index = new_tail.wrapping_add(1 << SHIFT); 243 244 self.tail.block.store(next_block, Ordering::Release); 245 self.tail.index.store(next_index, Ordering::Release); 246 (*block).next.store(next_block, Ordering::Release); 247 } 248 249 // Write the value into the slot. 250 let slot = (*block).slots.get_unchecked(offset); 251 slot.value.get().write(MaybeUninit::new(value)); 252 slot.state.fetch_or(WRITE, Ordering::Release); 253 254 return; 255 }, 256 Err(t) => { 257 tail = t; 258 block = self.tail.block.load(Ordering::Acquire); 259 backoff.spin(); 260 } 261 } 262 } 263 } 264 265 /// Pops an element from the queue. 266 /// 267 /// If the queue is empty, `None` is returned. 268 /// 269 /// # Examples 270 /// 271 /// ``` 272 /// use crossbeam_queue::SegQueue; 273 /// 274 /// let q = SegQueue::new(); 275 /// 276 /// q.push(10); 277 /// assert_eq!(q.pop(), Some(10)); 278 /// assert!(q.pop().is_none()); 279 /// ``` pop(&self) -> Option<T>280 pub fn pop(&self) -> Option<T> { 281 let backoff = Backoff::new(); 282 let mut head = self.head.index.load(Ordering::Acquire); 283 let mut block = self.head.block.load(Ordering::Acquire); 284 285 loop { 286 // Calculate the offset of the index into the block. 287 let offset = (head >> SHIFT) % LAP; 288 289 // If we reached the end of the block, wait until the next one is installed. 290 if offset == BLOCK_CAP { 291 backoff.snooze(); 292 head = self.head.index.load(Ordering::Acquire); 293 block = self.head.block.load(Ordering::Acquire); 294 continue; 295 } 296 297 let mut new_head = head + (1 << SHIFT); 298 299 if new_head & HAS_NEXT == 0 { 300 atomic::fence(Ordering::SeqCst); 301 let tail = self.tail.index.load(Ordering::Relaxed); 302 303 // If the tail equals the head, that means the queue is empty. 304 if head >> SHIFT == tail >> SHIFT { 305 return None; 306 } 307 308 // If head and tail are not in the same block, set `HAS_NEXT` in head. 309 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 310 new_head |= HAS_NEXT; 311 } 312 } 313 314 // The block can be null here only if the first push operation is in progress. In that 315 // case, just wait until it gets initialized. 316 if block.is_null() { 317 backoff.snooze(); 318 head = self.head.index.load(Ordering::Acquire); 319 block = self.head.block.load(Ordering::Acquire); 320 continue; 321 } 322 323 // Try moving the head index forward. 324 match self.head.index.compare_exchange_weak( 325 head, 326 new_head, 327 Ordering::SeqCst, 328 Ordering::Acquire, 329 ) { 330 Ok(_) => unsafe { 331 // If we've reached the end of the block, move to the next one. 332 if offset + 1 == BLOCK_CAP { 333 let next = (*block).wait_next(); 334 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 335 if !(*next).next.load(Ordering::Relaxed).is_null() { 336 next_index |= HAS_NEXT; 337 } 338 339 self.head.block.store(next, Ordering::Release); 340 self.head.index.store(next_index, Ordering::Release); 341 } 342 343 // Read the value. 344 let slot = (*block).slots.get_unchecked(offset); 345 slot.wait_write(); 346 let value = slot.value.get().read().assume_init(); 347 348 // Destroy the block if we've reached the end, or if another thread wanted to 349 // destroy but couldn't because we were busy reading from the slot. 350 if offset + 1 == BLOCK_CAP { 351 Block::destroy(block, 0); 352 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 353 Block::destroy(block, offset + 1); 354 } 355 356 return Some(value); 357 }, 358 Err(h) => { 359 head = h; 360 block = self.head.block.load(Ordering::Acquire); 361 backoff.spin(); 362 } 363 } 364 } 365 } 366 367 /// Returns `true` if the queue is empty. 368 /// 369 /// # Examples 370 /// 371 /// ``` 372 /// use crossbeam_queue::SegQueue; 373 /// 374 /// let q = SegQueue::new(); 375 /// 376 /// assert!(q.is_empty()); 377 /// q.push(1); 378 /// assert!(!q.is_empty()); 379 /// ``` is_empty(&self) -> bool380 pub fn is_empty(&self) -> bool { 381 let head = self.head.index.load(Ordering::SeqCst); 382 let tail = self.tail.index.load(Ordering::SeqCst); 383 head >> SHIFT == tail >> SHIFT 384 } 385 386 /// Returns the number of elements in the queue. 387 /// 388 /// # Examples 389 /// 390 /// ``` 391 /// use crossbeam_queue::SegQueue; 392 /// 393 /// let q = SegQueue::new(); 394 /// assert_eq!(q.len(), 0); 395 /// 396 /// q.push(10); 397 /// assert_eq!(q.len(), 1); 398 /// 399 /// q.push(20); 400 /// assert_eq!(q.len(), 2); 401 /// ``` len(&self) -> usize402 pub fn len(&self) -> usize { 403 loop { 404 // Load the tail index, then load the head index. 405 let mut tail = self.tail.index.load(Ordering::SeqCst); 406 let mut head = self.head.index.load(Ordering::SeqCst); 407 408 // If the tail index didn't change, we've got consistent indices to work with. 409 if self.tail.index.load(Ordering::SeqCst) == tail { 410 // Erase the lower bits. 411 tail &= !((1 << SHIFT) - 1); 412 head &= !((1 << SHIFT) - 1); 413 414 // Fix up indices if they fall onto block ends. 415 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 416 tail = tail.wrapping_add(1 << SHIFT); 417 } 418 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 419 head = head.wrapping_add(1 << SHIFT); 420 } 421 422 // Rotate indices so that head falls into the first block. 423 let lap = (head >> SHIFT) / LAP; 424 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 425 head = head.wrapping_sub((lap * LAP) << SHIFT); 426 427 // Remove the lower bits. 428 tail >>= SHIFT; 429 head >>= SHIFT; 430 431 // Return the difference minus the number of blocks between tail and head. 432 return tail - head - tail / LAP; 433 } 434 } 435 } 436 } 437 438 impl<T> Drop for SegQueue<T> { drop(&mut self)439 fn drop(&mut self) { 440 let mut head = self.head.index.load(Ordering::Relaxed); 441 let mut tail = self.tail.index.load(Ordering::Relaxed); 442 let mut block = self.head.block.load(Ordering::Relaxed); 443 444 // Erase the lower bits. 445 head &= !((1 << SHIFT) - 1); 446 tail &= !((1 << SHIFT) - 1); 447 448 unsafe { 449 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 450 while head != tail { 451 let offset = (head >> SHIFT) % LAP; 452 453 if offset < BLOCK_CAP { 454 // Drop the value in the slot. 455 let slot = (*block).slots.get_unchecked(offset); 456 let p = &mut *slot.value.get(); 457 p.as_mut_ptr().drop_in_place(); 458 } else { 459 // Deallocate the block and move to the next one. 460 let next = (*block).next.load(Ordering::Relaxed); 461 drop(Box::from_raw(block)); 462 block = next; 463 } 464 465 head = head.wrapping_add(1 << SHIFT); 466 } 467 468 // Deallocate the last remaining block. 469 if !block.is_null() { 470 drop(Box::from_raw(block)); 471 } 472 } 473 } 474 } 475 476 impl<T> fmt::Debug for SegQueue<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 478 f.pad("SegQueue { .. }") 479 } 480 } 481 482 impl<T> Default for SegQueue<T> { default() -> SegQueue<T>483 fn default() -> SegQueue<T> { 484 SegQueue::new() 485 } 486 } 487 488 impl<T> IntoIterator for SegQueue<T> { 489 type Item = T; 490 491 type IntoIter = IntoIter<T>; 492 into_iter(self) -> Self::IntoIter493 fn into_iter(self) -> Self::IntoIter { 494 IntoIter { value: self } 495 } 496 } 497 498 #[derive(Debug)] 499 pub struct IntoIter<T> { 500 value: SegQueue<T>, 501 } 502 503 impl<T> Iterator for IntoIter<T> { 504 type Item = T; 505 next(&mut self) -> Option<Self::Item>506 fn next(&mut self) -> Option<Self::Item> { 507 let value = &mut self.value; 508 let head = *value.head.index.get_mut(); 509 let tail = *value.tail.index.get_mut(); 510 if head >> SHIFT == tail >> SHIFT { 511 None 512 } else { 513 let block = *value.head.block.get_mut(); 514 let offset = (head >> SHIFT) % LAP; 515 516 // SAFETY: We have mutable access to this, so we can read without 517 // worrying about concurrency. Furthermore, we know this is 518 // initialized because it is the value pointed at by `value.head` 519 // and this is a non-empty queue. 520 let item = unsafe { 521 let slot = (*block).slots.get_unchecked(offset); 522 let p = &mut *slot.value.get(); 523 p.as_mut_ptr().read() 524 }; 525 if offset + 1 == BLOCK_CAP { 526 // Deallocate the block and move to the next one. 527 // SAFETY: The block is initialized because we've been reading 528 // from it this entire time. We can drop it b/c everything has 529 // been read out of it, so nothing is pointing to it anymore. 530 unsafe { 531 let next = *(*block).next.get_mut(); 532 drop(Box::from_raw(block)); 533 *value.head.block.get_mut() = next; 534 } 535 // The last value in a block is empty, so skip it 536 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); 537 // Double-check that we're pointing to the first item in a block. 538 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); 539 } else { 540 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); 541 } 542 Some(item) 543 } 544 } 545 } 546