1 /// Waits on multiple concurrent branches, returning when **all** branches 2 /// complete. 3 /// 4 /// The `join!` macro must be used inside of async functions, closures, and 5 /// blocks. 6 /// 7 /// The `join!` macro takes a list of async expressions and evaluates them 8 /// concurrently on the same task. Each async expression evaluates to a future 9 /// and the futures from each expression are multiplexed on the current task. 10 /// 11 /// When working with async expressions returning `Result`, `join!` will wait 12 /// for **all** branches complete regardless if any complete with `Err`. Use 13 /// [`try_join!`] to return early when `Err` is encountered. 14 /// 15 /// [`try_join!`]: crate::try_join 16 /// 17 /// # Notes 18 /// 19 /// The supplied futures are stored inline and does not require allocating a 20 /// `Vec`. 21 /// 22 /// ### Runtime characteristics 23 /// 24 /// By running all async expressions on the current task, the expressions are 25 /// able to run **concurrently** but not in **parallel**. This means all 26 /// expressions are run on the same thread and if one branch blocks the thread, 27 /// all other expressions will be unable to continue. If parallelism is 28 /// required, spawn each async expression using [`tokio::spawn`] and pass the 29 /// join handle to `join!`. 30 /// 31 /// [`tokio::spawn`]: crate::spawn 32 /// 33 /// # Examples 34 /// 35 /// Basic join with two branches 36 /// 37 /// ``` 38 /// async fn do_stuff_async() { 39 /// // async work 40 /// } 41 /// 42 /// async fn more_async_work() { 43 /// // more here 44 /// } 45 /// 46 /// #[tokio::main] 47 /// async fn main() { 48 /// let (first, second) = tokio::join!( 49 /// do_stuff_async(), 50 /// more_async_work()); 51 /// 52 /// // do something with the values 53 /// } 54 /// ``` 55 #[macro_export] 56 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] 57 macro_rules! join { 58 (@ { 59 // One `_` for each branch in the `join!` macro. This is not used once 60 // normalization is complete. 61 ( $($count:tt)* ) 62 63 // The expression `0+1+1+ ... +1` equal to the number of branches. 64 ( $($total:tt)* ) 65 66 // Normalized join! branches 67 $( ( $($skip:tt)* ) $e:expr, )* 68 69 }) => {{ 70 use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; 71 use $crate::macros::support::Poll::{Ready, Pending}; 72 73 // Safety: nothing must be moved out of `futures`. This is to satisfy 74 // the requirement of `Pin::new_unchecked` called below. 75 // 76 // We can't use the `pin!` macro for this because `futures` is a tuple 77 // and the standard library provides no way to pin-project to the fields 78 // of a tuple. 79 let mut futures = ( $( maybe_done($e), )* ); 80 81 // This assignment makes sure that the `poll_fn` closure only has a 82 // reference to the futures, instead of taking ownership of them. This 83 // mitigates the issue described in 84 // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> 85 let mut futures = &mut futures; 86 87 // Each time the future created by poll_fn is polled, a different future will be polled first 88 // to ensure every future passed to join! gets a chance to make progress even if 89 // one of the futures consumes the whole budget. 90 // 91 // This is number of futures that will be skipped in the first loop 92 // iteration the next time. 93 let mut skip_next_time: u32 = 0; 94 95 poll_fn(move |cx| { 96 const COUNT: u32 = $($total)*; 97 98 let mut is_pending = false; 99 100 let mut to_run = COUNT; 101 102 // The number of futures that will be skipped in the first loop iteration. 103 let mut skip = skip_next_time; 104 105 skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; 106 107 // This loop runs twice and the first `skip` futures 108 // are not polled in the first iteration. 109 loop { 110 $( 111 if skip == 0 { 112 if to_run == 0 { 113 // Every future has been polled 114 break; 115 } 116 to_run -= 1; 117 118 // Extract the future for this branch from the tuple. 119 let ( $($skip,)* fut, .. ) = &mut *futures; 120 121 // Safety: future is stored on the stack above 122 // and never moved. 123 let mut fut = unsafe { Pin::new_unchecked(fut) }; 124 125 // Try polling 126 if fut.poll(cx).is_pending() { 127 is_pending = true; 128 } 129 } else { 130 // Future skipped, one less future to skip in the next iteration 131 skip -= 1; 132 } 133 )* 134 } 135 136 if is_pending { 137 Pending 138 } else { 139 Ready(($({ 140 // Extract the future for this branch from the tuple. 141 let ( $($skip,)* fut, .. ) = &mut futures; 142 143 // Safety: future is stored on the stack above 144 // and never moved. 145 let mut fut = unsafe { Pin::new_unchecked(fut) }; 146 147 fut.take_output().expect("expected completed future") 148 },)*)) 149 } 150 }).await 151 }}; 152 153 // ===== Normalize ===== 154 155 (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { 156 $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) 157 }; 158 159 // ===== Entry point ===== 160 161 ( $($e:expr),+ $(,)?) => { 162 $crate::join!(@{ () (0) } $($e,)*) 163 }; 164 165 () => { async {}.await } 166 } 167