1 use core::fmt; 2 use core::future::Future; 3 use core::marker::{PhantomData, Unpin}; 4 use core::mem; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::sync::atomic::Ordering; 8 use core::task::{Context, Poll}; 9 10 use crate::header::Header; 11 use crate::state::*; 12 13 /// A spawned task. 14 /// 15 /// A [`Task`] can be awaited to retrieve the output of its future. 16 /// 17 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the 18 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a 19 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] 20 /// method. 21 /// 22 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor 23 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking 24 /// [`run()`][`super::Runnable::run()`]. 25 /// 26 /// # Examples 27 /// 28 /// ``` 29 /// use smol::{future, Executor}; 30 /// use std::thread; 31 /// 32 /// let ex = Executor::new(); 33 /// 34 /// // Spawn a future onto the executor. 35 /// let task = ex.spawn(async { 36 /// println!("Hello from a task!"); 37 /// 1 + 2 38 /// }); 39 /// 40 /// // Run an executor thread. 41 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 42 /// 43 /// // Wait for the task's output. 44 /// assert_eq!(future::block_on(task), 3); 45 /// ``` 46 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] 47 pub struct Task<T> { 48 /// A raw task pointer. 49 pub(crate) ptr: NonNull<()>, 50 51 /// A marker capturing generic type `T`. 52 pub(crate) _marker: PhantomData<T>, 53 } 54 55 unsafe impl<T: Send> Send for Task<T> {} 56 unsafe impl<T> Sync for Task<T> {} 57 58 impl<T> Unpin for Task<T> {} 59 60 #[cfg(feature = "std")] 61 impl<T> std::panic::UnwindSafe for Task<T> {} 62 #[cfg(feature = "std")] 63 impl<T> std::panic::RefUnwindSafe for Task<T> {} 64 65 impl<T> Task<T> { 66 /// Detaches the task to let it keep running in the background. 67 /// 68 /// # Examples 69 /// 70 /// ``` 71 /// use smol::{Executor, Timer}; 72 /// use std::time::Duration; 73 /// 74 /// let ex = Executor::new(); 75 /// 76 /// // Spawn a deamon future. 77 /// ex.spawn(async { 78 /// loop { 79 /// println!("I'm a daemon task looping forever."); 80 /// Timer::after(Duration::from_secs(1)).await; 81 /// } 82 /// }) 83 /// .detach(); 84 /// ``` detach(self)85 pub fn detach(self) { 86 let mut this = self; 87 let _out = this.set_detached(); 88 mem::forget(this); 89 } 90 91 /// Cancels the task and waits for it to stop running. 92 /// 93 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if 94 /// it didn't complete. 95 /// 96 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of 97 /// canceling because it also waits for the task to stop running. 98 /// 99 /// # Examples 100 /// 101 /// ``` 102 /// # if cfg!(miri) { return; } // Miri does not support epoll 103 /// use smol::{future, Executor, Timer}; 104 /// use std::thread; 105 /// use std::time::Duration; 106 /// 107 /// let ex = Executor::new(); 108 /// 109 /// // Spawn a deamon future. 110 /// let task = ex.spawn(async { 111 /// loop { 112 /// println!("Even though I'm in an infinite loop, you can still cancel me!"); 113 /// Timer::after(Duration::from_secs(1)).await; 114 /// } 115 /// }); 116 /// 117 /// // Run an executor thread. 118 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 119 /// 120 /// future::block_on(async { 121 /// Timer::after(Duration::from_secs(3)).await; 122 /// task.cancel().await; 123 /// }); 124 /// ``` cancel(self) -> Option<T>125 pub async fn cancel(self) -> Option<T> { 126 let mut this = self; 127 this.set_canceled(); 128 this.fallible().await 129 } 130 131 /// Converts this task into a [`FallibleTask`]. 132 /// 133 /// Like [`Task`], a fallible task will poll the task's output until it is 134 /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being 135 /// dropped without being run. Resolves to the task's output when completed, 136 /// or [`None`] if it didn't complete. 137 /// 138 /// # Examples 139 /// 140 /// ``` 141 /// use smol::{future, Executor}; 142 /// use std::thread; 143 /// 144 /// let ex = Executor::new(); 145 /// 146 /// // Spawn a future onto the executor. 147 /// let task = ex.spawn(async { 148 /// println!("Hello from a task!"); 149 /// 1 + 2 150 /// }) 151 /// .fallible(); 152 /// 153 /// // Run an executor thread. 154 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 155 /// 156 /// // Wait for the task's output. 157 /// assert_eq!(future::block_on(task), Some(3)); 158 /// ``` 159 /// 160 /// ``` 161 /// use smol::future; 162 /// 163 /// // Schedule function which drops the runnable without running it. 164 /// let schedule = move |runnable| drop(runnable); 165 /// 166 /// // Create a task with the future and the schedule function. 167 /// let (runnable, task) = async_task::spawn(async { 168 /// println!("Hello from a task!"); 169 /// 1 + 2 170 /// }, schedule); 171 /// runnable.schedule(); 172 /// 173 /// // Wait for the task's output. 174 /// assert_eq!(future::block_on(task.fallible()), None); 175 /// ``` fallible(self) -> FallibleTask<T>176 pub fn fallible(self) -> FallibleTask<T> { 177 FallibleTask { task: self } 178 } 179 180 /// Puts the task in canceled state. set_canceled(&mut self)181 fn set_canceled(&mut self) { 182 let ptr = self.ptr.as_ptr(); 183 let header = ptr as *const Header; 184 185 unsafe { 186 let mut state = (*header).state.load(Ordering::Acquire); 187 188 loop { 189 // If the task has been completed or closed, it can't be canceled. 190 if state & (COMPLETED | CLOSED) != 0 { 191 break; 192 } 193 194 // If the task is not scheduled nor running, we'll need to schedule it. 195 let new = if state & (SCHEDULED | RUNNING) == 0 { 196 (state | SCHEDULED | CLOSED) + REFERENCE 197 } else { 198 state | CLOSED 199 }; 200 201 // Mark the task as closed. 202 match (*header).state.compare_exchange_weak( 203 state, 204 new, 205 Ordering::AcqRel, 206 Ordering::Acquire, 207 ) { 208 Ok(_) => { 209 // If the task is not scheduled nor running, schedule it one more time so 210 // that its future gets dropped by the executor. 211 if state & (SCHEDULED | RUNNING) == 0 { 212 ((*header).vtable.schedule)(ptr); 213 } 214 215 // Notify the awaiter that the task has been closed. 216 if state & AWAITER != 0 { 217 (*header).notify(None); 218 } 219 220 break; 221 } 222 Err(s) => state = s, 223 } 224 } 225 } 226 } 227 228 /// Puts the task in detached state. set_detached(&mut self) -> Option<T>229 fn set_detached(&mut self) -> Option<T> { 230 let ptr = self.ptr.as_ptr(); 231 let header = ptr as *const Header; 232 233 unsafe { 234 // A place where the output will be stored in case it needs to be dropped. 235 let mut output = None; 236 237 // Optimistically assume the `Task` is being detached just after creating the task. 238 // This is a common case so if the `Task` is datached, the overhead of it is only one 239 // compare-exchange operation. 240 if let Err(mut state) = (*header).state.compare_exchange_weak( 241 SCHEDULED | TASK | REFERENCE, 242 SCHEDULED | REFERENCE, 243 Ordering::AcqRel, 244 Ordering::Acquire, 245 ) { 246 loop { 247 // If the task has been completed but not yet closed, that means its output 248 // must be dropped. 249 if state & COMPLETED != 0 && state & CLOSED == 0 { 250 // Mark the task as closed in order to grab its output. 251 match (*header).state.compare_exchange_weak( 252 state, 253 state | CLOSED, 254 Ordering::AcqRel, 255 Ordering::Acquire, 256 ) { 257 Ok(_) => { 258 // Read the output. 259 output = 260 Some((((*header).vtable.get_output)(ptr) as *mut T).read()); 261 262 // Update the state variable because we're continuing the loop. 263 state |= CLOSED; 264 } 265 Err(s) => state = s, 266 } 267 } else { 268 // If this is the last reference to the task and it's not closed, then 269 // close it and schedule one more time so that its future gets dropped by 270 // the executor. 271 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { 272 SCHEDULED | CLOSED | REFERENCE 273 } else { 274 state & !TASK 275 }; 276 277 // Unset the `TASK` flag. 278 match (*header).state.compare_exchange_weak( 279 state, 280 new, 281 Ordering::AcqRel, 282 Ordering::Acquire, 283 ) { 284 Ok(_) => { 285 // If this is the last reference to the task, we need to either 286 // schedule dropping its future or destroy it. 287 if state & !(REFERENCE - 1) == 0 { 288 if state & CLOSED == 0 { 289 ((*header).vtable.schedule)(ptr); 290 } else { 291 ((*header).vtable.destroy)(ptr); 292 } 293 } 294 295 break; 296 } 297 Err(s) => state = s, 298 } 299 } 300 } 301 } 302 303 output 304 } 305 } 306 307 /// Polls the task to retrieve its output. 308 /// 309 /// Returns `Some` if the task has completed or `None` if it was closed. 310 /// 311 /// A task becomes closed in the following cases: 312 /// 313 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. 314 /// 2. Its output gets awaited by the `Task`. 315 /// 3. It panics while polling the future. 316 /// 4. It is completed and the `Task` gets dropped. poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>317 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { 318 let ptr = self.ptr.as_ptr(); 319 let header = ptr as *const Header; 320 321 unsafe { 322 let mut state = (*header).state.load(Ordering::Acquire); 323 324 loop { 325 // If the task has been closed, notify the awaiter and return `None`. 326 if state & CLOSED != 0 { 327 // If the task is scheduled or running, we need to wait until its future is 328 // dropped. 329 if state & (SCHEDULED | RUNNING) != 0 { 330 // Replace the waker with one associated with the current task. 331 (*header).register(cx.waker()); 332 333 // Reload the state after registering. It is possible changes occurred just 334 // before registration so we need to check for that. 335 state = (*header).state.load(Ordering::Acquire); 336 337 // If the task is still scheduled or running, we need to wait because its 338 // future is not dropped yet. 339 if state & (SCHEDULED | RUNNING) != 0 { 340 return Poll::Pending; 341 } 342 } 343 344 // Even though the awaiter is most likely the current task, it could also be 345 // another task. 346 (*header).notify(Some(cx.waker())); 347 return Poll::Ready(None); 348 } 349 350 // If the task is not completed, register the current task. 351 if state & COMPLETED == 0 { 352 // Replace the waker with one associated with the current task. 353 (*header).register(cx.waker()); 354 355 // Reload the state after registering. It is possible that the task became 356 // completed or closed just before registration so we need to check for that. 357 state = (*header).state.load(Ordering::Acquire); 358 359 // If the task has been closed, restart. 360 if state & CLOSED != 0 { 361 continue; 362 } 363 364 // If the task is still not completed, we're blocked on it. 365 if state & COMPLETED == 0 { 366 return Poll::Pending; 367 } 368 } 369 370 // Since the task is now completed, mark it as closed in order to grab its output. 371 match (*header).state.compare_exchange( 372 state, 373 state | CLOSED, 374 Ordering::AcqRel, 375 Ordering::Acquire, 376 ) { 377 Ok(_) => { 378 // Notify the awaiter. Even though the awaiter is most likely the current 379 // task, it could also be another task. 380 if state & AWAITER != 0 { 381 (*header).notify(Some(cx.waker())); 382 } 383 384 // Take the output from the task. 385 let output = ((*header).vtable.get_output)(ptr) as *mut T; 386 return Poll::Ready(Some(output.read())); 387 } 388 Err(s) => state = s, 389 } 390 } 391 } 392 } 393 header(&self) -> &Header394 fn header(&self) -> &Header { 395 let ptr = self.ptr.as_ptr(); 396 let header = ptr as *const Header; 397 unsafe { &*header } 398 } 399 400 /// Returns `true` if the current task is finished. 401 /// 402 /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. is_finished(&self) -> bool403 pub fn is_finished(&self) -> bool { 404 let ptr = self.ptr.as_ptr(); 405 let header = ptr as *const Header; 406 407 unsafe { 408 let state = (*header).state.load(Ordering::Acquire); 409 state & (CLOSED | COMPLETED) != 0 410 } 411 } 412 } 413 414 impl<T> Drop for Task<T> { drop(&mut self)415 fn drop(&mut self) { 416 self.set_canceled(); 417 self.set_detached(); 418 } 419 } 420 421 impl<T> Future for Task<T> { 422 type Output = T; 423 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>424 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 425 match self.poll_task(cx) { 426 Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), 427 Poll::Pending => Poll::Pending, 428 } 429 } 430 } 431 432 impl<T> fmt::Debug for Task<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 434 f.debug_struct("Task") 435 .field("header", self.header()) 436 .finish() 437 } 438 } 439 440 /// A spawned task with a fallible response. 441 /// 442 /// This type behaves like [`Task`], however it produces an `Option<T>` when 443 /// polled and will return `None` if the executor dropped its 444 /// [`Runnable`][`super::Runnable`] without being run. 445 /// 446 /// This can be useful to avoid the panic produced when polling the `Task` 447 /// future if the executor dropped its `Runnable`. 448 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] 449 pub struct FallibleTask<T> { 450 task: Task<T>, 451 } 452 453 impl<T> FallibleTask<T> { 454 /// Detaches the task to let it keep running in the background. 455 /// 456 /// # Examples 457 /// 458 /// ``` 459 /// use smol::{Executor, Timer}; 460 /// use std::time::Duration; 461 /// 462 /// let ex = Executor::new(); 463 /// 464 /// // Spawn a deamon future. 465 /// ex.spawn(async { 466 /// loop { 467 /// println!("I'm a daemon task looping forever."); 468 /// Timer::after(Duration::from_secs(1)).await; 469 /// } 470 /// }) 471 /// .fallible() 472 /// .detach(); 473 /// ``` detach(self)474 pub fn detach(self) { 475 self.task.detach() 476 } 477 478 /// Cancels the task and waits for it to stop running. 479 /// 480 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if 481 /// it didn't complete. 482 /// 483 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of 484 /// canceling because it also waits for the task to stop running. 485 /// 486 /// # Examples 487 /// 488 /// ``` 489 /// # if cfg!(miri) { return; } // Miri does not support epoll 490 /// use smol::{future, Executor, Timer}; 491 /// use std::thread; 492 /// use std::time::Duration; 493 /// 494 /// let ex = Executor::new(); 495 /// 496 /// // Spawn a deamon future. 497 /// let task = ex.spawn(async { 498 /// loop { 499 /// println!("Even though I'm in an infinite loop, you can still cancel me!"); 500 /// Timer::after(Duration::from_secs(1)).await; 501 /// } 502 /// }) 503 /// .fallible(); 504 /// 505 /// // Run an executor thread. 506 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 507 /// 508 /// future::block_on(async { 509 /// Timer::after(Duration::from_secs(3)).await; 510 /// task.cancel().await; 511 /// }); 512 /// ``` cancel(self) -> Option<T>513 pub async fn cancel(self) -> Option<T> { 514 self.task.cancel().await 515 } 516 } 517 518 impl<T> Future for FallibleTask<T> { 519 type Output = Option<T>; 520 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>521 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 522 self.task.poll_task(cx) 523 } 524 } 525 526 impl<T> fmt::Debug for FallibleTask<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 528 f.debug_struct("FallibleTask") 529 .field("header", self.task.header()) 530 .finish() 531 } 532 } 533