1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::Stream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`fold`](super::StreamExt::fold) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct Fold<St, Fut, T, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 accum: Option<T>, 17 #[pin] 18 future: Option<Fut>, 19 } 20 } 21 22 impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F> 23 where 24 St: fmt::Debug, 25 Fut: fmt::Debug, 26 T: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("Fold") 30 .field("stream", &self.stream) 31 .field("accum", &self.accum) 32 .field("future", &self.future) 33 .finish() 34 } 35 } 36 37 impl<St, Fut, T, F> Fold<St, Fut, T, F> 38 where 39 St: Stream, 40 F: FnMut(T, St::Item) -> Fut, 41 Fut: Future<Output = T>, 42 { new(stream: St, f: F, t: T) -> Self43 pub(super) fn new(stream: St, f: F, t: T) -> Self { 44 Self { stream, f, accum: Some(t), future: None } 45 } 46 } 47 48 impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> 49 where 50 St: Stream, 51 F: FnMut(T, St::Item) -> Fut, 52 Fut: Future<Output = T>, 53 { is_terminated(&self) -> bool54 fn is_terminated(&self) -> bool { 55 self.accum.is_none() && self.future.is_none() 56 } 57 } 58 59 impl<St, Fut, T, F> Future for Fold<St, Fut, T, F> 60 where 61 St: Stream, 62 F: FnMut(T, St::Item) -> Fut, 63 Fut: Future<Output = T>, 64 { 65 type Output = T; 66 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T>67 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 68 let mut this = self.project(); 69 Poll::Ready(loop { 70 if let Some(fut) = this.future.as_mut().as_pin_mut() { 71 // we're currently processing a future to produce a new accum value 72 *this.accum = Some(ready!(fut.poll(cx))); 73 this.future.set(None); 74 } else if this.accum.is_some() { 75 // we're waiting on a new item from the stream 76 let res = ready!(this.stream.as_mut().poll_next(cx)); 77 let a = this.accum.take().unwrap(); 78 if let Some(item) = res { 79 this.future.set(Some((this.f)(a, item))); 80 } else { 81 break a; 82 } 83 } else { 84 panic!("Fold polled after completion") 85 } 86 }) 87 } 88 } 89