1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3 //! A one-shot channel is used for sending a single message between
4 //! asynchronous tasks. The [`channel`] function is used to create a
5 //! [`Sender`] and [`Receiver`] handle pair that form the channel.
6 //!
7 //! The `Sender` handle is used by the producer to send the value.
8 //! The `Receiver` handle is used by the consumer to receive the value.
9 //!
10 //! Each handle can be used on separate tasks.
11 //!
12 //! Since the `send` method is not async, it can be used anywhere. This includes
13 //! sending between two runtimes, and using it from non-async code.
14 //!
15 //! If the [`Receiver`] is closed before receiving a message which has already
16 //! been sent, the message will remain in the channel until the receiver is
17 //! dropped, at which point the message will be dropped immediately.
18 //!
19 //! # Examples
20 //!
21 //! ```
22 //! use tokio::sync::oneshot;
23 //!
24 //! #[tokio::main]
25 //! async fn main() {
26 //! let (tx, rx) = oneshot::channel();
27 //!
28 //! tokio::spawn(async move {
29 //! if let Err(_) = tx.send(3) {
30 //! println!("the receiver dropped");
31 //! }
32 //! });
33 //!
34 //! match rx.await {
35 //! Ok(v) => println!("got = {:?}", v),
36 //! Err(_) => println!("the sender dropped"),
37 //! }
38 //! }
39 //! ```
40 //!
41 //! If the sender is dropped without sending, the receiver will fail with
42 //! [`error::RecvError`]:
43 //!
44 //! ```
45 //! use tokio::sync::oneshot;
46 //!
47 //! #[tokio::main]
48 //! async fn main() {
49 //! let (tx, rx) = oneshot::channel::<u32>();
50 //!
51 //! tokio::spawn(async move {
52 //! drop(tx);
53 //! });
54 //!
55 //! match rx.await {
56 //! Ok(_) => panic!("This doesn't happen"),
57 //! Err(_) => println!("the sender dropped"),
58 //! }
59 //! }
60 //! ```
61 //!
62 //! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of
63 //! the channel.
64 //!
65 //! ```
66 //! use tokio::sync::oneshot;
67 //! use tokio::time::{interval, sleep, Duration};
68 //!
69 //! #[tokio::main]
70 //! # async fn _doc() {}
71 //! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72 //! async fn main() {
73 //! let (send, mut recv) = oneshot::channel();
74 //! let mut interval = interval(Duration::from_millis(100));
75 //!
76 //! # let handle =
77 //! tokio::spawn(async move {
78 //! sleep(Duration::from_secs(1)).await;
79 //! send.send("shut down").unwrap();
80 //! });
81 //!
82 //! loop {
83 //! tokio::select! {
84 //! _ = interval.tick() => println!("Another 100ms"),
85 //! msg = &mut recv => {
86 //! println!("Got message: {}", msg.unwrap());
87 //! break;
88 //! }
89 //! }
90 //! }
91 //! # handle.await.unwrap();
92 //! }
93 //! ```
94 //!
95 //! To use a `Sender` from a destructor, put it in an [`Option`] and call
96 //! [`Option::take`].
97 //!
98 //! ```
99 //! use tokio::sync::oneshot;
100 //!
101 //! struct SendOnDrop {
102 //! sender: Option<oneshot::Sender<&'static str>>,
103 //! }
104 //! impl Drop for SendOnDrop {
105 //! fn drop(&mut self) {
106 //! if let Some(sender) = self.sender.take() {
107 //! // Using `let _ =` to ignore send errors.
108 //! let _ = sender.send("I got dropped!");
109 //! }
110 //! }
111 //! }
112 //!
113 //! #[tokio::main]
114 //! # async fn _doc() {}
115 //! # #[tokio::main(flavor = "current_thread")]
116 //! async fn main() {
117 //! let (send, recv) = oneshot::channel();
118 //!
119 //! let send_on_drop = SendOnDrop { sender: Some(send) };
120 //! drop(send_on_drop);
121 //!
122 //! assert_eq!(recv.await, Ok("I got dropped!"));
123 //! }
124 //! ```
125
126 use crate::loom::cell::UnsafeCell;
127 use crate::loom::sync::atomic::AtomicUsize;
128 use crate::loom::sync::Arc;
129 #[cfg(all(tokio_unstable, feature = "tracing"))]
130 use crate::util::trace;
131
132 use std::fmt;
133 use std::future::Future;
134 use std::mem::MaybeUninit;
135 use std::pin::Pin;
136 use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137 use std::task::Poll::{Pending, Ready};
138 use std::task::{Context, Poll, Waker};
139
140 /// Sends a value to the associated [`Receiver`].
141 ///
142 /// A pair of both a [`Sender`] and a [`Receiver`] are created by the
143 /// [`channel`](fn@channel) function.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use tokio::sync::oneshot;
149 ///
150 /// #[tokio::main]
151 /// async fn main() {
152 /// let (tx, rx) = oneshot::channel();
153 ///
154 /// tokio::spawn(async move {
155 /// if let Err(_) = tx.send(3) {
156 /// println!("the receiver dropped");
157 /// }
158 /// });
159 ///
160 /// match rx.await {
161 /// Ok(v) => println!("got = {:?}", v),
162 /// Err(_) => println!("the sender dropped"),
163 /// }
164 /// }
165 /// ```
166 ///
167 /// If the sender is dropped without sending, the receiver will fail with
168 /// [`error::RecvError`]:
169 ///
170 /// ```
171 /// use tokio::sync::oneshot;
172 ///
173 /// #[tokio::main]
174 /// async fn main() {
175 /// let (tx, rx) = oneshot::channel::<u32>();
176 ///
177 /// tokio::spawn(async move {
178 /// drop(tx);
179 /// });
180 ///
181 /// match rx.await {
182 /// Ok(_) => panic!("This doesn't happen"),
183 /// Err(_) => println!("the sender dropped"),
184 /// }
185 /// }
186 /// ```
187 ///
188 /// To use a `Sender` from a destructor, put it in an [`Option`] and call
189 /// [`Option::take`].
190 ///
191 /// ```
192 /// use tokio::sync::oneshot;
193 ///
194 /// struct SendOnDrop {
195 /// sender: Option<oneshot::Sender<&'static str>>,
196 /// }
197 /// impl Drop for SendOnDrop {
198 /// fn drop(&mut self) {
199 /// if let Some(sender) = self.sender.take() {
200 /// // Using `let _ =` to ignore send errors.
201 /// let _ = sender.send("I got dropped!");
202 /// }
203 /// }
204 /// }
205 ///
206 /// #[tokio::main]
207 /// # async fn _doc() {}
208 /// # #[tokio::main(flavor = "current_thread")]
209 /// async fn main() {
210 /// let (send, recv) = oneshot::channel();
211 ///
212 /// let send_on_drop = SendOnDrop { sender: Some(send) };
213 /// drop(send_on_drop);
214 ///
215 /// assert_eq!(recv.await, Ok("I got dropped!"));
216 /// }
217 /// ```
218 ///
219 /// [`Option`]: std::option::Option
220 /// [`Option::take`]: std::option::Option::take
221 #[derive(Debug)]
222 pub struct Sender<T> {
223 inner: Option<Arc<Inner<T>>>,
224 #[cfg(all(tokio_unstable, feature = "tracing"))]
225 resource_span: tracing::Span,
226 }
227
228 /// Receives a value from the associated [`Sender`].
229 ///
230 /// A pair of both a [`Sender`] and a [`Receiver`] are created by the
231 /// [`channel`](fn@channel) function.
232 ///
233 /// This channel has no `recv` method because the receiver itself implements the
234 /// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235 ///
236 /// The `poll` method on the `Future` trait is allowed to spuriously return
237 /// `Poll::Pending` even if the message has been sent. If such a spurious
238 /// failure happens, then the caller will be woken when the spurious failure has
239 /// been resolved so that the caller can attempt to receive the message again.
240 /// Note that receiving such a wakeup does not guarantee that the next call will
241 /// succeed — it could fail with another spurious failure. (A spurious failure
242 /// does not mean that the message is lost. It is just delayed.)
243 ///
244 /// [`Future`]: trait@std::future::Future
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use tokio::sync::oneshot;
250 ///
251 /// #[tokio::main]
252 /// async fn main() {
253 /// let (tx, rx) = oneshot::channel();
254 ///
255 /// tokio::spawn(async move {
256 /// if let Err(_) = tx.send(3) {
257 /// println!("the receiver dropped");
258 /// }
259 /// });
260 ///
261 /// match rx.await {
262 /// Ok(v) => println!("got = {:?}", v),
263 /// Err(_) => println!("the sender dropped"),
264 /// }
265 /// }
266 /// ```
267 ///
268 /// If the sender is dropped without sending, the receiver will fail with
269 /// [`error::RecvError`]:
270 ///
271 /// ```
272 /// use tokio::sync::oneshot;
273 ///
274 /// #[tokio::main]
275 /// async fn main() {
276 /// let (tx, rx) = oneshot::channel::<u32>();
277 ///
278 /// tokio::spawn(async move {
279 /// drop(tx);
280 /// });
281 ///
282 /// match rx.await {
283 /// Ok(_) => panic!("This doesn't happen"),
284 /// Err(_) => println!("the sender dropped"),
285 /// }
286 /// }
287 /// ```
288 ///
289 /// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290 /// channel.
291 ///
292 /// ```
293 /// use tokio::sync::oneshot;
294 /// use tokio::time::{interval, sleep, Duration};
295 ///
296 /// #[tokio::main]
297 /// # async fn _doc() {}
298 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299 /// async fn main() {
300 /// let (send, mut recv) = oneshot::channel();
301 /// let mut interval = interval(Duration::from_millis(100));
302 ///
303 /// # let handle =
304 /// tokio::spawn(async move {
305 /// sleep(Duration::from_secs(1)).await;
306 /// send.send("shut down").unwrap();
307 /// });
308 ///
309 /// loop {
310 /// tokio::select! {
311 /// _ = interval.tick() => println!("Another 100ms"),
312 /// msg = &mut recv => {
313 /// println!("Got message: {}", msg.unwrap());
314 /// break;
315 /// }
316 /// }
317 /// }
318 /// # handle.await.unwrap();
319 /// }
320 /// ```
321 #[derive(Debug)]
322 pub struct Receiver<T> {
323 inner: Option<Arc<Inner<T>>>,
324 #[cfg(all(tokio_unstable, feature = "tracing"))]
325 resource_span: tracing::Span,
326 #[cfg(all(tokio_unstable, feature = "tracing"))]
327 async_op_span: tracing::Span,
328 #[cfg(all(tokio_unstable, feature = "tracing"))]
329 async_op_poll_span: tracing::Span,
330 }
331
332 pub mod error {
333 //! Oneshot error types.
334
335 use std::fmt;
336
337 /// Error returned by the `Future` implementation for `Receiver`.
338 ///
339 /// This error is returned by the receiver when the sender is dropped without sending.
340 #[derive(Debug, Eq, PartialEq, Clone)]
341 pub struct RecvError(pub(super) ());
342
343 /// Error returned by the `try_recv` function on `Receiver`.
344 #[derive(Debug, Eq, PartialEq, Clone)]
345 pub enum TryRecvError {
346 /// The send half of the channel has not yet sent a value.
347 Empty,
348
349 /// The send half of the channel was dropped without sending a value.
350 Closed,
351 }
352
353 // ===== impl RecvError =====
354
355 impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result356 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357 write!(fmt, "channel closed")
358 }
359 }
360
361 impl std::error::Error for RecvError {}
362
363 // ===== impl TryRecvError =====
364
365 impl fmt::Display for TryRecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result366 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367 match self {
368 TryRecvError::Empty => write!(fmt, "channel empty"),
369 TryRecvError::Closed => write!(fmt, "channel closed"),
370 }
371 }
372 }
373
374 impl std::error::Error for TryRecvError {}
375 }
376
377 use self::error::*;
378
379 struct Inner<T> {
380 /// Manages the state of the inner cell.
381 state: AtomicUsize,
382
383 /// The value. This is set by `Sender` and read by `Receiver`. The state of
384 /// the cell is tracked by `state`.
385 value: UnsafeCell<Option<T>>,
386
387 /// The task to notify when the receiver drops without consuming the value.
388 ///
389 /// ## Safety
390 ///
391 /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392 /// initialized. If that bit is unset, this field may be uninitialized.
393 tx_task: Task,
394
395 /// The task to notify when the value is sent.
396 ///
397 /// ## Safety
398 ///
399 /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400 /// initialized. If that bit is unset, this field may be uninitialized.
401 rx_task: Task,
402 }
403
404 struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406 impl Task {
will_wake(&self, cx: &mut Context<'_>) -> bool407 unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408 self.with_task(|w| w.will_wake(cx.waker()))
409 }
410
with_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,411 unsafe fn with_task<F, R>(&self, f: F) -> R
412 where
413 F: FnOnce(&Waker) -> R,
414 {
415 self.0.with(|ptr| {
416 let waker: *const Waker = (*ptr).as_ptr();
417 f(&*waker)
418 })
419 }
420
drop_task(&self)421 unsafe fn drop_task(&self) {
422 self.0.with_mut(|ptr| {
423 let ptr: *mut Waker = (*ptr).as_mut_ptr();
424 ptr.drop_in_place();
425 });
426 }
427
set_task(&self, cx: &mut Context<'_>)428 unsafe fn set_task(&self, cx: &mut Context<'_>) {
429 self.0.with_mut(|ptr| {
430 let ptr: *mut Waker = (*ptr).as_mut_ptr();
431 ptr.write(cx.waker().clone());
432 });
433 }
434 }
435
436 #[derive(Clone, Copy)]
437 struct State(usize);
438
439 /// Creates a new one-shot channel for sending single values across asynchronous
440 /// tasks.
441 ///
442 /// The function returns separate "send" and "receive" handles. The `Sender`
443 /// handle is used by the producer to send the value. The `Receiver` handle is
444 /// used by the consumer to receive the value.
445 ///
446 /// Each handle can be used on separate tasks.
447 ///
448 /// # Examples
449 ///
450 /// ```
451 /// use tokio::sync::oneshot;
452 ///
453 /// #[tokio::main]
454 /// async fn main() {
455 /// let (tx, rx) = oneshot::channel();
456 ///
457 /// tokio::spawn(async move {
458 /// if let Err(_) = tx.send(3) {
459 /// println!("the receiver dropped");
460 /// }
461 /// });
462 ///
463 /// match rx.await {
464 /// Ok(v) => println!("got = {:?}", v),
465 /// Err(_) => println!("the sender dropped"),
466 /// }
467 /// }
468 /// ```
469 #[track_caller]
channel<T>() -> (Sender<T>, Receiver<T>)470 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471 #[cfg(all(tokio_unstable, feature = "tracing"))]
472 let resource_span = {
473 let location = std::panic::Location::caller();
474
475 let resource_span = tracing::trace_span!(
476 "runtime.resource",
477 concrete_type = "Sender|Receiver",
478 kind = "Sync",
479 loc.file = location.file(),
480 loc.line = location.line(),
481 loc.col = location.column(),
482 );
483
484 resource_span.in_scope(|| {
485 tracing::trace!(
486 target: "runtime::resource::state_update",
487 tx_dropped = false,
488 tx_dropped.op = "override",
489 )
490 });
491
492 resource_span.in_scope(|| {
493 tracing::trace!(
494 target: "runtime::resource::state_update",
495 rx_dropped = false,
496 rx_dropped.op = "override",
497 )
498 });
499
500 resource_span.in_scope(|| {
501 tracing::trace!(
502 target: "runtime::resource::state_update",
503 value_sent = false,
504 value_sent.op = "override",
505 )
506 });
507
508 resource_span.in_scope(|| {
509 tracing::trace!(
510 target: "runtime::resource::state_update",
511 value_received = false,
512 value_received.op = "override",
513 )
514 });
515
516 resource_span
517 };
518
519 let inner = Arc::new(Inner {
520 state: AtomicUsize::new(State::new().as_usize()),
521 value: UnsafeCell::new(None),
522 tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
523 rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524 });
525
526 let tx = Sender {
527 inner: Some(inner.clone()),
528 #[cfg(all(tokio_unstable, feature = "tracing"))]
529 resource_span: resource_span.clone(),
530 };
531
532 #[cfg(all(tokio_unstable, feature = "tracing"))]
533 let async_op_span = resource_span
534 .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
535
536 #[cfg(all(tokio_unstable, feature = "tracing"))]
537 let async_op_poll_span =
538 async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
539
540 let rx = Receiver {
541 inner: Some(inner),
542 #[cfg(all(tokio_unstable, feature = "tracing"))]
543 resource_span,
544 #[cfg(all(tokio_unstable, feature = "tracing"))]
545 async_op_span,
546 #[cfg(all(tokio_unstable, feature = "tracing"))]
547 async_op_poll_span,
548 };
549
550 (tx, rx)
551 }
552
553 impl<T> Sender<T> {
554 /// Attempts to send a value on this channel, returning it back if it could
555 /// not be sent.
556 ///
557 /// This method consumes `self` as only one value may ever be sent on a oneshot
558 /// channel. It is not marked async because sending a message to an oneshot
559 /// channel never requires any form of waiting. Because of this, the `send`
560 /// method can be used in both synchronous and asynchronous code without
561 /// problems.
562 ///
563 /// A successful send occurs when it is determined that the other end of the
564 /// channel has not hung up already. An unsuccessful send would be one where
565 /// the corresponding receiver has already been deallocated. Note that a
566 /// return value of `Err` means that the data will never be received, but
567 /// a return value of `Ok` does *not* mean that the data will be received.
568 /// It is possible for the corresponding receiver to hang up immediately
569 /// after this function returns `Ok`.
570 ///
571 /// # Examples
572 ///
573 /// Send a value to another task
574 ///
575 /// ```
576 /// use tokio::sync::oneshot;
577 ///
578 /// #[tokio::main]
579 /// async fn main() {
580 /// let (tx, rx) = oneshot::channel();
581 ///
582 /// tokio::spawn(async move {
583 /// if let Err(_) = tx.send(3) {
584 /// println!("the receiver dropped");
585 /// }
586 /// });
587 ///
588 /// match rx.await {
589 /// Ok(v) => println!("got = {:?}", v),
590 /// Err(_) => println!("the sender dropped"),
591 /// }
592 /// }
593 /// ```
send(mut self, t: T) -> Result<(), T>594 pub fn send(mut self, t: T) -> Result<(), T> {
595 let inner = self.inner.take().unwrap();
596
597 inner.value.with_mut(|ptr| unsafe {
598 // SAFETY: The receiver will not access the `UnsafeCell` unless the
599 // channel has been marked as "complete" (the `VALUE_SENT` state bit
600 // is set).
601 // That bit is only set by the sender later on in this method, and
602 // calling this method consumes `self`. Therefore, if it was possible to
603 // call this method, we know that the `VALUE_SENT` bit is unset, and
604 // the receiver is not currently accessing the `UnsafeCell`.
605 *ptr = Some(t);
606 });
607
608 if !inner.complete() {
609 unsafe {
610 // SAFETY: The receiver will not access the `UnsafeCell` unless
611 // the channel has been marked as "complete". Calling
612 // `complete()` will return true if this bit is set, and false
613 // if it is not set. Thus, if `complete()` returned false, it is
614 // safe for us to access the value, because we know that the
615 // receiver will not.
616 return Err(inner.consume_value().unwrap());
617 }
618 }
619
620 #[cfg(all(tokio_unstable, feature = "tracing"))]
621 self.resource_span.in_scope(|| {
622 tracing::trace!(
623 target: "runtime::resource::state_update",
624 value_sent = true,
625 value_sent.op = "override",
626 )
627 });
628
629 Ok(())
630 }
631
632 /// Waits for the associated [`Receiver`] handle to close.
633 ///
634 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
635 /// [`Receiver`] value is dropped.
636 ///
637 /// This function is useful when paired with `select!` to abort a
638 /// computation when the receiver is no longer interested in the result.
639 ///
640 /// # Return
641 ///
642 /// Returns a `Future` which must be awaited on.
643 ///
644 /// [`Receiver`]: Receiver
645 /// [`close`]: Receiver::close
646 ///
647 /// # Examples
648 ///
649 /// Basic usage
650 ///
651 /// ```
652 /// use tokio::sync::oneshot;
653 ///
654 /// #[tokio::main]
655 /// async fn main() {
656 /// let (mut tx, rx) = oneshot::channel::<()>();
657 ///
658 /// tokio::spawn(async move {
659 /// drop(rx);
660 /// });
661 ///
662 /// tx.closed().await;
663 /// println!("the receiver dropped");
664 /// }
665 /// ```
666 ///
667 /// Paired with select
668 ///
669 /// ```
670 /// use tokio::sync::oneshot;
671 /// use tokio::time::{self, Duration};
672 ///
673 /// async fn compute() -> String {
674 /// // Complex computation returning a `String`
675 /// # "hello".to_string()
676 /// }
677 ///
678 /// #[tokio::main]
679 /// async fn main() {
680 /// let (mut tx, rx) = oneshot::channel();
681 ///
682 /// tokio::spawn(async move {
683 /// tokio::select! {
684 /// _ = tx.closed() => {
685 /// // The receiver dropped, no need to do any further work
686 /// }
687 /// value = compute() => {
688 /// // The send can fail if the channel was closed at the exact same
689 /// // time as when compute() finished, so just ignore the failure.
690 /// let _ = tx.send(value);
691 /// }
692 /// }
693 /// });
694 ///
695 /// // Wait for up to 10 seconds
696 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
697 /// }
698 /// ```
closed(&mut self)699 pub async fn closed(&mut self) {
700 use crate::future::poll_fn;
701
702 #[cfg(all(tokio_unstable, feature = "tracing"))]
703 let resource_span = self.resource_span.clone();
704 #[cfg(all(tokio_unstable, feature = "tracing"))]
705 let closed = trace::async_op(
706 || poll_fn(|cx| self.poll_closed(cx)),
707 resource_span,
708 "Sender::closed",
709 "poll_closed",
710 false,
711 );
712 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
713 let closed = poll_fn(|cx| self.poll_closed(cx));
714
715 closed.await
716 }
717
718 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
719 ///
720 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
721 /// [`Receiver`] value is dropped.
722 ///
723 /// If `true` is returned, a call to `send` will always result in an error.
724 ///
725 /// [`Receiver`]: Receiver
726 /// [`close`]: Receiver::close
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// use tokio::sync::oneshot;
732 ///
733 /// #[tokio::main]
734 /// async fn main() {
735 /// let (tx, rx) = oneshot::channel();
736 ///
737 /// assert!(!tx.is_closed());
738 ///
739 /// drop(rx);
740 ///
741 /// assert!(tx.is_closed());
742 /// assert!(tx.send("never received").is_err());
743 /// }
744 /// ```
is_closed(&self) -> bool745 pub fn is_closed(&self) -> bool {
746 let inner = self.inner.as_ref().unwrap();
747
748 let state = State::load(&inner.state, Acquire);
749 state.is_closed()
750 }
751
752 /// Checks whether the oneshot channel has been closed, and if not, schedules the
753 /// `Waker` in the provided `Context` to receive a notification when the channel is
754 /// closed.
755 ///
756 /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
757 /// [`Receiver`] value is dropped.
758 ///
759 /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
760 /// to the most recent call will be scheduled to receive a wakeup.
761 ///
762 /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
763 /// [`close`]: fn@crate::sync::oneshot::Receiver::close
764 ///
765 /// # Return value
766 ///
767 /// This function returns:
768 ///
769 /// * `Poll::Pending` if the channel is still open.
770 /// * `Poll::Ready(())` if the channel is closed.
771 ///
772 /// # Examples
773 ///
774 /// ```
775 /// use tokio::sync::oneshot;
776 ///
777 /// use futures::future::poll_fn;
778 ///
779 /// #[tokio::main]
780 /// async fn main() {
781 /// let (mut tx, mut rx) = oneshot::channel::<()>();
782 ///
783 /// tokio::spawn(async move {
784 /// rx.close();
785 /// });
786 ///
787 /// poll_fn(|cx| tx.poll_closed(cx)).await;
788 ///
789 /// println!("the receiver dropped");
790 /// }
791 /// ```
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>792 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
793 ready!(crate::trace::trace_leaf(cx));
794
795 // Keep track of task budget
796 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
797
798 let inner = self.inner.as_ref().unwrap();
799
800 let mut state = State::load(&inner.state, Acquire);
801
802 if state.is_closed() {
803 coop.made_progress();
804 return Ready(());
805 }
806
807 if state.is_tx_task_set() {
808 let will_notify = unsafe { inner.tx_task.will_wake(cx) };
809
810 if !will_notify {
811 state = State::unset_tx_task(&inner.state);
812
813 if state.is_closed() {
814 // Set the flag again so that the waker is released in drop
815 State::set_tx_task(&inner.state);
816 coop.made_progress();
817 return Ready(());
818 } else {
819 unsafe { inner.tx_task.drop_task() };
820 }
821 }
822 }
823
824 if !state.is_tx_task_set() {
825 // Attempt to set the task
826 unsafe {
827 inner.tx_task.set_task(cx);
828 }
829
830 // Update the state
831 state = State::set_tx_task(&inner.state);
832
833 if state.is_closed() {
834 coop.made_progress();
835 return Ready(());
836 }
837 }
838
839 Pending
840 }
841 }
842
843 impl<T> Drop for Sender<T> {
drop(&mut self)844 fn drop(&mut self) {
845 if let Some(inner) = self.inner.as_ref() {
846 inner.complete();
847 #[cfg(all(tokio_unstable, feature = "tracing"))]
848 self.resource_span.in_scope(|| {
849 tracing::trace!(
850 target: "runtime::resource::state_update",
851 tx_dropped = true,
852 tx_dropped.op = "override",
853 )
854 });
855 }
856 }
857 }
858
859 impl<T> Receiver<T> {
860 /// Prevents the associated [`Sender`] handle from sending a value.
861 ///
862 /// Any `send` operation which happens after calling `close` is guaranteed
863 /// to fail. After calling `close`, [`try_recv`] should be called to
864 /// receive a value if one was sent **before** the call to `close`
865 /// completed.
866 ///
867 /// This function is useful to perform a graceful shutdown and ensure that a
868 /// value will not be sent into the channel and never received.
869 ///
870 /// `close` is no-op if a message is already received or the channel
871 /// is already closed.
872 ///
873 /// [`Sender`]: Sender
874 /// [`try_recv`]: Receiver::try_recv
875 ///
876 /// # Examples
877 ///
878 /// Prevent a value from being sent
879 ///
880 /// ```
881 /// use tokio::sync::oneshot;
882 /// use tokio::sync::oneshot::error::TryRecvError;
883 ///
884 /// #[tokio::main]
885 /// async fn main() {
886 /// let (tx, mut rx) = oneshot::channel();
887 ///
888 /// assert!(!tx.is_closed());
889 ///
890 /// rx.close();
891 ///
892 /// assert!(tx.is_closed());
893 /// assert!(tx.send("never received").is_err());
894 ///
895 /// match rx.try_recv() {
896 /// Err(TryRecvError::Closed) => {}
897 /// _ => unreachable!(),
898 /// }
899 /// }
900 /// ```
901 ///
902 /// Receive a value sent **before** calling `close`
903 ///
904 /// ```
905 /// use tokio::sync::oneshot;
906 ///
907 /// #[tokio::main]
908 /// async fn main() {
909 /// let (tx, mut rx) = oneshot::channel();
910 ///
911 /// assert!(tx.send("will receive").is_ok());
912 ///
913 /// rx.close();
914 ///
915 /// let msg = rx.try_recv().unwrap();
916 /// assert_eq!(msg, "will receive");
917 /// }
918 /// ```
close(&mut self)919 pub fn close(&mut self) {
920 if let Some(inner) = self.inner.as_ref() {
921 inner.close();
922 #[cfg(all(tokio_unstable, feature = "tracing"))]
923 self.resource_span.in_scope(|| {
924 tracing::trace!(
925 target: "runtime::resource::state_update",
926 rx_dropped = true,
927 rx_dropped.op = "override",
928 )
929 });
930 }
931 }
932
933 /// Attempts to receive a value.
934 ///
935 /// If a pending value exists in the channel, it is returned. If no value
936 /// has been sent, the current task **will not** be registered for
937 /// future notification.
938 ///
939 /// This function is useful to call from outside the context of an
940 /// asynchronous task.
941 ///
942 /// Note that unlike the `poll` method, the `try_recv` method cannot fail
943 /// spuriously. Any send or close event that happens before this call to
944 /// `try_recv` will be correctly returned to the caller.
945 ///
946 /// # Return
947 ///
948 /// - `Ok(T)` if a value is pending in the channel.
949 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
950 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
951 /// a value, or if the message has already been received.
952 ///
953 /// # Examples
954 ///
955 /// `try_recv` before a value is sent, then after.
956 ///
957 /// ```
958 /// use tokio::sync::oneshot;
959 /// use tokio::sync::oneshot::error::TryRecvError;
960 ///
961 /// #[tokio::main]
962 /// async fn main() {
963 /// let (tx, mut rx) = oneshot::channel();
964 ///
965 /// match rx.try_recv() {
966 /// // The channel is currently empty
967 /// Err(TryRecvError::Empty) => {}
968 /// _ => unreachable!(),
969 /// }
970 ///
971 /// // Send a value
972 /// tx.send("hello").unwrap();
973 ///
974 /// match rx.try_recv() {
975 /// Ok(value) => assert_eq!(value, "hello"),
976 /// _ => unreachable!(),
977 /// }
978 /// }
979 /// ```
980 ///
981 /// `try_recv` when the sender dropped before sending a value
982 ///
983 /// ```
984 /// use tokio::sync::oneshot;
985 /// use tokio::sync::oneshot::error::TryRecvError;
986 ///
987 /// #[tokio::main]
988 /// async fn main() {
989 /// let (tx, mut rx) = oneshot::channel::<()>();
990 ///
991 /// drop(tx);
992 ///
993 /// match rx.try_recv() {
994 /// // The channel will never receive a value.
995 /// Err(TryRecvError::Closed) => {}
996 /// _ => unreachable!(),
997 /// }
998 /// }
999 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>1000 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1001 let result = if let Some(inner) = self.inner.as_ref() {
1002 let state = State::load(&inner.state, Acquire);
1003
1004 if state.is_complete() {
1005 // SAFETY: If `state.is_complete()` returns true, then the
1006 // `VALUE_SENT` bit has been set and the sender side of the
1007 // channel will no longer attempt to access the inner
1008 // `UnsafeCell`. Therefore, it is now safe for us to access the
1009 // cell.
1010 match unsafe { inner.consume_value() } {
1011 Some(value) => {
1012 #[cfg(all(tokio_unstable, feature = "tracing"))]
1013 self.resource_span.in_scope(|| {
1014 tracing::trace!(
1015 target: "runtime::resource::state_update",
1016 value_received = true,
1017 value_received.op = "override",
1018 )
1019 });
1020 Ok(value)
1021 }
1022 None => Err(TryRecvError::Closed),
1023 }
1024 } else if state.is_closed() {
1025 Err(TryRecvError::Closed)
1026 } else {
1027 // Not ready, this does not clear `inner`
1028 return Err(TryRecvError::Empty);
1029 }
1030 } else {
1031 Err(TryRecvError::Closed)
1032 };
1033
1034 self.inner = None;
1035 result
1036 }
1037
1038 /// Blocking receive to call outside of asynchronous contexts.
1039 ///
1040 /// # Panics
1041 ///
1042 /// This function panics if called within an asynchronous execution
1043 /// context.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```
1048 /// use std::thread;
1049 /// use tokio::sync::oneshot;
1050 ///
1051 /// #[tokio::main]
1052 /// async fn main() {
1053 /// let (tx, rx) = oneshot::channel::<u8>();
1054 ///
1055 /// let sync_code = thread::spawn(move || {
1056 /// assert_eq!(Ok(10), rx.blocking_recv());
1057 /// });
1058 ///
1059 /// let _ = tx.send(10);
1060 /// sync_code.join().unwrap();
1061 /// }
1062 /// ```
1063 #[track_caller]
1064 #[cfg(feature = "sync")]
1065 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
blocking_recv(self) -> Result<T, RecvError>1066 pub fn blocking_recv(self) -> Result<T, RecvError> {
1067 crate::future::block_on(self)
1068 }
1069 }
1070
1071 impl<T> Drop for Receiver<T> {
drop(&mut self)1072 fn drop(&mut self) {
1073 if let Some(inner) = self.inner.as_ref() {
1074 inner.close();
1075 #[cfg(all(tokio_unstable, feature = "tracing"))]
1076 self.resource_span.in_scope(|| {
1077 tracing::trace!(
1078 target: "runtime::resource::state_update",
1079 rx_dropped = true,
1080 rx_dropped.op = "override",
1081 )
1082 });
1083 }
1084 }
1085 }
1086
1087 impl<T> Future for Receiver<T> {
1088 type Output = Result<T, RecvError>;
1089
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1090 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1091 // If `inner` is `None`, then `poll()` has already completed.
1092 #[cfg(all(tokio_unstable, feature = "tracing"))]
1093 let _res_span = self.resource_span.clone().entered();
1094 #[cfg(all(tokio_unstable, feature = "tracing"))]
1095 let _ao_span = self.async_op_span.clone().entered();
1096 #[cfg(all(tokio_unstable, feature = "tracing"))]
1097 let _ao_poll_span = self.async_op_poll_span.clone().entered();
1098
1099 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1100 #[cfg(all(tokio_unstable, feature = "tracing"))]
1101 let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1102
1103 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1104 let res = ready!(inner.poll_recv(cx))?;
1105
1106 res
1107 } else {
1108 panic!("called after complete");
1109 };
1110
1111 self.inner = None;
1112 Ready(Ok(ret))
1113 }
1114 }
1115
1116 impl<T> Inner<T> {
complete(&self) -> bool1117 fn complete(&self) -> bool {
1118 let prev = State::set_complete(&self.state);
1119
1120 if prev.is_closed() {
1121 return false;
1122 }
1123
1124 if prev.is_rx_task_set() {
1125 // TODO: Consume waker?
1126 unsafe {
1127 self.rx_task.with_task(Waker::wake_by_ref);
1128 }
1129 }
1130
1131 true
1132 }
1133
poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1134 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1135 ready!(crate::trace::trace_leaf(cx));
1136 // Keep track of task budget
1137 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
1138
1139 // Load the state
1140 let mut state = State::load(&self.state, Acquire);
1141
1142 if state.is_complete() {
1143 coop.made_progress();
1144 match unsafe { self.consume_value() } {
1145 Some(value) => Ready(Ok(value)),
1146 None => Ready(Err(RecvError(()))),
1147 }
1148 } else if state.is_closed() {
1149 coop.made_progress();
1150 Ready(Err(RecvError(())))
1151 } else {
1152 if state.is_rx_task_set() {
1153 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1154
1155 // Check if the task is still the same
1156 if !will_notify {
1157 // Unset the task
1158 state = State::unset_rx_task(&self.state);
1159 if state.is_complete() {
1160 // Set the flag again so that the waker is released in drop
1161 State::set_rx_task(&self.state);
1162
1163 coop.made_progress();
1164 // SAFETY: If `state.is_complete()` returns true, then the
1165 // `VALUE_SENT` bit has been set and the sender side of the
1166 // channel will no longer attempt to access the inner
1167 // `UnsafeCell`. Therefore, it is now safe for us to access the
1168 // cell.
1169 return match unsafe { self.consume_value() } {
1170 Some(value) => Ready(Ok(value)),
1171 None => Ready(Err(RecvError(()))),
1172 };
1173 } else {
1174 unsafe { self.rx_task.drop_task() };
1175 }
1176 }
1177 }
1178
1179 if !state.is_rx_task_set() {
1180 // Attempt to set the task
1181 unsafe {
1182 self.rx_task.set_task(cx);
1183 }
1184
1185 // Update the state
1186 state = State::set_rx_task(&self.state);
1187
1188 if state.is_complete() {
1189 coop.made_progress();
1190 match unsafe { self.consume_value() } {
1191 Some(value) => Ready(Ok(value)),
1192 None => Ready(Err(RecvError(()))),
1193 }
1194 } else {
1195 Pending
1196 }
1197 } else {
1198 Pending
1199 }
1200 }
1201 }
1202
1203 /// Called by `Receiver` to indicate that the value will never be received.
close(&self)1204 fn close(&self) {
1205 let prev = State::set_closed(&self.state);
1206
1207 if prev.is_tx_task_set() && !prev.is_complete() {
1208 unsafe {
1209 self.tx_task.with_task(Waker::wake_by_ref);
1210 }
1211 }
1212 }
1213
1214 /// Consumes the value. This function does not check `state`.
1215 ///
1216 /// # Safety
1217 ///
1218 /// Calling this method concurrently on multiple threads will result in a
1219 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1220 /// sender *or* the receiver will call this method at a given point in time.
1221 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1222 /// if it is set, then only the receiver may call this method.
consume_value(&self) -> Option<T>1223 unsafe fn consume_value(&self) -> Option<T> {
1224 self.value.with_mut(|ptr| (*ptr).take())
1225 }
1226 }
1227
1228 unsafe impl<T: Send> Send for Inner<T> {}
1229 unsafe impl<T: Send> Sync for Inner<T> {}
1230
mut_load(this: &mut AtomicUsize) -> usize1231 fn mut_load(this: &mut AtomicUsize) -> usize {
1232 this.with_mut(|v| *v)
1233 }
1234
1235 impl<T> Drop for Inner<T> {
drop(&mut self)1236 fn drop(&mut self) {
1237 let state = State(mut_load(&mut self.state));
1238
1239 if state.is_rx_task_set() {
1240 unsafe {
1241 self.rx_task.drop_task();
1242 }
1243 }
1244
1245 if state.is_tx_task_set() {
1246 unsafe {
1247 self.tx_task.drop_task();
1248 }
1249 }
1250 }
1251 }
1252
1253 impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1254 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1255 use std::sync::atomic::Ordering::Relaxed;
1256
1257 fmt.debug_struct("Inner")
1258 .field("state", &State::load(&self.state, Relaxed))
1259 .finish()
1260 }
1261 }
1262
1263 /// Indicates that a waker for the receiving task has been set.
1264 ///
1265 /// # Safety
1266 ///
1267 /// If this bit is not set, the `rx_task` field may be uninitialized.
1268 const RX_TASK_SET: usize = 0b00001;
1269 /// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1270 ///
1271 /// # Safety
1272 ///
1273 /// This bit controls which side of the channel is permitted to access the
1274 /// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1275 /// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1276 /// the sender.
1277 const VALUE_SENT: usize = 0b00010;
1278 const CLOSED: usize = 0b00100;
1279
1280 /// Indicates that a waker for the sending task has been set.
1281 ///
1282 /// # Safety
1283 ///
1284 /// If this bit is not set, the `tx_task` field may be uninitialized.
1285 const TX_TASK_SET: usize = 0b01000;
1286
1287 impl State {
new() -> State1288 fn new() -> State {
1289 State(0)
1290 }
1291
is_complete(self) -> bool1292 fn is_complete(self) -> bool {
1293 self.0 & VALUE_SENT == VALUE_SENT
1294 }
1295
set_complete(cell: &AtomicUsize) -> State1296 fn set_complete(cell: &AtomicUsize) -> State {
1297 // This method is a compare-and-swap loop rather than a fetch-or like
1298 // other `set_$WHATEVER` methods on `State`. This is because we must
1299 // check if the state has been closed before setting the `VALUE_SENT`
1300 // bit.
1301 //
1302 // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1303 // bit is already set, because `VALUE_SENT` will tell the receiver that
1304 // it's okay to access the inner `UnsafeCell`. Immediately after calling
1305 // `set_complete`, if the channel was closed, the sender will _also_
1306 // access the `UnsafeCell` to take the value back out, so if a
1307 // `poll_recv` or `try_recv` call is occurring concurrently, both
1308 // threads may try to access the `UnsafeCell` if we were to set the
1309 // `VALUE_SENT` bit on a closed channel.
1310 let mut state = cell.load(Ordering::Relaxed);
1311 loop {
1312 if State(state).is_closed() {
1313 break;
1314 }
1315 // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1316 // the `RX_TASK_SET` flag is set. However, `loom` does not support
1317 // fences yet.
1318 match cell.compare_exchange_weak(
1319 state,
1320 state | VALUE_SENT,
1321 Ordering::AcqRel,
1322 Ordering::Acquire,
1323 ) {
1324 Ok(_) => break,
1325 Err(actual) => state = actual,
1326 }
1327 }
1328 State(state)
1329 }
1330
is_rx_task_set(self) -> bool1331 fn is_rx_task_set(self) -> bool {
1332 self.0 & RX_TASK_SET == RX_TASK_SET
1333 }
1334
set_rx_task(cell: &AtomicUsize) -> State1335 fn set_rx_task(cell: &AtomicUsize) -> State {
1336 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1337 State(val | RX_TASK_SET)
1338 }
1339
unset_rx_task(cell: &AtomicUsize) -> State1340 fn unset_rx_task(cell: &AtomicUsize) -> State {
1341 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1342 State(val & !RX_TASK_SET)
1343 }
1344
is_closed(self) -> bool1345 fn is_closed(self) -> bool {
1346 self.0 & CLOSED == CLOSED
1347 }
1348
set_closed(cell: &AtomicUsize) -> State1349 fn set_closed(cell: &AtomicUsize) -> State {
1350 // Acquire because we want all later writes (attempting to poll) to be
1351 // ordered after this.
1352 let val = cell.fetch_or(CLOSED, Acquire);
1353 State(val)
1354 }
1355
set_tx_task(cell: &AtomicUsize) -> State1356 fn set_tx_task(cell: &AtomicUsize) -> State {
1357 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1358 State(val | TX_TASK_SET)
1359 }
1360
unset_tx_task(cell: &AtomicUsize) -> State1361 fn unset_tx_task(cell: &AtomicUsize) -> State {
1362 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1363 State(val & !TX_TASK_SET)
1364 }
1365
is_tx_task_set(self) -> bool1366 fn is_tx_task_set(self) -> bool {
1367 self.0 & TX_TASK_SET == TX_TASK_SET
1368 }
1369
as_usize(self) -> usize1370 fn as_usize(self) -> usize {
1371 self.0
1372 }
1373
load(cell: &AtomicUsize, order: Ordering) -> State1374 fn load(cell: &AtomicUsize, order: Ordering) -> State {
1375 let val = cell.load(order);
1376 State(val)
1377 }
1378 }
1379
1380 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1381 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1382 fmt.debug_struct("State")
1383 .field("is_complete", &self.is_complete())
1384 .field("is_closed", &self.is_closed())
1385 .field("is_rx_task_set", &self.is_rx_task_set())
1386 .field("is_tx_task_set", &self.is_tx_task_set())
1387 .finish()
1388 }
1389 }
1390