1 //! Michael-Scott lock-free queue. 2 //! 3 //! Usable with any number of producers and consumers. 4 //! 5 //! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 6 //! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106> 7 //! 8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a 9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7> 10 11 use core::mem::MaybeUninit; 12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; 13 14 use crossbeam_utils::CachePadded; 15 16 use crate::{unprotected, Atomic, Guard, Owned, Shared}; 17 18 // The representation here is a singly-linked list, with a sentinel node at the front. In general 19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or 20 // all `Blocked` (requests for data from blocked threads). 21 #[derive(Debug)] 22 pub(crate) struct Queue<T> { 23 head: CachePadded<Atomic<Node<T>>>, 24 tail: CachePadded<Atomic<Node<T>>>, 25 } 26 27 struct Node<T> { 28 /// The slot in which a value of type `T` can be stored. 29 /// 30 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. 31 /// For example, the sentinel node in a queue never contains a value: its slot is always empty. 32 /// Other nodes start their life with a push operation and contain a value until it gets popped 33 /// out. After that such empty nodes get added to the collector for destruction. 34 data: MaybeUninit<T>, 35 36 next: Atomic<Node<T>>, 37 } 38 39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`. 40 unsafe impl<T: Send> Sync for Queue<T> {} 41 unsafe impl<T: Send> Send for Queue<T> {} 42 43 impl<T> Queue<T> { 44 /// Create a new, empty queue. new() -> Queue<T>45 pub(crate) fn new() -> Queue<T> { 46 let q = Queue { 47 head: CachePadded::new(Atomic::null()), 48 tail: CachePadded::new(Atomic::null()), 49 }; 50 let sentinel = Owned::new(Node { 51 data: MaybeUninit::uninit(), 52 next: Atomic::null(), 53 }); 54 unsafe { 55 let guard = unprotected(); 56 let sentinel = sentinel.into_shared(guard); 57 q.head.store(sentinel, Relaxed); 58 q.tail.store(sentinel, Relaxed); 59 q 60 } 61 } 62 63 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on 64 /// success. The queue's `tail` pointer may be updated. 65 #[inline(always)] push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66 fn push_internal( 67 &self, 68 onto: Shared<'_, Node<T>>, 69 new: Shared<'_, Node<T>>, 70 guard: &Guard, 71 ) -> bool { 72 // is `onto` the actual tail? 73 let o = unsafe { onto.deref() }; 74 let next = o.next.load(Acquire, guard); 75 if unsafe { next.as_ref().is_some() } { 76 // if not, try to "help" by moving the tail pointer forward 77 let _ = self 78 .tail 79 .compare_exchange(onto, next, Release, Relaxed, guard); 80 false 81 } else { 82 // looks like the actual tail; attempt to link in `n` 83 let result = o 84 .next 85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard) 86 .is_ok(); 87 if result { 88 // try to move the tail pointer forward 89 let _ = self 90 .tail 91 .compare_exchange(onto, new, Release, Relaxed, guard); 92 } 93 result 94 } 95 } 96 97 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. push(&self, t: T, guard: &Guard)98 pub(crate) fn push(&self, t: T, guard: &Guard) { 99 let new = Owned::new(Node { 100 data: MaybeUninit::new(t), 101 next: Atomic::null(), 102 }); 103 let new = Owned::into_shared(new, guard); 104 105 loop { 106 // We push onto the tail, so we'll start optimistically by looking there first. 107 let tail = self.tail.load(Acquire, guard); 108 109 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. 110 if self.push_internal(tail, new, guard) { 111 break; 112 } 113 } 114 } 115 116 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. 117 #[inline(always)] pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { 119 let head = self.head.load(Acquire, guard); 120 let h = unsafe { head.deref() }; 121 let next = h.next.load(Acquire, guard); 122 match unsafe { next.as_ref() } { 123 Some(n) => unsafe { 124 self.head 125 .compare_exchange(head, next, Release, Relaxed, guard) 126 .map(|_| { 127 let tail = self.tail.load(Relaxed, guard); 128 // Advance the tail so that we don't retire a pointer to a reachable node. 129 if head == tail { 130 let _ = self 131 .tail 132 .compare_exchange(tail, next, Release, Relaxed, guard); 133 } 134 guard.defer_destroy(head); 135 // TODO: Replace with MaybeUninit::read when api is stable 136 Some(n.data.as_ptr().read()) 137 }) 138 .map_err(|_| ()) 139 }, 140 None => Ok(None), 141 } 142 } 143 144 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue 145 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. 146 #[inline(always)] pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,147 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> 148 where 149 T: Sync, 150 F: Fn(&T) -> bool, 151 { 152 let head = self.head.load(Acquire, guard); 153 let h = unsafe { head.deref() }; 154 let next = h.next.load(Acquire, guard); 155 match unsafe { next.as_ref() } { 156 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { 157 self.head 158 .compare_exchange(head, next, Release, Relaxed, guard) 159 .map(|_| { 160 let tail = self.tail.load(Relaxed, guard); 161 // Advance the tail so that we don't retire a pointer to a reachable node. 162 if head == tail { 163 let _ = self 164 .tail 165 .compare_exchange(tail, next, Release, Relaxed, guard); 166 } 167 guard.defer_destroy(head); 168 Some(n.data.as_ptr().read()) 169 }) 170 .map_err(|_| ()) 171 }, 172 None | Some(_) => Ok(None), 173 } 174 } 175 176 /// Attempts to dequeue from the front. 177 /// 178 /// Returns `None` if the queue is observed to be empty. try_pop(&self, guard: &Guard) -> Option<T>179 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { 180 loop { 181 if let Ok(head) = self.pop_internal(guard) { 182 return head; 183 } 184 } 185 } 186 187 /// Attempts to dequeue from the front, if the item satisfies the given condition. 188 /// 189 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given 190 /// condition. try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,191 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> 192 where 193 T: Sync, 194 F: Fn(&T) -> bool, 195 { 196 loop { 197 if let Ok(head) = self.pop_if_internal(&condition, guard) { 198 return head; 199 } 200 } 201 } 202 } 203 204 impl<T> Drop for Queue<T> { drop(&mut self)205 fn drop(&mut self) { 206 unsafe { 207 let guard = unprotected(); 208 209 while self.try_pop(guard).is_some() {} 210 211 // Destroy the remaining sentinel node. 212 let sentinel = self.head.load(Relaxed, guard); 213 drop(sentinel.into_owned()); 214 } 215 } 216 } 217 218 #[cfg(all(test, not(crossbeam_loom)))] 219 mod test { 220 use super::*; 221 use crate::pin; 222 use crossbeam_utils::thread; 223 224 struct Queue<T> { 225 queue: super::Queue<T>, 226 } 227 228 impl<T> Queue<T> { new() -> Queue<T>229 pub(crate) fn new() -> Queue<T> { 230 Queue { 231 queue: super::Queue::new(), 232 } 233 } 234 push(&self, t: T)235 pub(crate) fn push(&self, t: T) { 236 let guard = &pin(); 237 self.queue.push(t, guard); 238 } 239 is_empty(&self) -> bool240 pub(crate) fn is_empty(&self) -> bool { 241 let guard = &pin(); 242 let head = self.queue.head.load(Acquire, guard); 243 let h = unsafe { head.deref() }; 244 h.next.load(Acquire, guard).is_null() 245 } 246 try_pop(&self) -> Option<T>247 pub(crate) fn try_pop(&self) -> Option<T> { 248 let guard = &pin(); 249 self.queue.try_pop(guard) 250 } 251 pop(&self) -> T252 pub(crate) fn pop(&self) -> T { 253 loop { 254 match self.try_pop() { 255 None => continue, 256 Some(t) => return t, 257 } 258 } 259 } 260 } 261 262 #[cfg(miri)] 263 const CONC_COUNT: i64 = 1000; 264 #[cfg(not(miri))] 265 const CONC_COUNT: i64 = 1000000; 266 267 #[test] push_try_pop_1()268 fn push_try_pop_1() { 269 let q: Queue<i64> = Queue::new(); 270 assert!(q.is_empty()); 271 q.push(37); 272 assert!(!q.is_empty()); 273 assert_eq!(q.try_pop(), Some(37)); 274 assert!(q.is_empty()); 275 } 276 277 #[test] push_try_pop_2()278 fn push_try_pop_2() { 279 let q: Queue<i64> = Queue::new(); 280 assert!(q.is_empty()); 281 q.push(37); 282 q.push(48); 283 assert_eq!(q.try_pop(), Some(37)); 284 assert!(!q.is_empty()); 285 assert_eq!(q.try_pop(), Some(48)); 286 assert!(q.is_empty()); 287 } 288 289 #[test] push_try_pop_many_seq()290 fn push_try_pop_many_seq() { 291 let q: Queue<i64> = Queue::new(); 292 assert!(q.is_empty()); 293 for i in 0..200 { 294 q.push(i) 295 } 296 assert!(!q.is_empty()); 297 for i in 0..200 { 298 assert_eq!(q.try_pop(), Some(i)); 299 } 300 assert!(q.is_empty()); 301 } 302 303 #[test] push_pop_1()304 fn push_pop_1() { 305 let q: Queue<i64> = Queue::new(); 306 assert!(q.is_empty()); 307 q.push(37); 308 assert!(!q.is_empty()); 309 assert_eq!(q.pop(), 37); 310 assert!(q.is_empty()); 311 } 312 313 #[test] push_pop_2()314 fn push_pop_2() { 315 let q: Queue<i64> = Queue::new(); 316 q.push(37); 317 q.push(48); 318 assert_eq!(q.pop(), 37); 319 assert_eq!(q.pop(), 48); 320 } 321 322 #[test] push_pop_many_seq()323 fn push_pop_many_seq() { 324 let q: Queue<i64> = Queue::new(); 325 assert!(q.is_empty()); 326 for i in 0..200 { 327 q.push(i) 328 } 329 assert!(!q.is_empty()); 330 for i in 0..200 { 331 assert_eq!(q.pop(), i); 332 } 333 assert!(q.is_empty()); 334 } 335 336 #[test] push_try_pop_many_spsc()337 fn push_try_pop_many_spsc() { 338 let q: Queue<i64> = Queue::new(); 339 assert!(q.is_empty()); 340 341 thread::scope(|scope| { 342 scope.spawn(|_| { 343 let mut next = 0; 344 345 while next < CONC_COUNT { 346 if let Some(elem) = q.try_pop() { 347 assert_eq!(elem, next); 348 next += 1; 349 } 350 } 351 }); 352 353 for i in 0..CONC_COUNT { 354 q.push(i) 355 } 356 }) 357 .unwrap(); 358 } 359 360 #[test] push_try_pop_many_spmc()361 fn push_try_pop_many_spmc() { 362 fn recv(_t: i32, q: &Queue<i64>) { 363 let mut cur = -1; 364 for _i in 0..CONC_COUNT { 365 if let Some(elem) = q.try_pop() { 366 assert!(elem > cur); 367 cur = elem; 368 369 if cur == CONC_COUNT - 1 { 370 break; 371 } 372 } 373 } 374 } 375 376 let q: Queue<i64> = Queue::new(); 377 assert!(q.is_empty()); 378 thread::scope(|scope| { 379 for i in 0..3 { 380 let q = &q; 381 scope.spawn(move |_| recv(i, q)); 382 } 383 384 scope.spawn(|_| { 385 for i in 0..CONC_COUNT { 386 q.push(i); 387 } 388 }); 389 }) 390 .unwrap(); 391 } 392 393 #[test] push_try_pop_many_mpmc()394 fn push_try_pop_many_mpmc() { 395 enum LR { 396 Left(i64), 397 Right(i64), 398 } 399 400 let q: Queue<LR> = Queue::new(); 401 assert!(q.is_empty()); 402 403 thread::scope(|scope| { 404 for _t in 0..2 { 405 scope.spawn(|_| { 406 for i in CONC_COUNT - 1..CONC_COUNT { 407 q.push(LR::Left(i)) 408 } 409 }); 410 scope.spawn(|_| { 411 for i in CONC_COUNT - 1..CONC_COUNT { 412 q.push(LR::Right(i)) 413 } 414 }); 415 scope.spawn(|_| { 416 let mut vl = vec![]; 417 let mut vr = vec![]; 418 for _i in 0..CONC_COUNT { 419 match q.try_pop() { 420 Some(LR::Left(x)) => vl.push(x), 421 Some(LR::Right(x)) => vr.push(x), 422 _ => {} 423 } 424 } 425 426 let mut vl2 = vl.clone(); 427 let mut vr2 = vr.clone(); 428 vl2.sort_unstable(); 429 vr2.sort_unstable(); 430 431 assert_eq!(vl, vl2); 432 assert_eq!(vr, vr2); 433 }); 434 } 435 }) 436 .unwrap(); 437 } 438 439 #[test] push_pop_many_spsc()440 fn push_pop_many_spsc() { 441 let q: Queue<i64> = Queue::new(); 442 443 thread::scope(|scope| { 444 scope.spawn(|_| { 445 let mut next = 0; 446 while next < CONC_COUNT { 447 assert_eq!(q.pop(), next); 448 next += 1; 449 } 450 }); 451 452 for i in 0..CONC_COUNT { 453 q.push(i) 454 } 455 }) 456 .unwrap(); 457 assert!(q.is_empty()); 458 } 459 460 #[test] is_empty_dont_pop()461 fn is_empty_dont_pop() { 462 let q: Queue<i64> = Queue::new(); 463 q.push(20); 464 q.push(20); 465 assert!(!q.is_empty()); 466 assert!(!q.is_empty()); 467 assert!(q.try_pop().is_some()); 468 } 469 } 470