1 //! Bounded channel based on a preallocated array. 2 //! 3 //! This flavor has a fixed, positive capacity. 4 //! 5 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 6 //! 7 //! Source: 8 //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> 9 //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> 10 11 use std::cell::UnsafeCell; 12 use std::marker::PhantomData; 13 use std::mem::MaybeUninit; 14 use std::ptr; 15 use std::sync::atomic::{self, AtomicUsize, Ordering}; 16 use std::time::Instant; 17 18 use crossbeam_utils::{Backoff, CachePadded}; 19 20 use crate::context::Context; 21 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 22 use crate::select::{Operation, SelectHandle, Selected, Token}; 23 use crate::waker::SyncWaker; 24 25 /// A slot in a channel. 26 struct Slot<T> { 27 /// The current stamp. 28 stamp: AtomicUsize, 29 30 /// The message in this slot. 31 msg: UnsafeCell<MaybeUninit<T>>, 32 } 33 34 /// The token type for the array flavor. 35 #[derive(Debug)] 36 pub struct ArrayToken { 37 /// Slot to read from or write to. 38 slot: *const u8, 39 40 /// Stamp to store into the slot after reading or writing. 41 stamp: usize, 42 } 43 44 impl Default for ArrayToken { 45 #[inline] default() -> Self46 fn default() -> Self { 47 ArrayToken { 48 slot: ptr::null(), 49 stamp: 0, 50 } 51 } 52 } 53 54 /// Bounded channel based on a preallocated array. 55 pub(crate) struct Channel<T> { 56 /// The head of the channel. 57 /// 58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 60 /// represent the lap. The mark bit in the head is always zero. 61 /// 62 /// Messages are popped from the head of the channel. 63 head: CachePadded<AtomicUsize>, 64 65 /// The tail of the channel. 66 /// 67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 69 /// represent the lap. The mark bit indicates that the channel is disconnected. 70 /// 71 /// Messages are pushed into the tail of the channel. 72 tail: CachePadded<AtomicUsize>, 73 74 /// The buffer holding slots. 75 buffer: *mut Slot<T>, 76 77 /// The channel capacity. 78 cap: usize, 79 80 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. 81 one_lap: usize, 82 83 /// If this bit is set in the tail, that means the channel is disconnected. 84 mark_bit: usize, 85 86 /// Senders waiting while the channel is full. 87 senders: SyncWaker, 88 89 /// Receivers waiting while the channel is empty and not disconnected. 90 receivers: SyncWaker, 91 92 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 93 _marker: PhantomData<T>, 94 } 95 96 impl<T> Channel<T> { 97 /// Creates a bounded channel of capacity `cap`. with_capacity(cap: usize) -> Self98 pub(crate) fn with_capacity(cap: usize) -> Self { 99 assert!(cap > 0, "capacity must be positive"); 100 101 // Compute constants `mark_bit` and `one_lap`. 102 let mark_bit = (cap + 1).next_power_of_two(); 103 let one_lap = mark_bit * 2; 104 105 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. 106 let head = 0; 107 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. 108 let tail = 0; 109 110 // Allocate a buffer of `cap` slots initialized 111 // with stamps. 112 let buffer = { 113 let boxed: Box<[Slot<T>]> = (0..cap) 114 .map(|i| { 115 // Set the stamp to `{ lap: 0, mark: 0, index: i }`. 116 Slot { 117 stamp: AtomicUsize::new(i), 118 msg: UnsafeCell::new(MaybeUninit::uninit()), 119 } 120 }) 121 .collect(); 122 Box::into_raw(boxed) as *mut Slot<T> 123 }; 124 125 Channel { 126 buffer, 127 cap, 128 one_lap, 129 mark_bit, 130 head: CachePadded::new(AtomicUsize::new(head)), 131 tail: CachePadded::new(AtomicUsize::new(tail)), 132 senders: SyncWaker::new(), 133 receivers: SyncWaker::new(), 134 _marker: PhantomData, 135 } 136 } 137 138 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>139 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 140 Receiver(self) 141 } 142 143 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>144 pub(crate) fn sender(&self) -> Sender<'_, T> { 145 Sender(self) 146 } 147 148 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool149 fn start_send(&self, token: &mut Token) -> bool { 150 let backoff = Backoff::new(); 151 let mut tail = self.tail.load(Ordering::Relaxed); 152 153 loop { 154 // Check if the channel is disconnected. 155 if tail & self.mark_bit != 0 { 156 token.array.slot = ptr::null(); 157 token.array.stamp = 0; 158 return true; 159 } 160 161 // Deconstruct the tail. 162 let index = tail & (self.mark_bit - 1); 163 let lap = tail & !(self.one_lap - 1); 164 165 // Inspect the corresponding slot. 166 let slot = unsafe { &*self.buffer.add(index) }; 167 let stamp = slot.stamp.load(Ordering::Acquire); 168 169 // If the tail and the stamp match, we may attempt to push. 170 if tail == stamp { 171 let new_tail = if index + 1 < self.cap { 172 // Same lap, incremented index. 173 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 174 tail + 1 175 } else { 176 // One lap forward, index wraps around to zero. 177 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 178 lap.wrapping_add(self.one_lap) 179 }; 180 181 // Try moving the tail. 182 match self.tail.compare_exchange_weak( 183 tail, 184 new_tail, 185 Ordering::SeqCst, 186 Ordering::Relaxed, 187 ) { 188 Ok(_) => { 189 // Prepare the token for the follow-up call to `write`. 190 token.array.slot = slot as *const Slot<T> as *const u8; 191 token.array.stamp = tail + 1; 192 return true; 193 } 194 Err(t) => { 195 tail = t; 196 backoff.spin(); 197 } 198 } 199 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 200 atomic::fence(Ordering::SeqCst); 201 let head = self.head.load(Ordering::Relaxed); 202 203 // If the head lags one lap behind the tail as well... 204 if head.wrapping_add(self.one_lap) == tail { 205 // ...then the channel is full. 206 return false; 207 } 208 209 backoff.spin(); 210 tail = self.tail.load(Ordering::Relaxed); 211 } else { 212 // Snooze because we need to wait for the stamp to get updated. 213 backoff.snooze(); 214 tail = self.tail.load(Ordering::Relaxed); 215 } 216 } 217 } 218 219 /// Writes a message into the channel. write(&self, token: &mut Token, msg: T) -> Result<(), T>220 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 221 // If there is no slot, the channel is disconnected. 222 if token.array.slot.is_null() { 223 return Err(msg); 224 } 225 226 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); 227 228 // Write the message into the slot and update the stamp. 229 slot.msg.get().write(MaybeUninit::new(msg)); 230 slot.stamp.store(token.array.stamp, Ordering::Release); 231 232 // Wake a sleeping receiver. 233 self.receivers.notify(); 234 Ok(()) 235 } 236 237 /// Attempts to reserve a slot for receiving a message. start_recv(&self, token: &mut Token) -> bool238 fn start_recv(&self, token: &mut Token) -> bool { 239 let backoff = Backoff::new(); 240 let mut head = self.head.load(Ordering::Relaxed); 241 242 loop { 243 // Deconstruct the head. 244 let index = head & (self.mark_bit - 1); 245 let lap = head & !(self.one_lap - 1); 246 247 // Inspect the corresponding slot. 248 let slot = unsafe { &*self.buffer.add(index) }; 249 let stamp = slot.stamp.load(Ordering::Acquire); 250 251 // If the the stamp is ahead of the head by 1, we may attempt to pop. 252 if head + 1 == stamp { 253 let new = if index + 1 < self.cap { 254 // Same lap, incremented index. 255 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 256 head + 1 257 } else { 258 // One lap forward, index wraps around to zero. 259 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 260 lap.wrapping_add(self.one_lap) 261 }; 262 263 // Try moving the head. 264 match self.head.compare_exchange_weak( 265 head, 266 new, 267 Ordering::SeqCst, 268 Ordering::Relaxed, 269 ) { 270 Ok(_) => { 271 // Prepare the token for the follow-up call to `read`. 272 token.array.slot = slot as *const Slot<T> as *const u8; 273 token.array.stamp = head.wrapping_add(self.one_lap); 274 return true; 275 } 276 Err(h) => { 277 head = h; 278 backoff.spin(); 279 } 280 } 281 } else if stamp == head { 282 atomic::fence(Ordering::SeqCst); 283 let tail = self.tail.load(Ordering::Relaxed); 284 285 // If the tail equals the head, that means the channel is empty. 286 if (tail & !self.mark_bit) == head { 287 // If the channel is disconnected... 288 if tail & self.mark_bit != 0 { 289 // ...then receive an error. 290 token.array.slot = ptr::null(); 291 token.array.stamp = 0; 292 return true; 293 } else { 294 // Otherwise, the receive operation is not ready. 295 return false; 296 } 297 } 298 299 backoff.spin(); 300 head = self.head.load(Ordering::Relaxed); 301 } else { 302 // Snooze because we need to wait for the stamp to get updated. 303 backoff.snooze(); 304 head = self.head.load(Ordering::Relaxed); 305 } 306 } 307 } 308 309 /// Reads a message from the channel. read(&self, token: &mut Token) -> Result<T, ()>310 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 311 if token.array.slot.is_null() { 312 // The channel is disconnected. 313 return Err(()); 314 } 315 316 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); 317 318 // Read the message from the slot and update the stamp. 319 let msg = slot.msg.get().read().assume_init(); 320 slot.stamp.store(token.array.stamp, Ordering::Release); 321 322 // Wake a sleeping sender. 323 self.senders.notify(); 324 Ok(msg) 325 } 326 327 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>328 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 329 let token = &mut Token::default(); 330 if self.start_send(token) { 331 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } 332 } else { 333 Err(TrySendError::Full(msg)) 334 } 335 } 336 337 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>338 pub(crate) fn send( 339 &self, 340 msg: T, 341 deadline: Option<Instant>, 342 ) -> Result<(), SendTimeoutError<T>> { 343 let token = &mut Token::default(); 344 loop { 345 // Try sending a message several times. 346 let backoff = Backoff::new(); 347 loop { 348 if self.start_send(token) { 349 let res = unsafe { self.write(token, msg) }; 350 return res.map_err(SendTimeoutError::Disconnected); 351 } 352 353 if backoff.is_completed() { 354 break; 355 } else { 356 backoff.snooze(); 357 } 358 } 359 360 if let Some(d) = deadline { 361 if Instant::now() >= d { 362 return Err(SendTimeoutError::Timeout(msg)); 363 } 364 } 365 366 Context::with(|cx| { 367 // Prepare for blocking until a receiver wakes us up. 368 let oper = Operation::hook(token); 369 self.senders.register(oper, cx); 370 371 // Has the channel become ready just now? 372 if !self.is_full() || self.is_disconnected() { 373 let _ = cx.try_select(Selected::Aborted); 374 } 375 376 // Block the current thread. 377 let sel = cx.wait_until(deadline); 378 379 match sel { 380 Selected::Waiting => unreachable!(), 381 Selected::Aborted | Selected::Disconnected => { 382 self.senders.unregister(oper).unwrap(); 383 } 384 Selected::Operation(_) => {} 385 } 386 }); 387 } 388 } 389 390 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>391 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 392 let token = &mut Token::default(); 393 394 if self.start_recv(token) { 395 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 396 } else { 397 Err(TryRecvError::Empty) 398 } 399 } 400 401 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>402 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 403 let token = &mut Token::default(); 404 loop { 405 // Try receiving a message several times. 406 let backoff = Backoff::new(); 407 loop { 408 if self.start_recv(token) { 409 let res = unsafe { self.read(token) }; 410 return res.map_err(|_| RecvTimeoutError::Disconnected); 411 } 412 413 if backoff.is_completed() { 414 break; 415 } else { 416 backoff.snooze(); 417 } 418 } 419 420 if let Some(d) = deadline { 421 if Instant::now() >= d { 422 return Err(RecvTimeoutError::Timeout); 423 } 424 } 425 426 Context::with(|cx| { 427 // Prepare for blocking until a sender wakes us up. 428 let oper = Operation::hook(token); 429 self.receivers.register(oper, cx); 430 431 // Has the channel become ready just now? 432 if !self.is_empty() || self.is_disconnected() { 433 let _ = cx.try_select(Selected::Aborted); 434 } 435 436 // Block the current thread. 437 let sel = cx.wait_until(deadline); 438 439 match sel { 440 Selected::Waiting => unreachable!(), 441 Selected::Aborted | Selected::Disconnected => { 442 self.receivers.unregister(oper).unwrap(); 443 // If the channel was disconnected, we still have to check for remaining 444 // messages. 445 } 446 Selected::Operation(_) => {} 447 } 448 }); 449 } 450 } 451 452 /// Returns the current number of messages inside the channel. len(&self) -> usize453 pub(crate) fn len(&self) -> usize { 454 loop { 455 // Load the tail, then load the head. 456 let tail = self.tail.load(Ordering::SeqCst); 457 let head = self.head.load(Ordering::SeqCst); 458 459 // If the tail didn't change, we've got consistent values to work with. 460 if self.tail.load(Ordering::SeqCst) == tail { 461 let hix = head & (self.mark_bit - 1); 462 let tix = tail & (self.mark_bit - 1); 463 464 return if hix < tix { 465 tix - hix 466 } else if hix > tix { 467 self.cap - hix + tix 468 } else if (tail & !self.mark_bit) == head { 469 0 470 } else { 471 self.cap 472 }; 473 } 474 } 475 } 476 477 /// Returns the capacity of the channel. 478 #[allow(clippy::unnecessary_wraps)] // This is intentional. capacity(&self) -> Option<usize>479 pub(crate) fn capacity(&self) -> Option<usize> { 480 Some(self.cap) 481 } 482 483 /// Disconnects the channel and wakes up all blocked senders and receivers. 484 /// 485 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool486 pub(crate) fn disconnect(&self) -> bool { 487 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); 488 489 if tail & self.mark_bit == 0 { 490 self.senders.disconnect(); 491 self.receivers.disconnect(); 492 true 493 } else { 494 false 495 } 496 } 497 498 /// Returns `true` if the channel is disconnected. is_disconnected(&self) -> bool499 pub(crate) fn is_disconnected(&self) -> bool { 500 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 501 } 502 503 /// Returns `true` if the channel is empty. is_empty(&self) -> bool504 pub(crate) fn is_empty(&self) -> bool { 505 let head = self.head.load(Ordering::SeqCst); 506 let tail = self.tail.load(Ordering::SeqCst); 507 508 // Is the tail equal to the head? 509 // 510 // Note: If the head changes just before we load the tail, that means there was a moment 511 // when the channel was not empty, so it is safe to just return `false`. 512 (tail & !self.mark_bit) == head 513 } 514 515 /// Returns `true` if the channel is full. is_full(&self) -> bool516 pub(crate) fn is_full(&self) -> bool { 517 let tail = self.tail.load(Ordering::SeqCst); 518 let head = self.head.load(Ordering::SeqCst); 519 520 // Is the head lagging one lap behind tail? 521 // 522 // Note: If the tail changes just before we load the head, that means there was a moment 523 // when the channel was not full, so it is safe to just return `false`. 524 head.wrapping_add(self.one_lap) == tail & !self.mark_bit 525 } 526 } 527 528 impl<T> Drop for Channel<T> { drop(&mut self)529 fn drop(&mut self) { 530 // Get the index of the head. 531 let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); 532 533 // Loop over all slots that hold a message and drop them. 534 for i in 0..self.len() { 535 // Compute the index of the next slot holding a message. 536 let index = if hix + i < self.cap { 537 hix + i 538 } else { 539 hix + i - self.cap 540 }; 541 542 unsafe { 543 let p = { 544 let slot = &mut *self.buffer.add(index); 545 let msg = &mut *slot.msg.get(); 546 msg.as_mut_ptr() 547 }; 548 p.drop_in_place(); 549 } 550 } 551 552 // Finally, deallocate the buffer, but don't run any destructors. 553 unsafe { 554 // Create a slice from the buffer to make 555 // a fat pointer. Then, use Box::from_raw 556 // to deallocate it. 557 let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; 558 Box::from_raw(ptr); 559 } 560 } 561 } 562 563 /// Receiver handle to a channel. 564 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 565 566 /// Sender handle to a channel. 567 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 568 569 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool570 fn try_select(&self, token: &mut Token) -> bool { 571 self.0.start_recv(token) 572 } 573 deadline(&self) -> Option<Instant>574 fn deadline(&self) -> Option<Instant> { 575 None 576 } 577 register(&self, oper: Operation, cx: &Context) -> bool578 fn register(&self, oper: Operation, cx: &Context) -> bool { 579 self.0.receivers.register(oper, cx); 580 self.is_ready() 581 } 582 unregister(&self, oper: Operation)583 fn unregister(&self, oper: Operation) { 584 self.0.receivers.unregister(oper); 585 } 586 accept(&self, token: &mut Token, _cx: &Context) -> bool587 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 588 self.try_select(token) 589 } 590 is_ready(&self) -> bool591 fn is_ready(&self) -> bool { 592 !self.0.is_empty() || self.0.is_disconnected() 593 } 594 watch(&self, oper: Operation, cx: &Context) -> bool595 fn watch(&self, oper: Operation, cx: &Context) -> bool { 596 self.0.receivers.watch(oper, cx); 597 self.is_ready() 598 } 599 unwatch(&self, oper: Operation)600 fn unwatch(&self, oper: Operation) { 601 self.0.receivers.unwatch(oper); 602 } 603 } 604 605 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool606 fn try_select(&self, token: &mut Token) -> bool { 607 self.0.start_send(token) 608 } 609 deadline(&self) -> Option<Instant>610 fn deadline(&self) -> Option<Instant> { 611 None 612 } 613 register(&self, oper: Operation, cx: &Context) -> bool614 fn register(&self, oper: Operation, cx: &Context) -> bool { 615 self.0.senders.register(oper, cx); 616 self.is_ready() 617 } 618 unregister(&self, oper: Operation)619 fn unregister(&self, oper: Operation) { 620 self.0.senders.unregister(oper); 621 } 622 accept(&self, token: &mut Token, _cx: &Context) -> bool623 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 624 self.try_select(token) 625 } 626 is_ready(&self) -> bool627 fn is_ready(&self) -> bool { 628 !self.0.is_full() || self.0.is_disconnected() 629 } 630 watch(&self, oper: Operation, cx: &Context) -> bool631 fn watch(&self, oper: Operation, cx: &Context) -> bool { 632 self.0.senders.watch(oper, cx); 633 self.is_ready() 634 } 635 unwatch(&self, oper: Operation)636 fn unwatch(&self, oper: Operation) { 637 self.0.senders.unwatch(oper); 638 } 639 } 640