• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::future::Future;
3 use core::marker::{PhantomData, Unpin};
4 use core::mem;
5 use core::pin::Pin;
6 use core::ptr::NonNull;
7 use core::sync::atomic::Ordering;
8 use core::task::{Context, Poll};
9 
10 use crate::header::Header;
11 use crate::state::*;
12 
13 /// A spawned task.
14 ///
15 /// A [`Task`] can be awaited to retrieve the output of its future.
16 ///
17 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
18 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
19 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
20 /// method.
21 ///
22 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
23 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
24 /// [`run()`][`super::Runnable::run()`].
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use smol::{future, Executor};
30 /// use std::thread;
31 ///
32 /// let ex = Executor::new();
33 ///
34 /// // Spawn a future onto the executor.
35 /// let task = ex.spawn(async {
36 ///     println!("Hello from a task!");
37 ///     1 + 2
38 /// });
39 ///
40 /// // Run an executor thread.
41 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
42 ///
43 /// // Wait for the task's output.
44 /// assert_eq!(future::block_on(task), 3);
45 /// ```
46 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
47 pub struct Task<T> {
48     /// A raw task pointer.
49     pub(crate) ptr: NonNull<()>,
50 
51     /// A marker capturing generic type `T`.
52     pub(crate) _marker: PhantomData<T>,
53 }
54 
55 unsafe impl<T: Send> Send for Task<T> {}
56 unsafe impl<T> Sync for Task<T> {}
57 
58 impl<T> Unpin for Task<T> {}
59 
60 #[cfg(feature = "std")]
61 impl<T> std::panic::UnwindSafe for Task<T> {}
62 #[cfg(feature = "std")]
63 impl<T> std::panic::RefUnwindSafe for Task<T> {}
64 
65 impl<T> Task<T> {
66     /// Detaches the task to let it keep running in the background.
67     ///
68     /// # Examples
69     ///
70     /// ```
71     /// use smol::{Executor, Timer};
72     /// use std::time::Duration;
73     ///
74     /// let ex = Executor::new();
75     ///
76     /// // Spawn a deamon future.
77     /// ex.spawn(async {
78     ///     loop {
79     ///         println!("I'm a daemon task looping forever.");
80     ///         Timer::after(Duration::from_secs(1)).await;
81     ///     }
82     /// })
83     /// .detach();
84     /// ```
detach(self)85     pub fn detach(self) {
86         let mut this = self;
87         let _out = this.set_detached();
88         mem::forget(this);
89     }
90 
91     /// Cancels the task and waits for it to stop running.
92     ///
93     /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
94     /// it didn't complete.
95     ///
96     /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
97     /// canceling because it also waits for the task to stop running.
98     ///
99     /// # Examples
100     ///
101     /// ```
102     /// use smol::{future, Executor, Timer};
103     /// use std::thread;
104     /// use std::time::Duration;
105     ///
106     /// let ex = Executor::new();
107     ///
108     /// // Spawn a deamon future.
109     /// let task = ex.spawn(async {
110     ///     loop {
111     ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
112     ///         Timer::after(Duration::from_secs(1)).await;
113     ///     }
114     /// });
115     ///
116     /// // Run an executor thread.
117     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
118     ///
119     /// future::block_on(async {
120     ///     Timer::after(Duration::from_secs(3)).await;
121     ///     task.cancel().await;
122     /// });
123     /// ```
cancel(self) -> Option<T>124     pub async fn cancel(self) -> Option<T> {
125         let mut this = self;
126         this.set_canceled();
127 
128         struct Fut<T>(Task<T>);
129 
130         impl<T> Future for Fut<T> {
131             type Output = Option<T>;
132 
133             fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134                 self.0.poll_task(cx)
135             }
136         }
137 
138         Fut(this).await
139     }
140 
141     /// Puts the task in canceled state.
set_canceled(&mut self)142     fn set_canceled(&mut self) {
143         let ptr = self.ptr.as_ptr();
144         let header = ptr as *const Header;
145 
146         unsafe {
147             let mut state = (*header).state.load(Ordering::Acquire);
148 
149             loop {
150                 // If the task has been completed or closed, it can't be canceled.
151                 if state & (COMPLETED | CLOSED) != 0 {
152                     break;
153                 }
154 
155                 // If the task is not scheduled nor running, we'll need to schedule it.
156                 let new = if state & (SCHEDULED | RUNNING) == 0 {
157                     (state | SCHEDULED | CLOSED) + REFERENCE
158                 } else {
159                     state | CLOSED
160                 };
161 
162                 // Mark the task as closed.
163                 match (*header).state.compare_exchange_weak(
164                     state,
165                     new,
166                     Ordering::AcqRel,
167                     Ordering::Acquire,
168                 ) {
169                     Ok(_) => {
170                         // If the task is not scheduled nor running, schedule it one more time so
171                         // that its future gets dropped by the executor.
172                         if state & (SCHEDULED | RUNNING) == 0 {
173                             ((*header).vtable.schedule)(ptr);
174                         }
175 
176                         // Notify the awaiter that the task has been closed.
177                         if state & AWAITER != 0 {
178                             (*header).notify(None);
179                         }
180 
181                         break;
182                     }
183                     Err(s) => state = s,
184                 }
185             }
186         }
187     }
188 
189     /// Puts the task in detached state.
set_detached(&mut self) -> Option<T>190     fn set_detached(&mut self) -> Option<T> {
191         let ptr = self.ptr.as_ptr();
192         let header = ptr as *const Header;
193 
194         unsafe {
195             // A place where the output will be stored in case it needs to be dropped.
196             let mut output = None;
197 
198             // Optimistically assume the `Task` is being detached just after creating the task.
199             // This is a common case so if the `Task` is datached, the overhead of it is only one
200             // compare-exchange operation.
201             if let Err(mut state) = (*header).state.compare_exchange_weak(
202                 SCHEDULED | TASK | REFERENCE,
203                 SCHEDULED | REFERENCE,
204                 Ordering::AcqRel,
205                 Ordering::Acquire,
206             ) {
207                 loop {
208                     // If the task has been completed but not yet closed, that means its output
209                     // must be dropped.
210                     if state & COMPLETED != 0 && state & CLOSED == 0 {
211                         // Mark the task as closed in order to grab its output.
212                         match (*header).state.compare_exchange_weak(
213                             state,
214                             state | CLOSED,
215                             Ordering::AcqRel,
216                             Ordering::Acquire,
217                         ) {
218                             Ok(_) => {
219                                 // Read the output.
220                                 output =
221                                     Some((((*header).vtable.get_output)(ptr) as *mut T).read());
222 
223                                 // Update the state variable because we're continuing the loop.
224                                 state |= CLOSED;
225                             }
226                             Err(s) => state = s,
227                         }
228                     } else {
229                         // If this is the last reference to the task and it's not closed, then
230                         // close it and schedule one more time so that its future gets dropped by
231                         // the executor.
232                         let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
233                             SCHEDULED | CLOSED | REFERENCE
234                         } else {
235                             state & !TASK
236                         };
237 
238                         // Unset the `TASK` flag.
239                         match (*header).state.compare_exchange_weak(
240                             state,
241                             new,
242                             Ordering::AcqRel,
243                             Ordering::Acquire,
244                         ) {
245                             Ok(_) => {
246                                 // If this is the last reference to the task, we need to either
247                                 // schedule dropping its future or destroy it.
248                                 if state & !(REFERENCE - 1) == 0 {
249                                     if state & CLOSED == 0 {
250                                         ((*header).vtable.schedule)(ptr);
251                                     } else {
252                                         ((*header).vtable.destroy)(ptr);
253                                     }
254                                 }
255 
256                                 break;
257                             }
258                             Err(s) => state = s,
259                         }
260                     }
261                 }
262             }
263 
264             output
265         }
266     }
267 
268     /// Polls the task to retrieve its output.
269     ///
270     /// Returns `Some` if the task has completed or `None` if it was closed.
271     ///
272     /// A task becomes closed in the following cases:
273     ///
274     /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
275     /// 2. Its output gets awaited by the `Task`.
276     /// 3. It panics while polling the future.
277     /// 4. It is completed and the `Task` gets dropped.
poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>278     fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
279         let ptr = self.ptr.as_ptr();
280         let header = ptr as *const Header;
281 
282         unsafe {
283             let mut state = (*header).state.load(Ordering::Acquire);
284 
285             loop {
286                 // If the task has been closed, notify the awaiter and return `None`.
287                 if state & CLOSED != 0 {
288                     // If the task is scheduled or running, we need to wait until its future is
289                     // dropped.
290                     if state & (SCHEDULED | RUNNING) != 0 {
291                         // Replace the waker with one associated with the current task.
292                         (*header).register(cx.waker());
293 
294                         // Reload the state after registering. It is possible changes occurred just
295                         // before registration so we need to check for that.
296                         state = (*header).state.load(Ordering::Acquire);
297 
298                         // If the task is still scheduled or running, we need to wait because its
299                         // future is not dropped yet.
300                         if state & (SCHEDULED | RUNNING) != 0 {
301                             return Poll::Pending;
302                         }
303                     }
304 
305                     // Even though the awaiter is most likely the current task, it could also be
306                     // another task.
307                     (*header).notify(Some(cx.waker()));
308                     return Poll::Ready(None);
309                 }
310 
311                 // If the task is not completed, register the current task.
312                 if state & COMPLETED == 0 {
313                     // Replace the waker with one associated with the current task.
314                     (*header).register(cx.waker());
315 
316                     // Reload the state after registering. It is possible that the task became
317                     // completed or closed just before registration so we need to check for that.
318                     state = (*header).state.load(Ordering::Acquire);
319 
320                     // If the task has been closed, restart.
321                     if state & CLOSED != 0 {
322                         continue;
323                     }
324 
325                     // If the task is still not completed, we're blocked on it.
326                     if state & COMPLETED == 0 {
327                         return Poll::Pending;
328                     }
329                 }
330 
331                 // Since the task is now completed, mark it as closed in order to grab its output.
332                 match (*header).state.compare_exchange(
333                     state,
334                     state | CLOSED,
335                     Ordering::AcqRel,
336                     Ordering::Acquire,
337                 ) {
338                     Ok(_) => {
339                         // Notify the awaiter. Even though the awaiter is most likely the current
340                         // task, it could also be another task.
341                         if state & AWAITER != 0 {
342                             (*header).notify(Some(cx.waker()));
343                         }
344 
345                         // Take the output from the task.
346                         let output = ((*header).vtable.get_output)(ptr) as *mut T;
347                         return Poll::Ready(Some(output.read()));
348                     }
349                     Err(s) => state = s,
350                 }
351             }
352         }
353     }
354 }
355 
356 impl<T> Drop for Task<T> {
drop(&mut self)357     fn drop(&mut self) {
358         self.set_canceled();
359         self.set_detached();
360     }
361 }
362 
363 impl<T> Future for Task<T> {
364     type Output = T;
365 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>366     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367         match self.poll_task(cx) {
368             Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
369             Poll::Pending => Poll::Pending,
370         }
371     }
372 }
373 
374 impl<T> fmt::Debug for Task<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result375     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376         let ptr = self.ptr.as_ptr();
377         let header = ptr as *const Header;
378 
379         f.debug_struct("Task")
380             .field("header", unsafe { &(*header) })
381             .finish()
382     }
383 }
384