1 //! A collection of tasks spawned on a Tokio runtime. 2 //! 3 //! This module provides the [`JoinSet`] type, a collection which stores a set 4 //! of spawned tasks and allows asynchronously awaiting the output of those 5 //! tasks as they complete. See the documentation for the [`JoinSet`] type for 6 //! details. 7 use std::fmt; 8 use std::future::Future; 9 use std::pin::Pin; 10 use std::task::{Context, Poll}; 11 12 use crate::runtime::Handle; 13 #[cfg(tokio_unstable)] 14 use crate::task::Id; 15 use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet}; 16 use crate::util::IdleNotifiedSet; 17 18 /// A collection of tasks spawned on a Tokio runtime. 19 /// 20 /// A `JoinSet` can be used to await the completion of some or all of the tasks 21 /// in the set. The set is not ordered, and the tasks will be returned in the 22 /// order they complete. 23 /// 24 /// All of the tasks must have the same return type `T`. 25 /// 26 /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. 27 /// 28 /// # Examples 29 /// 30 /// Spawn multiple tasks and wait for them. 31 /// 32 /// ``` 33 /// use tokio::task::JoinSet; 34 /// 35 /// #[tokio::main] 36 /// async fn main() { 37 /// let mut set = JoinSet::new(); 38 /// 39 /// for i in 0..10 { 40 /// set.spawn(async move { i }); 41 /// } 42 /// 43 /// let mut seen = [false; 10]; 44 /// while let Some(res) = set.join_next().await { 45 /// let idx = res.unwrap(); 46 /// seen[idx] = true; 47 /// } 48 /// 49 /// for i in 0..10 { 50 /// assert!(seen[i]); 51 /// } 52 /// } 53 /// ``` 54 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] 55 pub struct JoinSet<T> { 56 inner: IdleNotifiedSet<JoinHandle<T>>, 57 } 58 59 /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather 60 /// than on the current default runtime. 61 /// 62 /// [`task::Builder`]: crate::task::Builder 63 #[cfg(all(tokio_unstable, feature = "tracing"))] 64 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 65 #[must_use = "builders do nothing unless used to spawn a task"] 66 pub struct Builder<'a, T> { 67 joinset: &'a mut JoinSet<T>, 68 builder: super::Builder<'a>, 69 } 70 71 impl<T> JoinSet<T> { 72 /// Create a new `JoinSet`. new() -> Self73 pub fn new() -> Self { 74 Self { 75 inner: IdleNotifiedSet::new(), 76 } 77 } 78 79 /// Returns the number of tasks currently in the `JoinSet`. len(&self) -> usize80 pub fn len(&self) -> usize { 81 self.inner.len() 82 } 83 84 /// Returns whether the `JoinSet` is empty. is_empty(&self) -> bool85 pub fn is_empty(&self) -> bool { 86 self.inner.is_empty() 87 } 88 } 89 90 impl<T: 'static> JoinSet<T> { 91 /// Returns a [`Builder`] that can be used to configure a task prior to 92 /// spawning it on this `JoinSet`. 93 /// 94 /// # Examples 95 /// 96 /// ``` 97 /// use tokio::task::JoinSet; 98 /// 99 /// #[tokio::main] 100 /// async fn main() -> std::io::Result<()> { 101 /// let mut set = JoinSet::new(); 102 /// 103 /// // Use the builder to configure a task's name before spawning it. 104 /// set.build_task() 105 /// .name("my_task") 106 /// .spawn(async { /* ... */ })?; 107 /// 108 /// Ok(()) 109 /// } 110 /// ``` 111 #[cfg(all(tokio_unstable, feature = "tracing"))] 112 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] build_task(&mut self) -> Builder<'_, T>113 pub fn build_task(&mut self) -> Builder<'_, T> { 114 Builder { 115 builder: super::Builder::new(), 116 joinset: self, 117 } 118 } 119 120 /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] 121 /// that can be used to remotely cancel the task. 122 /// 123 /// The provided future will start running in the background immediately 124 /// when this method is called, even if you don't await anything on this 125 /// `JoinSet`. 126 /// 127 /// # Panics 128 /// 129 /// This method panics if called outside of a Tokio runtime. 130 /// 131 /// [`AbortHandle`]: crate::task::AbortHandle 132 #[track_caller] spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,133 pub fn spawn<F>(&mut self, task: F) -> AbortHandle 134 where 135 F: Future<Output = T>, 136 F: Send + 'static, 137 T: Send, 138 { 139 self.insert(crate::spawn(task)) 140 } 141 142 /// Spawn the provided task on the provided runtime and store it in this 143 /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely 144 /// cancel the task. 145 /// 146 /// The provided future will start running in the background immediately 147 /// when this method is called, even if you don't await anything on this 148 /// `JoinSet`. 149 /// 150 /// [`AbortHandle`]: crate::task::AbortHandle 151 #[track_caller] spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,152 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle 153 where 154 F: Future<Output = T>, 155 F: Send + 'static, 156 T: Send, 157 { 158 self.insert(handle.spawn(task)) 159 } 160 161 /// Spawn the provided task on the current [`LocalSet`] and store it in this 162 /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely 163 /// cancel the task. 164 /// 165 /// The provided future will start running in the background immediately 166 /// when this method is called, even if you don't await anything on this 167 /// `JoinSet`. 168 /// 169 /// # Panics 170 /// 171 /// This method panics if it is called outside of a `LocalSet`. 172 /// 173 /// [`LocalSet`]: crate::task::LocalSet 174 /// [`AbortHandle`]: crate::task::AbortHandle 175 #[track_caller] spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: 'static,176 pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle 177 where 178 F: Future<Output = T>, 179 F: 'static, 180 { 181 self.insert(crate::task::spawn_local(task)) 182 } 183 184 /// Spawn the provided task on the provided [`LocalSet`] and store it in 185 /// this `JoinSet`, returning an [`AbortHandle`] that can be used to 186 /// remotely cancel the task. 187 /// 188 /// Unlike the [`spawn_local`] method, this method may be used to spawn local 189 /// tasks on a `LocalSet` that is _not_ currently running. The provided 190 /// future will start running whenever the `LocalSet` is next started. 191 /// 192 /// [`LocalSet`]: crate::task::LocalSet 193 /// [`AbortHandle`]: crate::task::AbortHandle 194 /// [`spawn_local`]: Self::spawn_local 195 #[track_caller] spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle where F: Future<Output = T>, F: 'static,196 pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle 197 where 198 F: Future<Output = T>, 199 F: 'static, 200 { 201 self.insert(local_set.spawn_local(task)) 202 } 203 204 /// Spawn the blocking code on the blocking threadpool and store 205 /// it in this `JoinSet`, returning an [`AbortHandle`] that can be 206 /// used to remotely cancel the task. 207 /// 208 /// # Examples 209 /// 210 /// Spawn multiple blocking tasks and wait for them. 211 /// 212 /// ``` 213 /// use tokio::task::JoinSet; 214 /// 215 /// #[tokio::main] 216 /// async fn main() { 217 /// let mut set = JoinSet::new(); 218 /// 219 /// for i in 0..10 { 220 /// set.spawn_blocking(move || { i }); 221 /// } 222 /// 223 /// let mut seen = [false; 10]; 224 /// while let Some(res) = set.join_next().await { 225 /// let idx = res.unwrap(); 226 /// seen[idx] = true; 227 /// } 228 /// 229 /// for i in 0..10 { 230 /// assert!(seen[i]); 231 /// } 232 /// } 233 /// ``` 234 /// 235 /// # Panics 236 /// 237 /// This method panics if called outside of a Tokio runtime. 238 /// 239 /// [`AbortHandle`]: crate::task::AbortHandle 240 #[track_caller] spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,241 pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle 242 where 243 F: FnOnce() -> T, 244 F: Send + 'static, 245 T: Send, 246 { 247 self.insert(crate::runtime::spawn_blocking(f)) 248 } 249 250 /// Spawn the blocking code on the blocking threadpool of the 251 /// provided runtime and store it in this `JoinSet`, returning an 252 /// [`AbortHandle`] that can be used to remotely cancel the task. 253 /// 254 /// [`AbortHandle`]: crate::task::AbortHandle 255 #[track_caller] spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,256 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle 257 where 258 F: FnOnce() -> T, 259 F: Send + 'static, 260 T: Send, 261 { 262 self.insert(handle.spawn_blocking(f)) 263 } 264 insert(&mut self, jh: JoinHandle<T>) -> AbortHandle265 fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle { 266 let abort = jh.abort_handle(); 267 let mut entry = self.inner.insert_idle(jh); 268 269 // Set the waker that is notified when the task completes. 270 entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker())); 271 abort 272 } 273 274 /// Waits until one of the tasks in the set completes and returns its output. 275 /// 276 /// Returns `None` if the set is empty. 277 /// 278 /// # Cancel Safety 279 /// 280 /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` 281 /// statement and some other branch completes first, it is guaranteed that no tasks were 282 /// removed from this `JoinSet`. join_next(&mut self) -> Option<Result<T, JoinError>>283 pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> { 284 crate::future::poll_fn(|cx| self.poll_join_next(cx)).await 285 } 286 287 /// Waits until one of the tasks in the set completes and returns its 288 /// output, along with the [task ID] of the completed task. 289 /// 290 /// Returns `None` if the set is empty. 291 /// 292 /// When this method returns an error, then the id of the task that failed can be accessed 293 /// using the [`JoinError::id`] method. 294 /// 295 /// # Cancel Safety 296 /// 297 /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!` 298 /// statement and some other branch completes first, it is guaranteed that no tasks were 299 /// removed from this `JoinSet`. 300 /// 301 /// [task ID]: crate::task::Id 302 /// [`JoinError::id`]: fn@crate::task::JoinError::id 303 #[cfg(tokio_unstable)] 304 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>305 pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { 306 crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await 307 } 308 309 /// Aborts all tasks and waits for them to finish shutting down. 310 /// 311 /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in 312 /// a loop until it returns `None`. 313 /// 314 /// This method ignores any panics in the tasks shutting down. When this call returns, the 315 /// `JoinSet` will be empty. 316 /// 317 /// [`abort_all`]: fn@Self::abort_all 318 /// [`join_next`]: fn@Self::join_next shutdown(&mut self)319 pub async fn shutdown(&mut self) { 320 self.abort_all(); 321 while self.join_next().await.is_some() {} 322 } 323 324 /// Aborts all tasks on this `JoinSet`. 325 /// 326 /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete 327 /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. abort_all(&mut self)328 pub fn abort_all(&mut self) { 329 self.inner.for_each(|jh| jh.abort()); 330 } 331 332 /// Removes all tasks from this `JoinSet` without aborting them. 333 /// 334 /// The tasks removed by this call will continue to run in the background even if the `JoinSet` 335 /// is dropped. detach_all(&mut self)336 pub fn detach_all(&mut self) { 337 self.inner.drain(drop); 338 } 339 340 /// Polls for one of the tasks in the set to complete. 341 /// 342 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. 343 /// 344 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled 345 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to 346 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is 347 /// scheduled to receive a wakeup. 348 /// 349 /// # Returns 350 /// 351 /// This function returns: 352 /// 353 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is 354 /// available right now. 355 /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. 356 /// The `value` is the return value of one of the tasks that completed. 357 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been 358 /// aborted. The `err` is the `JoinError` from the panicked/aborted task. 359 /// * `Poll::Ready(None)` if the `JoinSet` is empty. 360 /// 361 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. 362 /// This can happen if the [coop budget] is reached. 363 /// 364 /// [coop budget]: crate::task#cooperative-scheduling poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>>365 pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> { 366 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to 367 // the `notified` list if the waker is notified in the `poll` call below. 368 let mut entry = match self.inner.pop_notified(cx.waker()) { 369 Some(entry) => entry, 370 None => { 371 if self.is_empty() { 372 return Poll::Ready(None); 373 } else { 374 // The waker was set by `pop_notified`. 375 return Poll::Pending; 376 } 377 } 378 }; 379 380 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); 381 382 if let Poll::Ready(res) = res { 383 let _entry = entry.remove(); 384 Poll::Ready(Some(res)) 385 } else { 386 // A JoinHandle generally won't emit a wakeup without being ready unless 387 // the coop limit has been reached. We yield to the executor in this 388 // case. 389 cx.waker().wake_by_ref(); 390 Poll::Pending 391 } 392 } 393 394 /// Polls for one of the tasks in the set to complete. 395 /// 396 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. 397 /// 398 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled 399 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to 400 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is 401 /// scheduled to receive a wakeup. 402 /// 403 /// # Returns 404 /// 405 /// This function returns: 406 /// 407 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is 408 /// available right now. 409 /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed. 410 /// The `value` is the return value of one of the tasks that completed, and 411 /// `id` is the [task ID] of that task. 412 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been 413 /// aborted. The `err` is the `JoinError` from the panicked/aborted task. 414 /// * `Poll::Ready(None)` if the `JoinSet` is empty. 415 /// 416 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. 417 /// This can happen if the [coop budget] is reached. 418 /// 419 /// [coop budget]: crate::task#cooperative-scheduling 420 /// [task ID]: crate::task::Id 421 #[cfg(tokio_unstable)] 422 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] poll_join_next_with_id( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Id, T), JoinError>>>423 pub fn poll_join_next_with_id( 424 &mut self, 425 cx: &mut Context<'_>, 426 ) -> Poll<Option<Result<(Id, T), JoinError>>> { 427 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to 428 // the `notified` list if the waker is notified in the `poll` call below. 429 let mut entry = match self.inner.pop_notified(cx.waker()) { 430 Some(entry) => entry, 431 None => { 432 if self.is_empty() { 433 return Poll::Ready(None); 434 } else { 435 // The waker was set by `pop_notified`. 436 return Poll::Pending; 437 } 438 } 439 }; 440 441 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); 442 443 if let Poll::Ready(res) = res { 444 let entry = entry.remove(); 445 // If the task succeeded, add the task ID to the output. Otherwise, the 446 // `JoinError` will already have the task's ID. 447 Poll::Ready(Some(res.map(|output| (entry.id(), output)))) 448 } else { 449 // A JoinHandle generally won't emit a wakeup without being ready unless 450 // the coop limit has been reached. We yield to the executor in this 451 // case. 452 cx.waker().wake_by_ref(); 453 Poll::Pending 454 } 455 } 456 } 457 458 impl<T> Drop for JoinSet<T> { drop(&mut self)459 fn drop(&mut self) { 460 self.inner.drain(|join_handle| join_handle.abort()); 461 } 462 } 463 464 impl<T> fmt::Debug for JoinSet<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 466 f.debug_struct("JoinSet").field("len", &self.len()).finish() 467 } 468 } 469 470 impl<T> Default for JoinSet<T> { default() -> Self471 fn default() -> Self { 472 Self::new() 473 } 474 } 475 476 // === impl Builder === 477 478 #[cfg(all(tokio_unstable, feature = "tracing"))] 479 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 480 impl<'a, T: 'static> Builder<'a, T> { 481 /// Assigns a name to the task which will be spawned. name(self, name: &'a str) -> Self482 pub fn name(self, name: &'a str) -> Self { 483 let builder = self.builder.name(name); 484 Self { builder, ..self } 485 } 486 487 /// Spawn the provided task with this builder's settings and store it in the 488 /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely 489 /// cancel the task. 490 /// 491 /// # Returns 492 /// 493 /// An [`AbortHandle`] that can be used to remotely cancel the task. 494 /// 495 /// # Panics 496 /// 497 /// This method panics if called outside of a Tokio runtime. 498 /// 499 /// [`AbortHandle`]: crate::task::AbortHandle 500 #[track_caller] spawn<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,501 pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle> 502 where 503 F: Future<Output = T>, 504 F: Send + 'static, 505 T: Send, 506 { 507 Ok(self.joinset.insert(self.builder.spawn(future)?)) 508 } 509 510 /// Spawn the provided task on the provided [runtime handle] with this 511 /// builder's settings, and store it in the [`JoinSet`]. 512 /// 513 /// # Returns 514 /// 515 /// An [`AbortHandle`] that can be used to remotely cancel the task. 516 /// 517 /// 518 /// [`AbortHandle`]: crate::task::AbortHandle 519 /// [runtime handle]: crate::runtime::Handle 520 #[track_caller] spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,521 pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> 522 where 523 F: Future<Output = T>, 524 F: Send + 'static, 525 T: Send, 526 { 527 Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?)) 528 } 529 530 /// Spawn the provided task on the current [`LocalSet`] with this builder's 531 /// settings, and store it in the [`JoinSet`]. 532 /// 533 /// # Returns 534 /// 535 /// An [`AbortHandle`] that can be used to remotely cancel the task. 536 /// 537 /// # Panics 538 /// 539 /// This method panics if it is called outside of a `LocalSet`. 540 /// 541 /// [`LocalSet`]: crate::task::LocalSet 542 /// [`AbortHandle`]: crate::task::AbortHandle 543 #[track_caller] spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,544 pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> 545 where 546 F: Future<Output = T>, 547 F: 'static, 548 { 549 Ok(self.joinset.insert(self.builder.spawn_local(future)?)) 550 } 551 552 /// Spawn the provided task on the provided [`LocalSet`] with this builder's 553 /// settings, and store it in the [`JoinSet`]. 554 /// 555 /// # Returns 556 /// 557 /// An [`AbortHandle`] that can be used to remotely cancel the task. 558 /// 559 /// [`LocalSet`]: crate::task::LocalSet 560 /// [`AbortHandle`]: crate::task::AbortHandle 561 #[track_caller] spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,562 pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> 563 where 564 F: Future<Output = T>, 565 F: 'static, 566 { 567 Ok(self 568 .joinset 569 .insert(self.builder.spawn_local_on(future, local_set)?)) 570 } 571 } 572 573 // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is 574 // `Debug`. 575 #[cfg(all(tokio_unstable, feature = "tracing"))] 576 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 577 impl<'a, T> fmt::Debug for Builder<'a, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result578 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 579 f.debug_struct("join_set::Builder") 580 .field("joinset", &self.joinset) 581 .field("builder", &self.builder) 582 .finish() 583 } 584 } 585