• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A channel for sending a single message between asynchronous tasks.
2 //!
3 //! This is a single-producer, single-consumer channel.
4 
5 use alloc::sync::Arc;
6 use core::fmt;
7 use core::pin::Pin;
8 use core::sync::atomic::AtomicBool;
9 use core::sync::atomic::Ordering::SeqCst;
10 use futures_core::future::{Future, FusedFuture};
11 use futures_core::task::{Context, Poll, Waker};
12 
13 use crate::lock::Lock;
14 
15 /// A future for a value that will be provided by another asynchronous task.
16 ///
17 /// This is created by the [`channel`](channel) function.
18 #[must_use = "futures do nothing unless you `.await` or poll them"]
19 #[derive(Debug)]
20 pub struct Receiver<T> {
21     inner: Arc<Inner<T>>,
22 }
23 
24 /// A means of transmitting a single value to another task.
25 ///
26 /// This is created by the [`channel`](channel) function.
27 #[derive(Debug)]
28 pub struct Sender<T> {
29     inner: Arc<Inner<T>>,
30 }
31 
32 // The channels do not ever project Pin to the inner T
33 impl<T> Unpin for Receiver<T> {}
34 impl<T> Unpin for Sender<T> {}
35 
36 /// Internal state of the `Receiver`/`Sender` pair above. This is all used as
37 /// the internal synchronization between the two for send/recv operations.
38 #[derive(Debug)]
39 struct Inner<T> {
40     /// Indicates whether this oneshot is complete yet. This is filled in both
41     /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
42     /// appropriately.
43     ///
44     /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
45     /// unlocked and ready to be inspected.
46     ///
47     /// For `Sender` if this is `true` then the oneshot has gone away and it
48     /// can return ready from `poll_canceled`.
49     complete: AtomicBool,
50 
51     /// The actual data being transferred as part of this `Receiver`. This is
52     /// filled in by `Sender::complete` and read by `Receiver::poll`.
53     ///
54     /// Note that this is protected by `Lock`, but it is in theory safe to
55     /// replace with an `UnsafeCell` as it's actually protected by `complete`
56     /// above. I wouldn't recommend doing this, however, unless someone is
57     /// supremely confident in the various atomic orderings here and there.
58     data: Lock<Option<T>>,
59 
60     /// Field to store the task which is blocked in `Receiver::poll`.
61     ///
62     /// This is filled in when a oneshot is polled but not ready yet. Note that
63     /// the `Lock` here, unlike in `data` above, is important to resolve races.
64     /// Both the `Receiver` and the `Sender` halves understand that if they
65     /// can't acquire the lock then some important interference is happening.
66     rx_task: Lock<Option<Waker>>,
67 
68     /// Like `rx_task` above, except for the task blocked in
69     /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
70     tx_task: Lock<Option<Waker>>,
71 }
72 
73 /// Creates a new one-shot channel for sending a single value across asynchronous tasks.
74 ///
75 /// The channel works for a spsc (single-producer, single-consumer) scheme.
76 ///
77 /// This function is similar to Rust's channel constructor found in the standard
78 /// library. Two halves are returned, the first of which is a `Sender` handle,
79 /// used to signal the end of a computation and provide its value. The second
80 /// half is a `Receiver` which implements the `Future` trait, resolving to the
81 /// value that was given to the `Sender` handle.
82 ///
83 /// Each half can be separately owned and sent across tasks.
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// use futures::channel::oneshot;
89 /// use std::{thread, time::Duration};
90 ///
91 /// let (sender, receiver) = oneshot::channel::<i32>();
92 ///
93 /// thread::spawn(|| {
94 ///     println!("THREAD: sleeping zzz...");
95 ///     thread::sleep(Duration::from_millis(1000));
96 ///     println!("THREAD: i'm awake! sending.");
97 ///     sender.send(3).unwrap();
98 /// });
99 ///
100 /// println!("MAIN: doing some useful stuff");
101 ///
102 /// futures::executor::block_on(async {
103 ///     println!("MAIN: waiting for msg...");
104 ///     println!("MAIN: got: {:?}", receiver.await)
105 /// });
106 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)107 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
108     let inner = Arc::new(Inner::new());
109     let receiver = Receiver {
110         inner: inner.clone(),
111     };
112     let sender = Sender {
113         inner,
114     };
115     (sender, receiver)
116 }
117 
118 impl<T> Inner<T> {
new() -> Self119     fn new() -> Self {
120         Self {
121             complete: AtomicBool::new(false),
122             data: Lock::new(None),
123             rx_task: Lock::new(None),
124             tx_task: Lock::new(None),
125         }
126     }
127 
send(&self, t: T) -> Result<(), T>128     fn send(&self, t: T) -> Result<(), T> {
129         if self.complete.load(SeqCst) {
130             return Err(t)
131         }
132 
133         // Note that this lock acquisition may fail if the receiver
134         // is closed and sets the `complete` flag to `true`, whereupon
135         // the receiver may call `poll()`.
136         if let Some(mut slot) = self.data.try_lock() {
137             assert!(slot.is_none());
138             *slot = Some(t);
139             drop(slot);
140 
141             // If the receiver called `close()` between the check at the
142             // start of the function, and the lock being released, then
143             // the receiver may not be around to receive it, so try to
144             // pull it back out.
145             if self.complete.load(SeqCst) {
146                 // If lock acquisition fails, then receiver is actually
147                 // receiving it, so we're good.
148                 if let Some(mut slot) = self.data.try_lock() {
149                     if let Some(t) = slot.take() {
150                         return Err(t);
151                     }
152                 }
153             }
154             Ok(())
155         } else {
156             // Must have been closed
157             Err(t)
158         }
159     }
160 
poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()>161     fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
162         // Fast path up first, just read the flag and see if our other half is
163         // gone. This flag is set both in our destructor and the oneshot
164         // destructor, but our destructor hasn't run yet so if it's set then the
165         // oneshot is gone.
166         if self.complete.load(SeqCst) {
167             return Poll::Ready(())
168         }
169 
170         // If our other half is not gone then we need to park our current task
171         // and move it into the `tx_task` slot to get notified when it's
172         // actually gone.
173         //
174         // If `try_lock` fails, then the `Receiver` is in the process of using
175         // it, so we can deduce that it's now in the process of going away and
176         // hence we're canceled. If it succeeds then we just store our handle.
177         //
178         // Crucially we then check `complete` *again* before we return.
179         // While we were storing our handle inside `tx_task` the
180         // `Receiver` may have been dropped. The first thing it does is set the
181         // flag, and if it fails to acquire the lock it assumes that we'll see
182         // the flag later on. So... we then try to see the flag later on!
183         let handle = cx.waker().clone();
184         match self.tx_task.try_lock() {
185             Some(mut p) => *p = Some(handle),
186             None => return Poll::Ready(()),
187         }
188         if self.complete.load(SeqCst) {
189             Poll::Ready(())
190         } else {
191             Poll::Pending
192         }
193     }
194 
is_canceled(&self) -> bool195     fn is_canceled(&self) -> bool {
196         self.complete.load(SeqCst)
197     }
198 
drop_tx(&self)199     fn drop_tx(&self) {
200         // Flag that we're a completed `Sender` and try to wake up a receiver.
201         // Whether or not we actually stored any data will get picked up and
202         // translated to either an item or cancellation.
203         //
204         // Note that if we fail to acquire the `rx_task` lock then that means
205         // we're in one of two situations:
206         //
207         // 1. The receiver is trying to block in `poll`
208         // 2. The receiver is being dropped
209         //
210         // In the first case it'll check the `complete` flag after it's done
211         // blocking to see if it succeeded. In the latter case we don't need to
212         // wake up anyone anyway. So in both cases it's ok to ignore the `None`
213         // case of `try_lock` and bail out.
214         //
215         // The first case crucially depends on `Lock` using `SeqCst` ordering
216         // under the hood. If it instead used `Release` / `Acquire` ordering,
217         // then it would not necessarily synchronize with `inner.complete`
218         // and deadlock might be possible, as was observed in
219         // https://github.com/rust-lang/futures-rs/pull/219.
220         self.complete.store(true, SeqCst);
221 
222         if let Some(mut slot) = self.rx_task.try_lock() {
223             if let Some(task) = slot.take() {
224                 drop(slot);
225                 task.wake();
226             }
227         }
228 
229         // If we registered a task for cancel notification drop it to reduce
230         // spurious wakeups
231         if let Some(mut slot) = self.tx_task.try_lock() {
232             drop(slot.take());
233         }
234     }
235 
close_rx(&self)236     fn close_rx(&self) {
237         // Flag our completion and then attempt to wake up the sender if it's
238         // blocked. See comments in `drop` below for more info
239         self.complete.store(true, SeqCst);
240         if let Some(mut handle) = self.tx_task.try_lock() {
241             if let Some(task) = handle.take() {
242                 drop(handle);
243                 task.wake()
244             }
245         }
246     }
247 
try_recv(&self) -> Result<Option<T>, Canceled>248     fn try_recv(&self) -> Result<Option<T>, Canceled> {
249         // If we're complete, either `::close_rx` or `::drop_tx` was called.
250         // We can assume a successful send if data is present.
251         if self.complete.load(SeqCst) {
252             if let Some(mut slot) = self.data.try_lock() {
253                 if let Some(data) = slot.take() {
254                     return Ok(Some(data));
255                 }
256             }
257             Err(Canceled)
258         } else {
259             Ok(None)
260         }
261     }
262 
recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>>263     fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
264         // Check to see if some data has arrived. If it hasn't then we need to
265         // block our task.
266         //
267         // Note that the acquisition of the `rx_task` lock might fail below, but
268         // the only situation where this can happen is during `Sender::drop`
269         // when we are indeed completed already. If that's happening then we
270         // know we're completed so keep going.
271         let done = if self.complete.load(SeqCst) {
272             true
273         } else {
274             let task = cx.waker().clone();
275             match self.rx_task.try_lock() {
276                 Some(mut slot) => { *slot = Some(task); false },
277                 None => true,
278             }
279         };
280 
281         // If we're `done` via one of the paths above, then look at the data and
282         // figure out what the answer is. If, however, we stored `rx_task`
283         // successfully above we need to check again if we're completed in case
284         // a message was sent while `rx_task` was locked and couldn't notify us
285         // otherwise.
286         //
287         // If we're not done, and we're not complete, though, then we've
288         // successfully blocked our task and we return `Pending`.
289         if done || self.complete.load(SeqCst) {
290             // If taking the lock fails, the sender will realise that the we're
291             // `done` when it checks the `complete` flag on the way out, and
292             // will treat the send as a failure.
293             if let Some(mut slot) = self.data.try_lock() {
294                 if let Some(data) = slot.take() {
295                     return Poll::Ready(Ok(data));
296                 }
297             }
298             Poll::Ready(Err(Canceled))
299         } else {
300             Poll::Pending
301         }
302     }
303 
drop_rx(&self)304     fn drop_rx(&self) {
305         // Indicate to the `Sender` that we're done, so any future calls to
306         // `poll_canceled` are weeded out.
307         self.complete.store(true, SeqCst);
308 
309         // If we've blocked a task then there's no need for it to stick around,
310         // so we need to drop it. If this lock acquisition fails, though, then
311         // it's just because our `Sender` is trying to take the task, so we
312         // let them take care of that.
313         if let Some(mut slot) = self.rx_task.try_lock() {
314             let task = slot.take();
315             drop(slot);
316             drop(task);
317         }
318 
319         // Finally, if our `Sender` wants to get notified of us going away, it
320         // would have stored something in `tx_task`. Here we try to peel that
321         // out and unpark it.
322         //
323         // Note that the `try_lock` here may fail, but only if the `Sender` is
324         // in the process of filling in the task. If that happens then we
325         // already flagged `complete` and they'll pick that up above.
326         if let Some(mut handle) = self.tx_task.try_lock() {
327             if let Some(task) = handle.take() {
328                 drop(handle);
329                 task.wake()
330             }
331         }
332     }
333 }
334 
335 impl<T> Sender<T> {
336     /// Completes this oneshot with a successful result.
337     ///
338     /// This function will consume `self` and indicate to the other end, the
339     /// [`Receiver`](Receiver), that the value provided is the result of the
340     /// computation this represents.
341     ///
342     /// If the value is successfully enqueued for the remote end to receive,
343     /// then `Ok(())` is returned. If the receiving end was dropped before
344     /// this function was called, however, then `Err(t)` is returned.
send(self, t: T) -> Result<(), T>345     pub fn send(self, t: T) -> Result<(), T> {
346         self.inner.send(t)
347     }
348 
349     /// Polls this `Sender` half to detect whether its associated
350     /// [`Receiver`](Receiver) has been dropped.
351     ///
352     /// # Return values
353     ///
354     /// If `Ready(())` is returned then the associated `Receiver` has been
355     /// dropped, which means any work required for sending should be canceled.
356     ///
357     /// If `Pending` is returned then the associated `Receiver` is still
358     /// alive and may be able to receive a message if sent. The current task,
359     /// however, is scheduled to receive a notification if the corresponding
360     /// `Receiver` goes away.
poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()>361     pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
362         self.inner.poll_canceled(cx)
363     }
364 
365     /// Creates a future that resolves when this `Sender`'s corresponding
366     /// [`Receiver`](Receiver) half has hung up.
367     ///
368     /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
369     /// to expose a [`Future`](core::future::Future).
cancellation(&mut self) -> Cancellation<'_, T>370     pub fn cancellation(&mut self) -> Cancellation<'_, T> {
371         Cancellation { inner: self }
372     }
373 
374     /// Tests to see whether this `Sender`'s corresponding `Receiver`
375     /// has been dropped.
376     ///
377     /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
378     /// enqueue a task for wakeup upon cancellation, but merely reports the
379     /// current state, which may be subject to concurrent modification.
is_canceled(&self) -> bool380     pub fn is_canceled(&self) -> bool {
381         self.inner.is_canceled()
382     }
383 
384     /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
385     /// they were created by the same call to `channel`.
is_connected_to(&self, receiver: &Receiver<T>) -> bool386     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
387         Arc::ptr_eq(&self.inner, &receiver.inner)
388     }
389 }
390 
391 impl<T> Drop for Sender<T> {
drop(&mut self)392     fn drop(&mut self) {
393         self.inner.drop_tx()
394     }
395 }
396 
397 /// A future that resolves when the receiving end of a channel has hung up.
398 ///
399 /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
400 #[must_use = "futures do nothing unless you `.await` or poll them"]
401 #[derive(Debug)]
402 pub struct Cancellation<'a, T> {
403     inner: &'a mut Sender<T>,
404 }
405 
406 impl<T> Future for Cancellation<'_, T> {
407     type Output = ();
408 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>409     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
410         self.inner.poll_canceled(cx)
411     }
412 }
413 
414 /// Error returned from a [`Receiver`](Receiver) when the corresponding
415 /// [`Sender`](Sender) is dropped.
416 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
417 pub struct Canceled;
418 
419 impl fmt::Display for Canceled {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result420     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421         write!(f, "oneshot canceled")
422     }
423 }
424 
425 #[cfg(feature = "std")]
426 impl std::error::Error for Canceled {}
427 
428 impl<T> Receiver<T> {
429     /// Gracefully close this receiver, preventing any subsequent attempts to
430     /// send to it.
431     ///
432     /// Any `send` operation which happens after this method returns is
433     /// guaranteed to fail. After calling this method, you can use
434     /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
435     /// message had previously been sent.
close(&mut self)436     pub fn close(&mut self) {
437         self.inner.close_rx()
438     }
439 
440     /// Attempts to receive a message outside of the context of a task.
441     ///
442     /// Does not schedule a task wakeup or have any other side effects.
443     ///
444     /// A return value of `None` must be considered immediately stale (out of
445     /// date) unless [`close`](Receiver::close) has been called first.
446     ///
447     /// Returns an error if the sender was dropped.
try_recv(&mut self) -> Result<Option<T>, Canceled>448     pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
449         self.inner.try_recv()
450     }
451 }
452 
453 impl<T> Future for Receiver<T> {
454     type Output = Result<T, Canceled>;
455 
poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<T, Canceled>>456     fn poll(
457         self: Pin<&mut Self>,
458         cx: &mut Context<'_>,
459     ) -> Poll<Result<T, Canceled>> {
460         self.inner.recv(cx)
461     }
462 }
463 
464 impl<T> FusedFuture for Receiver<T> {
is_terminated(&self) -> bool465     fn is_terminated(&self) -> bool {
466         if self.inner.complete.load(SeqCst) {
467             if let Some(slot) = self.inner.data.try_lock() {
468                 if slot.is_some() {
469                     return false;
470                 }
471             }
472             true
473         } else {
474             false
475         }
476     }
477 }
478 
479 impl<T> Drop for Receiver<T> {
drop(&mut self)480     fn drop(&mut self) {
481         self.inner.drop_rx()
482     }
483 }
484