• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4 
5 use std::cell::UnsafeCell;
6 use std::marker::PhantomData;
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::sync::Mutex;
9 use std::time::Instant;
10 use std::{fmt, ptr};
11 
12 use crossbeam_utils::Backoff;
13 
14 use crate::context::Context;
15 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16 use crate::select::{Operation, SelectHandle, Selected, Token};
17 use crate::waker::Waker;
18 
19 /// A pointer to a packet.
20 pub(crate) struct ZeroToken(*mut ());
21 
22 impl Default for ZeroToken {
default() -> Self23     fn default() -> Self {
24         Self(ptr::null_mut())
25     }
26 }
27 
28 impl fmt::Debug for ZeroToken {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         fmt::Debug::fmt(&(self.0 as usize), f)
31     }
32 }
33 
34 /// A slot for passing one message from a sender to a receiver.
35 struct Packet<T> {
36     /// Equals `true` if the packet is allocated on the stack.
37     on_stack: bool,
38 
39     /// Equals `true` once the packet is ready for reading or writing.
40     ready: AtomicBool,
41 
42     /// The message.
43     msg: UnsafeCell<Option<T>>,
44 }
45 
46 impl<T> Packet<T> {
47     /// Creates an empty packet on the stack.
empty_on_stack() -> Packet<T>48     fn empty_on_stack() -> Packet<T> {
49         Packet {
50             on_stack: true,
51             ready: AtomicBool::new(false),
52             msg: UnsafeCell::new(None),
53         }
54     }
55 
56     /// Creates an empty packet on the heap.
empty_on_heap() -> Box<Packet<T>>57     fn empty_on_heap() -> Box<Packet<T>> {
58         Box::new(Packet {
59             on_stack: false,
60             ready: AtomicBool::new(false),
61             msg: UnsafeCell::new(None),
62         })
63     }
64 
65     /// Creates a packet on the stack, containing a message.
message_on_stack(msg: T) -> Packet<T>66     fn message_on_stack(msg: T) -> Packet<T> {
67         Packet {
68             on_stack: true,
69             ready: AtomicBool::new(false),
70             msg: UnsafeCell::new(Some(msg)),
71         }
72     }
73 
74     /// Waits until the packet becomes ready for reading or writing.
wait_ready(&self)75     fn wait_ready(&self) {
76         let backoff = Backoff::new();
77         while !self.ready.load(Ordering::Acquire) {
78             backoff.snooze();
79         }
80     }
81 }
82 
83 /// Inner representation of a zero-capacity channel.
84 struct Inner {
85     /// Senders waiting to pair up with a receive operation.
86     senders: Waker,
87 
88     /// Receivers waiting to pair up with a send operation.
89     receivers: Waker,
90 
91     /// Equals `true` when the channel is disconnected.
92     is_disconnected: bool,
93 }
94 
95 /// Zero-capacity channel.
96 pub(crate) struct Channel<T> {
97     /// Inner representation of the channel.
98     inner: Mutex<Inner>,
99 
100     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
101     _marker: PhantomData<T>,
102 }
103 
104 impl<T> Channel<T> {
105     /// Constructs a new zero-capacity channel.
new() -> Self106     pub(crate) fn new() -> Self {
107         Channel {
108             inner: Mutex::new(Inner {
109                 senders: Waker::new(),
110                 receivers: Waker::new(),
111                 is_disconnected: false,
112             }),
113             _marker: PhantomData,
114         }
115     }
116 
117     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>118     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
119         Receiver(self)
120     }
121 
122     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>123     pub(crate) fn sender(&self) -> Sender<'_, T> {
124         Sender(self)
125     }
126 
127     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool128     fn start_send(&self, token: &mut Token) -> bool {
129         let mut inner = self.inner.lock().unwrap();
130 
131         // If there's a waiting receiver, pair up with it.
132         if let Some(operation) = inner.receivers.try_select() {
133             token.zero.0 = operation.packet;
134             true
135         } else if inner.is_disconnected {
136             token.zero.0 = ptr::null_mut();
137             true
138         } else {
139             false
140         }
141     }
142 
143     /// Writes a message into the packet.
write(&self, token: &mut Token, msg: T) -> Result<(), T>144     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
145         // If there is no packet, the channel is disconnected.
146         if token.zero.0.is_null() {
147             return Err(msg);
148         }
149 
150         let packet = &*(token.zero.0 as *const Packet<T>);
151         packet.msg.get().write(Some(msg));
152         packet.ready.store(true, Ordering::Release);
153         Ok(())
154     }
155 
156     /// Attempts to pair up with a sender.
start_recv(&self, token: &mut Token) -> bool157     fn start_recv(&self, token: &mut Token) -> bool {
158         let mut inner = self.inner.lock().unwrap();
159 
160         // If there's a waiting sender, pair up with it.
161         if let Some(operation) = inner.senders.try_select() {
162             token.zero.0 = operation.packet;
163             true
164         } else if inner.is_disconnected {
165             token.zero.0 = ptr::null_mut();
166             true
167         } else {
168             false
169         }
170     }
171 
172     /// Reads a message from the packet.
read(&self, token: &mut Token) -> Result<T, ()>173     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
174         // If there is no packet, the channel is disconnected.
175         if token.zero.0.is_null() {
176             return Err(());
177         }
178 
179         let packet = &*(token.zero.0 as *const Packet<T>);
180 
181         if packet.on_stack {
182             // The message has been in the packet from the beginning, so there is no need to wait
183             // for it. However, after reading the message, we need to set `ready` to `true` in
184             // order to signal that the packet can be destroyed.
185             let msg = packet.msg.get().replace(None).unwrap();
186             packet.ready.store(true, Ordering::Release);
187             Ok(msg)
188         } else {
189             // Wait until the message becomes available, then read it and destroy the
190             // heap-allocated packet.
191             packet.wait_ready();
192             let msg = packet.msg.get().replace(None).unwrap();
193             drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
194             Ok(msg)
195         }
196     }
197 
198     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>199     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
200         let token = &mut Token::default();
201         let mut inner = self.inner.lock().unwrap();
202 
203         // If there's a waiting receiver, pair up with it.
204         if let Some(operation) = inner.receivers.try_select() {
205             token.zero.0 = operation.packet;
206             drop(inner);
207             unsafe {
208                 self.write(token, msg).ok().unwrap();
209             }
210             Ok(())
211         } else if inner.is_disconnected {
212             Err(TrySendError::Disconnected(msg))
213         } else {
214             Err(TrySendError::Full(msg))
215         }
216     }
217 
218     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>219     pub(crate) fn send(
220         &self,
221         msg: T,
222         deadline: Option<Instant>,
223     ) -> Result<(), SendTimeoutError<T>> {
224         let token = &mut Token::default();
225         let mut inner = self.inner.lock().unwrap();
226 
227         // If there's a waiting receiver, pair up with it.
228         if let Some(operation) = inner.receivers.try_select() {
229             token.zero.0 = operation.packet;
230             drop(inner);
231             unsafe {
232                 self.write(token, msg).ok().unwrap();
233             }
234             return Ok(());
235         }
236 
237         if inner.is_disconnected {
238             return Err(SendTimeoutError::Disconnected(msg));
239         }
240 
241         Context::with(|cx| {
242             // Prepare for blocking until a receiver wakes us up.
243             let oper = Operation::hook(token);
244             let mut packet = Packet::<T>::message_on_stack(msg);
245             inner
246                 .senders
247                 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
248             inner.receivers.notify();
249             drop(inner);
250 
251             // Block the current thread.
252             let sel = cx.wait_until(deadline);
253 
254             match sel {
255                 Selected::Waiting => unreachable!(),
256                 Selected::Aborted => {
257                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
258                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
259                     Err(SendTimeoutError::Timeout(msg))
260                 }
261                 Selected::Disconnected => {
262                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
263                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
264                     Err(SendTimeoutError::Disconnected(msg))
265                 }
266                 Selected::Operation(_) => {
267                     // Wait until the message is read, then drop the packet.
268                     packet.wait_ready();
269                     Ok(())
270                 }
271             }
272         })
273     }
274 
275     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>276     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
277         let token = &mut Token::default();
278         let mut inner = self.inner.lock().unwrap();
279 
280         // If there's a waiting sender, pair up with it.
281         if let Some(operation) = inner.senders.try_select() {
282             token.zero.0 = operation.packet;
283             drop(inner);
284             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
285         } else if inner.is_disconnected {
286             Err(TryRecvError::Disconnected)
287         } else {
288             Err(TryRecvError::Empty)
289         }
290     }
291 
292     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>293     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
294         let token = &mut Token::default();
295         let mut inner = self.inner.lock().unwrap();
296 
297         // If there's a waiting sender, pair up with it.
298         if let Some(operation) = inner.senders.try_select() {
299             token.zero.0 = operation.packet;
300             drop(inner);
301             unsafe {
302                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
303             }
304         }
305 
306         if inner.is_disconnected {
307             return Err(RecvTimeoutError::Disconnected);
308         }
309 
310         Context::with(|cx| {
311             // Prepare for blocking until a sender wakes us up.
312             let oper = Operation::hook(token);
313             let mut packet = Packet::<T>::empty_on_stack();
314             inner.receivers.register_with_packet(
315                 oper,
316                 &mut packet as *mut Packet<T> as *mut (),
317                 cx,
318             );
319             inner.senders.notify();
320             drop(inner);
321 
322             // Block the current thread.
323             let sel = cx.wait_until(deadline);
324 
325             match sel {
326                 Selected::Waiting => unreachable!(),
327                 Selected::Aborted => {
328                     self.inner
329                         .lock()
330                         .unwrap()
331                         .receivers
332                         .unregister(oper)
333                         .unwrap();
334                     Err(RecvTimeoutError::Timeout)
335                 }
336                 Selected::Disconnected => {
337                     self.inner
338                         .lock()
339                         .unwrap()
340                         .receivers
341                         .unregister(oper)
342                         .unwrap();
343                     Err(RecvTimeoutError::Disconnected)
344                 }
345                 Selected::Operation(_) => {
346                     // Wait until the message is provided, then read it.
347                     packet.wait_ready();
348                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
349                 }
350             }
351         })
352     }
353 
354     /// Disconnects the channel and wakes up all blocked senders and receivers.
355     ///
356     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool357     pub(crate) fn disconnect(&self) -> bool {
358         let mut inner = self.inner.lock().unwrap();
359 
360         if !inner.is_disconnected {
361             inner.is_disconnected = true;
362             inner.senders.disconnect();
363             inner.receivers.disconnect();
364             true
365         } else {
366             false
367         }
368     }
369 
370     /// Returns the current number of messages inside the channel.
len(&self) -> usize371     pub(crate) fn len(&self) -> usize {
372         0
373     }
374 
375     /// Returns the capacity of the channel.
capacity(&self) -> Option<usize>376     pub(crate) fn capacity(&self) -> Option<usize> {
377         Some(0)
378     }
379 
380     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool381     pub(crate) fn is_empty(&self) -> bool {
382         true
383     }
384 
385     /// Returns `true` if the channel is full.
is_full(&self) -> bool386     pub(crate) fn is_full(&self) -> bool {
387         true
388     }
389 }
390 
391 /// Receiver handle to a channel.
392 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
393 
394 /// Sender handle to a channel.
395 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
396 
397 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool398     fn try_select(&self, token: &mut Token) -> bool {
399         self.0.start_recv(token)
400     }
401 
deadline(&self) -> Option<Instant>402     fn deadline(&self) -> Option<Instant> {
403         None
404     }
405 
register(&self, oper: Operation, cx: &Context) -> bool406     fn register(&self, oper: Operation, cx: &Context) -> bool {
407         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
408 
409         let mut inner = self.0.inner.lock().unwrap();
410         inner
411             .receivers
412             .register_with_packet(oper, packet.cast::<()>(), cx);
413         inner.senders.notify();
414         inner.senders.can_select() || inner.is_disconnected
415     }
416 
unregister(&self, oper: Operation)417     fn unregister(&self, oper: Operation) {
418         if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
419             unsafe {
420                 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
421             }
422         }
423     }
424 
accept(&self, token: &mut Token, cx: &Context) -> bool425     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
426         token.zero.0 = cx.wait_packet();
427         true
428     }
429 
is_ready(&self) -> bool430     fn is_ready(&self) -> bool {
431         let inner = self.0.inner.lock().unwrap();
432         inner.senders.can_select() || inner.is_disconnected
433     }
434 
watch(&self, oper: Operation, cx: &Context) -> bool435     fn watch(&self, oper: Operation, cx: &Context) -> bool {
436         let mut inner = self.0.inner.lock().unwrap();
437         inner.receivers.watch(oper, cx);
438         inner.senders.can_select() || inner.is_disconnected
439     }
440 
unwatch(&self, oper: Operation)441     fn unwatch(&self, oper: Operation) {
442         let mut inner = self.0.inner.lock().unwrap();
443         inner.receivers.unwatch(oper);
444     }
445 }
446 
447 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool448     fn try_select(&self, token: &mut Token) -> bool {
449         self.0.start_send(token)
450     }
451 
deadline(&self) -> Option<Instant>452     fn deadline(&self) -> Option<Instant> {
453         None
454     }
455 
register(&self, oper: Operation, cx: &Context) -> bool456     fn register(&self, oper: Operation, cx: &Context) -> bool {
457         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
458 
459         let mut inner = self.0.inner.lock().unwrap();
460         inner
461             .senders
462             .register_with_packet(oper, packet.cast::<()>(), cx);
463         inner.receivers.notify();
464         inner.receivers.can_select() || inner.is_disconnected
465     }
466 
unregister(&self, oper: Operation)467     fn unregister(&self, oper: Operation) {
468         if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
469             unsafe {
470                 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
471             }
472         }
473     }
474 
accept(&self, token: &mut Token, cx: &Context) -> bool475     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
476         token.zero.0 = cx.wait_packet();
477         true
478     }
479 
is_ready(&self) -> bool480     fn is_ready(&self) -> bool {
481         let inner = self.0.inner.lock().unwrap();
482         inner.receivers.can_select() || inner.is_disconnected
483     }
484 
watch(&self, oper: Operation, cx: &Context) -> bool485     fn watch(&self, oper: Operation, cx: &Context) -> bool {
486         let mut inner = self.0.inner.lock().unwrap();
487         inner.senders.watch(oper, cx);
488         inner.receivers.can_select() || inner.is_disconnected
489     }
490 
unwatch(&self, oper: Operation)491     fn unwatch(&self, oper: Operation) {
492         let mut inner = self.0.inner.lock().unwrap();
493         inner.senders.unwatch(oper);
494     }
495 }
496