• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4 
5 use std::cell::UnsafeCell;
6 use std::marker::PhantomData;
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::time::Instant;
9 use std::{fmt, ptr};
10 
11 use crossbeam_utils::Backoff;
12 
13 use crate::context::Context;
14 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
15 use crate::select::{Operation, SelectHandle, Selected, Token};
16 use crate::utils::Spinlock;
17 use crate::waker::Waker;
18 
19 /// A pointer to a packet.
20 pub struct ZeroToken(*mut ());
21 
22 impl Default for ZeroToken {
default() -> Self23     fn default() -> Self {
24         Self(ptr::null_mut())
25     }
26 }
27 
28 impl fmt::Debug for ZeroToken {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         fmt::Debug::fmt(&(self.0 as usize), f)
31     }
32 }
33 
34 /// A slot for passing one message from a sender to a receiver.
35 struct Packet<T> {
36     /// Equals `true` if the packet is allocated on the stack.
37     on_stack: bool,
38 
39     /// Equals `true` once the packet is ready for reading or writing.
40     ready: AtomicBool,
41 
42     /// The message.
43     msg: UnsafeCell<Option<T>>,
44 }
45 
46 impl<T> Packet<T> {
47     /// Creates an empty packet on the stack.
empty_on_stack() -> Packet<T>48     fn empty_on_stack() -> Packet<T> {
49         Packet {
50             on_stack: true,
51             ready: AtomicBool::new(false),
52             msg: UnsafeCell::new(None),
53         }
54     }
55 
56     /// Creates an empty packet on the heap.
empty_on_heap() -> Box<Packet<T>>57     fn empty_on_heap() -> Box<Packet<T>> {
58         Box::new(Packet {
59             on_stack: false,
60             ready: AtomicBool::new(false),
61             msg: UnsafeCell::new(None),
62         })
63     }
64 
65     /// Creates a packet on the stack, containing a message.
message_on_stack(msg: T) -> Packet<T>66     fn message_on_stack(msg: T) -> Packet<T> {
67         Packet {
68             on_stack: true,
69             ready: AtomicBool::new(false),
70             msg: UnsafeCell::new(Some(msg)),
71         }
72     }
73 
74     /// Waits until the packet becomes ready for reading or writing.
wait_ready(&self)75     fn wait_ready(&self) {
76         let backoff = Backoff::new();
77         while !self.ready.load(Ordering::Acquire) {
78             backoff.snooze();
79         }
80     }
81 }
82 
83 /// Inner representation of a zero-capacity channel.
84 struct Inner {
85     /// Senders waiting to pair up with a receive operation.
86     senders: Waker,
87 
88     /// Receivers waiting to pair up with a send operation.
89     receivers: Waker,
90 
91     /// Equals `true` when the channel is disconnected.
92     is_disconnected: bool,
93 }
94 
95 /// Zero-capacity channel.
96 pub(crate) struct Channel<T> {
97     /// Inner representation of the channel.
98     inner: Spinlock<Inner>,
99 
100     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
101     _marker: PhantomData<T>,
102 }
103 
104 impl<T> Channel<T> {
105     /// Constructs a new zero-capacity channel.
new() -> Self106     pub(crate) fn new() -> Self {
107         Channel {
108             inner: Spinlock::new(Inner {
109                 senders: Waker::new(),
110                 receivers: Waker::new(),
111                 is_disconnected: false,
112             }),
113             _marker: PhantomData,
114         }
115     }
116 
117     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>118     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
119         Receiver(self)
120     }
121 
122     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>123     pub(crate) fn sender(&self) -> Sender<'_, T> {
124         Sender(self)
125     }
126 
127     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool128     fn start_send(&self, token: &mut Token) -> bool {
129         let mut inner = self.inner.lock();
130 
131         // If there's a waiting receiver, pair up with it.
132         if let Some(operation) = inner.receivers.try_select() {
133             token.zero.0 = operation.packet;
134             true
135         } else if inner.is_disconnected {
136             token.zero.0 = ptr::null_mut();
137             true
138         } else {
139             false
140         }
141     }
142 
143     /// Writes a message into the packet.
write(&self, token: &mut Token, msg: T) -> Result<(), T>144     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
145         // If there is no packet, the channel is disconnected.
146         if token.zero.0.is_null() {
147             return Err(msg);
148         }
149 
150         let packet = &*(token.zero.0 as *const Packet<T>);
151         packet.msg.get().write(Some(msg));
152         packet.ready.store(true, Ordering::Release);
153         Ok(())
154     }
155 
156     /// Attempts to pair up with a sender.
start_recv(&self, token: &mut Token) -> bool157     fn start_recv(&self, token: &mut Token) -> bool {
158         let mut inner = self.inner.lock();
159 
160         // If there's a waiting sender, pair up with it.
161         if let Some(operation) = inner.senders.try_select() {
162             token.zero.0 = operation.packet;
163             true
164         } else if inner.is_disconnected {
165             token.zero.0 = ptr::null_mut();
166             true
167         } else {
168             false
169         }
170     }
171 
172     /// Reads a message from the packet.
read(&self, token: &mut Token) -> Result<T, ()>173     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
174         // If there is no packet, the channel is disconnected.
175         if token.zero.0.is_null() {
176             return Err(());
177         }
178 
179         let packet = &*(token.zero.0 as *const Packet<T>);
180 
181         if packet.on_stack {
182             // The message has been in the packet from the beginning, so there is no need to wait
183             // for it. However, after reading the message, we need to set `ready` to `true` in
184             // order to signal that the packet can be destroyed.
185             let msg = packet.msg.get().replace(None).unwrap();
186             packet.ready.store(true, Ordering::Release);
187             Ok(msg)
188         } else {
189             // Wait until the message becomes available, then read it and destroy the
190             // heap-allocated packet.
191             packet.wait_ready();
192             let msg = packet.msg.get().replace(None).unwrap();
193             drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
194             Ok(msg)
195         }
196     }
197 
198     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>199     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
200         let token = &mut Token::default();
201         let mut inner = self.inner.lock();
202 
203         // If there's a waiting receiver, pair up with it.
204         if let Some(operation) = inner.receivers.try_select() {
205             token.zero.0 = operation.packet;
206             drop(inner);
207             unsafe {
208                 self.write(token, msg).ok().unwrap();
209             }
210             Ok(())
211         } else if inner.is_disconnected {
212             Err(TrySendError::Disconnected(msg))
213         } else {
214             Err(TrySendError::Full(msg))
215         }
216     }
217 
218     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>219     pub(crate) fn send(
220         &self,
221         msg: T,
222         deadline: Option<Instant>,
223     ) -> Result<(), SendTimeoutError<T>> {
224         let token = &mut Token::default();
225         let mut inner = self.inner.lock();
226 
227         // If there's a waiting receiver, pair up with it.
228         if let Some(operation) = inner.receivers.try_select() {
229             token.zero.0 = operation.packet;
230             drop(inner);
231             unsafe {
232                 self.write(token, msg).ok().unwrap();
233             }
234             return Ok(());
235         }
236 
237         if inner.is_disconnected {
238             return Err(SendTimeoutError::Disconnected(msg));
239         }
240 
241         Context::with(|cx| {
242             // Prepare for blocking until a receiver wakes us up.
243             let oper = Operation::hook(token);
244             let mut packet = Packet::<T>::message_on_stack(msg);
245             inner
246                 .senders
247                 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
248             inner.receivers.notify();
249             drop(inner);
250 
251             // Block the current thread.
252             let sel = cx.wait_until(deadline);
253 
254             match sel {
255                 Selected::Waiting => unreachable!(),
256                 Selected::Aborted => {
257                     self.inner.lock().senders.unregister(oper).unwrap();
258                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
259                     Err(SendTimeoutError::Timeout(msg))
260                 }
261                 Selected::Disconnected => {
262                     self.inner.lock().senders.unregister(oper).unwrap();
263                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
264                     Err(SendTimeoutError::Disconnected(msg))
265                 }
266                 Selected::Operation(_) => {
267                     // Wait until the message is read, then drop the packet.
268                     packet.wait_ready();
269                     Ok(())
270                 }
271             }
272         })
273     }
274 
275     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>276     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
277         let token = &mut Token::default();
278         let mut inner = self.inner.lock();
279 
280         // If there's a waiting sender, pair up with it.
281         if let Some(operation) = inner.senders.try_select() {
282             token.zero.0 = operation.packet;
283             drop(inner);
284             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
285         } else if inner.is_disconnected {
286             Err(TryRecvError::Disconnected)
287         } else {
288             Err(TryRecvError::Empty)
289         }
290     }
291 
292     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>293     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
294         let token = &mut Token::default();
295         let mut inner = self.inner.lock();
296 
297         // If there's a waiting sender, pair up with it.
298         if let Some(operation) = inner.senders.try_select() {
299             token.zero.0 = operation.packet;
300             drop(inner);
301             unsafe {
302                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
303             }
304         }
305 
306         if inner.is_disconnected {
307             return Err(RecvTimeoutError::Disconnected);
308         }
309 
310         Context::with(|cx| {
311             // Prepare for blocking until a sender wakes us up.
312             let oper = Operation::hook(token);
313             let mut packet = Packet::<T>::empty_on_stack();
314             inner.receivers.register_with_packet(
315                 oper,
316                 &mut packet as *mut Packet<T> as *mut (),
317                 cx,
318             );
319             inner.senders.notify();
320             drop(inner);
321 
322             // Block the current thread.
323             let sel = cx.wait_until(deadline);
324 
325             match sel {
326                 Selected::Waiting => unreachable!(),
327                 Selected::Aborted => {
328                     self.inner.lock().receivers.unregister(oper).unwrap();
329                     Err(RecvTimeoutError::Timeout)
330                 }
331                 Selected::Disconnected => {
332                     self.inner.lock().receivers.unregister(oper).unwrap();
333                     Err(RecvTimeoutError::Disconnected)
334                 }
335                 Selected::Operation(_) => {
336                     // Wait until the message is provided, then read it.
337                     packet.wait_ready();
338                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
339                 }
340             }
341         })
342     }
343 
344     /// Disconnects the channel and wakes up all blocked senders and receivers.
345     ///
346     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool347     pub(crate) fn disconnect(&self) -> bool {
348         let mut inner = self.inner.lock();
349 
350         if !inner.is_disconnected {
351             inner.is_disconnected = true;
352             inner.senders.disconnect();
353             inner.receivers.disconnect();
354             true
355         } else {
356             false
357         }
358     }
359 
360     /// Returns the current number of messages inside the channel.
len(&self) -> usize361     pub(crate) fn len(&self) -> usize {
362         0
363     }
364 
365     /// Returns the capacity of the channel.
366     #[allow(clippy::unnecessary_wraps)] // This is intentional.
capacity(&self) -> Option<usize>367     pub(crate) fn capacity(&self) -> Option<usize> {
368         Some(0)
369     }
370 
371     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool372     pub(crate) fn is_empty(&self) -> bool {
373         true
374     }
375 
376     /// Returns `true` if the channel is full.
is_full(&self) -> bool377     pub(crate) fn is_full(&self) -> bool {
378         true
379     }
380 }
381 
382 /// Receiver handle to a channel.
383 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
384 
385 /// Sender handle to a channel.
386 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
387 
388 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool389     fn try_select(&self, token: &mut Token) -> bool {
390         self.0.start_recv(token)
391     }
392 
deadline(&self) -> Option<Instant>393     fn deadline(&self) -> Option<Instant> {
394         None
395     }
396 
register(&self, oper: Operation, cx: &Context) -> bool397     fn register(&self, oper: Operation, cx: &Context) -> bool {
398         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
399 
400         let mut inner = self.0.inner.lock();
401         inner
402             .receivers
403             .register_with_packet(oper, packet as *mut (), cx);
404         inner.senders.notify();
405         inner.senders.can_select() || inner.is_disconnected
406     }
407 
unregister(&self, oper: Operation)408     fn unregister(&self, oper: Operation) {
409         if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
410             unsafe {
411                 drop(Box::from_raw(operation.packet as *mut Packet<T>));
412             }
413         }
414     }
415 
accept(&self, token: &mut Token, cx: &Context) -> bool416     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
417         token.zero.0 = cx.wait_packet();
418         true
419     }
420 
is_ready(&self) -> bool421     fn is_ready(&self) -> bool {
422         let inner = self.0.inner.lock();
423         inner.senders.can_select() || inner.is_disconnected
424     }
425 
watch(&self, oper: Operation, cx: &Context) -> bool426     fn watch(&self, oper: Operation, cx: &Context) -> bool {
427         let mut inner = self.0.inner.lock();
428         inner.receivers.watch(oper, cx);
429         inner.senders.can_select() || inner.is_disconnected
430     }
431 
unwatch(&self, oper: Operation)432     fn unwatch(&self, oper: Operation) {
433         let mut inner = self.0.inner.lock();
434         inner.receivers.unwatch(oper);
435     }
436 }
437 
438 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool439     fn try_select(&self, token: &mut Token) -> bool {
440         self.0.start_send(token)
441     }
442 
deadline(&self) -> Option<Instant>443     fn deadline(&self) -> Option<Instant> {
444         None
445     }
446 
register(&self, oper: Operation, cx: &Context) -> bool447     fn register(&self, oper: Operation, cx: &Context) -> bool {
448         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
449 
450         let mut inner = self.0.inner.lock();
451         inner
452             .senders
453             .register_with_packet(oper, packet as *mut (), cx);
454         inner.receivers.notify();
455         inner.receivers.can_select() || inner.is_disconnected
456     }
457 
unregister(&self, oper: Operation)458     fn unregister(&self, oper: Operation) {
459         if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
460             unsafe {
461                 drop(Box::from_raw(operation.packet as *mut Packet<T>));
462             }
463         }
464     }
465 
accept(&self, token: &mut Token, cx: &Context) -> bool466     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
467         token.zero.0 = cx.wait_packet();
468         true
469     }
470 
is_ready(&self) -> bool471     fn is_ready(&self) -> bool {
472         let inner = self.0.inner.lock();
473         inner.receivers.can_select() || inner.is_disconnected
474     }
475 
watch(&self, oper: Operation, cx: &Context) -> bool476     fn watch(&self, oper: Operation, cx: &Context) -> bool {
477         let mut inner = self.0.inner.lock();
478         inner.senders.watch(oper, cx);
479         inner.receivers.can_select() || inner.is_disconnected
480     }
481 
unwatch(&self, oper: Operation)482     fn unwatch(&self, oper: Operation) {
483         let mut inner = self.0.inner.lock();
484         inner.senders.unwatch(oper);
485     }
486 }
487