1 //! An asynchronously awaitable `CancellationToken`. 2 //! The token allows to signal a cancellation request to one or more tasks. 3 pub(crate) mod guard; 4 mod tree_node; 5 6 use crate::loom::sync::Arc; 7 use core::future::Future; 8 use core::pin::Pin; 9 use core::task::{Context, Poll}; 10 11 use guard::DropGuard; 12 use pin_project_lite::pin_project; 13 14 /// A token which can be used to signal a cancellation request to one or more 15 /// tasks. 16 /// 17 /// Tasks can call [`CancellationToken::cancelled()`] in order to 18 /// obtain a Future which will be resolved when cancellation is requested. 19 /// 20 /// Cancellation can be requested through the [`CancellationToken::cancel`] method. 21 /// 22 /// # Examples 23 /// 24 /// ```no_run 25 /// use tokio::select; 26 /// use tokio_util::sync::CancellationToken; 27 /// 28 /// #[tokio::main] 29 /// async fn main() { 30 /// let token = CancellationToken::new(); 31 /// let cloned_token = token.clone(); 32 /// 33 /// let join_handle = tokio::spawn(async move { 34 /// // Wait for either cancellation or a very long time 35 /// select! { 36 /// _ = cloned_token.cancelled() => { 37 /// // The token was cancelled 38 /// 5 39 /// } 40 /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { 41 /// 99 42 /// } 43 /// } 44 /// }); 45 /// 46 /// tokio::spawn(async move { 47 /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; 48 /// token.cancel(); 49 /// }); 50 /// 51 /// assert_eq!(5, join_handle.await.unwrap()); 52 /// } 53 /// ``` 54 pub struct CancellationToken { 55 inner: Arc<tree_node::TreeNode>, 56 } 57 58 impl std::panic::UnwindSafe for CancellationToken {} 59 impl std::panic::RefUnwindSafe for CancellationToken {} 60 61 pin_project! { 62 /// A Future that is resolved once the corresponding [`CancellationToken`] 63 /// is cancelled. 64 #[must_use = "futures do nothing unless polled"] 65 pub struct WaitForCancellationFuture<'a> { 66 cancellation_token: &'a CancellationToken, 67 #[pin] 68 future: tokio::sync::futures::Notified<'a>, 69 } 70 } 71 72 pin_project! { 73 /// A Future that is resolved once the corresponding [`CancellationToken`] 74 /// is cancelled. 75 /// 76 /// This is the counterpart to [`WaitForCancellationFuture`] that takes 77 /// [`CancellationToken`] by value instead of using a reference. 78 #[must_use = "futures do nothing unless polled"] 79 pub struct WaitForCancellationFutureOwned { 80 // Since `future` is the first field, it is dropped before the 81 // cancellation_token field. This ensures that the reference inside the 82 // `Notified` remains valid. 83 #[pin] 84 future: tokio::sync::futures::Notified<'static>, 85 cancellation_token: CancellationToken, 86 } 87 } 88 89 // ===== impl CancellationToken ===== 90 91 impl core::fmt::Debug for CancellationToken { fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result92 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 93 f.debug_struct("CancellationToken") 94 .field("is_cancelled", &self.is_cancelled()) 95 .finish() 96 } 97 } 98 99 impl Clone for CancellationToken { clone(&self) -> Self100 fn clone(&self) -> Self { 101 tree_node::increase_handle_refcount(&self.inner); 102 CancellationToken { 103 inner: self.inner.clone(), 104 } 105 } 106 } 107 108 impl Drop for CancellationToken { drop(&mut self)109 fn drop(&mut self) { 110 tree_node::decrease_handle_refcount(&self.inner); 111 } 112 } 113 114 impl Default for CancellationToken { default() -> CancellationToken115 fn default() -> CancellationToken { 116 CancellationToken::new() 117 } 118 } 119 120 impl CancellationToken { 121 /// Creates a new CancellationToken in the non-cancelled state. new() -> CancellationToken122 pub fn new() -> CancellationToken { 123 CancellationToken { 124 inner: Arc::new(tree_node::TreeNode::new()), 125 } 126 } 127 128 /// Creates a `CancellationToken` which will get cancelled whenever the 129 /// current token gets cancelled. 130 /// 131 /// If the current token is already cancelled, the child token will get 132 /// returned in cancelled state. 133 /// 134 /// # Examples 135 /// 136 /// ```no_run 137 /// use tokio::select; 138 /// use tokio_util::sync::CancellationToken; 139 /// 140 /// #[tokio::main] 141 /// async fn main() { 142 /// let token = CancellationToken::new(); 143 /// let child_token = token.child_token(); 144 /// 145 /// let join_handle = tokio::spawn(async move { 146 /// // Wait for either cancellation or a very long time 147 /// select! { 148 /// _ = child_token.cancelled() => { 149 /// // The token was cancelled 150 /// 5 151 /// } 152 /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { 153 /// 99 154 /// } 155 /// } 156 /// }); 157 /// 158 /// tokio::spawn(async move { 159 /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; 160 /// token.cancel(); 161 /// }); 162 /// 163 /// assert_eq!(5, join_handle.await.unwrap()); 164 /// } 165 /// ``` child_token(&self) -> CancellationToken166 pub fn child_token(&self) -> CancellationToken { 167 CancellationToken { 168 inner: tree_node::child_node(&self.inner), 169 } 170 } 171 172 /// Cancel the [`CancellationToken`] and all child tokens which had been 173 /// derived from it. 174 /// 175 /// This will wake up all tasks which are waiting for cancellation. 176 /// 177 /// Be aware that cancellation is not an atomic operation. It is possible 178 /// for another thread running in parallel with a call to `cancel` to first 179 /// receive `true` from `is_cancelled` on one child node, and then receive 180 /// `false` from `is_cancelled` on another child node. However, once the 181 /// call to `cancel` returns, all child nodes have been fully cancelled. cancel(&self)182 pub fn cancel(&self) { 183 tree_node::cancel(&self.inner); 184 } 185 186 /// Returns `true` if the `CancellationToken` is cancelled. is_cancelled(&self) -> bool187 pub fn is_cancelled(&self) -> bool { 188 tree_node::is_cancelled(&self.inner) 189 } 190 191 /// Returns a `Future` that gets fulfilled when cancellation is requested. 192 /// 193 /// The future will complete immediately if the token is already cancelled 194 /// when this method is called. 195 /// 196 /// # Cancel safety 197 /// 198 /// This method is cancel safe. cancelled(&self) -> WaitForCancellationFuture<'_>199 pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { 200 WaitForCancellationFuture { 201 cancellation_token: self, 202 future: self.inner.notified(), 203 } 204 } 205 206 /// Returns a `Future` that gets fulfilled when cancellation is requested. 207 /// 208 /// The future will complete immediately if the token is already cancelled 209 /// when this method is called. 210 /// 211 /// The function takes self by value and returns a future that owns the 212 /// token. 213 /// 214 /// # Cancel safety 215 /// 216 /// This method is cancel safe. cancelled_owned(self) -> WaitForCancellationFutureOwned217 pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned { 218 WaitForCancellationFutureOwned::new(self) 219 } 220 221 /// Creates a `DropGuard` for this token. 222 /// 223 /// Returned guard will cancel this token (and all its children) on drop 224 /// unless disarmed. drop_guard(self) -> DropGuard225 pub fn drop_guard(self) -> DropGuard { 226 DropGuard { inner: Some(self) } 227 } 228 } 229 230 // ===== impl WaitForCancellationFuture ===== 231 232 impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result233 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 234 f.debug_struct("WaitForCancellationFuture").finish() 235 } 236 } 237 238 impl<'a> Future for WaitForCancellationFuture<'a> { 239 type Output = (); 240 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>241 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 242 let mut this = self.project(); 243 loop { 244 if this.cancellation_token.is_cancelled() { 245 return Poll::Ready(()); 246 } 247 248 // No wakeups can be lost here because there is always a call to 249 // `is_cancelled` between the creation of the future and the call to 250 // `poll`, and the code that sets the cancelled flag does so before 251 // waking the `Notified`. 252 if this.future.as_mut().poll(cx).is_pending() { 253 return Poll::Pending; 254 } 255 256 this.future.set(this.cancellation_token.inner.notified()); 257 } 258 } 259 } 260 261 // ===== impl WaitForCancellationFutureOwned ===== 262 263 impl core::fmt::Debug for WaitForCancellationFutureOwned { fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result264 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 265 f.debug_struct("WaitForCancellationFutureOwned").finish() 266 } 267 } 268 269 impl WaitForCancellationFutureOwned { new(cancellation_token: CancellationToken) -> Self270 fn new(cancellation_token: CancellationToken) -> Self { 271 WaitForCancellationFutureOwned { 272 // cancellation_token holds a heap allocation and is guaranteed to have a 273 // stable deref, thus it would be ok to move the cancellation_token while 274 // the future holds a reference to it. 275 // 276 // # Safety 277 // 278 // cancellation_token is dropped after future due to the field ordering. 279 future: unsafe { Self::new_future(&cancellation_token) }, 280 cancellation_token, 281 } 282 } 283 284 /// # Safety 285 /// The returned future must be destroyed before the cancellation token is 286 /// destroyed. new_future( cancellation_token: &CancellationToken, ) -> tokio::sync::futures::Notified<'static>287 unsafe fn new_future( 288 cancellation_token: &CancellationToken, 289 ) -> tokio::sync::futures::Notified<'static> { 290 let inner_ptr = Arc::as_ptr(&cancellation_token.inner); 291 // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains 292 // valid until the strong count of the Arc drops to zero, and the caller 293 // guarantees that they will drop the future before that happens. 294 (*inner_ptr).notified() 295 } 296 } 297 298 impl Future for WaitForCancellationFutureOwned { 299 type Output = (); 300 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>301 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 302 let mut this = self.project(); 303 304 loop { 305 if this.cancellation_token.is_cancelled() { 306 return Poll::Ready(()); 307 } 308 309 // No wakeups can be lost here because there is always a call to 310 // `is_cancelled` between the creation of the future and the call to 311 // `poll`, and the code that sets the cancelled flag does so before 312 // waking the `Notified`. 313 if this.future.as_mut().poll(cx).is_pending() { 314 return Poll::Pending; 315 } 316 317 // # Safety 318 // 319 // cancellation_token is dropped after future due to the field ordering. 320 this.future 321 .set(unsafe { Self::new_future(this.cancellation_token) }); 322 } 323 } 324 } 325