• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::assert_stream;
2 use crate::stream::{Fuse, StreamExt};
3 use core::{fmt, pin::Pin};
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 use pin_project_lite::pin_project;
7 
8 /// Type to tell [`SelectWithStrategy`] which stream to poll next.
9 #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
10 pub enum PollNext {
11     /// Poll the first stream.
12     Left,
13     /// Poll the second stream.
14     Right,
15 }
16 
17 impl PollNext {
18     /// Toggle the value and return the old one.
toggle(&mut self) -> Self19     pub fn toggle(&mut self) -> Self {
20         let old = *self;
21 
22         match self {
23             PollNext::Left => *self = PollNext::Right,
24             PollNext::Right => *self = PollNext::Left,
25         }
26 
27         old
28     }
29 }
30 
31 impl Default for PollNext {
default() -> Self32     fn default() -> Self {
33         PollNext::Left
34     }
35 }
36 
37 pin_project! {
38     /// Stream for the [`select_with_strategy()`] function. See function docs for details.
39     #[must_use = "streams do nothing unless polled"]
40     pub struct SelectWithStrategy<St1, St2, Clos, State> {
41         #[pin]
42         stream1: Fuse<St1>,
43         #[pin]
44         stream2: Fuse<St2>,
45         state: State,
46         clos: Clos,
47     }
48 }
49 
50 /// This function will attempt to pull items from both streams. You provide a
51 /// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can
52 /// store state on `SelectWithStrategy` to which it will receive a `&mut` on every
53 /// invocation. This allows basing the strategy on prior choices.
54 ///
55 /// After one of the two input streams completes, the remaining one will be
56 /// polled exclusively. The returned stream completes when both input
57 /// streams have completed.
58 ///
59 /// Note that this function consumes both streams and returns a wrapped
60 /// version of them.
61 ///
62 /// ## Examples
63 ///
64 /// ### Priority
65 /// This example shows how to always prioritize the left stream.
66 ///
67 /// ```rust
68 /// # futures::executor::block_on(async {
69 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
70 ///
71 /// let left = repeat(1);
72 /// let right = repeat(2);
73 ///
74 /// // We don't need any state, so let's make it an empty tuple.
75 /// // We must provide some type here, as there is no way for the compiler
76 /// // to infer it. As we don't need to capture variables, we can just
77 /// // use a function pointer instead of a closure.
78 /// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
79 ///
80 /// let mut out = select_with_strategy(left, right, prio_left);
81 ///
82 /// for _ in 0..100 {
83 ///     // Whenever we poll out, we will alwas get `1`.
84 ///     assert_eq!(1, out.select_next_some().await);
85 /// }
86 /// # });
87 /// ```
88 ///
89 /// ### Round Robin
90 /// This example shows how to select from both streams round robin.
91 /// Note: this special case is provided by [`futures-util::stream::select`].
92 ///
93 /// ```rust
94 /// # futures::executor::block_on(async {
95 /// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
96 ///
97 /// let left = repeat(1);
98 /// let right = repeat(2);
99 ///
100 /// let rrobin = |last: &mut PollNext| last.toggle();
101 ///
102 /// let mut out = select_with_strategy(left, right, rrobin);
103 ///
104 /// for _ in 0..100 {
105 ///     // We should be alternating now.
106 ///     assert_eq!(1, out.select_next_some().await);
107 ///     assert_eq!(2, out.select_next_some().await);
108 /// }
109 /// # });
110 /// ```
select_with_strategy<St1, St2, Clos, State>( stream1: St1, stream2: St2, which: Clos, ) -> SelectWithStrategy<St1, St2, Clos, State> where St1: Stream, St2: Stream<Item = St1::Item>, Clos: FnMut(&mut State) -> PollNext, State: Default,111 pub fn select_with_strategy<St1, St2, Clos, State>(
112     stream1: St1,
113     stream2: St2,
114     which: Clos,
115 ) -> SelectWithStrategy<St1, St2, Clos, State>
116 where
117     St1: Stream,
118     St2: Stream<Item = St1::Item>,
119     Clos: FnMut(&mut State) -> PollNext,
120     State: Default,
121 {
122     assert_stream::<St1::Item, _>(SelectWithStrategy {
123         stream1: stream1.fuse(),
124         stream2: stream2.fuse(),
125         state: Default::default(),
126         clos: which,
127     })
128 }
129 
130 impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
131     /// Acquires a reference to the underlying streams that this combinator is
132     /// pulling from.
get_ref(&self) -> (&St1, &St2)133     pub fn get_ref(&self) -> (&St1, &St2) {
134         (self.stream1.get_ref(), self.stream2.get_ref())
135     }
136 
137     /// Acquires a mutable reference to the underlying streams that this
138     /// combinator is pulling from.
139     ///
140     /// Note that care must be taken to avoid tampering with the state of the
141     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> (&mut St1, &mut St2)142     pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
143         (self.stream1.get_mut(), self.stream2.get_mut())
144     }
145 
146     /// Acquires a pinned mutable reference to the underlying streams that this
147     /// combinator is pulling from.
148     ///
149     /// Note that care must be taken to avoid tampering with the state of the
150     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)151     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
152         let this = self.project();
153         (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
154     }
155 
156     /// Consumes this combinator, returning the underlying streams.
157     ///
158     /// Note that this may discard intermediate state of this combinator, so
159     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> (St1, St2)160     pub fn into_inner(self) -> (St1, St2) {
161         (self.stream1.into_inner(), self.stream2.into_inner())
162     }
163 }
164 
165 impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
166 where
167     St1: Stream,
168     St2: Stream<Item = St1::Item>,
169     Clos: FnMut(&mut State) -> PollNext,
170 {
is_terminated(&self) -> bool171     fn is_terminated(&self) -> bool {
172         self.stream1.is_terminated() && self.stream2.is_terminated()
173     }
174 }
175 
176 impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
177 where
178     St1: Stream,
179     St2: Stream<Item = St1::Item>,
180     Clos: FnMut(&mut State) -> PollNext,
181 {
182     type Item = St1::Item;
183 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>>184     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
185         let this = self.project();
186 
187         match (this.clos)(this.state) {
188             PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
189             PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
190         }
191     }
192 }
193 
poll_inner<St1, St2>( a: Pin<&mut St1>, b: Pin<&mut St2>, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>,194 fn poll_inner<St1, St2>(
195     a: Pin<&mut St1>,
196     b: Pin<&mut St2>,
197     cx: &mut Context<'_>,
198 ) -> Poll<Option<St1::Item>>
199 where
200     St1: Stream,
201     St2: Stream<Item = St1::Item>,
202 {
203     let a_done = match a.poll_next(cx) {
204         Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
205         Poll::Ready(None) => true,
206         Poll::Pending => false,
207     };
208 
209     match b.poll_next(cx) {
210         Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
211         Poll::Ready(None) if a_done => Poll::Ready(None),
212         Poll::Ready(None) | Poll::Pending => Poll::Pending,
213     }
214 }
215 
216 impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
217 where
218     St1: fmt::Debug,
219     St2: fmt::Debug,
220     State: fmt::Debug,
221 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result222     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223         f.debug_struct("SelectWithStrategy")
224             .field("stream1", &self.stream1)
225             .field("stream2", &self.stream2)
226             .field("state", &self.state)
227             .finish()
228     }
229 }
230