1 use crate::stream::{FuturesUnordered, StreamExt}; 2 use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; 3 use core::cmp::Ordering; 4 use core::fmt::{self, Debug}; 5 use core::iter::FromIterator; 6 use core::pin::Pin; 7 use futures_core::future::Future; 8 use futures_core::ready; 9 use futures_core::stream::Stream; 10 use futures_core::{ 11 task::{Context, Poll}, 12 FusedStream, 13 }; 14 use pin_project_lite::pin_project; 15 16 pin_project! { 17 #[must_use = "futures do nothing unless you `.await` or poll them"] 18 #[derive(Debug)] 19 struct OrderWrapper<T> { 20 #[pin] 21 data: T, // A future or a future's output 22 index: usize, 23 } 24 } 25 26 impl<T> PartialEq for OrderWrapper<T> { eq(&self, other: &Self) -> bool27 fn eq(&self, other: &Self) -> bool { 28 self.index == other.index 29 } 30 } 31 32 impl<T> Eq for OrderWrapper<T> {} 33 34 impl<T> PartialOrd for OrderWrapper<T> { partial_cmp(&self, other: &Self) -> Option<Ordering>35 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 36 Some(self.cmp(other)) 37 } 38 } 39 40 impl<T> Ord for OrderWrapper<T> { cmp(&self, other: &Self) -> Ordering41 fn cmp(&self, other: &Self) -> Ordering { 42 // BinaryHeap is a max heap, so compare backwards here. 43 other.index.cmp(&self.index) 44 } 45 } 46 47 impl<T> Future for OrderWrapper<T> 48 where 49 T: Future, 50 { 51 type Output = OrderWrapper<T::Output>; 52 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 54 let index = self.index; 55 self.project().data.poll(cx).map(|output| OrderWrapper { 56 data: output, 57 index, 58 }) 59 } 60 } 61 62 /// An unbounded queue of futures. 63 /// 64 /// This "combinator" is similar to `FuturesUnordered`, but it imposes an order 65 /// on top of the set of futures. While futures in the set will race to 66 /// completion in parallel, results will only be returned in the order their 67 /// originating futures were added to the queue. 68 /// 69 /// Futures are pushed into this queue and their realized values are yielded in 70 /// order. This structure is optimized to manage a large number of futures. 71 /// Futures managed by `FuturesOrdered` will only be polled when they generate 72 /// notifications. This reduces the required amount of work needed to coordinate 73 /// large numbers of futures. 74 /// 75 /// When a `FuturesOrdered` is first created, it does not contain any futures. 76 /// Calling `poll` in this state will result in `Poll::Ready(None))` to be 77 /// returned. Futures are submitted to the queue using `push`; however, the 78 /// future will **not** be polled at this point. `FuturesOrdered` will only 79 /// poll managed futures when `FuturesOrdered::poll` is called. As such, it 80 /// is important to call `poll` after pushing new futures. 81 /// 82 /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that 83 /// the queue is currently not managing any futures. A future may be submitted 84 /// to the queue at a later time. At that point, a call to 85 /// `FuturesOrdered::poll` will either return the future's resolved value 86 /// **or** `Poll::Pending` if the future has not yet completed. When 87 /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will 88 /// return `Poll::Pending` until the first future completes, even if 89 /// some of the later futures have already completed. 90 /// 91 /// Note that you can create a ready-made `FuturesOrdered` via the 92 /// [`collect`](Iterator::collect) method, or you can start with an empty queue 93 /// with the `FuturesOrdered::new` constructor. 94 /// 95 /// This type is only available when the `std` or `alloc` feature of this 96 /// library is activated, and it is activated by default. 97 #[must_use = "streams do nothing unless polled"] 98 pub struct FuturesOrdered<T: Future> { 99 in_progress_queue: FuturesUnordered<OrderWrapper<T>>, 100 queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, 101 next_incoming_index: usize, 102 next_outgoing_index: usize, 103 } 104 105 impl<T: Future> Unpin for FuturesOrdered<T> {} 106 107 impl<Fut: Future> FuturesOrdered<Fut> { 108 /// Constructs a new, empty `FuturesOrdered` 109 /// 110 /// The returned `FuturesOrdered` does not contain any futures and, in this 111 /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. new() -> Self112 pub fn new() -> Self { 113 Self { 114 in_progress_queue: FuturesUnordered::new(), 115 queued_outputs: BinaryHeap::new(), 116 next_incoming_index: 0, 117 next_outgoing_index: 0, 118 } 119 } 120 121 /// Returns the number of futures contained in the queue. 122 /// 123 /// This represents the total number of in-flight futures, both 124 /// those currently processing and those that have completed but 125 /// which are waiting for earlier futures to complete. len(&self) -> usize126 pub fn len(&self) -> usize { 127 self.in_progress_queue.len() + self.queued_outputs.len() 128 } 129 130 /// Returns `true` if the queue contains no futures is_empty(&self) -> bool131 pub fn is_empty(&self) -> bool { 132 self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() 133 } 134 135 /// Push a future into the queue. 136 /// 137 /// This function submits the given future to the internal set for managing. 138 /// This function will not call `poll` on the submitted future. The caller 139 /// must ensure that `FuturesOrdered::poll` is called in order to receive 140 /// task notifications. push(&mut self, future: Fut)141 pub fn push(&mut self, future: Fut) { 142 let wrapped = OrderWrapper { 143 data: future, 144 index: self.next_incoming_index, 145 }; 146 self.next_incoming_index += 1; 147 self.in_progress_queue.push(wrapped); 148 } 149 } 150 151 impl<Fut: Future> Default for FuturesOrdered<Fut> { default() -> Self152 fn default() -> Self { 153 Self::new() 154 } 155 } 156 157 impl<Fut: Future> Stream for FuturesOrdered<Fut> { 158 type Item = Fut::Output; 159 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>160 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 161 let this = &mut *self; 162 163 // Check to see if we've already received the next value 164 if let Some(next_output) = this.queued_outputs.peek_mut() { 165 if next_output.index == this.next_outgoing_index { 166 this.next_outgoing_index += 1; 167 return Poll::Ready(Some(PeekMut::pop(next_output).data)); 168 } 169 } 170 171 loop { 172 match ready!(this.in_progress_queue.poll_next_unpin(cx)) { 173 Some(output) => { 174 if output.index == this.next_outgoing_index { 175 this.next_outgoing_index += 1; 176 return Poll::Ready(Some(output.data)); 177 } else { 178 this.queued_outputs.push(output) 179 } 180 } 181 None => return Poll::Ready(None), 182 } 183 } 184 } 185 size_hint(&self) -> (usize, Option<usize>)186 fn size_hint(&self) -> (usize, Option<usize>) { 187 let len = self.len(); 188 (len, Some(len)) 189 } 190 } 191 192 impl<Fut: Future> Debug for FuturesOrdered<Fut> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 194 write!(f, "FuturesOrdered {{ ... }}") 195 } 196 } 197 198 impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = Fut>,199 fn from_iter<T>(iter: T) -> Self 200 where 201 T: IntoIterator<Item = Fut>, 202 { 203 let acc = Self::new(); 204 iter.into_iter().fold(acc, |mut acc, item| { 205 acc.push(item); 206 acc 207 }) 208 } 209 } 210 211 impl<Fut: Future> FusedStream for FuturesOrdered<Fut> { is_terminated(&self) -> bool212 fn is_terminated(&self) -> bool { 213 self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty() 214 } 215 } 216 217 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,218 fn extend<I>(&mut self, iter: I) 219 where 220 I: IntoIterator<Item = Fut>, 221 { 222 for item in iter { 223 self.push(item); 224 } 225 } 226 } 227