• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::future::Future;
3 use core::marker::{PhantomData, Unpin};
4 use core::mem;
5 use core::pin::Pin;
6 use core::ptr::NonNull;
7 use core::sync::atomic::Ordering;
8 use core::task::{Context, Poll};
9 
10 use crate::header::Header;
11 use crate::state::*;
12 
13 /// A spawned task.
14 ///
15 /// A [`Task`] can be awaited to retrieve the output of its future.
16 ///
17 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
18 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
19 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
20 /// method.
21 ///
22 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
23 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
24 /// [`run()`][`super::Runnable::run()`].
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use smol::{future, Executor};
30 /// use std::thread;
31 ///
32 /// let ex = Executor::new();
33 ///
34 /// // Spawn a future onto the executor.
35 /// let task = ex.spawn(async {
36 ///     println!("Hello from a task!");
37 ///     1 + 2
38 /// });
39 ///
40 /// // Run an executor thread.
41 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
42 ///
43 /// // Wait for the task's output.
44 /// assert_eq!(future::block_on(task), 3);
45 /// ```
46 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
47 pub struct Task<T> {
48     /// A raw task pointer.
49     pub(crate) ptr: NonNull<()>,
50 
51     /// A marker capturing generic type `T`.
52     pub(crate) _marker: PhantomData<T>,
53 }
54 
55 unsafe impl<T: Send> Send for Task<T> {}
56 unsafe impl<T> Sync for Task<T> {}
57 
58 impl<T> Unpin for Task<T> {}
59 
60 #[cfg(feature = "std")]
61 impl<T> std::panic::UnwindSafe for Task<T> {}
62 #[cfg(feature = "std")]
63 impl<T> std::panic::RefUnwindSafe for Task<T> {}
64 
65 impl<T> Task<T> {
66     /// Detaches the task to let it keep running in the background.
67     ///
68     /// # Examples
69     ///
70     /// ```
71     /// use smol::{Executor, Timer};
72     /// use std::time::Duration;
73     ///
74     /// let ex = Executor::new();
75     ///
76     /// // Spawn a deamon future.
77     /// ex.spawn(async {
78     ///     loop {
79     ///         println!("I'm a daemon task looping forever.");
80     ///         Timer::after(Duration::from_secs(1)).await;
81     ///     }
82     /// })
83     /// .detach();
84     /// ```
detach(self)85     pub fn detach(self) {
86         let mut this = self;
87         let _out = this.set_detached();
88         mem::forget(this);
89     }
90 
91     /// Cancels the task and waits for it to stop running.
92     ///
93     /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
94     /// it didn't complete.
95     ///
96     /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
97     /// canceling because it also waits for the task to stop running.
98     ///
99     /// # Examples
100     ///
101     /// ```
102     /// # if cfg!(miri) { return; } // Miri does not support epoll
103     /// use smol::{future, Executor, Timer};
104     /// use std::thread;
105     /// use std::time::Duration;
106     ///
107     /// let ex = Executor::new();
108     ///
109     /// // Spawn a deamon future.
110     /// let task = ex.spawn(async {
111     ///     loop {
112     ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
113     ///         Timer::after(Duration::from_secs(1)).await;
114     ///     }
115     /// });
116     ///
117     /// // Run an executor thread.
118     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
119     ///
120     /// future::block_on(async {
121     ///     Timer::after(Duration::from_secs(3)).await;
122     ///     task.cancel().await;
123     /// });
124     /// ```
cancel(self) -> Option<T>125     pub async fn cancel(self) -> Option<T> {
126         let mut this = self;
127         this.set_canceled();
128         this.fallible().await
129     }
130 
131     /// Converts this task into a [`FallibleTask`].
132     ///
133     /// Like [`Task`], a fallible task will poll the task's output until it is
134     /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
135     /// dropped without being run. Resolves to the task's output when completed,
136     /// or [`None`] if it didn't complete.
137     ///
138     /// # Examples
139     ///
140     /// ```
141     /// use smol::{future, Executor};
142     /// use std::thread;
143     ///
144     /// let ex = Executor::new();
145     ///
146     /// // Spawn a future onto the executor.
147     /// let task = ex.spawn(async {
148     ///     println!("Hello from a task!");
149     ///     1 + 2
150     /// })
151     /// .fallible();
152     ///
153     /// // Run an executor thread.
154     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
155     ///
156     /// // Wait for the task's output.
157     /// assert_eq!(future::block_on(task), Some(3));
158     /// ```
159     ///
160     /// ```
161     /// use smol::future;
162     ///
163     /// // Schedule function which drops the runnable without running it.
164     /// let schedule = move |runnable| drop(runnable);
165     ///
166     /// // Create a task with the future and the schedule function.
167     /// let (runnable, task) = async_task::spawn(async {
168     ///     println!("Hello from a task!");
169     ///     1 + 2
170     /// }, schedule);
171     /// runnable.schedule();
172     ///
173     /// // Wait for the task's output.
174     /// assert_eq!(future::block_on(task.fallible()), None);
175     /// ```
fallible(self) -> FallibleTask<T>176     pub fn fallible(self) -> FallibleTask<T> {
177         FallibleTask { task: self }
178     }
179 
180     /// Puts the task in canceled state.
set_canceled(&mut self)181     fn set_canceled(&mut self) {
182         let ptr = self.ptr.as_ptr();
183         let header = ptr as *const Header;
184 
185         unsafe {
186             let mut state = (*header).state.load(Ordering::Acquire);
187 
188             loop {
189                 // If the task has been completed or closed, it can't be canceled.
190                 if state & (COMPLETED | CLOSED) != 0 {
191                     break;
192                 }
193 
194                 // If the task is not scheduled nor running, we'll need to schedule it.
195                 let new = if state & (SCHEDULED | RUNNING) == 0 {
196                     (state | SCHEDULED | CLOSED) + REFERENCE
197                 } else {
198                     state | CLOSED
199                 };
200 
201                 // Mark the task as closed.
202                 match (*header).state.compare_exchange_weak(
203                     state,
204                     new,
205                     Ordering::AcqRel,
206                     Ordering::Acquire,
207                 ) {
208                     Ok(_) => {
209                         // If the task is not scheduled nor running, schedule it one more time so
210                         // that its future gets dropped by the executor.
211                         if state & (SCHEDULED | RUNNING) == 0 {
212                             ((*header).vtable.schedule)(ptr);
213                         }
214 
215                         // Notify the awaiter that the task has been closed.
216                         if state & AWAITER != 0 {
217                             (*header).notify(None);
218                         }
219 
220                         break;
221                     }
222                     Err(s) => state = s,
223                 }
224             }
225         }
226     }
227 
228     /// Puts the task in detached state.
set_detached(&mut self) -> Option<T>229     fn set_detached(&mut self) -> Option<T> {
230         let ptr = self.ptr.as_ptr();
231         let header = ptr as *const Header;
232 
233         unsafe {
234             // A place where the output will be stored in case it needs to be dropped.
235             let mut output = None;
236 
237             // Optimistically assume the `Task` is being detached just after creating the task.
238             // This is a common case so if the `Task` is datached, the overhead of it is only one
239             // compare-exchange operation.
240             if let Err(mut state) = (*header).state.compare_exchange_weak(
241                 SCHEDULED | TASK | REFERENCE,
242                 SCHEDULED | REFERENCE,
243                 Ordering::AcqRel,
244                 Ordering::Acquire,
245             ) {
246                 loop {
247                     // If the task has been completed but not yet closed, that means its output
248                     // must be dropped.
249                     if state & COMPLETED != 0 && state & CLOSED == 0 {
250                         // Mark the task as closed in order to grab its output.
251                         match (*header).state.compare_exchange_weak(
252                             state,
253                             state | CLOSED,
254                             Ordering::AcqRel,
255                             Ordering::Acquire,
256                         ) {
257                             Ok(_) => {
258                                 // Read the output.
259                                 output =
260                                     Some((((*header).vtable.get_output)(ptr) as *mut T).read());
261 
262                                 // Update the state variable because we're continuing the loop.
263                                 state |= CLOSED;
264                             }
265                             Err(s) => state = s,
266                         }
267                     } else {
268                         // If this is the last reference to the task and it's not closed, then
269                         // close it and schedule one more time so that its future gets dropped by
270                         // the executor.
271                         let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
272                             SCHEDULED | CLOSED | REFERENCE
273                         } else {
274                             state & !TASK
275                         };
276 
277                         // Unset the `TASK` flag.
278                         match (*header).state.compare_exchange_weak(
279                             state,
280                             new,
281                             Ordering::AcqRel,
282                             Ordering::Acquire,
283                         ) {
284                             Ok(_) => {
285                                 // If this is the last reference to the task, we need to either
286                                 // schedule dropping its future or destroy it.
287                                 if state & !(REFERENCE - 1) == 0 {
288                                     if state & CLOSED == 0 {
289                                         ((*header).vtable.schedule)(ptr);
290                                     } else {
291                                         ((*header).vtable.destroy)(ptr);
292                                     }
293                                 }
294 
295                                 break;
296                             }
297                             Err(s) => state = s,
298                         }
299                     }
300                 }
301             }
302 
303             output
304         }
305     }
306 
307     /// Polls the task to retrieve its output.
308     ///
309     /// Returns `Some` if the task has completed or `None` if it was closed.
310     ///
311     /// A task becomes closed in the following cases:
312     ///
313     /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
314     /// 2. Its output gets awaited by the `Task`.
315     /// 3. It panics while polling the future.
316     /// 4. It is completed and the `Task` gets dropped.
poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>317     fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
318         let ptr = self.ptr.as_ptr();
319         let header = ptr as *const Header;
320 
321         unsafe {
322             let mut state = (*header).state.load(Ordering::Acquire);
323 
324             loop {
325                 // If the task has been closed, notify the awaiter and return `None`.
326                 if state & CLOSED != 0 {
327                     // If the task is scheduled or running, we need to wait until its future is
328                     // dropped.
329                     if state & (SCHEDULED | RUNNING) != 0 {
330                         // Replace the waker with one associated with the current task.
331                         (*header).register(cx.waker());
332 
333                         // Reload the state after registering. It is possible changes occurred just
334                         // before registration so we need to check for that.
335                         state = (*header).state.load(Ordering::Acquire);
336 
337                         // If the task is still scheduled or running, we need to wait because its
338                         // future is not dropped yet.
339                         if state & (SCHEDULED | RUNNING) != 0 {
340                             return Poll::Pending;
341                         }
342                     }
343 
344                     // Even though the awaiter is most likely the current task, it could also be
345                     // another task.
346                     (*header).notify(Some(cx.waker()));
347                     return Poll::Ready(None);
348                 }
349 
350                 // If the task is not completed, register the current task.
351                 if state & COMPLETED == 0 {
352                     // Replace the waker with one associated with the current task.
353                     (*header).register(cx.waker());
354 
355                     // Reload the state after registering. It is possible that the task became
356                     // completed or closed just before registration so we need to check for that.
357                     state = (*header).state.load(Ordering::Acquire);
358 
359                     // If the task has been closed, restart.
360                     if state & CLOSED != 0 {
361                         continue;
362                     }
363 
364                     // If the task is still not completed, we're blocked on it.
365                     if state & COMPLETED == 0 {
366                         return Poll::Pending;
367                     }
368                 }
369 
370                 // Since the task is now completed, mark it as closed in order to grab its output.
371                 match (*header).state.compare_exchange(
372                     state,
373                     state | CLOSED,
374                     Ordering::AcqRel,
375                     Ordering::Acquire,
376                 ) {
377                     Ok(_) => {
378                         // Notify the awaiter. Even though the awaiter is most likely the current
379                         // task, it could also be another task.
380                         if state & AWAITER != 0 {
381                             (*header).notify(Some(cx.waker()));
382                         }
383 
384                         // Take the output from the task.
385                         let output = ((*header).vtable.get_output)(ptr) as *mut T;
386                         return Poll::Ready(Some(output.read()));
387                     }
388                     Err(s) => state = s,
389                 }
390             }
391         }
392     }
393 
header(&self) -> &Header394     fn header(&self) -> &Header {
395         let ptr = self.ptr.as_ptr();
396         let header = ptr as *const Header;
397         unsafe { &*header }
398     }
399 
400     /// Returns `true` if the current task is finished.
401     ///
402     /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
is_finished(&self) -> bool403     pub fn is_finished(&self) -> bool {
404         let ptr = self.ptr.as_ptr();
405         let header = ptr as *const Header;
406 
407         unsafe {
408             let state = (*header).state.load(Ordering::Acquire);
409             state & (CLOSED | COMPLETED) != 0
410         }
411     }
412 }
413 
414 impl<T> Drop for Task<T> {
drop(&mut self)415     fn drop(&mut self) {
416         self.set_canceled();
417         self.set_detached();
418     }
419 }
420 
421 impl<T> Future for Task<T> {
422     type Output = T;
423 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>424     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425         match self.poll_task(cx) {
426             Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
427             Poll::Pending => Poll::Pending,
428         }
429     }
430 }
431 
432 impl<T> fmt::Debug for Task<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result433     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434         f.debug_struct("Task")
435             .field("header", self.header())
436             .finish()
437     }
438 }
439 
440 /// A spawned task with a fallible response.
441 ///
442 /// This type behaves like [`Task`], however it produces an `Option<T>` when
443 /// polled and will return `None` if the executor dropped its
444 /// [`Runnable`][`super::Runnable`] without being run.
445 ///
446 /// This can be useful to avoid the panic produced when polling the `Task`
447 /// future if the executor dropped its `Runnable`.
448 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
449 pub struct FallibleTask<T> {
450     task: Task<T>,
451 }
452 
453 impl<T> FallibleTask<T> {
454     /// Detaches the task to let it keep running in the background.
455     ///
456     /// # Examples
457     ///
458     /// ```
459     /// use smol::{Executor, Timer};
460     /// use std::time::Duration;
461     ///
462     /// let ex = Executor::new();
463     ///
464     /// // Spawn a deamon future.
465     /// ex.spawn(async {
466     ///     loop {
467     ///         println!("I'm a daemon task looping forever.");
468     ///         Timer::after(Duration::from_secs(1)).await;
469     ///     }
470     /// })
471     /// .fallible()
472     /// .detach();
473     /// ```
detach(self)474     pub fn detach(self) {
475         self.task.detach()
476     }
477 
478     /// Cancels the task and waits for it to stop running.
479     ///
480     /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
481     /// it didn't complete.
482     ///
483     /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
484     /// canceling because it also waits for the task to stop running.
485     ///
486     /// # Examples
487     ///
488     /// ```
489     /// # if cfg!(miri) { return; } // Miri does not support epoll
490     /// use smol::{future, Executor, Timer};
491     /// use std::thread;
492     /// use std::time::Duration;
493     ///
494     /// let ex = Executor::new();
495     ///
496     /// // Spawn a deamon future.
497     /// let task = ex.spawn(async {
498     ///     loop {
499     ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
500     ///         Timer::after(Duration::from_secs(1)).await;
501     ///     }
502     /// })
503     /// .fallible();
504     ///
505     /// // Run an executor thread.
506     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
507     ///
508     /// future::block_on(async {
509     ///     Timer::after(Duration::from_secs(3)).await;
510     ///     task.cancel().await;
511     /// });
512     /// ```
cancel(self) -> Option<T>513     pub async fn cancel(self) -> Option<T> {
514         self.task.cancel().await
515     }
516 }
517 
518 impl<T> Future for FallibleTask<T> {
519     type Output = Option<T>;
520 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>521     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522         self.task.poll_task(cx)
523     }
524 }
525 
526 impl<T> fmt::Debug for FallibleTask<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result527     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528         f.debug_struct("FallibleTask")
529             .field("header", self.task.header())
530             .finish()
531     }
532 }
533