1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 2 //! 3 //! Source: 4 //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> 5 6 use alloc::boxed::Box; 7 use core::cell::UnsafeCell; 8 use core::fmt; 9 use core::mem::MaybeUninit; 10 use core::sync::atomic::{self, AtomicUsize, Ordering}; 11 12 use crossbeam_utils::{Backoff, CachePadded}; 13 14 /// A slot in a queue. 15 struct Slot<T> { 16 /// The current stamp. 17 /// 18 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, 19 /// this node will be next read from. 20 stamp: AtomicUsize, 21 22 /// The value in this slot. 23 value: UnsafeCell<MaybeUninit<T>>, 24 } 25 26 /// A bounded multi-producer multi-consumer queue. 27 /// 28 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed 29 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an 30 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit 31 /// faster than [`SegQueue`]. 32 /// 33 /// [`SegQueue`]: super::SegQueue 34 /// 35 /// # Examples 36 /// 37 /// ``` 38 /// use crossbeam_queue::ArrayQueue; 39 /// 40 /// let q = ArrayQueue::new(2); 41 /// 42 /// assert_eq!(q.push('a'), Ok(())); 43 /// assert_eq!(q.push('b'), Ok(())); 44 /// assert_eq!(q.push('c'), Err('c')); 45 /// assert_eq!(q.pop(), Some('a')); 46 /// ``` 47 pub struct ArrayQueue<T> { 48 /// The head of the queue. 49 /// 50 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 51 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 52 /// 53 /// Elements are popped from the head of the queue. 54 head: CachePadded<AtomicUsize>, 55 56 /// The tail of the queue. 57 /// 58 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 59 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 60 /// 61 /// Elements are pushed into the tail of the queue. 62 tail: CachePadded<AtomicUsize>, 63 64 /// The buffer holding slots. 65 buffer: Box<[Slot<T>]>, 66 67 /// The queue capacity. 68 cap: usize, 69 70 /// A stamp with the value of `{ lap: 1, index: 0 }`. 71 one_lap: usize, 72 } 73 74 unsafe impl<T: Send> Sync for ArrayQueue<T> {} 75 unsafe impl<T: Send> Send for ArrayQueue<T> {} 76 77 impl<T> ArrayQueue<T> { 78 /// Creates a new bounded queue with the given capacity. 79 /// 80 /// # Panics 81 /// 82 /// Panics if the capacity is zero. 83 /// 84 /// # Examples 85 /// 86 /// ``` 87 /// use crossbeam_queue::ArrayQueue; 88 /// 89 /// let q = ArrayQueue::<i32>::new(100); 90 /// ``` new(cap: usize) -> ArrayQueue<T>91 pub fn new(cap: usize) -> ArrayQueue<T> { 92 assert!(cap > 0, "capacity must be non-zero"); 93 94 // Head is initialized to `{ lap: 0, index: 0 }`. 95 // Tail is initialized to `{ lap: 0, index: 0 }`. 96 let head = 0; 97 let tail = 0; 98 99 // Allocate a buffer of `cap` slots initialized 100 // with stamps. 101 let buffer: Box<[Slot<T>]> = (0..cap) 102 .map(|i| { 103 // Set the stamp to `{ lap: 0, index: i }`. 104 Slot { 105 stamp: AtomicUsize::new(i), 106 value: UnsafeCell::new(MaybeUninit::uninit()), 107 } 108 }) 109 .collect(); 110 111 // One lap is the smallest power of two greater than `cap`. 112 let one_lap = (cap + 1).next_power_of_two(); 113 114 ArrayQueue { 115 buffer, 116 cap, 117 one_lap, 118 head: CachePadded::new(AtomicUsize::new(head)), 119 tail: CachePadded::new(AtomicUsize::new(tail)), 120 } 121 } 122 123 /// Attempts to push an element into the queue. 124 /// 125 /// If the queue is full, the element is returned back as an error. 126 /// 127 /// # Examples 128 /// 129 /// ``` 130 /// use crossbeam_queue::ArrayQueue; 131 /// 132 /// let q = ArrayQueue::new(1); 133 /// 134 /// assert_eq!(q.push(10), Ok(())); 135 /// assert_eq!(q.push(20), Err(20)); 136 /// ``` push(&self, value: T) -> Result<(), T>137 pub fn push(&self, value: T) -> Result<(), T> { 138 let backoff = Backoff::new(); 139 let mut tail = self.tail.load(Ordering::Relaxed); 140 141 loop { 142 // Deconstruct the tail. 143 let index = tail & (self.one_lap - 1); 144 let lap = tail & !(self.one_lap - 1); 145 146 // Inspect the corresponding slot. 147 debug_assert!(index < self.buffer.len()); 148 let slot = unsafe { self.buffer.get_unchecked(index) }; 149 let stamp = slot.stamp.load(Ordering::Acquire); 150 151 // If the tail and the stamp match, we may attempt to push. 152 if tail == stamp { 153 let new_tail = if index + 1 < self.cap { 154 // Same lap, incremented index. 155 // Set to `{ lap: lap, index: index + 1 }`. 156 tail + 1 157 } else { 158 // One lap forward, index wraps around to zero. 159 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 160 lap.wrapping_add(self.one_lap) 161 }; 162 163 // Try moving the tail. 164 match self.tail.compare_exchange_weak( 165 tail, 166 new_tail, 167 Ordering::SeqCst, 168 Ordering::Relaxed, 169 ) { 170 Ok(_) => { 171 // Write the value into the slot and update the stamp. 172 unsafe { 173 slot.value.get().write(MaybeUninit::new(value)); 174 } 175 slot.stamp.store(tail + 1, Ordering::Release); 176 return Ok(()); 177 } 178 Err(t) => { 179 tail = t; 180 backoff.spin(); 181 } 182 } 183 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 184 atomic::fence(Ordering::SeqCst); 185 let head = self.head.load(Ordering::Relaxed); 186 187 // If the head lags one lap behind the tail as well... 188 if head.wrapping_add(self.one_lap) == tail { 189 // ...then the queue is full. 190 return Err(value); 191 } 192 193 backoff.spin(); 194 tail = self.tail.load(Ordering::Relaxed); 195 } else { 196 // Snooze because we need to wait for the stamp to get updated. 197 backoff.snooze(); 198 tail = self.tail.load(Ordering::Relaxed); 199 } 200 } 201 } 202 203 /// Attempts to pop an element from the queue. 204 /// 205 /// If the queue is empty, `None` is returned. 206 /// 207 /// # Examples 208 /// 209 /// ``` 210 /// use crossbeam_queue::ArrayQueue; 211 /// 212 /// let q = ArrayQueue::new(1); 213 /// assert_eq!(q.push(10), Ok(())); 214 /// 215 /// assert_eq!(q.pop(), Some(10)); 216 /// assert!(q.pop().is_none()); 217 /// ``` pop(&self) -> Option<T>218 pub fn pop(&self) -> Option<T> { 219 let backoff = Backoff::new(); 220 let mut head = self.head.load(Ordering::Relaxed); 221 222 loop { 223 // Deconstruct the head. 224 let index = head & (self.one_lap - 1); 225 let lap = head & !(self.one_lap - 1); 226 227 // Inspect the corresponding slot. 228 debug_assert!(index < self.buffer.len()); 229 let slot = unsafe { self.buffer.get_unchecked(index) }; 230 let stamp = slot.stamp.load(Ordering::Acquire); 231 232 // If the the stamp is ahead of the head by 1, we may attempt to pop. 233 if head + 1 == stamp { 234 let new = if index + 1 < self.cap { 235 // Same lap, incremented index. 236 // Set to `{ lap: lap, index: index + 1 }`. 237 head + 1 238 } else { 239 // One lap forward, index wraps around to zero. 240 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 241 lap.wrapping_add(self.one_lap) 242 }; 243 244 // Try moving the head. 245 match self.head.compare_exchange_weak( 246 head, 247 new, 248 Ordering::SeqCst, 249 Ordering::Relaxed, 250 ) { 251 Ok(_) => { 252 // Read the value from the slot and update the stamp. 253 let msg = unsafe { slot.value.get().read().assume_init() }; 254 slot.stamp 255 .store(head.wrapping_add(self.one_lap), Ordering::Release); 256 return Some(msg); 257 } 258 Err(h) => { 259 head = h; 260 backoff.spin(); 261 } 262 } 263 } else if stamp == head { 264 atomic::fence(Ordering::SeqCst); 265 let tail = self.tail.load(Ordering::Relaxed); 266 267 // If the tail equals the head, that means the channel is empty. 268 if tail == head { 269 return None; 270 } 271 272 backoff.spin(); 273 head = self.head.load(Ordering::Relaxed); 274 } else { 275 // Snooze because we need to wait for the stamp to get updated. 276 backoff.snooze(); 277 head = self.head.load(Ordering::Relaxed); 278 } 279 } 280 } 281 282 /// Returns the capacity of the queue. 283 /// 284 /// # Examples 285 /// 286 /// ``` 287 /// use crossbeam_queue::ArrayQueue; 288 /// 289 /// let q = ArrayQueue::<i32>::new(100); 290 /// 291 /// assert_eq!(q.capacity(), 100); 292 /// ``` capacity(&self) -> usize293 pub fn capacity(&self) -> usize { 294 self.cap 295 } 296 297 /// Returns `true` if the queue is empty. 298 /// 299 /// # Examples 300 /// 301 /// ``` 302 /// use crossbeam_queue::ArrayQueue; 303 /// 304 /// let q = ArrayQueue::new(100); 305 /// 306 /// assert!(q.is_empty()); 307 /// q.push(1).unwrap(); 308 /// assert!(!q.is_empty()); 309 /// ``` is_empty(&self) -> bool310 pub fn is_empty(&self) -> bool { 311 let head = self.head.load(Ordering::SeqCst); 312 let tail = self.tail.load(Ordering::SeqCst); 313 314 // Is the tail lagging one lap behind head? 315 // Is the tail equal to the head? 316 // 317 // Note: If the head changes just before we load the tail, that means there was a moment 318 // when the channel was not empty, so it is safe to just return `false`. 319 tail == head 320 } 321 322 /// Returns `true` if the queue is full. 323 /// 324 /// # Examples 325 /// 326 /// ``` 327 /// use crossbeam_queue::ArrayQueue; 328 /// 329 /// let q = ArrayQueue::new(1); 330 /// 331 /// assert!(!q.is_full()); 332 /// q.push(1).unwrap(); 333 /// assert!(q.is_full()); 334 /// ``` is_full(&self) -> bool335 pub fn is_full(&self) -> bool { 336 let tail = self.tail.load(Ordering::SeqCst); 337 let head = self.head.load(Ordering::SeqCst); 338 339 // Is the head lagging one lap behind tail? 340 // 341 // Note: If the tail changes just before we load the head, that means there was a moment 342 // when the queue was not full, so it is safe to just return `false`. 343 head.wrapping_add(self.one_lap) == tail 344 } 345 346 /// Returns the number of elements in the queue. 347 /// 348 /// # Examples 349 /// 350 /// ``` 351 /// use crossbeam_queue::ArrayQueue; 352 /// 353 /// let q = ArrayQueue::new(100); 354 /// assert_eq!(q.len(), 0); 355 /// 356 /// q.push(10).unwrap(); 357 /// assert_eq!(q.len(), 1); 358 /// 359 /// q.push(20).unwrap(); 360 /// assert_eq!(q.len(), 2); 361 /// ``` len(&self) -> usize362 pub fn len(&self) -> usize { 363 loop { 364 // Load the tail, then load the head. 365 let tail = self.tail.load(Ordering::SeqCst); 366 let head = self.head.load(Ordering::SeqCst); 367 368 // If the tail didn't change, we've got consistent values to work with. 369 if self.tail.load(Ordering::SeqCst) == tail { 370 let hix = head & (self.one_lap - 1); 371 let tix = tail & (self.one_lap - 1); 372 373 return if hix < tix { 374 tix - hix 375 } else if hix > tix { 376 self.cap - hix + tix 377 } else if tail == head { 378 0 379 } else { 380 self.cap 381 }; 382 } 383 } 384 } 385 } 386 387 impl<T> Drop for ArrayQueue<T> { drop(&mut self)388 fn drop(&mut self) { 389 // Get the index of the head. 390 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); 391 392 // Loop over all slots that hold a message and drop them. 393 for i in 0..self.len() { 394 // Compute the index of the next slot holding a message. 395 let index = if hix + i < self.cap { 396 hix + i 397 } else { 398 hix + i - self.cap 399 }; 400 401 unsafe { 402 debug_assert!(index < self.buffer.len()); 403 let slot = self.buffer.get_unchecked_mut(index); 404 let value = &mut *slot.value.get(); 405 value.as_mut_ptr().drop_in_place(); 406 } 407 } 408 } 409 } 410 411 impl<T> fmt::Debug for ArrayQueue<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 413 f.pad("ArrayQueue { .. }") 414 } 415 } 416 417 impl<T> IntoIterator for ArrayQueue<T> { 418 type Item = T; 419 420 type IntoIter = IntoIter<T>; 421 into_iter(self) -> Self::IntoIter422 fn into_iter(self) -> Self::IntoIter { 423 IntoIter { value: self } 424 } 425 } 426 427 #[derive(Debug)] 428 pub struct IntoIter<T> { 429 value: ArrayQueue<T>, 430 } 431 432 impl<T> Iterator for IntoIter<T> { 433 type Item = T; 434 next(&mut self) -> Option<Self::Item>435 fn next(&mut self) -> Option<Self::Item> { 436 let value = &mut self.value; 437 let head = *value.head.get_mut(); 438 if value.head.get_mut() != value.tail.get_mut() { 439 let index = head & (value.one_lap - 1); 440 let lap = head & !(value.one_lap - 1); 441 // SAFETY: We have mutable access to this, so we can read without 442 // worrying about concurrency. Furthermore, we know this is 443 // initialized because it is the value pointed at by `value.head` 444 // and this is a non-empty queue. 445 let val = unsafe { 446 debug_assert!(index < value.buffer.len()); 447 let slot = value.buffer.get_unchecked_mut(index); 448 slot.value.get().read().assume_init() 449 }; 450 let new = if index + 1 < value.cap { 451 // Same lap, incremented index. 452 // Set to `{ lap: lap, index: index + 1 }`. 453 head + 1 454 } else { 455 // One lap forward, index wraps around to zero. 456 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 457 lap.wrapping_add(value.one_lap) 458 }; 459 *value.head.get_mut() = new; 460 Option::Some(val) 461 } else { 462 Option::None 463 } 464 } 465 } 466