• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! An asynchronously awaitable `CancellationToken`.
2 //! The token allows to signal a cancellation request to one or more tasks.
3 pub(crate) mod guard;
4 mod tree_node;
5 
6 use crate::loom::sync::Arc;
7 use core::future::Future;
8 use core::pin::Pin;
9 use core::task::{Context, Poll};
10 
11 use guard::DropGuard;
12 use pin_project_lite::pin_project;
13 
14 /// A token which can be used to signal a cancellation request to one or more
15 /// tasks.
16 ///
17 /// Tasks can call [`CancellationToken::cancelled()`] in order to
18 /// obtain a Future which will be resolved when cancellation is requested.
19 ///
20 /// Cancellation can be requested through the [`CancellationToken::cancel`] method.
21 ///
22 /// # Examples
23 ///
24 /// ```no_run
25 /// use tokio::select;
26 /// use tokio_util::sync::CancellationToken;
27 ///
28 /// #[tokio::main]
29 /// async fn main() {
30 ///     let token = CancellationToken::new();
31 ///     let cloned_token = token.clone();
32 ///
33 ///     let join_handle = tokio::spawn(async move {
34 ///         // Wait for either cancellation or a very long time
35 ///         select! {
36 ///             _ = cloned_token.cancelled() => {
37 ///                 // The token was cancelled
38 ///                 5
39 ///             }
40 ///             _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
41 ///                 99
42 ///             }
43 ///         }
44 ///     });
45 ///
46 ///     tokio::spawn(async move {
47 ///         tokio::time::sleep(std::time::Duration::from_millis(10)).await;
48 ///         token.cancel();
49 ///     });
50 ///
51 ///     assert_eq!(5, join_handle.await.unwrap());
52 /// }
53 /// ```
54 pub struct CancellationToken {
55     inner: Arc<tree_node::TreeNode>,
56 }
57 
58 impl std::panic::UnwindSafe for CancellationToken {}
59 impl std::panic::RefUnwindSafe for CancellationToken {}
60 
61 pin_project! {
62     /// A Future that is resolved once the corresponding [`CancellationToken`]
63     /// is cancelled.
64     #[must_use = "futures do nothing unless polled"]
65     pub struct WaitForCancellationFuture<'a> {
66         cancellation_token: &'a CancellationToken,
67         #[pin]
68         future: tokio::sync::futures::Notified<'a>,
69     }
70 }
71 
72 pin_project! {
73     /// A Future that is resolved once the corresponding [`CancellationToken`]
74     /// is cancelled.
75     ///
76     /// This is the counterpart to [`WaitForCancellationFuture`] that takes
77     /// [`CancellationToken`] by value instead of using a reference.
78     #[must_use = "futures do nothing unless polled"]
79     pub struct WaitForCancellationFutureOwned {
80         // Since `future` is the first field, it is dropped before the
81         // cancellation_token field. This ensures that the reference inside the
82         // `Notified` remains valid.
83         #[pin]
84         future: tokio::sync::futures::Notified<'static>,
85         cancellation_token: CancellationToken,
86     }
87 }
88 
89 // ===== impl CancellationToken =====
90 
91 impl core::fmt::Debug for CancellationToken {
fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result92     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
93         f.debug_struct("CancellationToken")
94             .field("is_cancelled", &self.is_cancelled())
95             .finish()
96     }
97 }
98 
99 impl Clone for CancellationToken {
clone(&self) -> Self100     fn clone(&self) -> Self {
101         tree_node::increase_handle_refcount(&self.inner);
102         CancellationToken {
103             inner: self.inner.clone(),
104         }
105     }
106 }
107 
108 impl Drop for CancellationToken {
drop(&mut self)109     fn drop(&mut self) {
110         tree_node::decrease_handle_refcount(&self.inner);
111     }
112 }
113 
114 impl Default for CancellationToken {
default() -> CancellationToken115     fn default() -> CancellationToken {
116         CancellationToken::new()
117     }
118 }
119 
120 impl CancellationToken {
121     /// Creates a new CancellationToken in the non-cancelled state.
new() -> CancellationToken122     pub fn new() -> CancellationToken {
123         CancellationToken {
124             inner: Arc::new(tree_node::TreeNode::new()),
125         }
126     }
127 
128     /// Creates a `CancellationToken` which will get cancelled whenever the
129     /// current token gets cancelled.
130     ///
131     /// If the current token is already cancelled, the child token will get
132     /// returned in cancelled state.
133     ///
134     /// # Examples
135     ///
136     /// ```no_run
137     /// use tokio::select;
138     /// use tokio_util::sync::CancellationToken;
139     ///
140     /// #[tokio::main]
141     /// async fn main() {
142     ///     let token = CancellationToken::new();
143     ///     let child_token = token.child_token();
144     ///
145     ///     let join_handle = tokio::spawn(async move {
146     ///         // Wait for either cancellation or a very long time
147     ///         select! {
148     ///             _ = child_token.cancelled() => {
149     ///                 // The token was cancelled
150     ///                 5
151     ///             }
152     ///             _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
153     ///                 99
154     ///             }
155     ///         }
156     ///     });
157     ///
158     ///     tokio::spawn(async move {
159     ///         tokio::time::sleep(std::time::Duration::from_millis(10)).await;
160     ///         token.cancel();
161     ///     });
162     ///
163     ///     assert_eq!(5, join_handle.await.unwrap());
164     /// }
165     /// ```
child_token(&self) -> CancellationToken166     pub fn child_token(&self) -> CancellationToken {
167         CancellationToken {
168             inner: tree_node::child_node(&self.inner),
169         }
170     }
171 
172     /// Cancel the [`CancellationToken`] and all child tokens which had been
173     /// derived from it.
174     ///
175     /// This will wake up all tasks which are waiting for cancellation.
176     ///
177     /// Be aware that cancellation is not an atomic operation. It is possible
178     /// for another thread running in parallel with a call to `cancel` to first
179     /// receive `true` from `is_cancelled` on one child node, and then receive
180     /// `false` from `is_cancelled` on another child node. However, once the
181     /// call to `cancel` returns, all child nodes have been fully cancelled.
cancel(&self)182     pub fn cancel(&self) {
183         tree_node::cancel(&self.inner);
184     }
185 
186     /// Returns `true` if the `CancellationToken` is cancelled.
is_cancelled(&self) -> bool187     pub fn is_cancelled(&self) -> bool {
188         tree_node::is_cancelled(&self.inner)
189     }
190 
191     /// Returns a `Future` that gets fulfilled when cancellation is requested.
192     ///
193     /// The future will complete immediately if the token is already cancelled
194     /// when this method is called.
195     ///
196     /// # Cancel safety
197     ///
198     /// This method is cancel safe.
cancelled(&self) -> WaitForCancellationFuture<'_>199     pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
200         WaitForCancellationFuture {
201             cancellation_token: self,
202             future: self.inner.notified(),
203         }
204     }
205 
206     /// Returns a `Future` that gets fulfilled when cancellation is requested.
207     ///
208     /// The future will complete immediately if the token is already cancelled
209     /// when this method is called.
210     ///
211     /// The function takes self by value and returns a future that owns the
212     /// token.
213     ///
214     /// # Cancel safety
215     ///
216     /// This method is cancel safe.
cancelled_owned(self) -> WaitForCancellationFutureOwned217     pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
218         WaitForCancellationFutureOwned::new(self)
219     }
220 
221     /// Creates a `DropGuard` for this token.
222     ///
223     /// Returned guard will cancel this token (and all its children) on drop
224     /// unless disarmed.
drop_guard(self) -> DropGuard225     pub fn drop_guard(self) -> DropGuard {
226         DropGuard { inner: Some(self) }
227     }
228 }
229 
230 // ===== impl WaitForCancellationFuture =====
231 
232 impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result233     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
234         f.debug_struct("WaitForCancellationFuture").finish()
235     }
236 }
237 
238 impl<'a> Future for WaitForCancellationFuture<'a> {
239     type Output = ();
240 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>241     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
242         let mut this = self.project();
243         loop {
244             if this.cancellation_token.is_cancelled() {
245                 return Poll::Ready(());
246             }
247 
248             // No wakeups can be lost here because there is always a call to
249             // `is_cancelled` between the creation of the future and the call to
250             // `poll`, and the code that sets the cancelled flag does so before
251             // waking the `Notified`.
252             if this.future.as_mut().poll(cx).is_pending() {
253                 return Poll::Pending;
254             }
255 
256             this.future.set(this.cancellation_token.inner.notified());
257         }
258     }
259 }
260 
261 // ===== impl WaitForCancellationFutureOwned =====
262 
263 impl core::fmt::Debug for WaitForCancellationFutureOwned {
fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result264     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265         f.debug_struct("WaitForCancellationFutureOwned").finish()
266     }
267 }
268 
269 impl WaitForCancellationFutureOwned {
new(cancellation_token: CancellationToken) -> Self270     fn new(cancellation_token: CancellationToken) -> Self {
271         WaitForCancellationFutureOwned {
272             // cancellation_token holds a heap allocation and is guaranteed to have a
273             // stable deref, thus it would be ok to move the cancellation_token while
274             // the future holds a reference to it.
275             //
276             // # Safety
277             //
278             // cancellation_token is dropped after future due to the field ordering.
279             future: unsafe { Self::new_future(&cancellation_token) },
280             cancellation_token,
281         }
282     }
283 
284     /// # Safety
285     /// The returned future must be destroyed before the cancellation token is
286     /// destroyed.
new_future( cancellation_token: &CancellationToken, ) -> tokio::sync::futures::Notified<'static>287     unsafe fn new_future(
288         cancellation_token: &CancellationToken,
289     ) -> tokio::sync::futures::Notified<'static> {
290         let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
291         // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains
292         // valid until the strong count of the Arc drops to zero, and the caller
293         // guarantees that they will drop the future before that happens.
294         (*inner_ptr).notified()
295     }
296 }
297 
298 impl Future for WaitForCancellationFutureOwned {
299     type Output = ();
300 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>301     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
302         let mut this = self.project();
303 
304         loop {
305             if this.cancellation_token.is_cancelled() {
306                 return Poll::Ready(());
307             }
308 
309             // No wakeups can be lost here because there is always a call to
310             // `is_cancelled` between the creation of the future and the call to
311             // `poll`, and the code that sets the cancelled flag does so before
312             // waking the `Notified`.
313             if this.future.as_mut().poll(cx).is_pending() {
314                 return Poll::Pending;
315             }
316 
317             // # Safety
318             //
319             // cancellation_token is dropped after future due to the field ordering.
320             this.future
321                 .set(unsafe { Self::new_future(this.cancellation_token) });
322         }
323     }
324 }
325