1 use core::fmt; 2 use core::future::Future; 3 use core::marker::{PhantomData, Unpin}; 4 use core::mem; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::sync::atomic::Ordering; 8 use core::task::{Context, Poll}; 9 10 use crate::header::Header; 11 use crate::state::*; 12 13 /// A spawned task. 14 /// 15 /// A [`Task`] can be awaited to retrieve the output of its future. 16 /// 17 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the 18 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a 19 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] 20 /// method. 21 /// 22 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor 23 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking 24 /// [`run()`][`super::Runnable::run()`]. 25 /// 26 /// # Examples 27 /// 28 /// ``` 29 /// use smol::{future, Executor}; 30 /// use std::thread; 31 /// 32 /// let ex = Executor::new(); 33 /// 34 /// // Spawn a future onto the executor. 35 /// let task = ex.spawn(async { 36 /// println!("Hello from a task!"); 37 /// 1 + 2 38 /// }); 39 /// 40 /// // Run an executor thread. 41 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 42 /// 43 /// // Wait for the task's output. 44 /// assert_eq!(future::block_on(task), 3); 45 /// ``` 46 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] 47 pub struct Task<T> { 48 /// A raw task pointer. 49 pub(crate) ptr: NonNull<()>, 50 51 /// A marker capturing generic type `T`. 52 pub(crate) _marker: PhantomData<T>, 53 } 54 55 unsafe impl<T: Send> Send for Task<T> {} 56 unsafe impl<T> Sync for Task<T> {} 57 58 impl<T> Unpin for Task<T> {} 59 60 #[cfg(feature = "std")] 61 impl<T> std::panic::UnwindSafe for Task<T> {} 62 #[cfg(feature = "std")] 63 impl<T> std::panic::RefUnwindSafe for Task<T> {} 64 65 impl<T> Task<T> { 66 /// Detaches the task to let it keep running in the background. 67 /// 68 /// # Examples 69 /// 70 /// ``` 71 /// use smol::{Executor, Timer}; 72 /// use std::time::Duration; 73 /// 74 /// let ex = Executor::new(); 75 /// 76 /// // Spawn a deamon future. 77 /// ex.spawn(async { 78 /// loop { 79 /// println!("I'm a daemon task looping forever."); 80 /// Timer::after(Duration::from_secs(1)).await; 81 /// } 82 /// }) 83 /// .detach(); 84 /// ``` detach(self)85 pub fn detach(self) { 86 let mut this = self; 87 let _out = this.set_detached(); 88 mem::forget(this); 89 } 90 91 /// Cancels the task and waits for it to stop running. 92 /// 93 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if 94 /// it didn't complete. 95 /// 96 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of 97 /// canceling because it also waits for the task to stop running. 98 /// 99 /// # Examples 100 /// 101 /// ``` 102 /// use smol::{future, Executor, Timer}; 103 /// use std::thread; 104 /// use std::time::Duration; 105 /// 106 /// let ex = Executor::new(); 107 /// 108 /// // Spawn a deamon future. 109 /// let task = ex.spawn(async { 110 /// loop { 111 /// println!("Even though I'm in an infinite loop, you can still cancel me!"); 112 /// Timer::after(Duration::from_secs(1)).await; 113 /// } 114 /// }); 115 /// 116 /// // Run an executor thread. 117 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 118 /// 119 /// future::block_on(async { 120 /// Timer::after(Duration::from_secs(3)).await; 121 /// task.cancel().await; 122 /// }); 123 /// ``` cancel(self) -> Option<T>124 pub async fn cancel(self) -> Option<T> { 125 let mut this = self; 126 this.set_canceled(); 127 128 struct Fut<T>(Task<T>); 129 130 impl<T> Future for Fut<T> { 131 type Output = Option<T>; 132 133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 134 self.0.poll_task(cx) 135 } 136 } 137 138 Fut(this).await 139 } 140 141 /// Puts the task in canceled state. set_canceled(&mut self)142 fn set_canceled(&mut self) { 143 let ptr = self.ptr.as_ptr(); 144 let header = ptr as *const Header; 145 146 unsafe { 147 let mut state = (*header).state.load(Ordering::Acquire); 148 149 loop { 150 // If the task has been completed or closed, it can't be canceled. 151 if state & (COMPLETED | CLOSED) != 0 { 152 break; 153 } 154 155 // If the task is not scheduled nor running, we'll need to schedule it. 156 let new = if state & (SCHEDULED | RUNNING) == 0 { 157 (state | SCHEDULED | CLOSED) + REFERENCE 158 } else { 159 state | CLOSED 160 }; 161 162 // Mark the task as closed. 163 match (*header).state.compare_exchange_weak( 164 state, 165 new, 166 Ordering::AcqRel, 167 Ordering::Acquire, 168 ) { 169 Ok(_) => { 170 // If the task is not scheduled nor running, schedule it one more time so 171 // that its future gets dropped by the executor. 172 if state & (SCHEDULED | RUNNING) == 0 { 173 ((*header).vtable.schedule)(ptr); 174 } 175 176 // Notify the awaiter that the task has been closed. 177 if state & AWAITER != 0 { 178 (*header).notify(None); 179 } 180 181 break; 182 } 183 Err(s) => state = s, 184 } 185 } 186 } 187 } 188 189 /// Puts the task in detached state. set_detached(&mut self) -> Option<T>190 fn set_detached(&mut self) -> Option<T> { 191 let ptr = self.ptr.as_ptr(); 192 let header = ptr as *const Header; 193 194 unsafe { 195 // A place where the output will be stored in case it needs to be dropped. 196 let mut output = None; 197 198 // Optimistically assume the `Task` is being detached just after creating the task. 199 // This is a common case so if the `Task` is datached, the overhead of it is only one 200 // compare-exchange operation. 201 if let Err(mut state) = (*header).state.compare_exchange_weak( 202 SCHEDULED | TASK | REFERENCE, 203 SCHEDULED | REFERENCE, 204 Ordering::AcqRel, 205 Ordering::Acquire, 206 ) { 207 loop { 208 // If the task has been completed but not yet closed, that means its output 209 // must be dropped. 210 if state & COMPLETED != 0 && state & CLOSED == 0 { 211 // Mark the task as closed in order to grab its output. 212 match (*header).state.compare_exchange_weak( 213 state, 214 state | CLOSED, 215 Ordering::AcqRel, 216 Ordering::Acquire, 217 ) { 218 Ok(_) => { 219 // Read the output. 220 output = 221 Some((((*header).vtable.get_output)(ptr) as *mut T).read()); 222 223 // Update the state variable because we're continuing the loop. 224 state |= CLOSED; 225 } 226 Err(s) => state = s, 227 } 228 } else { 229 // If this is the last reference to the task and it's not closed, then 230 // close it and schedule one more time so that its future gets dropped by 231 // the executor. 232 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { 233 SCHEDULED | CLOSED | REFERENCE 234 } else { 235 state & !TASK 236 }; 237 238 // Unset the `TASK` flag. 239 match (*header).state.compare_exchange_weak( 240 state, 241 new, 242 Ordering::AcqRel, 243 Ordering::Acquire, 244 ) { 245 Ok(_) => { 246 // If this is the last reference to the task, we need to either 247 // schedule dropping its future or destroy it. 248 if state & !(REFERENCE - 1) == 0 { 249 if state & CLOSED == 0 { 250 ((*header).vtable.schedule)(ptr); 251 } else { 252 ((*header).vtable.destroy)(ptr); 253 } 254 } 255 256 break; 257 } 258 Err(s) => state = s, 259 } 260 } 261 } 262 } 263 264 output 265 } 266 } 267 268 /// Polls the task to retrieve its output. 269 /// 270 /// Returns `Some` if the task has completed or `None` if it was closed. 271 /// 272 /// A task becomes closed in the following cases: 273 /// 274 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. 275 /// 2. Its output gets awaited by the `Task`. 276 /// 3. It panics while polling the future. 277 /// 4. It is completed and the `Task` gets dropped. poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>278 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { 279 let ptr = self.ptr.as_ptr(); 280 let header = ptr as *const Header; 281 282 unsafe { 283 let mut state = (*header).state.load(Ordering::Acquire); 284 285 loop { 286 // If the task has been closed, notify the awaiter and return `None`. 287 if state & CLOSED != 0 { 288 // If the task is scheduled or running, we need to wait until its future is 289 // dropped. 290 if state & (SCHEDULED | RUNNING) != 0 { 291 // Replace the waker with one associated with the current task. 292 (*header).register(cx.waker()); 293 294 // Reload the state after registering. It is possible changes occurred just 295 // before registration so we need to check for that. 296 state = (*header).state.load(Ordering::Acquire); 297 298 // If the task is still scheduled or running, we need to wait because its 299 // future is not dropped yet. 300 if state & (SCHEDULED | RUNNING) != 0 { 301 return Poll::Pending; 302 } 303 } 304 305 // Even though the awaiter is most likely the current task, it could also be 306 // another task. 307 (*header).notify(Some(cx.waker())); 308 return Poll::Ready(None); 309 } 310 311 // If the task is not completed, register the current task. 312 if state & COMPLETED == 0 { 313 // Replace the waker with one associated with the current task. 314 (*header).register(cx.waker()); 315 316 // Reload the state after registering. It is possible that the task became 317 // completed or closed just before registration so we need to check for that. 318 state = (*header).state.load(Ordering::Acquire); 319 320 // If the task has been closed, restart. 321 if state & CLOSED != 0 { 322 continue; 323 } 324 325 // If the task is still not completed, we're blocked on it. 326 if state & COMPLETED == 0 { 327 return Poll::Pending; 328 } 329 } 330 331 // Since the task is now completed, mark it as closed in order to grab its output. 332 match (*header).state.compare_exchange( 333 state, 334 state | CLOSED, 335 Ordering::AcqRel, 336 Ordering::Acquire, 337 ) { 338 Ok(_) => { 339 // Notify the awaiter. Even though the awaiter is most likely the current 340 // task, it could also be another task. 341 if state & AWAITER != 0 { 342 (*header).notify(Some(cx.waker())); 343 } 344 345 // Take the output from the task. 346 let output = ((*header).vtable.get_output)(ptr) as *mut T; 347 return Poll::Ready(Some(output.read())); 348 } 349 Err(s) => state = s, 350 } 351 } 352 } 353 } 354 } 355 356 impl<T> Drop for Task<T> { drop(&mut self)357 fn drop(&mut self) { 358 self.set_canceled(); 359 self.set_detached(); 360 } 361 } 362 363 impl<T> Future for Task<T> { 364 type Output = T; 365 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>366 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 367 match self.poll_task(cx) { 368 Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), 369 Poll::Pending => Poll::Pending, 370 } 371 } 372 } 373 374 impl<T> fmt::Debug for Task<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 376 let ptr = self.ptr.as_ptr(); 377 let header = ptr as *const Header; 378 379 f.debug_struct("Task") 380 .field("header", unsafe { &(*header) }) 381 .finish() 382 } 383 } 384