• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::stream::{FuturesUnordered, StreamExt};
2 use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
3 use core::cmp::Ordering;
4 use core::fmt::{self, Debug};
5 use core::iter::FromIterator;
6 use core::pin::Pin;
7 use futures_core::future::Future;
8 use futures_core::ready;
9 use futures_core::stream::Stream;
10 use futures_core::{
11     task::{Context, Poll},
12     FusedStream,
13 };
14 use pin_project_lite::pin_project;
15 
16 pin_project! {
17     #[must_use = "futures do nothing unless you `.await` or poll them"]
18     #[derive(Debug)]
19     struct OrderWrapper<T> {
20         #[pin]
21         data: T, // A future or a future's output
22         index: usize,
23     }
24 }
25 
26 impl<T> PartialEq for OrderWrapper<T> {
eq(&self, other: &Self) -> bool27     fn eq(&self, other: &Self) -> bool {
28         self.index == other.index
29     }
30 }
31 
32 impl<T> Eq for OrderWrapper<T> {}
33 
34 impl<T> PartialOrd for OrderWrapper<T> {
partial_cmp(&self, other: &Self) -> Option<Ordering>35     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
36         Some(self.cmp(other))
37     }
38 }
39 
40 impl<T> Ord for OrderWrapper<T> {
cmp(&self, other: &Self) -> Ordering41     fn cmp(&self, other: &Self) -> Ordering {
42         // BinaryHeap is a max heap, so compare backwards here.
43         other.index.cmp(&self.index)
44     }
45 }
46 
47 impl<T> Future for OrderWrapper<T>
48 where
49     T: Future,
50 {
51     type Output = OrderWrapper<T::Output>;
52 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54         let index = self.index;
55         self.project().data.poll(cx).map(|output| OrderWrapper {
56             data: output,
57             index,
58         })
59     }
60 }
61 
62 /// An unbounded queue of futures.
63 ///
64 /// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
65 /// on top of the set of futures. While futures in the set will race to
66 /// completion in parallel, results will only be returned in the order their
67 /// originating futures were added to the queue.
68 ///
69 /// Futures are pushed into this queue and their realized values are yielded in
70 /// order. This structure is optimized to manage a large number of futures.
71 /// Futures managed by `FuturesOrdered` will only be polled when they generate
72 /// notifications. This reduces the required amount of work needed to coordinate
73 /// large numbers of futures.
74 ///
75 /// When a `FuturesOrdered` is first created, it does not contain any futures.
76 /// Calling `poll` in this state will result in `Poll::Ready(None))` to be
77 /// returned. Futures are submitted to the queue using `push`; however, the
78 /// future will **not** be polled at this point. `FuturesOrdered` will only
79 /// poll managed futures when `FuturesOrdered::poll` is called. As such, it
80 /// is important to call `poll` after pushing new futures.
81 ///
82 /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that
83 /// the queue is currently not managing any futures. A future may be submitted
84 /// to the queue at a later time. At that point, a call to
85 /// `FuturesOrdered::poll` will either return the future's resolved value
86 /// **or** `Poll::Pending` if the future has not yet completed. When
87 /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
88 /// return `Poll::Pending` until the first future completes, even if
89 /// some of the later futures have already completed.
90 ///
91 /// Note that you can create a ready-made `FuturesOrdered` via the
92 /// [`collect`](Iterator::collect) method, or you can start with an empty queue
93 /// with the `FuturesOrdered::new` constructor.
94 ///
95 /// This type is only available when the `std` or `alloc` feature of this
96 /// library is activated, and it is activated by default.
97 #[must_use = "streams do nothing unless polled"]
98 pub struct FuturesOrdered<T: Future> {
99     in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
100     queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
101     next_incoming_index: usize,
102     next_outgoing_index: usize,
103 }
104 
105 impl<T: Future> Unpin for FuturesOrdered<T> {}
106 
107 impl<Fut: Future> FuturesOrdered<Fut> {
108     /// Constructs a new, empty `FuturesOrdered`
109     ///
110     /// The returned `FuturesOrdered` does not contain any futures and, in this
111     /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
new() -> Self112     pub fn new() -> Self {
113         Self {
114             in_progress_queue: FuturesUnordered::new(),
115             queued_outputs: BinaryHeap::new(),
116             next_incoming_index: 0,
117             next_outgoing_index: 0,
118         }
119     }
120 
121     /// Returns the number of futures contained in the queue.
122     ///
123     /// This represents the total number of in-flight futures, both
124     /// those currently processing and those that have completed but
125     /// which are waiting for earlier futures to complete.
len(&self) -> usize126     pub fn len(&self) -> usize {
127         self.in_progress_queue.len() + self.queued_outputs.len()
128     }
129 
130     /// Returns `true` if the queue contains no futures
is_empty(&self) -> bool131     pub fn is_empty(&self) -> bool {
132         self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
133     }
134 
135     /// Push a future into the queue.
136     ///
137     /// This function submits the given future to the internal set for managing.
138     /// This function will not call `poll` on the submitted future. The caller
139     /// must ensure that `FuturesOrdered::poll` is called in order to receive
140     /// task notifications.
push(&mut self, future: Fut)141     pub fn push(&mut self, future: Fut) {
142         let wrapped = OrderWrapper {
143             data: future,
144             index: self.next_incoming_index,
145         };
146         self.next_incoming_index += 1;
147         self.in_progress_queue.push(wrapped);
148     }
149 }
150 
151 impl<Fut: Future> Default for FuturesOrdered<Fut> {
default() -> Self152     fn default() -> Self {
153         Self::new()
154     }
155 }
156 
157 impl<Fut: Future> Stream for FuturesOrdered<Fut> {
158     type Item = Fut::Output;
159 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>160     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161         let this = &mut *self;
162 
163         // Check to see if we've already received the next value
164         if let Some(next_output) = this.queued_outputs.peek_mut() {
165             if next_output.index == this.next_outgoing_index {
166                 this.next_outgoing_index += 1;
167                 return Poll::Ready(Some(PeekMut::pop(next_output).data));
168             }
169         }
170 
171         loop {
172             match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
173                 Some(output) => {
174                     if output.index == this.next_outgoing_index {
175                         this.next_outgoing_index += 1;
176                         return Poll::Ready(Some(output.data));
177                     } else {
178                         this.queued_outputs.push(output)
179                     }
180                 }
181                 None => return Poll::Ready(None),
182             }
183         }
184     }
185 
size_hint(&self) -> (usize, Option<usize>)186     fn size_hint(&self) -> (usize, Option<usize>) {
187         let len = self.len();
188         (len, Some(len))
189     }
190 }
191 
192 impl<Fut: Future> Debug for FuturesOrdered<Fut> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result193     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194         write!(f, "FuturesOrdered {{ ... }}")
195     }
196 }
197 
198 impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = Fut>,199     fn from_iter<T>(iter: T) -> Self
200     where
201         T: IntoIterator<Item = Fut>,
202     {
203         let acc = Self::new();
204         iter.into_iter().fold(acc, |mut acc, item| {
205             acc.push(item);
206             acc
207         })
208     }
209 }
210 
211 impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
is_terminated(&self) -> bool212     fn is_terminated(&self) -> bool {
213         self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
214     }
215 }
216 
217 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,218     fn extend<I>(&mut self, iter: I)
219     where
220         I: IntoIterator<Item = Fut>,
221     {
222         for item in iter {
223             self.push(item);
224         }
225     }
226 }
227