• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::fns::FnOnce1;
2 use crate::stream::{Fuse, StreamExt};
3 use core::fmt;
4 use core::marker::PhantomData;
5 use core::pin::Pin;
6 use futures_core::future::{FusedFuture, Future};
7 use futures_core::ready;
8 use futures_core::stream::{FusedStream, Stream};
9 use futures_core::task::{Context, Poll};
10 #[cfg(feature = "sink")]
11 use futures_sink::Sink;
12 use pin_project_lite::pin_project;
13 
14 pin_project! {
15     /// A `Stream` that implements a `peek` method.
16     ///
17     /// The `peek` method can be used to retrieve a reference
18     /// to the next `Stream::Item` if available. A subsequent
19     /// call to `poll` will return the owned item.
20     #[derive(Debug)]
21     #[must_use = "streams do nothing unless polled"]
22     pub struct Peekable<St: Stream> {
23         #[pin]
24         stream: Fuse<St>,
25         peeked: Option<St::Item>,
26     }
27 }
28 
29 impl<St: Stream> Peekable<St> {
new(stream: St) -> Self30     pub(super) fn new(stream: St) -> Self {
31         Self { stream: stream.fuse(), peeked: None }
32     }
33 
34     delegate_access_inner!(stream, St, (.));
35 
36     /// Produces a future which retrieves a reference to the next item
37     /// in the stream, or `None` if the underlying stream terminates.
peek(self: Pin<&mut Self>) -> Peek<'_, St>38     pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
39         Peek { inner: Some(self) }
40     }
41 
42     /// Peek retrieves a reference to the next item in the stream.
43     ///
44     /// This method polls the underlying stream and return either a reference
45     /// to the next item if the stream is ready or passes through any errors.
poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>>46     pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
47         let mut this = self.project();
48 
49         Poll::Ready(loop {
50             if this.peeked.is_some() {
51                 break this.peeked.as_ref();
52             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
53                 *this.peeked = Some(item);
54             } else {
55                 break None;
56             }
57         })
58     }
59 
60     /// Produces a future which retrieves a mutable reference to the next item
61     /// in the stream, or `None` if the underlying stream terminates.
62     ///
63     /// # Examples
64     ///
65     /// ```
66     /// # futures::executor::block_on(async {
67     /// use futures::stream::{self, StreamExt};
68     /// use futures::pin_mut;
69     ///
70     /// let stream = stream::iter(vec![1, 2, 3]).peekable();
71     /// pin_mut!(stream);
72     ///
73     /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
74     /// assert_eq!(stream.as_mut().next().await, Some(1));
75     ///
76     /// // Peek into the stream and modify the value which will be returned next
77     /// if let Some(p) = stream.as_mut().peek_mut().await {
78     ///     if *p == 2 {
79     ///         *p = 5;
80     ///     }
81     /// }
82     ///
83     /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
84     /// # });
85     /// ```
peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St>86     pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
87         PeekMut { inner: Some(self) }
88     }
89 
90     /// Peek retrieves a mutable reference to the next item in the stream.
poll_peek_mut( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&mut St::Item>>91     pub fn poll_peek_mut(
92         self: Pin<&mut Self>,
93         cx: &mut Context<'_>,
94     ) -> Poll<Option<&mut St::Item>> {
95         let mut this = self.project();
96 
97         Poll::Ready(loop {
98             if this.peeked.is_some() {
99                 break this.peeked.as_mut();
100             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
101                 *this.peeked = Some(item);
102             } else {
103                 break None;
104             }
105         })
106     }
107 
108     /// Creates a future which will consume and return the next value of this
109     /// stream if a condition is true.
110     ///
111     /// If `func` returns `true` for the next value of this stream, consume and
112     /// return it. Otherwise, return `None`.
113     ///
114     /// # Examples
115     ///
116     /// Consume a number if it's equal to 0.
117     ///
118     /// ```
119     /// # futures::executor::block_on(async {
120     /// use futures::stream::{self, StreamExt};
121     /// use futures::pin_mut;
122     ///
123     /// let stream = stream::iter(0..5).peekable();
124     /// pin_mut!(stream);
125     /// // The first item of the stream is 0; consume it.
126     /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0));
127     /// // The next item returned is now 1, so `consume` will return `false`.
128     /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None);
129     /// // `next_if` saves the value of the next item if it was not equal to `expected`.
130     /// assert_eq!(stream.next().await, Some(1));
131     /// # });
132     /// ```
133     ///
134     /// Consume any number less than 10.
135     ///
136     /// ```
137     /// # futures::executor::block_on(async {
138     /// use futures::stream::{self, StreamExt};
139     /// use futures::pin_mut;
140     ///
141     /// let stream = stream::iter(1..20).peekable();
142     /// pin_mut!(stream);
143     /// // Consume all numbers less than 10
144     /// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {}
145     /// // The next value returned will be 10
146     /// assert_eq!(stream.next().await, Some(10));
147     /// # });
148     /// ```
next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F> where F: FnOnce(&St::Item) -> bool,149     pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
150     where
151         F: FnOnce(&St::Item) -> bool,
152     {
153         NextIf { inner: Some((self, func)) }
154     }
155 
156     /// Creates a future which will consume and return the next item if it is
157     /// equal to `expected`.
158     ///
159     /// # Example
160     ///
161     /// Consume a number if it's equal to 0.
162     ///
163     /// ```
164     /// # futures::executor::block_on(async {
165     /// use futures::stream::{self, StreamExt};
166     /// use futures::pin_mut;
167     ///
168     /// let stream = stream::iter(0..5).peekable();
169     /// pin_mut!(stream);
170     /// // The first item of the stream is 0; consume it.
171     /// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0));
172     /// // The next item returned is now 1, so `consume` will return `false`.
173     /// assert_eq!(stream.as_mut().next_if_eq(&0).await, None);
174     /// // `next_if_eq` saves the value of the next item if it was not equal to `expected`.
175     /// assert_eq!(stream.next().await, Some(1));
176     /// # });
177     /// ```
next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T> where T: ?Sized, St::Item: PartialEq<T>,178     pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
179     where
180         T: ?Sized,
181         St::Item: PartialEq<T>,
182     {
183         NextIfEq {
184             inner: NextIf { inner: Some((self, NextIfEqFn { expected, _next: PhantomData })) },
185         }
186     }
187 }
188 
189 impl<St: Stream> FusedStream for Peekable<St> {
is_terminated(&self) -> bool190     fn is_terminated(&self) -> bool {
191         self.peeked.is_none() && self.stream.is_terminated()
192     }
193 }
194 
195 impl<S: Stream> Stream for Peekable<S> {
196     type Item = S::Item;
197 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>198     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
199         let this = self.project();
200         if let Some(item) = this.peeked.take() {
201             return Poll::Ready(Some(item));
202         }
203         this.stream.poll_next(cx)
204     }
205 
size_hint(&self) -> (usize, Option<usize>)206     fn size_hint(&self) -> (usize, Option<usize>) {
207         let peek_len = usize::from(self.peeked.is_some());
208         let (lower, upper) = self.stream.size_hint();
209         let lower = lower.saturating_add(peek_len);
210         let upper = match upper {
211             Some(x) => x.checked_add(peek_len),
212             None => None,
213         };
214         (lower, upper)
215     }
216 }
217 
218 // Forwarding impl of Sink from the underlying stream
219 #[cfg(feature = "sink")]
220 impl<S, Item> Sink<Item> for Peekable<S>
221 where
222     S: Sink<Item> + Stream,
223 {
224     type Error = S::Error;
225 
226     delegate_sink!(stream, Item);
227 }
228 
229 pin_project! {
230     /// Future for the [`Peekable::peek`](self::Peekable::peek) method.
231     #[must_use = "futures do nothing unless polled"]
232     pub struct Peek<'a, St: Stream> {
233         inner: Option<Pin<&'a mut Peekable<St>>>,
234     }
235 }
236 
237 impl<St> fmt::Debug for Peek<'_, St>
238 where
239     St: Stream + fmt::Debug,
240     St::Item: fmt::Debug,
241 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result242     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243         f.debug_struct("Peek").field("inner", &self.inner).finish()
244     }
245 }
246 
247 impl<St: Stream> FusedFuture for Peek<'_, St> {
is_terminated(&self) -> bool248     fn is_terminated(&self) -> bool {
249         self.inner.is_none()
250     }
251 }
252 
253 impl<'a, St> Future for Peek<'a, St>
254 where
255     St: Stream,
256 {
257     type Output = Option<&'a St::Item>;
258 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>259     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
260         let inner = self.project().inner;
261         if let Some(peekable) = inner {
262             ready!(peekable.as_mut().poll_peek(cx));
263 
264             inner.take().unwrap().poll_peek(cx)
265         } else {
266             panic!("Peek polled after completion")
267         }
268     }
269 }
270 
271 pin_project! {
272     /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
273     #[must_use = "futures do nothing unless polled"]
274     pub struct PeekMut<'a, St: Stream> {
275         inner: Option<Pin<&'a mut Peekable<St>>>,
276     }
277 }
278 
279 impl<St> fmt::Debug for PeekMut<'_, St>
280 where
281     St: Stream + fmt::Debug,
282     St::Item: fmt::Debug,
283 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result284     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285         f.debug_struct("PeekMut").field("inner", &self.inner).finish()
286     }
287 }
288 
289 impl<St: Stream> FusedFuture for PeekMut<'_, St> {
is_terminated(&self) -> bool290     fn is_terminated(&self) -> bool {
291         self.inner.is_none()
292     }
293 }
294 
295 impl<'a, St> Future for PeekMut<'a, St>
296 where
297     St: Stream,
298 {
299     type Output = Option<&'a mut St::Item>;
300 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>301     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
302         let inner = self.project().inner;
303         if let Some(peekable) = inner {
304             ready!(peekable.as_mut().poll_peek_mut(cx));
305 
306             inner.take().unwrap().poll_peek_mut(cx)
307         } else {
308             panic!("PeekMut polled after completion")
309         }
310     }
311 }
312 
313 pin_project! {
314     /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
315     #[must_use = "futures do nothing unless polled"]
316     pub struct NextIf<'a, St: Stream, F> {
317         inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
318     }
319 }
320 
321 impl<St, F> fmt::Debug for NextIf<'_, St, F>
322 where
323     St: Stream + fmt::Debug,
324     St::Item: fmt::Debug,
325 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result326     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327         f.debug_struct("NextIf").field("inner", &self.inner.as_ref().map(|(s, _f)| s)).finish()
328     }
329 }
330 
331 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
332 impl<St, F> FusedFuture for NextIf<'_, St, F>
333 where
334     St: Stream,
335     F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
336 {
is_terminated(&self) -> bool337     fn is_terminated(&self) -> bool {
338         self.inner.is_none()
339     }
340 }
341 
342 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
343 impl<St, F> Future for NextIf<'_, St, F>
344 where
345     St: Stream,
346     F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
347 {
348     type Output = Option<St::Item>;
349 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>350     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351         let inner = self.project().inner;
352         if let Some((peekable, _)) = inner {
353             let res = ready!(peekable.as_mut().poll_next(cx));
354 
355             let (peekable, func) = inner.take().unwrap();
356             match res {
357                 Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
358                 other => {
359                     let peekable = peekable.project();
360                     // Since we called `self.next()`, we consumed `self.peeked`.
361                     assert!(peekable.peeked.is_none());
362                     *peekable.peeked = other;
363                     Poll::Ready(None)
364                 }
365             }
366         } else {
367             panic!("NextIf polled after completion")
368         }
369     }
370 }
371 
372 pin_project! {
373     /// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method.
374     #[must_use = "futures do nothing unless polled"]
375     pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
376         #[pin]
377         inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
378     }
379 }
380 
381 impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
382 where
383     St: Stream + fmt::Debug,
384     St::Item: fmt::Debug,
385     T: ?Sized,
386 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result387     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
388         f.debug_struct("NextIfEq")
389             .field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
390             .finish()
391     }
392 }
393 
394 impl<St, T> FusedFuture for NextIfEq<'_, St, T>
395 where
396     St: Stream,
397     T: ?Sized,
398     St::Item: PartialEq<T>,
399 {
is_terminated(&self) -> bool400     fn is_terminated(&self) -> bool {
401         self.inner.is_terminated()
402     }
403 }
404 
405 impl<St, T> Future for NextIfEq<'_, St, T>
406 where
407     St: Stream,
408     T: ?Sized,
409     St::Item: PartialEq<T>,
410 {
411     type Output = Option<St::Item>;
412 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>413     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
414         self.project().inner.poll(cx)
415     }
416 }
417 
418 struct NextIfEqFn<'a, T: ?Sized, Item> {
419     expected: &'a T,
420     _next: PhantomData<Item>,
421 }
422 
423 impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
424 where
425     T: ?Sized,
426     Item: PartialEq<T>,
427 {
428     type Output = bool;
429 
call_once(self, next: &Item) -> Self::Output430     fn call_once(self, next: &Item) -> Self::Output {
431         next == self.expected
432     }
433 }
434