1 use crate::runtime::task::RawTask; 2 3 use std::fmt; 4 use std::future::Future; 5 use std::marker::PhantomData; 6 use std::pin::Pin; 7 use std::task::{Context, Poll}; 8 9 cfg_rt! { 10 /// An owned permission to join on a task (await its termination). 11 /// 12 /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for 13 /// a task rather than a thread. 14 /// 15 /// A `JoinHandle` *detaches* the associated task when it is dropped, which 16 /// means that there is no longer any handle to the task, and no way to `join` 17 /// on it. 18 /// 19 /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] 20 /// functions. 21 /// 22 /// # Examples 23 /// 24 /// Creation from [`task::spawn`]: 25 /// 26 /// ``` 27 /// use tokio::task; 28 /// 29 /// # async fn doc() { 30 /// let join_handle: task::JoinHandle<_> = task::spawn(async { 31 /// // some work here 32 /// }); 33 /// # } 34 /// ``` 35 /// 36 /// Creation from [`task::spawn_blocking`]: 37 /// 38 /// ``` 39 /// use tokio::task; 40 /// 41 /// # async fn doc() { 42 /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { 43 /// // some blocking work here 44 /// }); 45 /// # } 46 /// ``` 47 /// 48 /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task. 49 /// If the return value is an i32, the join handle has type `JoinHandle<i32>`: 50 /// 51 /// ``` 52 /// use tokio::task; 53 /// 54 /// # async fn doc() { 55 /// let join_handle: task::JoinHandle<i32> = task::spawn(async { 56 /// 5 + 3 57 /// }); 58 /// # } 59 /// 60 /// ``` 61 /// 62 /// If the task does not have a return value, the join handle has type `JoinHandle<()>`: 63 /// 64 /// ``` 65 /// use tokio::task; 66 /// 67 /// # async fn doc() { 68 /// let join_handle: task::JoinHandle<()> = task::spawn(async { 69 /// println!("I return nothing."); 70 /// }); 71 /// # } 72 /// ``` 73 /// 74 /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a 75 /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has 76 /// to be double chained to extract the returned value: 77 /// 78 /// ``` 79 /// use tokio::task; 80 /// use std::io; 81 /// 82 /// #[tokio::main] 83 /// async fn main() -> io::Result<()> { 84 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { 85 /// Ok(5 + 3) 86 /// }); 87 /// 88 /// let result = join_handle.await??; 89 /// assert_eq!(result, 8); 90 /// Ok(()) 91 /// } 92 /// ``` 93 /// 94 /// If the task panics, the error is a [`JoinError`] that contains the panic: 95 /// 96 /// ``` 97 /// use tokio::task; 98 /// use std::io; 99 /// use std::panic; 100 /// 101 /// #[tokio::main] 102 /// async fn main() -> io::Result<()> { 103 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { 104 /// panic!("boom"); 105 /// }); 106 /// 107 /// let err = join_handle.await.unwrap_err(); 108 /// assert!(err.is_panic()); 109 /// Ok(()) 110 /// } 111 /// 112 /// ``` 113 /// Child being detached and outliving its parent: 114 /// 115 /// ```no_run 116 /// use tokio::task; 117 /// use tokio::time; 118 /// use std::time::Duration; 119 /// 120 /// # #[tokio::main] async fn main() { 121 /// let original_task = task::spawn(async { 122 /// let _detached_task = task::spawn(async { 123 /// // Here we sleep to make sure that the first task returns before. 124 /// time::sleep(Duration::from_millis(10)).await; 125 /// // This will be called, even though the JoinHandle is dropped. 126 /// println!("♫ Still alive ♫"); 127 /// }); 128 /// }); 129 /// 130 /// original_task.await.expect("The task being joined has panicked"); 131 /// println!("Original task is joined."); 132 /// 133 /// // We make sure that the new task has time to run, before the main 134 /// // task returns. 135 /// 136 /// time::sleep(Duration::from_millis(1000)).await; 137 /// # } 138 /// ``` 139 /// 140 /// [`task::spawn`]: crate::task::spawn() 141 /// [`task::spawn_blocking`]: crate::task::spawn_blocking 142 /// [`std::thread::JoinHandle`]: std::thread::JoinHandle 143 /// [`JoinError`]: crate::task::JoinError 144 pub struct JoinHandle<T> { 145 raw: Option<RawTask>, 146 _p: PhantomData<T>, 147 } 148 } 149 150 unsafe impl<T: Send> Send for JoinHandle<T> {} 151 unsafe impl<T: Send> Sync for JoinHandle<T> {} 152 153 impl<T> JoinHandle<T> { new(raw: RawTask) -> JoinHandle<T>154 pub(super) fn new(raw: RawTask) -> JoinHandle<T> { 155 JoinHandle { 156 raw: Some(raw), 157 _p: PhantomData, 158 } 159 } 160 161 /// Abort the task associated with the handle. 162 /// 163 /// Awaiting a cancelled task might complete as usual if the task was 164 /// already completed at the time it was cancelled, but most likely it 165 /// will fail with a [cancelled] `JoinError`. 166 /// 167 /// ```rust 168 /// use tokio::time; 169 /// 170 /// #[tokio::main] 171 /// async fn main() { 172 /// let mut handles = Vec::new(); 173 /// 174 /// handles.push(tokio::spawn(async { 175 /// time::sleep(time::Duration::from_secs(10)).await; 176 /// true 177 /// })); 178 /// 179 /// handles.push(tokio::spawn(async { 180 /// time::sleep(time::Duration::from_secs(10)).await; 181 /// false 182 /// })); 183 /// 184 /// for handle in &handles { 185 /// handle.abort(); 186 /// } 187 /// 188 /// for handle in handles { 189 /// assert!(handle.await.unwrap_err().is_cancelled()); 190 /// } 191 /// } 192 /// ``` 193 /// [cancelled]: method@super::error::JoinError::is_cancelled abort(&self)194 pub fn abort(&self) { 195 if let Some(raw) = self.raw { 196 raw.remote_abort(); 197 } 198 } 199 } 200 201 impl<T> Unpin for JoinHandle<T> {} 202 203 impl<T> Future for JoinHandle<T> { 204 type Output = super::Result<T>; 205 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>206 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 207 let mut ret = Poll::Pending; 208 209 // Keep track of task budget 210 let coop = ready!(crate::coop::poll_proceed(cx)); 211 212 // Raw should always be set. If it is not, this is due to polling after 213 // completion 214 let raw = self 215 .raw 216 .as_ref() 217 .expect("polling after `JoinHandle` already completed"); 218 219 // Try to read the task output. If the task is not yet complete, the 220 // waker is stored and is notified once the task does complete. 221 // 222 // The function must go via the vtable, which requires erasing generic 223 // types. To do this, the function "return" is placed on the stack 224 // **before** calling the function and is passed into the function using 225 // `*mut ()`. 226 // 227 // Safety: 228 // 229 // The type of `T` must match the task's output type. 230 unsafe { 231 raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); 232 } 233 234 if ret.is_ready() { 235 coop.made_progress(); 236 } 237 238 ret 239 } 240 } 241 242 impl<T> Drop for JoinHandle<T> { drop(&mut self)243 fn drop(&mut self) { 244 if let Some(raw) = self.raw.take() { 245 if raw.header().state.drop_join_handle_fast().is_ok() { 246 return; 247 } 248 249 raw.drop_join_handle_slow(); 250 } 251 } 252 } 253 254 impl<T> fmt::Debug for JoinHandle<T> 255 where 256 T: fmt::Debug, 257 { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result258 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 259 fmt.debug_struct("JoinHandle").finish() 260 } 261 } 262