• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A collection of tasks spawned on a Tokio runtime.
2 //!
3 //! This module provides the [`JoinSet`] type, a collection which stores a set
4 //! of spawned tasks and allows asynchronously awaiting the output of those
5 //! tasks as they complete. See the documentation for the [`JoinSet`] type for
6 //! details.
7 use std::fmt;
8 use std::future::Future;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11 
12 use crate::runtime::Handle;
13 #[cfg(tokio_unstable)]
14 use crate::task::Id;
15 use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
16 use crate::util::IdleNotifiedSet;
17 
18 /// A collection of tasks spawned on a Tokio runtime.
19 ///
20 /// A `JoinSet` can be used to await the completion of some or all of the tasks
21 /// in the set. The set is not ordered, and the tasks will be returned in the
22 /// order they complete.
23 ///
24 /// All of the tasks must have the same return type `T`.
25 ///
26 /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
27 ///
28 /// # Examples
29 ///
30 /// Spawn multiple tasks and wait for them.
31 ///
32 /// ```
33 /// use tokio::task::JoinSet;
34 ///
35 /// #[tokio::main]
36 /// async fn main() {
37 ///     let mut set = JoinSet::new();
38 ///
39 ///     for i in 0..10 {
40 ///         set.spawn(async move { i });
41 ///     }
42 ///
43 ///     let mut seen = [false; 10];
44 ///     while let Some(res) = set.join_next().await {
45 ///         let idx = res.unwrap();
46 ///         seen[idx] = true;
47 ///     }
48 ///
49 ///     for i in 0..10 {
50 ///         assert!(seen[i]);
51 ///     }
52 /// }
53 /// ```
54 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
55 pub struct JoinSet<T> {
56     inner: IdleNotifiedSet<JoinHandle<T>>,
57 }
58 
59 /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
60 /// than on the current default runtime.
61 ///
62 /// [`task::Builder`]: crate::task::Builder
63 #[cfg(all(tokio_unstable, feature = "tracing"))]
64 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
65 #[must_use = "builders do nothing unless used to spawn a task"]
66 pub struct Builder<'a, T> {
67     joinset: &'a mut JoinSet<T>,
68     builder: super::Builder<'a>,
69 }
70 
71 impl<T> JoinSet<T> {
72     /// Create a new `JoinSet`.
new() -> Self73     pub fn new() -> Self {
74         Self {
75             inner: IdleNotifiedSet::new(),
76         }
77     }
78 
79     /// Returns the number of tasks currently in the `JoinSet`.
len(&self) -> usize80     pub fn len(&self) -> usize {
81         self.inner.len()
82     }
83 
84     /// Returns whether the `JoinSet` is empty.
is_empty(&self) -> bool85     pub fn is_empty(&self) -> bool {
86         self.inner.is_empty()
87     }
88 }
89 
90 impl<T: 'static> JoinSet<T> {
91     /// Returns a [`Builder`] that can be used to configure a task prior to
92     /// spawning it on this `JoinSet`.
93     ///
94     /// # Examples
95     ///
96     /// ```
97     /// use tokio::task::JoinSet;
98     ///
99     /// #[tokio::main]
100     /// async fn main() -> std::io::Result<()> {
101     ///     let mut set = JoinSet::new();
102     ///
103     ///     // Use the builder to configure a task's name before spawning it.
104     ///     set.build_task()
105     ///         .name("my_task")
106     ///         .spawn(async { /* ... */ })?;
107     ///
108     ///     Ok(())
109     /// }
110     /// ```
111     #[cfg(all(tokio_unstable, feature = "tracing"))]
112     #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
build_task(&mut self) -> Builder<'_, T>113     pub fn build_task(&mut self) -> Builder<'_, T> {
114         Builder {
115             builder: super::Builder::new(),
116             joinset: self,
117         }
118     }
119 
120     /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
121     /// that can be used to remotely cancel the task.
122     ///
123     /// The provided future will start running in the background immediately
124     /// when this method is called, even if you don't await anything on this
125     /// `JoinSet`.
126     ///
127     /// # Panics
128     ///
129     /// This method panics if called outside of a Tokio runtime.
130     ///
131     /// [`AbortHandle`]: crate::task::AbortHandle
132     #[track_caller]
spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,133     pub fn spawn<F>(&mut self, task: F) -> AbortHandle
134     where
135         F: Future<Output = T>,
136         F: Send + 'static,
137         T: Send,
138     {
139         self.insert(crate::spawn(task))
140     }
141 
142     /// Spawn the provided task on the provided runtime and store it in this
143     /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
144     /// cancel the task.
145     ///
146     /// The provided future will start running in the background immediately
147     /// when this method is called, even if you don't await anything on this
148     /// `JoinSet`.
149     ///
150     /// [`AbortHandle`]: crate::task::AbortHandle
151     #[track_caller]
spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,152     pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
153     where
154         F: Future<Output = T>,
155         F: Send + 'static,
156         T: Send,
157     {
158         self.insert(handle.spawn(task))
159     }
160 
161     /// Spawn the provided task on the current [`LocalSet`] and store it in this
162     /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
163     /// cancel the task.
164     ///
165     /// The provided future will start running in the background immediately
166     /// when this method is called, even if you don't await anything on this
167     /// `JoinSet`.
168     ///
169     /// # Panics
170     ///
171     /// This method panics if it is called outside of a `LocalSet`.
172     ///
173     /// [`LocalSet`]: crate::task::LocalSet
174     /// [`AbortHandle`]: crate::task::AbortHandle
175     #[track_caller]
spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: 'static,176     pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
177     where
178         F: Future<Output = T>,
179         F: 'static,
180     {
181         self.insert(crate::task::spawn_local(task))
182     }
183 
184     /// Spawn the provided task on the provided [`LocalSet`] and store it in
185     /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
186     /// remotely cancel the task.
187     ///
188     /// Unlike the [`spawn_local`] method, this method may be used to spawn local
189     /// tasks on a `LocalSet` that is _not_ currently running. The provided
190     /// future will start running whenever the `LocalSet` is next started.
191     ///
192     /// [`LocalSet`]: crate::task::LocalSet
193     /// [`AbortHandle`]: crate::task::AbortHandle
194     /// [`spawn_local`]: Self::spawn_local
195     #[track_caller]
spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle where F: Future<Output = T>, F: 'static,196     pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
197     where
198         F: Future<Output = T>,
199         F: 'static,
200     {
201         self.insert(local_set.spawn_local(task))
202     }
203 
204     /// Spawn the blocking code on the blocking threadpool and store
205     /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
206     /// used to remotely cancel the task.
207     ///
208     /// # Examples
209     ///
210     /// Spawn multiple blocking tasks and wait for them.
211     ///
212     /// ```
213     /// use tokio::task::JoinSet;
214     ///
215     /// #[tokio::main]
216     /// async fn main() {
217     ///     let mut set = JoinSet::new();
218     ///
219     ///     for i in 0..10 {
220     ///         set.spawn_blocking(move || { i });
221     ///     }
222     ///
223     ///     let mut seen = [false; 10];
224     ///     while let Some(res) = set.join_next().await {
225     ///         let idx = res.unwrap();
226     ///         seen[idx] = true;
227     ///     }
228     ///
229     ///     for i in 0..10 {
230     ///         assert!(seen[i]);
231     ///     }
232     /// }
233     /// ```
234     ///
235     /// # Panics
236     ///
237     /// This method panics if called outside of a Tokio runtime.
238     ///
239     /// [`AbortHandle`]: crate::task::AbortHandle
240     #[track_caller]
spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,241     pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
242     where
243         F: FnOnce() -> T,
244         F: Send + 'static,
245         T: Send,
246     {
247         self.insert(crate::runtime::spawn_blocking(f))
248     }
249 
250     /// Spawn the blocking code on the blocking threadpool of the
251     /// provided runtime and store it in this `JoinSet`, returning an
252     /// [`AbortHandle`] that can be used to remotely cancel the task.
253     ///
254     /// [`AbortHandle`]: crate::task::AbortHandle
255     #[track_caller]
spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,256     pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
257     where
258         F: FnOnce() -> T,
259         F: Send + 'static,
260         T: Send,
261     {
262         self.insert(handle.spawn_blocking(f))
263     }
264 
insert(&mut self, jh: JoinHandle<T>) -> AbortHandle265     fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
266         let abort = jh.abort_handle();
267         let mut entry = self.inner.insert_idle(jh);
268 
269         // Set the waker that is notified when the task completes.
270         entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
271         abort
272     }
273 
274     /// Waits until one of the tasks in the set completes and returns its output.
275     ///
276     /// Returns `None` if the set is empty.
277     ///
278     /// # Cancel Safety
279     ///
280     /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
281     /// statement and some other branch completes first, it is guaranteed that no tasks were
282     /// removed from this `JoinSet`.
join_next(&mut self) -> Option<Result<T, JoinError>>283     pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
284         crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
285     }
286 
287     /// Waits until one of the tasks in the set completes and returns its
288     /// output, along with the [task ID] of the completed task.
289     ///
290     /// Returns `None` if the set is empty.
291     ///
292     /// When this method returns an error, then the id of the task that failed can be accessed
293     /// using the [`JoinError::id`] method.
294     ///
295     /// # Cancel Safety
296     ///
297     /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
298     /// statement and some other branch completes first, it is guaranteed that no tasks were
299     /// removed from this `JoinSet`.
300     ///
301     /// [task ID]: crate::task::Id
302     /// [`JoinError::id`]: fn@crate::task::JoinError::id
303     #[cfg(tokio_unstable)]
304     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>305     pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
306         crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
307     }
308 
309     /// Aborts all tasks and waits for them to finish shutting down.
310     ///
311     /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
312     /// a loop until it returns `None`.
313     ///
314     /// This method ignores any panics in the tasks shutting down. When this call returns, the
315     /// `JoinSet` will be empty.
316     ///
317     /// [`abort_all`]: fn@Self::abort_all
318     /// [`join_next`]: fn@Self::join_next
shutdown(&mut self)319     pub async fn shutdown(&mut self) {
320         self.abort_all();
321         while self.join_next().await.is_some() {}
322     }
323 
324     /// Aborts all tasks on this `JoinSet`.
325     ///
326     /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
327     /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
abort_all(&mut self)328     pub fn abort_all(&mut self) {
329         self.inner.for_each(|jh| jh.abort());
330     }
331 
332     /// Removes all tasks from this `JoinSet` without aborting them.
333     ///
334     /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
335     /// is dropped.
detach_all(&mut self)336     pub fn detach_all(&mut self) {
337         self.inner.drain(drop);
338     }
339 
340     /// Polls for one of the tasks in the set to complete.
341     ///
342     /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
343     ///
344     /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
345     /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
346     /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
347     /// scheduled to receive a wakeup.
348     ///
349     /// # Returns
350     ///
351     /// This function returns:
352     ///
353     ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
354     ///     available right now.
355     ///  * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
356     ///     The `value` is the return value of one of the tasks that completed.
357     ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
358     ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
359     ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
360     ///
361     /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
362     /// This can happen if the [coop budget] is reached.
363     ///
364     /// [coop budget]: crate::task#cooperative-scheduling
poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>>365     pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
366         // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
367         // the `notified` list if the waker is notified in the `poll` call below.
368         let mut entry = match self.inner.pop_notified(cx.waker()) {
369             Some(entry) => entry,
370             None => {
371                 if self.is_empty() {
372                     return Poll::Ready(None);
373                 } else {
374                     // The waker was set by `pop_notified`.
375                     return Poll::Pending;
376                 }
377             }
378         };
379 
380         let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
381 
382         if let Poll::Ready(res) = res {
383             let _entry = entry.remove();
384             Poll::Ready(Some(res))
385         } else {
386             // A JoinHandle generally won't emit a wakeup without being ready unless
387             // the coop limit has been reached. We yield to the executor in this
388             // case.
389             cx.waker().wake_by_ref();
390             Poll::Pending
391         }
392     }
393 
394     /// Polls for one of the tasks in the set to complete.
395     ///
396     /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
397     ///
398     /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
399     /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
400     /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
401     /// scheduled to receive a wakeup.
402     ///
403     /// # Returns
404     ///
405     /// This function returns:
406     ///
407     ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
408     ///     available right now.
409     ///  * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
410     ///     The `value` is the return value of one of the tasks that completed, and
411     ///    `id` is the [task ID] of that task.
412     ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
413     ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
414     ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
415     ///
416     /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
417     /// This can happen if the [coop budget] is reached.
418     ///
419     /// [coop budget]: crate::task#cooperative-scheduling
420     /// [task ID]: crate::task::Id
421     #[cfg(tokio_unstable)]
422     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
poll_join_next_with_id( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Id, T), JoinError>>>423     pub fn poll_join_next_with_id(
424         &mut self,
425         cx: &mut Context<'_>,
426     ) -> Poll<Option<Result<(Id, T), JoinError>>> {
427         // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
428         // the `notified` list if the waker is notified in the `poll` call below.
429         let mut entry = match self.inner.pop_notified(cx.waker()) {
430             Some(entry) => entry,
431             None => {
432                 if self.is_empty() {
433                     return Poll::Ready(None);
434                 } else {
435                     // The waker was set by `pop_notified`.
436                     return Poll::Pending;
437                 }
438             }
439         };
440 
441         let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
442 
443         if let Poll::Ready(res) = res {
444             let entry = entry.remove();
445             // If the task succeeded, add the task ID to the output. Otherwise, the
446             // `JoinError` will already have the task's ID.
447             Poll::Ready(Some(res.map(|output| (entry.id(), output))))
448         } else {
449             // A JoinHandle generally won't emit a wakeup without being ready unless
450             // the coop limit has been reached. We yield to the executor in this
451             // case.
452             cx.waker().wake_by_ref();
453             Poll::Pending
454         }
455     }
456 }
457 
458 impl<T> Drop for JoinSet<T> {
drop(&mut self)459     fn drop(&mut self) {
460         self.inner.drain(|join_handle| join_handle.abort());
461     }
462 }
463 
464 impl<T> fmt::Debug for JoinSet<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result465     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466         f.debug_struct("JoinSet").field("len", &self.len()).finish()
467     }
468 }
469 
470 impl<T> Default for JoinSet<T> {
default() -> Self471     fn default() -> Self {
472         Self::new()
473     }
474 }
475 
476 // === impl Builder ===
477 
478 #[cfg(all(tokio_unstable, feature = "tracing"))]
479 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
480 impl<'a, T: 'static> Builder<'a, T> {
481     /// Assigns a name to the task which will be spawned.
name(self, name: &'a str) -> Self482     pub fn name(self, name: &'a str) -> Self {
483         let builder = self.builder.name(name);
484         Self { builder, ..self }
485     }
486 
487     /// Spawn the provided task with this builder's settings and store it in the
488     /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
489     /// cancel the task.
490     ///
491     /// # Returns
492     ///
493     /// An [`AbortHandle`] that can be used to remotely cancel the task.
494     ///
495     /// # Panics
496     ///
497     /// This method panics if called outside of a Tokio runtime.
498     ///
499     /// [`AbortHandle`]: crate::task::AbortHandle
500     #[track_caller]
spawn<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,501     pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
502     where
503         F: Future<Output = T>,
504         F: Send + 'static,
505         T: Send,
506     {
507         Ok(self.joinset.insert(self.builder.spawn(future)?))
508     }
509 
510     /// Spawn the provided task on the provided [runtime handle] with this
511     /// builder's settings, and store it in the [`JoinSet`].
512     ///
513     /// # Returns
514     ///
515     /// An [`AbortHandle`] that can be used to remotely cancel the task.
516     ///
517     ///
518     /// [`AbortHandle`]: crate::task::AbortHandle
519     /// [runtime handle]: crate::runtime::Handle
520     #[track_caller]
spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,521     pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
522     where
523         F: Future<Output = T>,
524         F: Send + 'static,
525         T: Send,
526     {
527         Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
528     }
529 
530     /// Spawn the provided task on the current [`LocalSet`] with this builder's
531     /// settings, and store it in the [`JoinSet`].
532     ///
533     /// # Returns
534     ///
535     /// An [`AbortHandle`] that can be used to remotely cancel the task.
536     ///
537     /// # Panics
538     ///
539     /// This method panics if it is called outside of a `LocalSet`.
540     ///
541     /// [`LocalSet`]: crate::task::LocalSet
542     /// [`AbortHandle`]: crate::task::AbortHandle
543     #[track_caller]
spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,544     pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
545     where
546         F: Future<Output = T>,
547         F: 'static,
548     {
549         Ok(self.joinset.insert(self.builder.spawn_local(future)?))
550     }
551 
552     /// Spawn the provided task on the provided [`LocalSet`] with this builder's
553     /// settings, and store it in the [`JoinSet`].
554     ///
555     /// # Returns
556     ///
557     /// An [`AbortHandle`] that can be used to remotely cancel the task.
558     ///
559     /// [`LocalSet`]: crate::task::LocalSet
560     /// [`AbortHandle`]: crate::task::AbortHandle
561     #[track_caller]
spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,562     pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
563     where
564         F: Future<Output = T>,
565         F: 'static,
566     {
567         Ok(self
568             .joinset
569             .insert(self.builder.spawn_local_on(future, local_set)?))
570     }
571 }
572 
573 // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
574 // `Debug`.
575 #[cfg(all(tokio_unstable, feature = "tracing"))]
576 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
577 impl<'a, T> fmt::Debug for Builder<'a, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result578     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
579         f.debug_struct("join_set::Builder")
580             .field("joinset", &self.joinset)
581             .field("builder", &self.builder)
582             .finish()
583     }
584 }
585