• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A one-shot channel is used for sending a single message between
4 //! asynchronous tasks. The [`channel`] function is used to create a
5 //! [`Sender`] and [`Receiver`] handle pair that form the channel.
6 //!
7 //! The `Sender` handle is used by the producer to send the value.
8 //! The `Receiver` handle is used by the consumer to receive the value.
9 //!
10 //! Each handle can be used on separate tasks.
11 //!
12 //! Since the `send` method is not async, it can be used anywhere. This includes
13 //! sending between two runtimes, and using it from non-async code.
14 //!
15 //! If the [`Receiver`] is closed before receiving a message which has already
16 //! been sent, the message will remain in the channel until the receiver is
17 //! dropped, at which point the message will be dropped immediately.
18 //!
19 //! # Examples
20 //!
21 //! ```
22 //! use tokio::sync::oneshot;
23 //!
24 //! #[tokio::main]
25 //! async fn main() {
26 //!     let (tx, rx) = oneshot::channel();
27 //!
28 //!     tokio::spawn(async move {
29 //!         if let Err(_) = tx.send(3) {
30 //!             println!("the receiver dropped");
31 //!         }
32 //!     });
33 //!
34 //!     match rx.await {
35 //!         Ok(v) => println!("got = {:?}", v),
36 //!         Err(_) => println!("the sender dropped"),
37 //!     }
38 //! }
39 //! ```
40 //!
41 //! If the sender is dropped without sending, the receiver will fail with
42 //! [`error::RecvError`]:
43 //!
44 //! ```
45 //! use tokio::sync::oneshot;
46 //!
47 //! #[tokio::main]
48 //! async fn main() {
49 //!     let (tx, rx) = oneshot::channel::<u32>();
50 //!
51 //!     tokio::spawn(async move {
52 //!         drop(tx);
53 //!     });
54 //!
55 //!     match rx.await {
56 //!         Ok(_) => panic!("This doesn't happen"),
57 //!         Err(_) => println!("the sender dropped"),
58 //!     }
59 //! }
60 //! ```
61 //!
62 //! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of
63 //! the channel.
64 //!
65 //! ```
66 //! use tokio::sync::oneshot;
67 //! use tokio::time::{interval, sleep, Duration};
68 //!
69 //! #[tokio::main]
70 //! # async fn _doc() {}
71 //! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72 //! async fn main() {
73 //!     let (send, mut recv) = oneshot::channel();
74 //!     let mut interval = interval(Duration::from_millis(100));
75 //!
76 //!     # let handle =
77 //!     tokio::spawn(async move {
78 //!         sleep(Duration::from_secs(1)).await;
79 //!         send.send("shut down").unwrap();
80 //!     });
81 //!
82 //!     loop {
83 //!         tokio::select! {
84 //!             _ = interval.tick() => println!("Another 100ms"),
85 //!             msg = &mut recv => {
86 //!                 println!("Got message: {}", msg.unwrap());
87 //!                 break;
88 //!             }
89 //!         }
90 //!     }
91 //!     # handle.await.unwrap();
92 //! }
93 //! ```
94 //!
95 //! To use a `Sender` from a destructor, put it in an [`Option`] and call
96 //! [`Option::take`].
97 //!
98 //! ```
99 //! use tokio::sync::oneshot;
100 //!
101 //! struct SendOnDrop {
102 //!     sender: Option<oneshot::Sender<&'static str>>,
103 //! }
104 //! impl Drop for SendOnDrop {
105 //!     fn drop(&mut self) {
106 //!         if let Some(sender) = self.sender.take() {
107 //!             // Using `let _ =` to ignore send errors.
108 //!             let _ = sender.send("I got dropped!");
109 //!         }
110 //!     }
111 //! }
112 //!
113 //! #[tokio::main]
114 //! # async fn _doc() {}
115 //! # #[tokio::main(flavor = "current_thread")]
116 //! async fn main() {
117 //!     let (send, recv) = oneshot::channel();
118 //!
119 //!     let send_on_drop = SendOnDrop { sender: Some(send) };
120 //!     drop(send_on_drop);
121 //!
122 //!     assert_eq!(recv.await, Ok("I got dropped!"));
123 //! }
124 //! ```
125 
126 use crate::loom::cell::UnsafeCell;
127 use crate::loom::sync::atomic::AtomicUsize;
128 use crate::loom::sync::Arc;
129 #[cfg(all(tokio_unstable, feature = "tracing"))]
130 use crate::util::trace;
131 
132 use std::fmt;
133 use std::future::Future;
134 use std::mem::MaybeUninit;
135 use std::pin::Pin;
136 use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137 use std::task::Poll::{Pending, Ready};
138 use std::task::{Context, Poll, Waker};
139 
140 /// Sends a value to the associated [`Receiver`].
141 ///
142 /// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143 /// [`channel`](fn@channel) function.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use tokio::sync::oneshot;
149 ///
150 /// #[tokio::main]
151 /// async fn main() {
152 ///     let (tx, rx) = oneshot::channel();
153 ///
154 ///     tokio::spawn(async move {
155 ///         if let Err(_) = tx.send(3) {
156 ///             println!("the receiver dropped");
157 ///         }
158 ///     });
159 ///
160 ///     match rx.await {
161 ///         Ok(v) => println!("got = {:?}", v),
162 ///         Err(_) => println!("the sender dropped"),
163 ///     }
164 /// }
165 /// ```
166 ///
167 /// If the sender is dropped without sending, the receiver will fail with
168 /// [`error::RecvError`]:
169 ///
170 /// ```
171 /// use tokio::sync::oneshot;
172 ///
173 /// #[tokio::main]
174 /// async fn main() {
175 ///     let (tx, rx) = oneshot::channel::<u32>();
176 ///
177 ///     tokio::spawn(async move {
178 ///         drop(tx);
179 ///     });
180 ///
181 ///     match rx.await {
182 ///         Ok(_) => panic!("This doesn't happen"),
183 ///         Err(_) => println!("the sender dropped"),
184 ///     }
185 /// }
186 /// ```
187 ///
188 /// To use a `Sender` from a destructor, put it in an [`Option`] and call
189 /// [`Option::take`].
190 ///
191 /// ```
192 /// use tokio::sync::oneshot;
193 ///
194 /// struct SendOnDrop {
195 ///     sender: Option<oneshot::Sender<&'static str>>,
196 /// }
197 /// impl Drop for SendOnDrop {
198 ///     fn drop(&mut self) {
199 ///         if let Some(sender) = self.sender.take() {
200 ///             // Using `let _ =` to ignore send errors.
201 ///             let _ = sender.send("I got dropped!");
202 ///         }
203 ///     }
204 /// }
205 ///
206 /// #[tokio::main]
207 /// # async fn _doc() {}
208 /// # #[tokio::main(flavor = "current_thread")]
209 /// async fn main() {
210 ///     let (send, recv) = oneshot::channel();
211 ///
212 ///     let send_on_drop = SendOnDrop { sender: Some(send) };
213 ///     drop(send_on_drop);
214 ///
215 ///     assert_eq!(recv.await, Ok("I got dropped!"));
216 /// }
217 /// ```
218 ///
219 /// [`Option`]: std::option::Option
220 /// [`Option::take`]: std::option::Option::take
221 #[derive(Debug)]
222 pub struct Sender<T> {
223     inner: Option<Arc<Inner<T>>>,
224     #[cfg(all(tokio_unstable, feature = "tracing"))]
225     resource_span: tracing::Span,
226 }
227 
228 /// Receives a value from the associated [`Sender`].
229 ///
230 /// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231 /// [`channel`](fn@channel) function.
232 ///
233 /// This channel has no `recv` method because the receiver itself implements the
234 /// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235 ///
236 /// The `poll` method on the `Future` trait is allowed to spuriously return
237 /// `Poll::Pending` even if the message has been sent. If such a spurious
238 /// failure happens, then the caller will be woken when the spurious failure has
239 /// been resolved so that the caller can attempt to receive the message again.
240 /// Note that receiving such a wakeup does not guarantee that the next call will
241 /// succeed — it could fail with another spurious failure. (A spurious failure
242 /// does not mean that the message is lost. It is just delayed.)
243 ///
244 /// [`Future`]: trait@std::future::Future
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use tokio::sync::oneshot;
250 ///
251 /// #[tokio::main]
252 /// async fn main() {
253 ///     let (tx, rx) = oneshot::channel();
254 ///
255 ///     tokio::spawn(async move {
256 ///         if let Err(_) = tx.send(3) {
257 ///             println!("the receiver dropped");
258 ///         }
259 ///     });
260 ///
261 ///     match rx.await {
262 ///         Ok(v) => println!("got = {:?}", v),
263 ///         Err(_) => println!("the sender dropped"),
264 ///     }
265 /// }
266 /// ```
267 ///
268 /// If the sender is dropped without sending, the receiver will fail with
269 /// [`error::RecvError`]:
270 ///
271 /// ```
272 /// use tokio::sync::oneshot;
273 ///
274 /// #[tokio::main]
275 /// async fn main() {
276 ///     let (tx, rx) = oneshot::channel::<u32>();
277 ///
278 ///     tokio::spawn(async move {
279 ///         drop(tx);
280 ///     });
281 ///
282 ///     match rx.await {
283 ///         Ok(_) => panic!("This doesn't happen"),
284 ///         Err(_) => println!("the sender dropped"),
285 ///     }
286 /// }
287 /// ```
288 ///
289 /// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290 /// channel.
291 ///
292 /// ```
293 /// use tokio::sync::oneshot;
294 /// use tokio::time::{interval, sleep, Duration};
295 ///
296 /// #[tokio::main]
297 /// # async fn _doc() {}
298 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299 /// async fn main() {
300 ///     let (send, mut recv) = oneshot::channel();
301 ///     let mut interval = interval(Duration::from_millis(100));
302 ///
303 ///     # let handle =
304 ///     tokio::spawn(async move {
305 ///         sleep(Duration::from_secs(1)).await;
306 ///         send.send("shut down").unwrap();
307 ///     });
308 ///
309 ///     loop {
310 ///         tokio::select! {
311 ///             _ = interval.tick() => println!("Another 100ms"),
312 ///             msg = &mut recv => {
313 ///                 println!("Got message: {}", msg.unwrap());
314 ///                 break;
315 ///             }
316 ///         }
317 ///     }
318 ///     # handle.await.unwrap();
319 /// }
320 /// ```
321 #[derive(Debug)]
322 pub struct Receiver<T> {
323     inner: Option<Arc<Inner<T>>>,
324     #[cfg(all(tokio_unstable, feature = "tracing"))]
325     resource_span: tracing::Span,
326     #[cfg(all(tokio_unstable, feature = "tracing"))]
327     async_op_span: tracing::Span,
328     #[cfg(all(tokio_unstable, feature = "tracing"))]
329     async_op_poll_span: tracing::Span,
330 }
331 
332 pub mod error {
333     //! Oneshot error types.
334 
335     use std::fmt;
336 
337     /// Error returned by the `Future` implementation for `Receiver`.
338     ///
339     /// This error is returned by the receiver when the sender is dropped without sending.
340     #[derive(Debug, Eq, PartialEq, Clone)]
341     pub struct RecvError(pub(super) ());
342 
343     /// Error returned by the `try_recv` function on `Receiver`.
344     #[derive(Debug, Eq, PartialEq, Clone)]
345     pub enum TryRecvError {
346         /// The send half of the channel has not yet sent a value.
347         Empty,
348 
349         /// The send half of the channel was dropped without sending a value.
350         Closed,
351     }
352 
353     // ===== impl RecvError =====
354 
355     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result356         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357             write!(fmt, "channel closed")
358         }
359     }
360 
361     impl std::error::Error for RecvError {}
362 
363     // ===== impl TryRecvError =====
364 
365     impl fmt::Display for TryRecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result366         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367             match self {
368                 TryRecvError::Empty => write!(fmt, "channel empty"),
369                 TryRecvError::Closed => write!(fmt, "channel closed"),
370             }
371         }
372     }
373 
374     impl std::error::Error for TryRecvError {}
375 }
376 
377 use self::error::*;
378 
379 struct Inner<T> {
380     /// Manages the state of the inner cell.
381     state: AtomicUsize,
382 
383     /// The value. This is set by `Sender` and read by `Receiver`. The state of
384     /// the cell is tracked by `state`.
385     value: UnsafeCell<Option<T>>,
386 
387     /// The task to notify when the receiver drops without consuming the value.
388     ///
389     /// ## Safety
390     ///
391     /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392     /// initialized. If that bit is unset, this field may be uninitialized.
393     tx_task: Task,
394 
395     /// The task to notify when the value is sent.
396     ///
397     /// ## Safety
398     ///
399     /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400     /// initialized. If that bit is unset, this field may be uninitialized.
401     rx_task: Task,
402 }
403 
404 struct Task(UnsafeCell<MaybeUninit<Waker>>);
405 
406 impl Task {
will_wake(&self, cx: &mut Context<'_>) -> bool407     unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408         self.with_task(|w| w.will_wake(cx.waker()))
409     }
410 
with_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,411     unsafe fn with_task<F, R>(&self, f: F) -> R
412     where
413         F: FnOnce(&Waker) -> R,
414     {
415         self.0.with(|ptr| {
416             let waker: *const Waker = (*ptr).as_ptr();
417             f(&*waker)
418         })
419     }
420 
drop_task(&self)421     unsafe fn drop_task(&self) {
422         self.0.with_mut(|ptr| {
423             let ptr: *mut Waker = (*ptr).as_mut_ptr();
424             ptr.drop_in_place();
425         });
426     }
427 
set_task(&self, cx: &mut Context<'_>)428     unsafe fn set_task(&self, cx: &mut Context<'_>) {
429         self.0.with_mut(|ptr| {
430             let ptr: *mut Waker = (*ptr).as_mut_ptr();
431             ptr.write(cx.waker().clone());
432         });
433     }
434 }
435 
436 #[derive(Clone, Copy)]
437 struct State(usize);
438 
439 /// Creates a new one-shot channel for sending single values across asynchronous
440 /// tasks.
441 ///
442 /// The function returns separate "send" and "receive" handles. The `Sender`
443 /// handle is used by the producer to send the value. The `Receiver` handle is
444 /// used by the consumer to receive the value.
445 ///
446 /// Each handle can be used on separate tasks.
447 ///
448 /// # Examples
449 ///
450 /// ```
451 /// use tokio::sync::oneshot;
452 ///
453 /// #[tokio::main]
454 /// async fn main() {
455 ///     let (tx, rx) = oneshot::channel();
456 ///
457 ///     tokio::spawn(async move {
458 ///         if let Err(_) = tx.send(3) {
459 ///             println!("the receiver dropped");
460 ///         }
461 ///     });
462 ///
463 ///     match rx.await {
464 ///         Ok(v) => println!("got = {:?}", v),
465 ///         Err(_) => println!("the sender dropped"),
466 ///     }
467 /// }
468 /// ```
469 #[track_caller]
channel<T>() -> (Sender<T>, Receiver<T>)470 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471     #[cfg(all(tokio_unstable, feature = "tracing"))]
472     let resource_span = {
473         let location = std::panic::Location::caller();
474 
475         let resource_span = tracing::trace_span!(
476             "runtime.resource",
477             concrete_type = "Sender|Receiver",
478             kind = "Sync",
479             loc.file = location.file(),
480             loc.line = location.line(),
481             loc.col = location.column(),
482         );
483 
484         resource_span.in_scope(|| {
485             tracing::trace!(
486             target: "runtime::resource::state_update",
487             tx_dropped = false,
488             tx_dropped.op = "override",
489             )
490         });
491 
492         resource_span.in_scope(|| {
493             tracing::trace!(
494             target: "runtime::resource::state_update",
495             rx_dropped = false,
496             rx_dropped.op = "override",
497             )
498         });
499 
500         resource_span.in_scope(|| {
501             tracing::trace!(
502             target: "runtime::resource::state_update",
503             value_sent = false,
504             value_sent.op = "override",
505             )
506         });
507 
508         resource_span.in_scope(|| {
509             tracing::trace!(
510             target: "runtime::resource::state_update",
511             value_received = false,
512             value_received.op = "override",
513             )
514         });
515 
516         resource_span
517     };
518 
519     let inner = Arc::new(Inner {
520         state: AtomicUsize::new(State::new().as_usize()),
521         value: UnsafeCell::new(None),
522         tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
523         rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524     });
525 
526     let tx = Sender {
527         inner: Some(inner.clone()),
528         #[cfg(all(tokio_unstable, feature = "tracing"))]
529         resource_span: resource_span.clone(),
530     };
531 
532     #[cfg(all(tokio_unstable, feature = "tracing"))]
533     let async_op_span = resource_span
534         .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
535 
536     #[cfg(all(tokio_unstable, feature = "tracing"))]
537     let async_op_poll_span =
538         async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
539 
540     let rx = Receiver {
541         inner: Some(inner),
542         #[cfg(all(tokio_unstable, feature = "tracing"))]
543         resource_span,
544         #[cfg(all(tokio_unstable, feature = "tracing"))]
545         async_op_span,
546         #[cfg(all(tokio_unstable, feature = "tracing"))]
547         async_op_poll_span,
548     };
549 
550     (tx, rx)
551 }
552 
553 impl<T> Sender<T> {
554     /// Attempts to send a value on this channel, returning it back if it could
555     /// not be sent.
556     ///
557     /// This method consumes `self` as only one value may ever be sent on a oneshot
558     /// channel. It is not marked async because sending a message to an oneshot
559     /// channel never requires any form of waiting.  Because of this, the `send`
560     /// method can be used in both synchronous and asynchronous code without
561     /// problems.
562     ///
563     /// A successful send occurs when it is determined that the other end of the
564     /// channel has not hung up already. An unsuccessful send would be one where
565     /// the corresponding receiver has already been deallocated. Note that a
566     /// return value of `Err` means that the data will never be received, but
567     /// a return value of `Ok` does *not* mean that the data will be received.
568     /// It is possible for the corresponding receiver to hang up immediately
569     /// after this function returns `Ok`.
570     ///
571     /// # Examples
572     ///
573     /// Send a value to another task
574     ///
575     /// ```
576     /// use tokio::sync::oneshot;
577     ///
578     /// #[tokio::main]
579     /// async fn main() {
580     ///     let (tx, rx) = oneshot::channel();
581     ///
582     ///     tokio::spawn(async move {
583     ///         if let Err(_) = tx.send(3) {
584     ///             println!("the receiver dropped");
585     ///         }
586     ///     });
587     ///
588     ///     match rx.await {
589     ///         Ok(v) => println!("got = {:?}", v),
590     ///         Err(_) => println!("the sender dropped"),
591     ///     }
592     /// }
593     /// ```
send(mut self, t: T) -> Result<(), T>594     pub fn send(mut self, t: T) -> Result<(), T> {
595         let inner = self.inner.take().unwrap();
596 
597         inner.value.with_mut(|ptr| unsafe {
598             // SAFETY: The receiver will not access the `UnsafeCell` unless the
599             // channel has been marked as "complete" (the `VALUE_SENT` state bit
600             // is set).
601             // That bit is only set by the sender later on in this method, and
602             // calling this method consumes `self`. Therefore, if it was possible to
603             // call this method, we know that the `VALUE_SENT` bit is unset, and
604             // the receiver is not currently accessing the `UnsafeCell`.
605             *ptr = Some(t);
606         });
607 
608         if !inner.complete() {
609             unsafe {
610                 // SAFETY: The receiver will not access the `UnsafeCell` unless
611                 // the channel has been marked as "complete". Calling
612                 // `complete()` will return true if this bit is set, and false
613                 // if it is not set. Thus, if `complete()` returned false, it is
614                 // safe for us to access the value, because we know that the
615                 // receiver will not.
616                 return Err(inner.consume_value().unwrap());
617             }
618         }
619 
620         #[cfg(all(tokio_unstable, feature = "tracing"))]
621         self.resource_span.in_scope(|| {
622             tracing::trace!(
623             target: "runtime::resource::state_update",
624             value_sent = true,
625             value_sent.op = "override",
626             )
627         });
628 
629         Ok(())
630     }
631 
632     /// Waits for the associated [`Receiver`] handle to close.
633     ///
634     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
635     /// [`Receiver`] value is dropped.
636     ///
637     /// This function is useful when paired with `select!` to abort a
638     /// computation when the receiver is no longer interested in the result.
639     ///
640     /// # Return
641     ///
642     /// Returns a `Future` which must be awaited on.
643     ///
644     /// [`Receiver`]: Receiver
645     /// [`close`]: Receiver::close
646     ///
647     /// # Examples
648     ///
649     /// Basic usage
650     ///
651     /// ```
652     /// use tokio::sync::oneshot;
653     ///
654     /// #[tokio::main]
655     /// async fn main() {
656     ///     let (mut tx, rx) = oneshot::channel::<()>();
657     ///
658     ///     tokio::spawn(async move {
659     ///         drop(rx);
660     ///     });
661     ///
662     ///     tx.closed().await;
663     ///     println!("the receiver dropped");
664     /// }
665     /// ```
666     ///
667     /// Paired with select
668     ///
669     /// ```
670     /// use tokio::sync::oneshot;
671     /// use tokio::time::{self, Duration};
672     ///
673     /// async fn compute() -> String {
674     ///     // Complex computation returning a `String`
675     /// # "hello".to_string()
676     /// }
677     ///
678     /// #[tokio::main]
679     /// async fn main() {
680     ///     let (mut tx, rx) = oneshot::channel();
681     ///
682     ///     tokio::spawn(async move {
683     ///         tokio::select! {
684     ///             _ = tx.closed() => {
685     ///                 // The receiver dropped, no need to do any further work
686     ///             }
687     ///             value = compute() => {
688     ///                 // The send can fail if the channel was closed at the exact same
689     ///                 // time as when compute() finished, so just ignore the failure.
690     ///                 let _ = tx.send(value);
691     ///             }
692     ///         }
693     ///     });
694     ///
695     ///     // Wait for up to 10 seconds
696     ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
697     /// }
698     /// ```
closed(&mut self)699     pub async fn closed(&mut self) {
700         use crate::future::poll_fn;
701 
702         #[cfg(all(tokio_unstable, feature = "tracing"))]
703         let resource_span = self.resource_span.clone();
704         #[cfg(all(tokio_unstable, feature = "tracing"))]
705         let closed = trace::async_op(
706             || poll_fn(|cx| self.poll_closed(cx)),
707             resource_span,
708             "Sender::closed",
709             "poll_closed",
710             false,
711         );
712         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
713         let closed = poll_fn(|cx| self.poll_closed(cx));
714 
715         closed.await
716     }
717 
718     /// Returns `true` if the associated [`Receiver`] handle has been dropped.
719     ///
720     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
721     /// [`Receiver`] value is dropped.
722     ///
723     /// If `true` is returned, a call to `send` will always result in an error.
724     ///
725     /// [`Receiver`]: Receiver
726     /// [`close`]: Receiver::close
727     ///
728     /// # Examples
729     ///
730     /// ```
731     /// use tokio::sync::oneshot;
732     ///
733     /// #[tokio::main]
734     /// async fn main() {
735     ///     let (tx, rx) = oneshot::channel();
736     ///
737     ///     assert!(!tx.is_closed());
738     ///
739     ///     drop(rx);
740     ///
741     ///     assert!(tx.is_closed());
742     ///     assert!(tx.send("never received").is_err());
743     /// }
744     /// ```
is_closed(&self) -> bool745     pub fn is_closed(&self) -> bool {
746         let inner = self.inner.as_ref().unwrap();
747 
748         let state = State::load(&inner.state, Acquire);
749         state.is_closed()
750     }
751 
752     /// Checks whether the oneshot channel has been closed, and if not, schedules the
753     /// `Waker` in the provided `Context` to receive a notification when the channel is
754     /// closed.
755     ///
756     /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
757     /// [`Receiver`] value is dropped.
758     ///
759     /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
760     /// to the most recent call will be scheduled to receive a wakeup.
761     ///
762     /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
763     /// [`close`]: fn@crate::sync::oneshot::Receiver::close
764     ///
765     /// # Return value
766     ///
767     /// This function returns:
768     ///
769     ///  * `Poll::Pending` if the channel is still open.
770     ///  * `Poll::Ready(())` if the channel is closed.
771     ///
772     /// # Examples
773     ///
774     /// ```
775     /// use tokio::sync::oneshot;
776     ///
777     /// use futures::future::poll_fn;
778     ///
779     /// #[tokio::main]
780     /// async fn main() {
781     ///     let (mut tx, mut rx) = oneshot::channel::<()>();
782     ///
783     ///     tokio::spawn(async move {
784     ///         rx.close();
785     ///     });
786     ///
787     ///     poll_fn(|cx| tx.poll_closed(cx)).await;
788     ///
789     ///     println!("the receiver dropped");
790     /// }
791     /// ```
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>792     pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
793         ready!(crate::trace::trace_leaf(cx));
794 
795         // Keep track of task budget
796         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
797 
798         let inner = self.inner.as_ref().unwrap();
799 
800         let mut state = State::load(&inner.state, Acquire);
801 
802         if state.is_closed() {
803             coop.made_progress();
804             return Ready(());
805         }
806 
807         if state.is_tx_task_set() {
808             let will_notify = unsafe { inner.tx_task.will_wake(cx) };
809 
810             if !will_notify {
811                 state = State::unset_tx_task(&inner.state);
812 
813                 if state.is_closed() {
814                     // Set the flag again so that the waker is released in drop
815                     State::set_tx_task(&inner.state);
816                     coop.made_progress();
817                     return Ready(());
818                 } else {
819                     unsafe { inner.tx_task.drop_task() };
820                 }
821             }
822         }
823 
824         if !state.is_tx_task_set() {
825             // Attempt to set the task
826             unsafe {
827                 inner.tx_task.set_task(cx);
828             }
829 
830             // Update the state
831             state = State::set_tx_task(&inner.state);
832 
833             if state.is_closed() {
834                 coop.made_progress();
835                 return Ready(());
836             }
837         }
838 
839         Pending
840     }
841 }
842 
843 impl<T> Drop for Sender<T> {
drop(&mut self)844     fn drop(&mut self) {
845         if let Some(inner) = self.inner.as_ref() {
846             inner.complete();
847             #[cfg(all(tokio_unstable, feature = "tracing"))]
848             self.resource_span.in_scope(|| {
849                 tracing::trace!(
850                 target: "runtime::resource::state_update",
851                 tx_dropped = true,
852                 tx_dropped.op = "override",
853                 )
854             });
855         }
856     }
857 }
858 
859 impl<T> Receiver<T> {
860     /// Prevents the associated [`Sender`] handle from sending a value.
861     ///
862     /// Any `send` operation which happens after calling `close` is guaranteed
863     /// to fail. After calling `close`, [`try_recv`] should be called to
864     /// receive a value if one was sent **before** the call to `close`
865     /// completed.
866     ///
867     /// This function is useful to perform a graceful shutdown and ensure that a
868     /// value will not be sent into the channel and never received.
869     ///
870     /// `close` is no-op if a message is already received or the channel
871     /// is already closed.
872     ///
873     /// [`Sender`]: Sender
874     /// [`try_recv`]: Receiver::try_recv
875     ///
876     /// # Examples
877     ///
878     /// Prevent a value from being sent
879     ///
880     /// ```
881     /// use tokio::sync::oneshot;
882     /// use tokio::sync::oneshot::error::TryRecvError;
883     ///
884     /// #[tokio::main]
885     /// async fn main() {
886     ///     let (tx, mut rx) = oneshot::channel();
887     ///
888     ///     assert!(!tx.is_closed());
889     ///
890     ///     rx.close();
891     ///
892     ///     assert!(tx.is_closed());
893     ///     assert!(tx.send("never received").is_err());
894     ///
895     ///     match rx.try_recv() {
896     ///         Err(TryRecvError::Closed) => {}
897     ///         _ => unreachable!(),
898     ///     }
899     /// }
900     /// ```
901     ///
902     /// Receive a value sent **before** calling `close`
903     ///
904     /// ```
905     /// use tokio::sync::oneshot;
906     ///
907     /// #[tokio::main]
908     /// async fn main() {
909     ///     let (tx, mut rx) = oneshot::channel();
910     ///
911     ///     assert!(tx.send("will receive").is_ok());
912     ///
913     ///     rx.close();
914     ///
915     ///     let msg = rx.try_recv().unwrap();
916     ///     assert_eq!(msg, "will receive");
917     /// }
918     /// ```
close(&mut self)919     pub fn close(&mut self) {
920         if let Some(inner) = self.inner.as_ref() {
921             inner.close();
922             #[cfg(all(tokio_unstable, feature = "tracing"))]
923             self.resource_span.in_scope(|| {
924                 tracing::trace!(
925                 target: "runtime::resource::state_update",
926                 rx_dropped = true,
927                 rx_dropped.op = "override",
928                 )
929             });
930         }
931     }
932 
933     /// Attempts to receive a value.
934     ///
935     /// If a pending value exists in the channel, it is returned. If no value
936     /// has been sent, the current task **will not** be registered for
937     /// future notification.
938     ///
939     /// This function is useful to call from outside the context of an
940     /// asynchronous task.
941     ///
942     /// Note that unlike the `poll` method, the `try_recv` method cannot fail
943     /// spuriously. Any send or close event that happens before this call to
944     /// `try_recv` will be correctly returned to the caller.
945     ///
946     /// # Return
947     ///
948     /// - `Ok(T)` if a value is pending in the channel.
949     /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
950     /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
951     ///   a value, or if the message has already been received.
952     ///
953     /// # Examples
954     ///
955     /// `try_recv` before a value is sent, then after.
956     ///
957     /// ```
958     /// use tokio::sync::oneshot;
959     /// use tokio::sync::oneshot::error::TryRecvError;
960     ///
961     /// #[tokio::main]
962     /// async fn main() {
963     ///     let (tx, mut rx) = oneshot::channel();
964     ///
965     ///     match rx.try_recv() {
966     ///         // The channel is currently empty
967     ///         Err(TryRecvError::Empty) => {}
968     ///         _ => unreachable!(),
969     ///     }
970     ///
971     ///     // Send a value
972     ///     tx.send("hello").unwrap();
973     ///
974     ///     match rx.try_recv() {
975     ///         Ok(value) => assert_eq!(value, "hello"),
976     ///         _ => unreachable!(),
977     ///     }
978     /// }
979     /// ```
980     ///
981     /// `try_recv` when the sender dropped before sending a value
982     ///
983     /// ```
984     /// use tokio::sync::oneshot;
985     /// use tokio::sync::oneshot::error::TryRecvError;
986     ///
987     /// #[tokio::main]
988     /// async fn main() {
989     ///     let (tx, mut rx) = oneshot::channel::<()>();
990     ///
991     ///     drop(tx);
992     ///
993     ///     match rx.try_recv() {
994     ///         // The channel will never receive a value.
995     ///         Err(TryRecvError::Closed) => {}
996     ///         _ => unreachable!(),
997     ///     }
998     /// }
999     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>1000     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1001         let result = if let Some(inner) = self.inner.as_ref() {
1002             let state = State::load(&inner.state, Acquire);
1003 
1004             if state.is_complete() {
1005                 // SAFETY: If `state.is_complete()` returns true, then the
1006                 // `VALUE_SENT` bit has been set and the sender side of the
1007                 // channel will no longer attempt to access the inner
1008                 // `UnsafeCell`. Therefore, it is now safe for us to access the
1009                 // cell.
1010                 match unsafe { inner.consume_value() } {
1011                     Some(value) => {
1012                         #[cfg(all(tokio_unstable, feature = "tracing"))]
1013                         self.resource_span.in_scope(|| {
1014                             tracing::trace!(
1015                             target: "runtime::resource::state_update",
1016                             value_received = true,
1017                             value_received.op = "override",
1018                             )
1019                         });
1020                         Ok(value)
1021                     }
1022                     None => Err(TryRecvError::Closed),
1023                 }
1024             } else if state.is_closed() {
1025                 Err(TryRecvError::Closed)
1026             } else {
1027                 // Not ready, this does not clear `inner`
1028                 return Err(TryRecvError::Empty);
1029             }
1030         } else {
1031             Err(TryRecvError::Closed)
1032         };
1033 
1034         self.inner = None;
1035         result
1036     }
1037 
1038     /// Blocking receive to call outside of asynchronous contexts.
1039     ///
1040     /// # Panics
1041     ///
1042     /// This function panics if called within an asynchronous execution
1043     /// context.
1044     ///
1045     /// # Examples
1046     ///
1047     /// ```
1048     /// use std::thread;
1049     /// use tokio::sync::oneshot;
1050     ///
1051     /// #[tokio::main]
1052     /// async fn main() {
1053     ///     let (tx, rx) = oneshot::channel::<u8>();
1054     ///
1055     ///     let sync_code = thread::spawn(move || {
1056     ///         assert_eq!(Ok(10), rx.blocking_recv());
1057     ///     });
1058     ///
1059     ///     let _ = tx.send(10);
1060     ///     sync_code.join().unwrap();
1061     /// }
1062     /// ```
1063     #[track_caller]
1064     #[cfg(feature = "sync")]
1065     #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
blocking_recv(self) -> Result<T, RecvError>1066     pub fn blocking_recv(self) -> Result<T, RecvError> {
1067         crate::future::block_on(self)
1068     }
1069 }
1070 
1071 impl<T> Drop for Receiver<T> {
drop(&mut self)1072     fn drop(&mut self) {
1073         if let Some(inner) = self.inner.as_ref() {
1074             inner.close();
1075             #[cfg(all(tokio_unstable, feature = "tracing"))]
1076             self.resource_span.in_scope(|| {
1077                 tracing::trace!(
1078                 target: "runtime::resource::state_update",
1079                 rx_dropped = true,
1080                 rx_dropped.op = "override",
1081                 )
1082             });
1083         }
1084     }
1085 }
1086 
1087 impl<T> Future for Receiver<T> {
1088     type Output = Result<T, RecvError>;
1089 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1090     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1091         // If `inner` is `None`, then `poll()` has already completed.
1092         #[cfg(all(tokio_unstable, feature = "tracing"))]
1093         let _res_span = self.resource_span.clone().entered();
1094         #[cfg(all(tokio_unstable, feature = "tracing"))]
1095         let _ao_span = self.async_op_span.clone().entered();
1096         #[cfg(all(tokio_unstable, feature = "tracing"))]
1097         let _ao_poll_span = self.async_op_poll_span.clone().entered();
1098 
1099         let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1100             #[cfg(all(tokio_unstable, feature = "tracing"))]
1101             let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1102 
1103             #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1104             let res = ready!(inner.poll_recv(cx))?;
1105 
1106             res
1107         } else {
1108             panic!("called after complete");
1109         };
1110 
1111         self.inner = None;
1112         Ready(Ok(ret))
1113     }
1114 }
1115 
1116 impl<T> Inner<T> {
complete(&self) -> bool1117     fn complete(&self) -> bool {
1118         let prev = State::set_complete(&self.state);
1119 
1120         if prev.is_closed() {
1121             return false;
1122         }
1123 
1124         if prev.is_rx_task_set() {
1125             // TODO: Consume waker?
1126             unsafe {
1127                 self.rx_task.with_task(Waker::wake_by_ref);
1128             }
1129         }
1130 
1131         true
1132     }
1133 
poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1134     fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1135         ready!(crate::trace::trace_leaf(cx));
1136         // Keep track of task budget
1137         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
1138 
1139         // Load the state
1140         let mut state = State::load(&self.state, Acquire);
1141 
1142         if state.is_complete() {
1143             coop.made_progress();
1144             match unsafe { self.consume_value() } {
1145                 Some(value) => Ready(Ok(value)),
1146                 None => Ready(Err(RecvError(()))),
1147             }
1148         } else if state.is_closed() {
1149             coop.made_progress();
1150             Ready(Err(RecvError(())))
1151         } else {
1152             if state.is_rx_task_set() {
1153                 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1154 
1155                 // Check if the task is still the same
1156                 if !will_notify {
1157                     // Unset the task
1158                     state = State::unset_rx_task(&self.state);
1159                     if state.is_complete() {
1160                         // Set the flag again so that the waker is released in drop
1161                         State::set_rx_task(&self.state);
1162 
1163                         coop.made_progress();
1164                         // SAFETY: If `state.is_complete()` returns true, then the
1165                         // `VALUE_SENT` bit has been set and the sender side of the
1166                         // channel will no longer attempt to access the inner
1167                         // `UnsafeCell`. Therefore, it is now safe for us to access the
1168                         // cell.
1169                         return match unsafe { self.consume_value() } {
1170                             Some(value) => Ready(Ok(value)),
1171                             None => Ready(Err(RecvError(()))),
1172                         };
1173                     } else {
1174                         unsafe { self.rx_task.drop_task() };
1175                     }
1176                 }
1177             }
1178 
1179             if !state.is_rx_task_set() {
1180                 // Attempt to set the task
1181                 unsafe {
1182                     self.rx_task.set_task(cx);
1183                 }
1184 
1185                 // Update the state
1186                 state = State::set_rx_task(&self.state);
1187 
1188                 if state.is_complete() {
1189                     coop.made_progress();
1190                     match unsafe { self.consume_value() } {
1191                         Some(value) => Ready(Ok(value)),
1192                         None => Ready(Err(RecvError(()))),
1193                     }
1194                 } else {
1195                     Pending
1196                 }
1197             } else {
1198                 Pending
1199             }
1200         }
1201     }
1202 
1203     /// Called by `Receiver` to indicate that the value will never be received.
close(&self)1204     fn close(&self) {
1205         let prev = State::set_closed(&self.state);
1206 
1207         if prev.is_tx_task_set() && !prev.is_complete() {
1208             unsafe {
1209                 self.tx_task.with_task(Waker::wake_by_ref);
1210             }
1211         }
1212     }
1213 
1214     /// Consumes the value. This function does not check `state`.
1215     ///
1216     /// # Safety
1217     ///
1218     /// Calling this method concurrently on multiple threads will result in a
1219     /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1220     /// sender *or* the receiver will call this method at a given point in time.
1221     /// If `VALUE_SENT` is not set, then only the sender may call this method;
1222     /// if it is set, then only the receiver may call this method.
consume_value(&self) -> Option<T>1223     unsafe fn consume_value(&self) -> Option<T> {
1224         self.value.with_mut(|ptr| (*ptr).take())
1225     }
1226 }
1227 
1228 unsafe impl<T: Send> Send for Inner<T> {}
1229 unsafe impl<T: Send> Sync for Inner<T> {}
1230 
mut_load(this: &mut AtomicUsize) -> usize1231 fn mut_load(this: &mut AtomicUsize) -> usize {
1232     this.with_mut(|v| *v)
1233 }
1234 
1235 impl<T> Drop for Inner<T> {
drop(&mut self)1236     fn drop(&mut self) {
1237         let state = State(mut_load(&mut self.state));
1238 
1239         if state.is_rx_task_set() {
1240             unsafe {
1241                 self.rx_task.drop_task();
1242             }
1243         }
1244 
1245         if state.is_tx_task_set() {
1246             unsafe {
1247                 self.tx_task.drop_task();
1248             }
1249         }
1250     }
1251 }
1252 
1253 impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1254     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1255         use std::sync::atomic::Ordering::Relaxed;
1256 
1257         fmt.debug_struct("Inner")
1258             .field("state", &State::load(&self.state, Relaxed))
1259             .finish()
1260     }
1261 }
1262 
1263 /// Indicates that a waker for the receiving task has been set.
1264 ///
1265 /// # Safety
1266 ///
1267 /// If this bit is not set, the `rx_task` field may be uninitialized.
1268 const RX_TASK_SET: usize = 0b00001;
1269 /// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1270 ///
1271 /// # Safety
1272 ///
1273 /// This bit controls which side of the channel is permitted to access the
1274 /// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1275 /// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1276 /// the sender.
1277 const VALUE_SENT: usize = 0b00010;
1278 const CLOSED: usize = 0b00100;
1279 
1280 /// Indicates that a waker for the sending task has been set.
1281 ///
1282 /// # Safety
1283 ///
1284 /// If this bit is not set, the `tx_task` field may be uninitialized.
1285 const TX_TASK_SET: usize = 0b01000;
1286 
1287 impl State {
new() -> State1288     fn new() -> State {
1289         State(0)
1290     }
1291 
is_complete(self) -> bool1292     fn is_complete(self) -> bool {
1293         self.0 & VALUE_SENT == VALUE_SENT
1294     }
1295 
set_complete(cell: &AtomicUsize) -> State1296     fn set_complete(cell: &AtomicUsize) -> State {
1297         // This method is a compare-and-swap loop rather than a fetch-or like
1298         // other `set_$WHATEVER` methods on `State`. This is because we must
1299         // check if the state has been closed before setting the `VALUE_SENT`
1300         // bit.
1301         //
1302         // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1303         // bit is already set, because `VALUE_SENT` will tell the receiver that
1304         // it's okay to access the inner `UnsafeCell`. Immediately after calling
1305         // `set_complete`, if the channel was closed, the sender will _also_
1306         // access the `UnsafeCell` to take the value back out, so if a
1307         // `poll_recv` or `try_recv` call is occurring concurrently, both
1308         // threads may try to access the `UnsafeCell` if we were to set the
1309         // `VALUE_SENT` bit on a closed channel.
1310         let mut state = cell.load(Ordering::Relaxed);
1311         loop {
1312             if State(state).is_closed() {
1313                 break;
1314             }
1315             // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1316             // the `RX_TASK_SET` flag is set. However, `loom` does not support
1317             // fences yet.
1318             match cell.compare_exchange_weak(
1319                 state,
1320                 state | VALUE_SENT,
1321                 Ordering::AcqRel,
1322                 Ordering::Acquire,
1323             ) {
1324                 Ok(_) => break,
1325                 Err(actual) => state = actual,
1326             }
1327         }
1328         State(state)
1329     }
1330 
is_rx_task_set(self) -> bool1331     fn is_rx_task_set(self) -> bool {
1332         self.0 & RX_TASK_SET == RX_TASK_SET
1333     }
1334 
set_rx_task(cell: &AtomicUsize) -> State1335     fn set_rx_task(cell: &AtomicUsize) -> State {
1336         let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1337         State(val | RX_TASK_SET)
1338     }
1339 
unset_rx_task(cell: &AtomicUsize) -> State1340     fn unset_rx_task(cell: &AtomicUsize) -> State {
1341         let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1342         State(val & !RX_TASK_SET)
1343     }
1344 
is_closed(self) -> bool1345     fn is_closed(self) -> bool {
1346         self.0 & CLOSED == CLOSED
1347     }
1348 
set_closed(cell: &AtomicUsize) -> State1349     fn set_closed(cell: &AtomicUsize) -> State {
1350         // Acquire because we want all later writes (attempting to poll) to be
1351         // ordered after this.
1352         let val = cell.fetch_or(CLOSED, Acquire);
1353         State(val)
1354     }
1355 
set_tx_task(cell: &AtomicUsize) -> State1356     fn set_tx_task(cell: &AtomicUsize) -> State {
1357         let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1358         State(val | TX_TASK_SET)
1359     }
1360 
unset_tx_task(cell: &AtomicUsize) -> State1361     fn unset_tx_task(cell: &AtomicUsize) -> State {
1362         let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1363         State(val & !TX_TASK_SET)
1364     }
1365 
is_tx_task_set(self) -> bool1366     fn is_tx_task_set(self) -> bool {
1367         self.0 & TX_TASK_SET == TX_TASK_SET
1368     }
1369 
as_usize(self) -> usize1370     fn as_usize(self) -> usize {
1371         self.0
1372     }
1373 
load(cell: &AtomicUsize, order: Ordering) -> State1374     fn load(cell: &AtomicUsize, order: Ordering) -> State {
1375         let val = cell.load(order);
1376         State(val)
1377     }
1378 }
1379 
1380 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1381     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1382         fmt.debug_struct("State")
1383             .field("is_complete", &self.is_complete())
1384             .field("is_closed", &self.is_closed())
1385             .field("is_rx_task_set", &self.is_rx_task_set())
1386             .field("is_tx_task_set", &self.is_tx_task_set())
1387             .finish()
1388     }
1389 }
1390