• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Stream;
2 
3 use core::future::Future;
4 use core::marker::PhantomPinned;
5 use core::pin::Pin;
6 use core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future returned by the [`fold`](super::StreamExt::fold) method.
11     #[derive(Debug)]
12     #[must_use = "futures do nothing unless you `.await` or poll them"]
13     pub struct FoldFuture<St, B, F> {
14         #[pin]
15         stream: St,
16         acc: Option<B>,
17         f: F,
18         // Make this future `!Unpin` for compatibility with async trait methods.
19         #[pin]
20         _pin: PhantomPinned,
21     }
22 }
23 
24 impl<St, B, F> FoldFuture<St, B, F> {
new(stream: St, init: B, f: F) -> Self25     pub(super) fn new(stream: St, init: B, f: F) -> Self {
26         Self {
27             stream,
28             acc: Some(init),
29             f,
30             _pin: PhantomPinned,
31         }
32     }
33 }
34 
35 impl<St, B, F> Future for FoldFuture<St, B, F>
36 where
37     St: Stream,
38     F: FnMut(B, St::Item) -> B,
39 {
40     type Output = B;
41 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>42     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43         let mut me = self.project();
44         loop {
45             let next = ready!(me.stream.as_mut().poll_next(cx));
46 
47             match next {
48                 Some(v) => {
49                     let old = me.acc.take().unwrap();
50                     let new = (me.f)(old, v);
51                     *me.acc = Some(new);
52                 }
53                 None => return Poll::Ready(me.acc.take().unwrap()),
54             }
55         }
56     }
57 }
58