1 //! Zero-capacity channel. 2 //! 3 //! This kind of channel is also known as *rendezvous* channel. 4 5 use std::cell::UnsafeCell; 6 use std::marker::PhantomData; 7 use std::sync::atomic::{AtomicBool, Ordering}; 8 use std::sync::Mutex; 9 use std::time::Instant; 10 use std::{fmt, ptr}; 11 12 use crossbeam_utils::Backoff; 13 14 use crate::context::Context; 15 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 16 use crate::select::{Operation, SelectHandle, Selected, Token}; 17 use crate::waker::Waker; 18 19 /// A pointer to a packet. 20 pub(crate) struct ZeroToken(*mut ()); 21 22 impl Default for ZeroToken { default() -> Self23 fn default() -> Self { 24 Self(ptr::null_mut()) 25 } 26 } 27 28 impl fmt::Debug for ZeroToken { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 fmt::Debug::fmt(&(self.0 as usize), f) 31 } 32 } 33 34 /// A slot for passing one message from a sender to a receiver. 35 struct Packet<T> { 36 /// Equals `true` if the packet is allocated on the stack. 37 on_stack: bool, 38 39 /// Equals `true` once the packet is ready for reading or writing. 40 ready: AtomicBool, 41 42 /// The message. 43 msg: UnsafeCell<Option<T>>, 44 } 45 46 impl<T> Packet<T> { 47 /// Creates an empty packet on the stack. empty_on_stack() -> Packet<T>48 fn empty_on_stack() -> Packet<T> { 49 Packet { 50 on_stack: true, 51 ready: AtomicBool::new(false), 52 msg: UnsafeCell::new(None), 53 } 54 } 55 56 /// Creates an empty packet on the heap. empty_on_heap() -> Box<Packet<T>>57 fn empty_on_heap() -> Box<Packet<T>> { 58 Box::new(Packet { 59 on_stack: false, 60 ready: AtomicBool::new(false), 61 msg: UnsafeCell::new(None), 62 }) 63 } 64 65 /// Creates a packet on the stack, containing a message. message_on_stack(msg: T) -> Packet<T>66 fn message_on_stack(msg: T) -> Packet<T> { 67 Packet { 68 on_stack: true, 69 ready: AtomicBool::new(false), 70 msg: UnsafeCell::new(Some(msg)), 71 } 72 } 73 74 /// Waits until the packet becomes ready for reading or writing. wait_ready(&self)75 fn wait_ready(&self) { 76 let backoff = Backoff::new(); 77 while !self.ready.load(Ordering::Acquire) { 78 backoff.snooze(); 79 } 80 } 81 } 82 83 /// Inner representation of a zero-capacity channel. 84 struct Inner { 85 /// Senders waiting to pair up with a receive operation. 86 senders: Waker, 87 88 /// Receivers waiting to pair up with a send operation. 89 receivers: Waker, 90 91 /// Equals `true` when the channel is disconnected. 92 is_disconnected: bool, 93 } 94 95 /// Zero-capacity channel. 96 pub(crate) struct Channel<T> { 97 /// Inner representation of the channel. 98 inner: Mutex<Inner>, 99 100 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 101 _marker: PhantomData<T>, 102 } 103 104 impl<T> Channel<T> { 105 /// Constructs a new zero-capacity channel. new() -> Self106 pub(crate) fn new() -> Self { 107 Channel { 108 inner: Mutex::new(Inner { 109 senders: Waker::new(), 110 receivers: Waker::new(), 111 is_disconnected: false, 112 }), 113 _marker: PhantomData, 114 } 115 } 116 117 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>118 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 119 Receiver(self) 120 } 121 122 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>123 pub(crate) fn sender(&self) -> Sender<'_, T> { 124 Sender(self) 125 } 126 127 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool128 fn start_send(&self, token: &mut Token) -> bool { 129 let mut inner = self.inner.lock().unwrap(); 130 131 // If there's a waiting receiver, pair up with it. 132 if let Some(operation) = inner.receivers.try_select() { 133 token.zero.0 = operation.packet; 134 true 135 } else if inner.is_disconnected { 136 token.zero.0 = ptr::null_mut(); 137 true 138 } else { 139 false 140 } 141 } 142 143 /// Writes a message into the packet. write(&self, token: &mut Token, msg: T) -> Result<(), T>144 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 145 // If there is no packet, the channel is disconnected. 146 if token.zero.0.is_null() { 147 return Err(msg); 148 } 149 150 let packet = &*(token.zero.0 as *const Packet<T>); 151 packet.msg.get().write(Some(msg)); 152 packet.ready.store(true, Ordering::Release); 153 Ok(()) 154 } 155 156 /// Attempts to pair up with a sender. start_recv(&self, token: &mut Token) -> bool157 fn start_recv(&self, token: &mut Token) -> bool { 158 let mut inner = self.inner.lock().unwrap(); 159 160 // If there's a waiting sender, pair up with it. 161 if let Some(operation) = inner.senders.try_select() { 162 token.zero.0 = operation.packet; 163 true 164 } else if inner.is_disconnected { 165 token.zero.0 = ptr::null_mut(); 166 true 167 } else { 168 false 169 } 170 } 171 172 /// Reads a message from the packet. read(&self, token: &mut Token) -> Result<T, ()>173 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 174 // If there is no packet, the channel is disconnected. 175 if token.zero.0.is_null() { 176 return Err(()); 177 } 178 179 let packet = &*(token.zero.0 as *const Packet<T>); 180 181 if packet.on_stack { 182 // The message has been in the packet from the beginning, so there is no need to wait 183 // for it. However, after reading the message, we need to set `ready` to `true` in 184 // order to signal that the packet can be destroyed. 185 let msg = packet.msg.get().replace(None).unwrap(); 186 packet.ready.store(true, Ordering::Release); 187 Ok(msg) 188 } else { 189 // Wait until the message becomes available, then read it and destroy the 190 // heap-allocated packet. 191 packet.wait_ready(); 192 let msg = packet.msg.get().replace(None).unwrap(); 193 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>())); 194 Ok(msg) 195 } 196 } 197 198 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>199 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 200 let token = &mut Token::default(); 201 let mut inner = self.inner.lock().unwrap(); 202 203 // If there's a waiting receiver, pair up with it. 204 if let Some(operation) = inner.receivers.try_select() { 205 token.zero.0 = operation.packet; 206 drop(inner); 207 unsafe { 208 self.write(token, msg).ok().unwrap(); 209 } 210 Ok(()) 211 } else if inner.is_disconnected { 212 Err(TrySendError::Disconnected(msg)) 213 } else { 214 Err(TrySendError::Full(msg)) 215 } 216 } 217 218 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>219 pub(crate) fn send( 220 &self, 221 msg: T, 222 deadline: Option<Instant>, 223 ) -> Result<(), SendTimeoutError<T>> { 224 let token = &mut Token::default(); 225 let mut inner = self.inner.lock().unwrap(); 226 227 // If there's a waiting receiver, pair up with it. 228 if let Some(operation) = inner.receivers.try_select() { 229 token.zero.0 = operation.packet; 230 drop(inner); 231 unsafe { 232 self.write(token, msg).ok().unwrap(); 233 } 234 return Ok(()); 235 } 236 237 if inner.is_disconnected { 238 return Err(SendTimeoutError::Disconnected(msg)); 239 } 240 241 Context::with(|cx| { 242 // Prepare for blocking until a receiver wakes us up. 243 let oper = Operation::hook(token); 244 let mut packet = Packet::<T>::message_on_stack(msg); 245 inner 246 .senders 247 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); 248 inner.receivers.notify(); 249 drop(inner); 250 251 // Block the current thread. 252 let sel = cx.wait_until(deadline); 253 254 match sel { 255 Selected::Waiting => unreachable!(), 256 Selected::Aborted => { 257 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 258 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 259 Err(SendTimeoutError::Timeout(msg)) 260 } 261 Selected::Disconnected => { 262 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 263 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 264 Err(SendTimeoutError::Disconnected(msg)) 265 } 266 Selected::Operation(_) => { 267 // Wait until the message is read, then drop the packet. 268 packet.wait_ready(); 269 Ok(()) 270 } 271 } 272 }) 273 } 274 275 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>276 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 277 let token = &mut Token::default(); 278 let mut inner = self.inner.lock().unwrap(); 279 280 // If there's a waiting sender, pair up with it. 281 if let Some(operation) = inner.senders.try_select() { 282 token.zero.0 = operation.packet; 283 drop(inner); 284 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 285 } else if inner.is_disconnected { 286 Err(TryRecvError::Disconnected) 287 } else { 288 Err(TryRecvError::Empty) 289 } 290 } 291 292 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>293 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 294 let token = &mut Token::default(); 295 let mut inner = self.inner.lock().unwrap(); 296 297 // If there's a waiting sender, pair up with it. 298 if let Some(operation) = inner.senders.try_select() { 299 token.zero.0 = operation.packet; 300 drop(inner); 301 unsafe { 302 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 303 } 304 } 305 306 if inner.is_disconnected { 307 return Err(RecvTimeoutError::Disconnected); 308 } 309 310 Context::with(|cx| { 311 // Prepare for blocking until a sender wakes us up. 312 let oper = Operation::hook(token); 313 let mut packet = Packet::<T>::empty_on_stack(); 314 inner.receivers.register_with_packet( 315 oper, 316 &mut packet as *mut Packet<T> as *mut (), 317 cx, 318 ); 319 inner.senders.notify(); 320 drop(inner); 321 322 // Block the current thread. 323 let sel = cx.wait_until(deadline); 324 325 match sel { 326 Selected::Waiting => unreachable!(), 327 Selected::Aborted => { 328 self.inner 329 .lock() 330 .unwrap() 331 .receivers 332 .unregister(oper) 333 .unwrap(); 334 Err(RecvTimeoutError::Timeout) 335 } 336 Selected::Disconnected => { 337 self.inner 338 .lock() 339 .unwrap() 340 .receivers 341 .unregister(oper) 342 .unwrap(); 343 Err(RecvTimeoutError::Disconnected) 344 } 345 Selected::Operation(_) => { 346 // Wait until the message is provided, then read it. 347 packet.wait_ready(); 348 unsafe { Ok(packet.msg.get().replace(None).unwrap()) } 349 } 350 } 351 }) 352 } 353 354 /// Disconnects the channel and wakes up all blocked senders and receivers. 355 /// 356 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool357 pub(crate) fn disconnect(&self) -> bool { 358 let mut inner = self.inner.lock().unwrap(); 359 360 if !inner.is_disconnected { 361 inner.is_disconnected = true; 362 inner.senders.disconnect(); 363 inner.receivers.disconnect(); 364 true 365 } else { 366 false 367 } 368 } 369 370 /// Returns the current number of messages inside the channel. len(&self) -> usize371 pub(crate) fn len(&self) -> usize { 372 0 373 } 374 375 /// Returns the capacity of the channel. capacity(&self) -> Option<usize>376 pub(crate) fn capacity(&self) -> Option<usize> { 377 Some(0) 378 } 379 380 /// Returns `true` if the channel is empty. is_empty(&self) -> bool381 pub(crate) fn is_empty(&self) -> bool { 382 true 383 } 384 385 /// Returns `true` if the channel is full. is_full(&self) -> bool386 pub(crate) fn is_full(&self) -> bool { 387 true 388 } 389 } 390 391 /// Receiver handle to a channel. 392 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 393 394 /// Sender handle to a channel. 395 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 396 397 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool398 fn try_select(&self, token: &mut Token) -> bool { 399 self.0.start_recv(token) 400 } 401 deadline(&self) -> Option<Instant>402 fn deadline(&self) -> Option<Instant> { 403 None 404 } 405 register(&self, oper: Operation, cx: &Context) -> bool406 fn register(&self, oper: Operation, cx: &Context) -> bool { 407 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 408 409 let mut inner = self.0.inner.lock().unwrap(); 410 inner 411 .receivers 412 .register_with_packet(oper, packet.cast::<()>(), cx); 413 inner.senders.notify(); 414 inner.senders.can_select() || inner.is_disconnected 415 } 416 unregister(&self, oper: Operation)417 fn unregister(&self, oper: Operation) { 418 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { 419 unsafe { 420 drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); 421 } 422 } 423 } 424 accept(&self, token: &mut Token, cx: &Context) -> bool425 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 426 token.zero.0 = cx.wait_packet(); 427 true 428 } 429 is_ready(&self) -> bool430 fn is_ready(&self) -> bool { 431 let inner = self.0.inner.lock().unwrap(); 432 inner.senders.can_select() || inner.is_disconnected 433 } 434 watch(&self, oper: Operation, cx: &Context) -> bool435 fn watch(&self, oper: Operation, cx: &Context) -> bool { 436 let mut inner = self.0.inner.lock().unwrap(); 437 inner.receivers.watch(oper, cx); 438 inner.senders.can_select() || inner.is_disconnected 439 } 440 unwatch(&self, oper: Operation)441 fn unwatch(&self, oper: Operation) { 442 let mut inner = self.0.inner.lock().unwrap(); 443 inner.receivers.unwatch(oper); 444 } 445 } 446 447 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool448 fn try_select(&self, token: &mut Token) -> bool { 449 self.0.start_send(token) 450 } 451 deadline(&self) -> Option<Instant>452 fn deadline(&self) -> Option<Instant> { 453 None 454 } 455 register(&self, oper: Operation, cx: &Context) -> bool456 fn register(&self, oper: Operation, cx: &Context) -> bool { 457 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 458 459 let mut inner = self.0.inner.lock().unwrap(); 460 inner 461 .senders 462 .register_with_packet(oper, packet.cast::<()>(), cx); 463 inner.receivers.notify(); 464 inner.receivers.can_select() || inner.is_disconnected 465 } 466 unregister(&self, oper: Operation)467 fn unregister(&self, oper: Operation) { 468 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { 469 unsafe { 470 drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); 471 } 472 } 473 } 474 accept(&self, token: &mut Token, cx: &Context) -> bool475 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 476 token.zero.0 = cx.wait_packet(); 477 true 478 } 479 is_ready(&self) -> bool480 fn is_ready(&self) -> bool { 481 let inner = self.0.inner.lock().unwrap(); 482 inner.receivers.can_select() || inner.is_disconnected 483 } 484 watch(&self, oper: Operation, cx: &Context) -> bool485 fn watch(&self, oper: Operation, cx: &Context) -> bool { 486 let mut inner = self.0.inner.lock().unwrap(); 487 inner.senders.watch(oper, cx); 488 inner.receivers.can_select() || inner.is_disconnected 489 } 490 unwatch(&self, oper: Operation)491 fn unwatch(&self, oper: Operation) { 492 let mut inner = self.0.inner.lock().unwrap(); 493 inner.senders.unwatch(oper); 494 } 495 } 496