1 //! Zero-capacity channel. 2 //! 3 //! This kind of channel is also known as *rendezvous* channel. 4 5 use std::cell::UnsafeCell; 6 use std::marker::PhantomData; 7 use std::sync::atomic::{AtomicBool, Ordering}; 8 use std::time::Instant; 9 use std::{fmt, ptr}; 10 11 use crossbeam_utils::Backoff; 12 13 use crate::context::Context; 14 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 15 use crate::select::{Operation, SelectHandle, Selected, Token}; 16 use crate::utils::Spinlock; 17 use crate::waker::Waker; 18 19 /// A pointer to a packet. 20 pub struct ZeroToken(*mut ()); 21 22 impl Default for ZeroToken { default() -> Self23 fn default() -> Self { 24 Self(ptr::null_mut()) 25 } 26 } 27 28 impl fmt::Debug for ZeroToken { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 fmt::Debug::fmt(&(self.0 as usize), f) 31 } 32 } 33 34 /// A slot for passing one message from a sender to a receiver. 35 struct Packet<T> { 36 /// Equals `true` if the packet is allocated on the stack. 37 on_stack: bool, 38 39 /// Equals `true` once the packet is ready for reading or writing. 40 ready: AtomicBool, 41 42 /// The message. 43 msg: UnsafeCell<Option<T>>, 44 } 45 46 impl<T> Packet<T> { 47 /// Creates an empty packet on the stack. empty_on_stack() -> Packet<T>48 fn empty_on_stack() -> Packet<T> { 49 Packet { 50 on_stack: true, 51 ready: AtomicBool::new(false), 52 msg: UnsafeCell::new(None), 53 } 54 } 55 56 /// Creates an empty packet on the heap. empty_on_heap() -> Box<Packet<T>>57 fn empty_on_heap() -> Box<Packet<T>> { 58 Box::new(Packet { 59 on_stack: false, 60 ready: AtomicBool::new(false), 61 msg: UnsafeCell::new(None), 62 }) 63 } 64 65 /// Creates a packet on the stack, containing a message. message_on_stack(msg: T) -> Packet<T>66 fn message_on_stack(msg: T) -> Packet<T> { 67 Packet { 68 on_stack: true, 69 ready: AtomicBool::new(false), 70 msg: UnsafeCell::new(Some(msg)), 71 } 72 } 73 74 /// Waits until the packet becomes ready for reading or writing. wait_ready(&self)75 fn wait_ready(&self) { 76 let backoff = Backoff::new(); 77 while !self.ready.load(Ordering::Acquire) { 78 backoff.snooze(); 79 } 80 } 81 } 82 83 /// Inner representation of a zero-capacity channel. 84 struct Inner { 85 /// Senders waiting to pair up with a receive operation. 86 senders: Waker, 87 88 /// Receivers waiting to pair up with a send operation. 89 receivers: Waker, 90 91 /// Equals `true` when the channel is disconnected. 92 is_disconnected: bool, 93 } 94 95 /// Zero-capacity channel. 96 pub(crate) struct Channel<T> { 97 /// Inner representation of the channel. 98 inner: Spinlock<Inner>, 99 100 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 101 _marker: PhantomData<T>, 102 } 103 104 impl<T> Channel<T> { 105 /// Constructs a new zero-capacity channel. new() -> Self106 pub(crate) fn new() -> Self { 107 Channel { 108 inner: Spinlock::new(Inner { 109 senders: Waker::new(), 110 receivers: Waker::new(), 111 is_disconnected: false, 112 }), 113 _marker: PhantomData, 114 } 115 } 116 117 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>118 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 119 Receiver(self) 120 } 121 122 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>123 pub(crate) fn sender(&self) -> Sender<'_, T> { 124 Sender(self) 125 } 126 127 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool128 fn start_send(&self, token: &mut Token) -> bool { 129 let mut inner = self.inner.lock(); 130 131 // If there's a waiting receiver, pair up with it. 132 if let Some(operation) = inner.receivers.try_select() { 133 token.zero.0 = operation.packet; 134 true 135 } else if inner.is_disconnected { 136 token.zero.0 = ptr::null_mut(); 137 true 138 } else { 139 false 140 } 141 } 142 143 /// Writes a message into the packet. write(&self, token: &mut Token, msg: T) -> Result<(), T>144 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 145 // If there is no packet, the channel is disconnected. 146 if token.zero.0.is_null() { 147 return Err(msg); 148 } 149 150 let packet = &*(token.zero.0 as *const Packet<T>); 151 packet.msg.get().write(Some(msg)); 152 packet.ready.store(true, Ordering::Release); 153 Ok(()) 154 } 155 156 /// Attempts to pair up with a sender. start_recv(&self, token: &mut Token) -> bool157 fn start_recv(&self, token: &mut Token) -> bool { 158 let mut inner = self.inner.lock(); 159 160 // If there's a waiting sender, pair up with it. 161 if let Some(operation) = inner.senders.try_select() { 162 token.zero.0 = operation.packet; 163 true 164 } else if inner.is_disconnected { 165 token.zero.0 = ptr::null_mut(); 166 true 167 } else { 168 false 169 } 170 } 171 172 /// Reads a message from the packet. read(&self, token: &mut Token) -> Result<T, ()>173 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 174 // If there is no packet, the channel is disconnected. 175 if token.zero.0.is_null() { 176 return Err(()); 177 } 178 179 let packet = &*(token.zero.0 as *const Packet<T>); 180 181 if packet.on_stack { 182 // The message has been in the packet from the beginning, so there is no need to wait 183 // for it. However, after reading the message, we need to set `ready` to `true` in 184 // order to signal that the packet can be destroyed. 185 let msg = packet.msg.get().replace(None).unwrap(); 186 packet.ready.store(true, Ordering::Release); 187 Ok(msg) 188 } else { 189 // Wait until the message becomes available, then read it and destroy the 190 // heap-allocated packet. 191 packet.wait_ready(); 192 let msg = packet.msg.get().replace(None).unwrap(); 193 drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); 194 Ok(msg) 195 } 196 } 197 198 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>199 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 200 let token = &mut Token::default(); 201 let mut inner = self.inner.lock(); 202 203 // If there's a waiting receiver, pair up with it. 204 if let Some(operation) = inner.receivers.try_select() { 205 token.zero.0 = operation.packet; 206 drop(inner); 207 unsafe { 208 self.write(token, msg).ok().unwrap(); 209 } 210 Ok(()) 211 } else if inner.is_disconnected { 212 Err(TrySendError::Disconnected(msg)) 213 } else { 214 Err(TrySendError::Full(msg)) 215 } 216 } 217 218 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>219 pub(crate) fn send( 220 &self, 221 msg: T, 222 deadline: Option<Instant>, 223 ) -> Result<(), SendTimeoutError<T>> { 224 let token = &mut Token::default(); 225 let mut inner = self.inner.lock(); 226 227 // If there's a waiting receiver, pair up with it. 228 if let Some(operation) = inner.receivers.try_select() { 229 token.zero.0 = operation.packet; 230 drop(inner); 231 unsafe { 232 self.write(token, msg).ok().unwrap(); 233 } 234 return Ok(()); 235 } 236 237 if inner.is_disconnected { 238 return Err(SendTimeoutError::Disconnected(msg)); 239 } 240 241 Context::with(|cx| { 242 // Prepare for blocking until a receiver wakes us up. 243 let oper = Operation::hook(token); 244 let mut packet = Packet::<T>::message_on_stack(msg); 245 inner 246 .senders 247 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); 248 inner.receivers.notify(); 249 drop(inner); 250 251 // Block the current thread. 252 let sel = cx.wait_until(deadline); 253 254 match sel { 255 Selected::Waiting => unreachable!(), 256 Selected::Aborted => { 257 self.inner.lock().senders.unregister(oper).unwrap(); 258 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 259 Err(SendTimeoutError::Timeout(msg)) 260 } 261 Selected::Disconnected => { 262 self.inner.lock().senders.unregister(oper).unwrap(); 263 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 264 Err(SendTimeoutError::Disconnected(msg)) 265 } 266 Selected::Operation(_) => { 267 // Wait until the message is read, then drop the packet. 268 packet.wait_ready(); 269 Ok(()) 270 } 271 } 272 }) 273 } 274 275 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>276 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 277 let token = &mut Token::default(); 278 let mut inner = self.inner.lock(); 279 280 // If there's a waiting sender, pair up with it. 281 if let Some(operation) = inner.senders.try_select() { 282 token.zero.0 = operation.packet; 283 drop(inner); 284 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 285 } else if inner.is_disconnected { 286 Err(TryRecvError::Disconnected) 287 } else { 288 Err(TryRecvError::Empty) 289 } 290 } 291 292 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>293 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 294 let token = &mut Token::default(); 295 let mut inner = self.inner.lock(); 296 297 // If there's a waiting sender, pair up with it. 298 if let Some(operation) = inner.senders.try_select() { 299 token.zero.0 = operation.packet; 300 drop(inner); 301 unsafe { 302 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 303 } 304 } 305 306 if inner.is_disconnected { 307 return Err(RecvTimeoutError::Disconnected); 308 } 309 310 Context::with(|cx| { 311 // Prepare for blocking until a sender wakes us up. 312 let oper = Operation::hook(token); 313 let mut packet = Packet::<T>::empty_on_stack(); 314 inner.receivers.register_with_packet( 315 oper, 316 &mut packet as *mut Packet<T> as *mut (), 317 cx, 318 ); 319 inner.senders.notify(); 320 drop(inner); 321 322 // Block the current thread. 323 let sel = cx.wait_until(deadline); 324 325 match sel { 326 Selected::Waiting => unreachable!(), 327 Selected::Aborted => { 328 self.inner.lock().receivers.unregister(oper).unwrap(); 329 Err(RecvTimeoutError::Timeout) 330 } 331 Selected::Disconnected => { 332 self.inner.lock().receivers.unregister(oper).unwrap(); 333 Err(RecvTimeoutError::Disconnected) 334 } 335 Selected::Operation(_) => { 336 // Wait until the message is provided, then read it. 337 packet.wait_ready(); 338 unsafe { Ok(packet.msg.get().replace(None).unwrap()) } 339 } 340 } 341 }) 342 } 343 344 /// Disconnects the channel and wakes up all blocked senders and receivers. 345 /// 346 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool347 pub(crate) fn disconnect(&self) -> bool { 348 let mut inner = self.inner.lock(); 349 350 if !inner.is_disconnected { 351 inner.is_disconnected = true; 352 inner.senders.disconnect(); 353 inner.receivers.disconnect(); 354 true 355 } else { 356 false 357 } 358 } 359 360 /// Returns the current number of messages inside the channel. len(&self) -> usize361 pub(crate) fn len(&self) -> usize { 362 0 363 } 364 365 /// Returns the capacity of the channel. 366 #[allow(clippy::unnecessary_wraps)] // This is intentional. capacity(&self) -> Option<usize>367 pub(crate) fn capacity(&self) -> Option<usize> { 368 Some(0) 369 } 370 371 /// Returns `true` if the channel is empty. is_empty(&self) -> bool372 pub(crate) fn is_empty(&self) -> bool { 373 true 374 } 375 376 /// Returns `true` if the channel is full. is_full(&self) -> bool377 pub(crate) fn is_full(&self) -> bool { 378 true 379 } 380 } 381 382 /// Receiver handle to a channel. 383 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 384 385 /// Sender handle to a channel. 386 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 387 388 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool389 fn try_select(&self, token: &mut Token) -> bool { 390 self.0.start_recv(token) 391 } 392 deadline(&self) -> Option<Instant>393 fn deadline(&self) -> Option<Instant> { 394 None 395 } 396 register(&self, oper: Operation, cx: &Context) -> bool397 fn register(&self, oper: Operation, cx: &Context) -> bool { 398 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 399 400 let mut inner = self.0.inner.lock(); 401 inner 402 .receivers 403 .register_with_packet(oper, packet as *mut (), cx); 404 inner.senders.notify(); 405 inner.senders.can_select() || inner.is_disconnected 406 } 407 unregister(&self, oper: Operation)408 fn unregister(&self, oper: Operation) { 409 if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { 410 unsafe { 411 drop(Box::from_raw(operation.packet as *mut Packet<T>)); 412 } 413 } 414 } 415 accept(&self, token: &mut Token, cx: &Context) -> bool416 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 417 token.zero.0 = cx.wait_packet(); 418 true 419 } 420 is_ready(&self) -> bool421 fn is_ready(&self) -> bool { 422 let inner = self.0.inner.lock(); 423 inner.senders.can_select() || inner.is_disconnected 424 } 425 watch(&self, oper: Operation, cx: &Context) -> bool426 fn watch(&self, oper: Operation, cx: &Context) -> bool { 427 let mut inner = self.0.inner.lock(); 428 inner.receivers.watch(oper, cx); 429 inner.senders.can_select() || inner.is_disconnected 430 } 431 unwatch(&self, oper: Operation)432 fn unwatch(&self, oper: Operation) { 433 let mut inner = self.0.inner.lock(); 434 inner.receivers.unwatch(oper); 435 } 436 } 437 438 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool439 fn try_select(&self, token: &mut Token) -> bool { 440 self.0.start_send(token) 441 } 442 deadline(&self) -> Option<Instant>443 fn deadline(&self) -> Option<Instant> { 444 None 445 } 446 register(&self, oper: Operation, cx: &Context) -> bool447 fn register(&self, oper: Operation, cx: &Context) -> bool { 448 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 449 450 let mut inner = self.0.inner.lock(); 451 inner 452 .senders 453 .register_with_packet(oper, packet as *mut (), cx); 454 inner.receivers.notify(); 455 inner.receivers.can_select() || inner.is_disconnected 456 } 457 unregister(&self, oper: Operation)458 fn unregister(&self, oper: Operation) { 459 if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { 460 unsafe { 461 drop(Box::from_raw(operation.packet as *mut Packet<T>)); 462 } 463 } 464 } 465 accept(&self, token: &mut Token, cx: &Context) -> bool466 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 467 token.zero.0 = cx.wait_packet(); 468 true 469 } 470 is_ready(&self) -> bool471 fn is_ready(&self) -> bool { 472 let inner = self.0.inner.lock(); 473 inner.receivers.can_select() || inner.is_disconnected 474 } 475 watch(&self, oper: Operation, cx: &Context) -> bool476 fn watch(&self, oper: Operation, cx: &Context) -> bool { 477 let mut inner = self.0.inner.lock(); 478 inner.senders.watch(oper, cx); 479 inner.receivers.can_select() || inner.is_disconnected 480 } 481 unwatch(&self, oper: Operation)482 fn unwatch(&self, oper: Operation) { 483 let mut inner = self.0.inner.lock(); 484 inner.senders.unwatch(oper); 485 } 486 } 487