• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::task::RawTask;
2 
3 use std::fmt;
4 use std::future::Future;
5 use std::marker::PhantomData;
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8 
9 cfg_rt! {
10     /// An owned permission to join on a task (await its termination).
11     ///
12     /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
13     /// a task rather than a thread.
14     ///
15     /// A `JoinHandle` *detaches* the associated task when it is dropped, which
16     /// means that there is no longer any handle to the task, and no way to `join`
17     /// on it.
18     ///
19     /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
20     /// functions.
21     ///
22     /// # Examples
23     ///
24     /// Creation from [`task::spawn`]:
25     ///
26     /// ```
27     /// use tokio::task;
28     ///
29     /// # async fn doc() {
30     /// let join_handle: task::JoinHandle<_> = task::spawn(async {
31     ///     // some work here
32     /// });
33     /// # }
34     /// ```
35     ///
36     /// Creation from [`task::spawn_blocking`]:
37     ///
38     /// ```
39     /// use tokio::task;
40     ///
41     /// # async fn doc() {
42     /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
43     ///     // some blocking work here
44     /// });
45     /// # }
46     /// ```
47     ///
48     /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
49     /// If the return value is an i32, the join handle has type `JoinHandle<i32>`:
50     ///
51     /// ```
52     /// use tokio::task;
53     ///
54     /// # async fn doc() {
55     /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
56     ///     5 + 3
57     /// });
58     /// # }
59     ///
60     /// ```
61     ///
62     /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
63     ///
64     /// ```
65     /// use tokio::task;
66     ///
67     /// # async fn doc() {
68     /// let join_handle: task::JoinHandle<()> = task::spawn(async {
69     ///     println!("I return nothing.");
70     /// });
71     /// # }
72     /// ```
73     ///
74     /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
75     /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
76     /// to be double chained to extract the returned value:
77     ///
78     /// ```
79     /// use tokio::task;
80     /// use std::io;
81     ///
82     /// #[tokio::main]
83     /// async fn main() -> io::Result<()> {
84     ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
85     ///         Ok(5 + 3)
86     ///     });
87     ///
88     ///     let result = join_handle.await??;
89     ///     assert_eq!(result, 8);
90     ///     Ok(())
91     /// }
92     /// ```
93     ///
94     /// If the task panics, the error is a [`JoinError`] that contains the panic:
95     ///
96     /// ```
97     /// use tokio::task;
98     /// use std::io;
99     /// use std::panic;
100     ///
101     /// #[tokio::main]
102     /// async fn main() -> io::Result<()> {
103     ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
104     ///         panic!("boom");
105     ///     });
106     ///
107     ///     let err = join_handle.await.unwrap_err();
108     ///     assert!(err.is_panic());
109     ///     Ok(())
110     /// }
111     ///
112     /// ```
113     /// Child being detached and outliving its parent:
114     ///
115     /// ```no_run
116     /// use tokio::task;
117     /// use tokio::time;
118     /// use std::time::Duration;
119     ///
120     /// # #[tokio::main] async fn main() {
121     /// let original_task = task::spawn(async {
122     ///     let _detached_task = task::spawn(async {
123     ///         // Here we sleep to make sure that the first task returns before.
124     ///         time::sleep(Duration::from_millis(10)).await;
125     ///         // This will be called, even though the JoinHandle is dropped.
126     ///         println!("♫ Still alive ♫");
127     ///     });
128     /// });
129     ///
130     /// original_task.await.expect("The task being joined has panicked");
131     /// println!("Original task is joined.");
132     ///
133     /// // We make sure that the new task has time to run, before the main
134     /// // task returns.
135     ///
136     /// time::sleep(Duration::from_millis(1000)).await;
137     /// # }
138     /// ```
139     ///
140     /// [`task::spawn`]: crate::task::spawn()
141     /// [`task::spawn_blocking`]: crate::task::spawn_blocking
142     /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
143     /// [`JoinError`]: crate::task::JoinError
144     pub struct JoinHandle<T> {
145         raw: Option<RawTask>,
146         _p: PhantomData<T>,
147     }
148 }
149 
150 unsafe impl<T: Send> Send for JoinHandle<T> {}
151 unsafe impl<T: Send> Sync for JoinHandle<T> {}
152 
153 impl<T> JoinHandle<T> {
new(raw: RawTask) -> JoinHandle<T>154     pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
155         JoinHandle {
156             raw: Some(raw),
157             _p: PhantomData,
158         }
159     }
160 
161     /// Abort the task associated with the handle.
162     ///
163     /// Awaiting a cancelled task might complete as usual if the task was
164     /// already completed at the time it was cancelled, but most likely it
165     /// will fail with a [cancelled] `JoinError`.
166     ///
167     /// ```rust
168     /// use tokio::time;
169     ///
170     /// #[tokio::main]
171     /// async fn main() {
172     ///    let mut handles = Vec::new();
173     ///
174     ///    handles.push(tokio::spawn(async {
175     ///       time::sleep(time::Duration::from_secs(10)).await;
176     ///       true
177     ///    }));
178     ///
179     ///    handles.push(tokio::spawn(async {
180     ///       time::sleep(time::Duration::from_secs(10)).await;
181     ///       false
182     ///    }));
183     ///
184     ///    for handle in &handles {
185     ///        handle.abort();
186     ///    }
187     ///
188     ///    for handle in handles {
189     ///        assert!(handle.await.unwrap_err().is_cancelled());
190     ///    }
191     /// }
192     /// ```
193     /// [cancelled]: method@super::error::JoinError::is_cancelled
abort(&self)194     pub fn abort(&self) {
195         if let Some(raw) = self.raw {
196             raw.remote_abort();
197         }
198     }
199 }
200 
201 impl<T> Unpin for JoinHandle<T> {}
202 
203 impl<T> Future for JoinHandle<T> {
204     type Output = super::Result<T>;
205 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>206     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207         let mut ret = Poll::Pending;
208 
209         // Keep track of task budget
210         let coop = ready!(crate::coop::poll_proceed(cx));
211 
212         // Raw should always be set. If it is not, this is due to polling after
213         // completion
214         let raw = self
215             .raw
216             .as_ref()
217             .expect("polling after `JoinHandle` already completed");
218 
219         // Try to read the task output. If the task is not yet complete, the
220         // waker is stored and is notified once the task does complete.
221         //
222         // The function must go via the vtable, which requires erasing generic
223         // types. To do this, the function "return" is placed on the stack
224         // **before** calling the function and is passed into the function using
225         // `*mut ()`.
226         //
227         // Safety:
228         //
229         // The type of `T` must match the task's output type.
230         unsafe {
231             raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
232         }
233 
234         if ret.is_ready() {
235             coop.made_progress();
236         }
237 
238         ret
239     }
240 }
241 
242 impl<T> Drop for JoinHandle<T> {
drop(&mut self)243     fn drop(&mut self) {
244         if let Some(raw) = self.raw.take() {
245             if raw.header().state.drop_join_handle_fast().is_ok() {
246                 return;
247             }
248 
249             raw.drop_join_handle_slow();
250         }
251     }
252 }
253 
254 impl<T> fmt::Debug for JoinHandle<T>
255 where
256     T: fmt::Debug,
257 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result258     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
259         fmt.debug_struct("JoinHandle").finish()
260     }
261 }
262