1 //! A channel for sending a single message between asynchronous tasks.
2 //!
3 //! This is a single-producer, single-consumer channel.
4
5 use alloc::sync::Arc;
6 use core::fmt;
7 use core::pin::Pin;
8 use core::sync::atomic::AtomicBool;
9 use core::sync::atomic::Ordering::SeqCst;
10 use futures_core::future::{Future, FusedFuture};
11 use futures_core::task::{Context, Poll, Waker};
12
13 use crate::lock::Lock;
14
15 /// A future for a value that will be provided by another asynchronous task.
16 ///
17 /// This is created by the [`channel`](channel) function.
18 #[must_use = "futures do nothing unless you `.await` or poll them"]
19 #[derive(Debug)]
20 pub struct Receiver<T> {
21 inner: Arc<Inner<T>>,
22 }
23
24 /// A means of transmitting a single value to another task.
25 ///
26 /// This is created by the [`channel`](channel) function.
27 #[derive(Debug)]
28 pub struct Sender<T> {
29 inner: Arc<Inner<T>>,
30 }
31
32 // The channels do not ever project Pin to the inner T
33 impl<T> Unpin for Receiver<T> {}
34 impl<T> Unpin for Sender<T> {}
35
36 /// Internal state of the `Receiver`/`Sender` pair above. This is all used as
37 /// the internal synchronization between the two for send/recv operations.
38 #[derive(Debug)]
39 struct Inner<T> {
40 /// Indicates whether this oneshot is complete yet. This is filled in both
41 /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
42 /// appropriately.
43 ///
44 /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
45 /// unlocked and ready to be inspected.
46 ///
47 /// For `Sender` if this is `true` then the oneshot has gone away and it
48 /// can return ready from `poll_canceled`.
49 complete: AtomicBool,
50
51 /// The actual data being transferred as part of this `Receiver`. This is
52 /// filled in by `Sender::complete` and read by `Receiver::poll`.
53 ///
54 /// Note that this is protected by `Lock`, but it is in theory safe to
55 /// replace with an `UnsafeCell` as it's actually protected by `complete`
56 /// above. I wouldn't recommend doing this, however, unless someone is
57 /// supremely confident in the various atomic orderings here and there.
58 data: Lock<Option<T>>,
59
60 /// Field to store the task which is blocked in `Receiver::poll`.
61 ///
62 /// This is filled in when a oneshot is polled but not ready yet. Note that
63 /// the `Lock` here, unlike in `data` above, is important to resolve races.
64 /// Both the `Receiver` and the `Sender` halves understand that if they
65 /// can't acquire the lock then some important interference is happening.
66 rx_task: Lock<Option<Waker>>,
67
68 /// Like `rx_task` above, except for the task blocked in
69 /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
70 tx_task: Lock<Option<Waker>>,
71 }
72
73 /// Creates a new one-shot channel for sending a single value across asynchronous tasks.
74 ///
75 /// The channel works for a spsc (single-producer, single-consumer) scheme.
76 ///
77 /// This function is similar to Rust's channel constructor found in the standard
78 /// library. Two halves are returned, the first of which is a `Sender` handle,
79 /// used to signal the end of a computation and provide its value. The second
80 /// half is a `Receiver` which implements the `Future` trait, resolving to the
81 /// value that was given to the `Sender` handle.
82 ///
83 /// Each half can be separately owned and sent across tasks.
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// use futures::channel::oneshot;
89 /// use std::{thread, time::Duration};
90 ///
91 /// let (sender, receiver) = oneshot::channel::<i32>();
92 ///
93 /// thread::spawn(|| {
94 /// println!("THREAD: sleeping zzz...");
95 /// thread::sleep(Duration::from_millis(1000));
96 /// println!("THREAD: i'm awake! sending.");
97 /// sender.send(3).unwrap();
98 /// });
99 ///
100 /// println!("MAIN: doing some useful stuff");
101 ///
102 /// futures::executor::block_on(async {
103 /// println!("MAIN: waiting for msg...");
104 /// println!("MAIN: got: {:?}", receiver.await)
105 /// });
106 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)107 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
108 let inner = Arc::new(Inner::new());
109 let receiver = Receiver {
110 inner: inner.clone(),
111 };
112 let sender = Sender {
113 inner,
114 };
115 (sender, receiver)
116 }
117
118 impl<T> Inner<T> {
new() -> Self119 fn new() -> Self {
120 Self {
121 complete: AtomicBool::new(false),
122 data: Lock::new(None),
123 rx_task: Lock::new(None),
124 tx_task: Lock::new(None),
125 }
126 }
127
send(&self, t: T) -> Result<(), T>128 fn send(&self, t: T) -> Result<(), T> {
129 if self.complete.load(SeqCst) {
130 return Err(t)
131 }
132
133 // Note that this lock acquisition may fail if the receiver
134 // is closed and sets the `complete` flag to `true`, whereupon
135 // the receiver may call `poll()`.
136 if let Some(mut slot) = self.data.try_lock() {
137 assert!(slot.is_none());
138 *slot = Some(t);
139 drop(slot);
140
141 // If the receiver called `close()` between the check at the
142 // start of the function, and the lock being released, then
143 // the receiver may not be around to receive it, so try to
144 // pull it back out.
145 if self.complete.load(SeqCst) {
146 // If lock acquisition fails, then receiver is actually
147 // receiving it, so we're good.
148 if let Some(mut slot) = self.data.try_lock() {
149 if let Some(t) = slot.take() {
150 return Err(t);
151 }
152 }
153 }
154 Ok(())
155 } else {
156 // Must have been closed
157 Err(t)
158 }
159 }
160
poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()>161 fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
162 // Fast path up first, just read the flag and see if our other half is
163 // gone. This flag is set both in our destructor and the oneshot
164 // destructor, but our destructor hasn't run yet so if it's set then the
165 // oneshot is gone.
166 if self.complete.load(SeqCst) {
167 return Poll::Ready(())
168 }
169
170 // If our other half is not gone then we need to park our current task
171 // and move it into the `tx_task` slot to get notified when it's
172 // actually gone.
173 //
174 // If `try_lock` fails, then the `Receiver` is in the process of using
175 // it, so we can deduce that it's now in the process of going away and
176 // hence we're canceled. If it succeeds then we just store our handle.
177 //
178 // Crucially we then check `complete` *again* before we return.
179 // While we were storing our handle inside `tx_task` the
180 // `Receiver` may have been dropped. The first thing it does is set the
181 // flag, and if it fails to acquire the lock it assumes that we'll see
182 // the flag later on. So... we then try to see the flag later on!
183 let handle = cx.waker().clone();
184 match self.tx_task.try_lock() {
185 Some(mut p) => *p = Some(handle),
186 None => return Poll::Ready(()),
187 }
188 if self.complete.load(SeqCst) {
189 Poll::Ready(())
190 } else {
191 Poll::Pending
192 }
193 }
194
is_canceled(&self) -> bool195 fn is_canceled(&self) -> bool {
196 self.complete.load(SeqCst)
197 }
198
drop_tx(&self)199 fn drop_tx(&self) {
200 // Flag that we're a completed `Sender` and try to wake up a receiver.
201 // Whether or not we actually stored any data will get picked up and
202 // translated to either an item or cancellation.
203 //
204 // Note that if we fail to acquire the `rx_task` lock then that means
205 // we're in one of two situations:
206 //
207 // 1. The receiver is trying to block in `poll`
208 // 2. The receiver is being dropped
209 //
210 // In the first case it'll check the `complete` flag after it's done
211 // blocking to see if it succeeded. In the latter case we don't need to
212 // wake up anyone anyway. So in both cases it's ok to ignore the `None`
213 // case of `try_lock` and bail out.
214 //
215 // The first case crucially depends on `Lock` using `SeqCst` ordering
216 // under the hood. If it instead used `Release` / `Acquire` ordering,
217 // then it would not necessarily synchronize with `inner.complete`
218 // and deadlock might be possible, as was observed in
219 // https://github.com/rust-lang/futures-rs/pull/219.
220 self.complete.store(true, SeqCst);
221
222 if let Some(mut slot) = self.rx_task.try_lock() {
223 if let Some(task) = slot.take() {
224 drop(slot);
225 task.wake();
226 }
227 }
228
229 // If we registered a task for cancel notification drop it to reduce
230 // spurious wakeups
231 if let Some(mut slot) = self.tx_task.try_lock() {
232 drop(slot.take());
233 }
234 }
235
close_rx(&self)236 fn close_rx(&self) {
237 // Flag our completion and then attempt to wake up the sender if it's
238 // blocked. See comments in `drop` below for more info
239 self.complete.store(true, SeqCst);
240 if let Some(mut handle) = self.tx_task.try_lock() {
241 if let Some(task) = handle.take() {
242 drop(handle);
243 task.wake()
244 }
245 }
246 }
247
try_recv(&self) -> Result<Option<T>, Canceled>248 fn try_recv(&self) -> Result<Option<T>, Canceled> {
249 // If we're complete, either `::close_rx` or `::drop_tx` was called.
250 // We can assume a successful send if data is present.
251 if self.complete.load(SeqCst) {
252 if let Some(mut slot) = self.data.try_lock() {
253 if let Some(data) = slot.take() {
254 return Ok(Some(data));
255 }
256 }
257 Err(Canceled)
258 } else {
259 Ok(None)
260 }
261 }
262
recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>>263 fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
264 // Check to see if some data has arrived. If it hasn't then we need to
265 // block our task.
266 //
267 // Note that the acquisition of the `rx_task` lock might fail below, but
268 // the only situation where this can happen is during `Sender::drop`
269 // when we are indeed completed already. If that's happening then we
270 // know we're completed so keep going.
271 let done = if self.complete.load(SeqCst) {
272 true
273 } else {
274 let task = cx.waker().clone();
275 match self.rx_task.try_lock() {
276 Some(mut slot) => { *slot = Some(task); false },
277 None => true,
278 }
279 };
280
281 // If we're `done` via one of the paths above, then look at the data and
282 // figure out what the answer is. If, however, we stored `rx_task`
283 // successfully above we need to check again if we're completed in case
284 // a message was sent while `rx_task` was locked and couldn't notify us
285 // otherwise.
286 //
287 // If we're not done, and we're not complete, though, then we've
288 // successfully blocked our task and we return `Pending`.
289 if done || self.complete.load(SeqCst) {
290 // If taking the lock fails, the sender will realise that the we're
291 // `done` when it checks the `complete` flag on the way out, and
292 // will treat the send as a failure.
293 if let Some(mut slot) = self.data.try_lock() {
294 if let Some(data) = slot.take() {
295 return Poll::Ready(Ok(data));
296 }
297 }
298 Poll::Ready(Err(Canceled))
299 } else {
300 Poll::Pending
301 }
302 }
303
drop_rx(&self)304 fn drop_rx(&self) {
305 // Indicate to the `Sender` that we're done, so any future calls to
306 // `poll_canceled` are weeded out.
307 self.complete.store(true, SeqCst);
308
309 // If we've blocked a task then there's no need for it to stick around,
310 // so we need to drop it. If this lock acquisition fails, though, then
311 // it's just because our `Sender` is trying to take the task, so we
312 // let them take care of that.
313 if let Some(mut slot) = self.rx_task.try_lock() {
314 let task = slot.take();
315 drop(slot);
316 drop(task);
317 }
318
319 // Finally, if our `Sender` wants to get notified of us going away, it
320 // would have stored something in `tx_task`. Here we try to peel that
321 // out and unpark it.
322 //
323 // Note that the `try_lock` here may fail, but only if the `Sender` is
324 // in the process of filling in the task. If that happens then we
325 // already flagged `complete` and they'll pick that up above.
326 if let Some(mut handle) = self.tx_task.try_lock() {
327 if let Some(task) = handle.take() {
328 drop(handle);
329 task.wake()
330 }
331 }
332 }
333 }
334
335 impl<T> Sender<T> {
336 /// Completes this oneshot with a successful result.
337 ///
338 /// This function will consume `self` and indicate to the other end, the
339 /// [`Receiver`](Receiver), that the value provided is the result of the
340 /// computation this represents.
341 ///
342 /// If the value is successfully enqueued for the remote end to receive,
343 /// then `Ok(())` is returned. If the receiving end was dropped before
344 /// this function was called, however, then `Err(t)` is returned.
send(self, t: T) -> Result<(), T>345 pub fn send(self, t: T) -> Result<(), T> {
346 self.inner.send(t)
347 }
348
349 /// Polls this `Sender` half to detect whether its associated
350 /// [`Receiver`](Receiver) has been dropped.
351 ///
352 /// # Return values
353 ///
354 /// If `Ready(())` is returned then the associated `Receiver` has been
355 /// dropped, which means any work required for sending should be canceled.
356 ///
357 /// If `Pending` is returned then the associated `Receiver` is still
358 /// alive and may be able to receive a message if sent. The current task,
359 /// however, is scheduled to receive a notification if the corresponding
360 /// `Receiver` goes away.
poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()>361 pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
362 self.inner.poll_canceled(cx)
363 }
364
365 /// Creates a future that resolves when this `Sender`'s corresponding
366 /// [`Receiver`](Receiver) half has hung up.
367 ///
368 /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
369 /// to expose a [`Future`](core::future::Future).
cancellation(&mut self) -> Cancellation<'_, T>370 pub fn cancellation(&mut self) -> Cancellation<'_, T> {
371 Cancellation { inner: self }
372 }
373
374 /// Tests to see whether this `Sender`'s corresponding `Receiver`
375 /// has been dropped.
376 ///
377 /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
378 /// enqueue a task for wakeup upon cancellation, but merely reports the
379 /// current state, which may be subject to concurrent modification.
is_canceled(&self) -> bool380 pub fn is_canceled(&self) -> bool {
381 self.inner.is_canceled()
382 }
383
384 /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
385 /// they were created by the same call to `channel`.
is_connected_to(&self, receiver: &Receiver<T>) -> bool386 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
387 Arc::ptr_eq(&self.inner, &receiver.inner)
388 }
389 }
390
391 impl<T> Drop for Sender<T> {
drop(&mut self)392 fn drop(&mut self) {
393 self.inner.drop_tx()
394 }
395 }
396
397 /// A future that resolves when the receiving end of a channel has hung up.
398 ///
399 /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
400 #[must_use = "futures do nothing unless you `.await` or poll them"]
401 #[derive(Debug)]
402 pub struct Cancellation<'a, T> {
403 inner: &'a mut Sender<T>,
404 }
405
406 impl<T> Future for Cancellation<'_, T> {
407 type Output = ();
408
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>409 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
410 self.inner.poll_canceled(cx)
411 }
412 }
413
414 /// Error returned from a [`Receiver`](Receiver) when the corresponding
415 /// [`Sender`](Sender) is dropped.
416 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
417 pub struct Canceled;
418
419 impl fmt::Display for Canceled {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result420 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421 write!(f, "oneshot canceled")
422 }
423 }
424
425 #[cfg(feature = "std")]
426 impl std::error::Error for Canceled {}
427
428 impl<T> Receiver<T> {
429 /// Gracefully close this receiver, preventing any subsequent attempts to
430 /// send to it.
431 ///
432 /// Any `send` operation which happens after this method returns is
433 /// guaranteed to fail. After calling this method, you can use
434 /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
435 /// message had previously been sent.
close(&mut self)436 pub fn close(&mut self) {
437 self.inner.close_rx()
438 }
439
440 /// Attempts to receive a message outside of the context of a task.
441 ///
442 /// Does not schedule a task wakeup or have any other side effects.
443 ///
444 /// A return value of `None` must be considered immediately stale (out of
445 /// date) unless [`close`](Receiver::close) has been called first.
446 ///
447 /// Returns an error if the sender was dropped.
try_recv(&mut self) -> Result<Option<T>, Canceled>448 pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
449 self.inner.try_recv()
450 }
451 }
452
453 impl<T> Future for Receiver<T> {
454 type Output = Result<T, Canceled>;
455
poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<T, Canceled>>456 fn poll(
457 self: Pin<&mut Self>,
458 cx: &mut Context<'_>,
459 ) -> Poll<Result<T, Canceled>> {
460 self.inner.recv(cx)
461 }
462 }
463
464 impl<T> FusedFuture for Receiver<T> {
is_terminated(&self) -> bool465 fn is_terminated(&self) -> bool {
466 if self.inner.complete.load(SeqCst) {
467 if let Some(slot) = self.inner.data.try_lock() {
468 if slot.is_some() {
469 return false;
470 }
471 }
472 true
473 } else {
474 false
475 }
476 }
477 }
478
479 impl<T> Drop for Receiver<T> {
drop(&mut self)480 fn drop(&mut self) {
481 self.inner.drop_rx()
482 }
483 }
484