• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::assert_stream;
2 use core::{fmt, pin::Pin};
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 use pin_project_lite::pin_project;
6 
7 /// Type to tell [`SelectWithStrategy`] which stream to poll next.
8 #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
9 pub enum PollNext {
10     /// Poll the first stream.
11     Left,
12     /// Poll the second stream.
13     Right,
14 }
15 
16 impl PollNext {
17     /// Toggle the value and return the old one.
toggle(&mut self) -> Self18     pub fn toggle(&mut self) -> Self {
19         let old = *self;
20         *self = self.other();
21         old
22     }
23 
other(&self) -> Self24     fn other(&self) -> Self {
25         match self {
26             Self::Left => Self::Right,
27             Self::Right => Self::Left,
28         }
29     }
30 }
31 
32 impl Default for PollNext {
default() -> Self33     fn default() -> Self {
34         Self::Left
35     }
36 }
37 
38 enum InternalState {
39     Start,
40     LeftFinished,
41     RightFinished,
42     BothFinished,
43 }
44 
45 impl InternalState {
finish(&mut self, ps: PollNext)46     fn finish(&mut self, ps: PollNext) {
47         match (&self, ps) {
48             (Self::Start, PollNext::Left) => {
49                 *self = Self::LeftFinished;
50             }
51             (Self::Start, PollNext::Right) => {
52                 *self = Self::RightFinished;
53             }
54             (Self::LeftFinished, PollNext::Right) | (Self::RightFinished, PollNext::Left) => {
55                 *self = Self::BothFinished;
56             }
57             _ => {}
58         }
59     }
60 }
61 
62 pin_project! {
63     /// Stream for the [`select_with_strategy()`] function. See function docs for details.
64     #[must_use = "streams do nothing unless polled"]
65     #[project = SelectWithStrategyProj]
66     pub struct SelectWithStrategy<St1, St2, Clos, State> {
67         #[pin]
68         stream1: St1,
69         #[pin]
70         stream2: St2,
71         internal_state: InternalState,
72         state: State,
73         clos: Clos,
74     }
75 }
76 
77 #[allow(clippy::too_long_first_doc_paragraph)]
78 /// This function will attempt to pull items from both streams. You provide a
79 /// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can
80 /// store state on `SelectWithStrategy` to which it will receive a `&mut` on every
81 /// invocation. This allows basing the strategy on prior choices.
82 ///
83 /// After one of the two input streams completes, the remaining one will be
84 /// polled exclusively. The returned stream completes when both input
85 /// streams have completed.
86 ///
87 /// Note that this function consumes both streams and returns a wrapped
88 /// version of them.
89 ///
90 /// ## Examples
91 ///
92 /// ### Priority
93 /// This example shows how to always prioritize the left stream.
94 ///
95 /// ```rust
96 /// # futures::executor::block_on(async {
97 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
98 ///
99 /// let left = repeat(1);
100 /// let right = repeat(2);
101 ///
102 /// // We don't need any state, so let's make it an empty tuple.
103 /// // We must provide some type here, as there is no way for the compiler
104 /// // to infer it. As we don't need to capture variables, we can just
105 /// // use a function pointer instead of a closure.
106 /// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
107 ///
108 /// let mut out = select_with_strategy(left, right, prio_left);
109 ///
110 /// for _ in 0..100 {
111 ///     // Whenever we poll out, we will alwas get `1`.
112 ///     assert_eq!(1, out.select_next_some().await);
113 /// }
114 /// # });
115 /// ```
116 ///
117 /// ### Round Robin
118 /// This example shows how to select from both streams round robin.
119 /// Note: this special case is provided by [`futures-util::stream::select`].
120 ///
121 /// ```rust
122 /// # futures::executor::block_on(async {
123 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
124 ///
125 /// let left = repeat(1);
126 /// let right = repeat(2);
127 ///
128 /// let rrobin = |last: &mut PollNext| last.toggle();
129 ///
130 /// let mut out = select_with_strategy(left, right, rrobin);
131 ///
132 /// for _ in 0..100 {
133 ///     // We should be alternating now.
134 ///     assert_eq!(1, out.select_next_some().await);
135 ///     assert_eq!(2, out.select_next_some().await);
136 /// }
137 /// # });
138 /// ```
select_with_strategy<St1, St2, Clos, State>( stream1: St1, stream2: St2, which: Clos, ) -> SelectWithStrategy<St1, St2, Clos, State> where St1: Stream, St2: Stream<Item = St1::Item>, Clos: FnMut(&mut State) -> PollNext, State: Default,139 pub fn select_with_strategy<St1, St2, Clos, State>(
140     stream1: St1,
141     stream2: St2,
142     which: Clos,
143 ) -> SelectWithStrategy<St1, St2, Clos, State>
144 where
145     St1: Stream,
146     St2: Stream<Item = St1::Item>,
147     Clos: FnMut(&mut State) -> PollNext,
148     State: Default,
149 {
150     assert_stream::<St1::Item, _>(SelectWithStrategy {
151         stream1,
152         stream2,
153         state: Default::default(),
154         internal_state: InternalState::Start,
155         clos: which,
156     })
157 }
158 
159 impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
160     /// Acquires a reference to the underlying streams that this combinator is
161     /// pulling from.
get_ref(&self) -> (&St1, &St2)162     pub fn get_ref(&self) -> (&St1, &St2) {
163         (&self.stream1, &self.stream2)
164     }
165 
166     /// Acquires a mutable reference to the underlying streams that this
167     /// combinator is pulling from.
168     ///
169     /// Note that care must be taken to avoid tampering with the state of the
170     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> (&mut St1, &mut St2)171     pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
172         (&mut self.stream1, &mut self.stream2)
173     }
174 
175     /// Acquires a pinned mutable reference to the underlying streams that this
176     /// combinator is pulling from.
177     ///
178     /// Note that care must be taken to avoid tampering with the state of the
179     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)180     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
181         let this = self.project();
182         (this.stream1, this.stream2)
183     }
184 
185     /// Consumes this combinator, returning the underlying streams.
186     ///
187     /// Note that this may discard intermediate state of this combinator, so
188     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> (St1, St2)189     pub fn into_inner(self) -> (St1, St2) {
190         (self.stream1, self.stream2)
191     }
192 }
193 
194 impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
195 where
196     St1: Stream,
197     St2: Stream<Item = St1::Item>,
198     Clos: FnMut(&mut State) -> PollNext,
199 {
is_terminated(&self) -> bool200     fn is_terminated(&self) -> bool {
201         match self.internal_state {
202             InternalState::BothFinished => true,
203             _ => false,
204         }
205     }
206 }
207 
208 #[inline]
poll_side<St1, St2, Clos, State>( select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, side: PollNext, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>,209 fn poll_side<St1, St2, Clos, State>(
210     select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
211     side: PollNext,
212     cx: &mut Context<'_>,
213 ) -> Poll<Option<St1::Item>>
214 where
215     St1: Stream,
216     St2: Stream<Item = St1::Item>,
217 {
218     match side {
219         PollNext::Left => select.stream1.as_mut().poll_next(cx),
220         PollNext::Right => select.stream2.as_mut().poll_next(cx),
221     }
222 }
223 
224 #[inline]
poll_inner<St1, St2, Clos, State>( select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, side: PollNext, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>,225 fn poll_inner<St1, St2, Clos, State>(
226     select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
227     side: PollNext,
228     cx: &mut Context<'_>,
229 ) -> Poll<Option<St1::Item>>
230 where
231     St1: Stream,
232     St2: Stream<Item = St1::Item>,
233 {
234     let first_done = match poll_side(select, side, cx) {
235         Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
236         Poll::Ready(None) => {
237             select.internal_state.finish(side);
238             true
239         }
240         Poll::Pending => false,
241     };
242     let other = side.other();
243     match poll_side(select, other, cx) {
244         Poll::Ready(None) => {
245             select.internal_state.finish(other);
246             if first_done {
247                 Poll::Ready(None)
248             } else {
249                 Poll::Pending
250             }
251         }
252         a => a,
253     }
254 }
255 
256 impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
257 where
258     St1: Stream,
259     St2: Stream<Item = St1::Item>,
260     Clos: FnMut(&mut State) -> PollNext,
261 {
262     type Item = St1::Item;
263 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>>264     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
265         let mut this = self.project();
266 
267         match this.internal_state {
268             InternalState::Start => {
269                 let next_side = (this.clos)(this.state);
270                 poll_inner(&mut this, next_side, cx)
271             }
272             InternalState::LeftFinished => match this.stream2.poll_next(cx) {
273                 Poll::Ready(None) => {
274                     *this.internal_state = InternalState::BothFinished;
275                     Poll::Ready(None)
276                 }
277                 a => a,
278             },
279             InternalState::RightFinished => match this.stream1.poll_next(cx) {
280                 Poll::Ready(None) => {
281                     *this.internal_state = InternalState::BothFinished;
282                     Poll::Ready(None)
283                 }
284                 a => a,
285             },
286             InternalState::BothFinished => Poll::Ready(None),
287         }
288     }
289 }
290 
291 impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
292 where
293     St1: fmt::Debug,
294     St2: fmt::Debug,
295     State: fmt::Debug,
296 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result297     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298         f.debug_struct("SelectWithStrategy")
299             .field("stream1", &self.stream1)
300             .field("stream2", &self.stream2)
301             .field("state", &self.state)
302             .finish()
303     }
304 }
305