• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::assert_future;
2 use crate::task::AtomicWaker;
3 use futures_core::future::Future;
4 use futures_core::task::{Context, Poll};
5 use core::fmt;
6 use core::pin::Pin;
7 use core::sync::atomic::{AtomicBool, Ordering};
8 use alloc::sync::Arc;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// A future which can be remotely short-circuited using an `AbortHandle`.
13     #[derive(Debug, Clone)]
14     #[must_use = "futures do nothing unless you `.await` or poll them"]
15     pub struct Abortable<Fut> {
16         #[pin]
17         future: Fut,
18         inner: Arc<AbortInner>,
19     }
20 }
21 
22 impl<Fut> Abortable<Fut> where Fut: Future {
23     /// Creates a new `Abortable` future using an existing `AbortRegistration`.
24     /// `AbortRegistration`s can be acquired through `AbortHandle::new`.
25     ///
26     /// When `abort` is called on the handle tied to `reg` or if `abort` has
27     /// already been called, the future will complete immediately without making
28     /// any further progress.
29     ///
30     /// Example:
31     ///
32     /// ```
33     /// # futures::executor::block_on(async {
34     /// use futures::future::{Abortable, AbortHandle, Aborted};
35     ///
36     /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
37     /// let future = Abortable::new(async { 2 }, abort_registration);
38     /// abort_handle.abort();
39     /// assert_eq!(future.await, Err(Aborted));
40     /// # });
41     /// ```
new(future: Fut, reg: AbortRegistration) -> Self42     pub fn new(future: Fut, reg: AbortRegistration) -> Self {
43         assert_future::<Result<Fut::Output, Aborted>, _>(Self {
44             future,
45             inner: reg.inner,
46         })
47     }
48 }
49 
50 /// A registration handle for a `Abortable` future.
51 /// Values of this type can be acquired from `AbortHandle::new` and are used
52 /// in calls to `Abortable::new`.
53 #[derive(Debug)]
54 pub struct AbortRegistration {
55     inner: Arc<AbortInner>,
56 }
57 
58 /// A handle to a `Abortable` future.
59 #[derive(Debug, Clone)]
60 pub struct AbortHandle {
61     inner: Arc<AbortInner>,
62 }
63 
64 impl AbortHandle {
65     /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
66     /// to abort a running future.
67     ///
68     /// This function is usually paired with a call to `Abortable::new`.
69     ///
70     /// Example:
71     ///
72     /// ```
73     /// # futures::executor::block_on(async {
74     /// use futures::future::{Abortable, AbortHandle, Aborted};
75     ///
76     /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
77     /// let future = Abortable::new(async { 2 }, abort_registration);
78     /// abort_handle.abort();
79     /// assert_eq!(future.await, Err(Aborted));
80     /// # });
81     /// ```
new_pair() -> (Self, AbortRegistration)82     pub fn new_pair() -> (Self, AbortRegistration) {
83         let inner = Arc::new(AbortInner {
84             waker: AtomicWaker::new(),
85             cancel: AtomicBool::new(false),
86         });
87 
88         (
89             Self {
90                 inner: inner.clone(),
91             },
92             AbortRegistration {
93                 inner,
94             },
95         )
96     }
97 }
98 
99 // Inner type storing the waker to awaken and a bool indicating that it
100 // should be cancelled.
101 #[derive(Debug)]
102 struct AbortInner {
103     waker: AtomicWaker,
104     cancel: AtomicBool,
105 }
106 
107 /// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it.
108 ///
109 /// This function is a convenient (but less flexible) alternative to calling
110 /// `AbortHandle::new` and `Abortable::new` manually.
111 ///
112 /// This function is only available when the `std` or `alloc` feature of this
113 /// library is activated, and it is activated by default.
abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle) where Fut: Future114 pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
115     where Fut: Future
116 {
117     let (handle, reg) = AbortHandle::new_pair();
118     (
119         Abortable::new(future, reg),
120         handle,
121     )
122 }
123 
124 /// Indicator that the `Abortable` future was aborted.
125 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
126 pub struct Aborted;
127 
128 impl fmt::Display for Aborted {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result129     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130         write!(f, "`Abortable` future has been aborted")
131     }
132 }
133 
134 #[cfg(feature = "std")]
135 impl std::error::Error for Aborted {}
136 
137 impl<Fut> Future for Abortable<Fut> where Fut: Future {
138     type Output = Result<Fut::Output, Aborted>;
139 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>140     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141         // Check if the future has been aborted
142         if self.inner.cancel.load(Ordering::Relaxed) {
143             return Poll::Ready(Err(Aborted))
144         }
145 
146         // attempt to complete the future
147         if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) {
148             return Poll::Ready(Ok(x))
149         }
150 
151         // Register to receive a wakeup if the future is aborted in the... future
152         self.inner.waker.register(cx.waker());
153 
154         // Check to see if the future was aborted between the first check and
155         // registration.
156         // Checking with `Relaxed` is sufficient because `register` introduces an
157         // `AcqRel` barrier.
158         if self.inner.cancel.load(Ordering::Relaxed) {
159             return Poll::Ready(Err(Aborted))
160         }
161 
162         Poll::Pending
163     }
164 }
165 
166 impl AbortHandle {
167     /// Abort the `Abortable` future associated with this handle.
168     ///
169     /// Notifies the Abortable future associated with this handle that it
170     /// should abort. Note that if the future is currently being polled on
171     /// another thread, it will not immediately stop running. Instead, it will
172     /// continue to run until its poll method returns.
abort(&self)173     pub fn abort(&self) {
174         self.inner.cancel.store(true, Ordering::Relaxed);
175         self.inner.waker.wake();
176     }
177 }
178