1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future, TryFuture}; 4 use futures_core::ready; 5 use futures_core::stream::TryStream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct TryFold<St, Fut, T, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 accum: Option<T>, 17 #[pin] 18 future: Option<Fut>, 19 } 20 } 21 22 impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> 23 where 24 St: fmt::Debug, 25 Fut: fmt::Debug, 26 T: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("TryFold") 30 .field("stream", &self.stream) 31 .field("accum", &self.accum) 32 .field("future", &self.future) 33 .finish() 34 } 35 } 36 37 impl<St, Fut, T, F> TryFold<St, Fut, T, F> 38 where St: TryStream, 39 F: FnMut(T, St::Ok) -> Fut, 40 Fut: TryFuture<Ok = T, Error = St::Error>, 41 { new(stream: St, f: F, t: T) -> Self42 pub(super) fn new(stream: St, f: F, t: T) -> Self { 43 Self { 44 stream, 45 f, 46 accum: Some(t), 47 future: None, 48 } 49 } 50 } 51 52 impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F> 53 where St: TryStream, 54 F: FnMut(T, St::Ok) -> Fut, 55 Fut: TryFuture<Ok = T, Error = St::Error>, 56 { is_terminated(&self) -> bool57 fn is_terminated(&self) -> bool { 58 self.accum.is_none() && self.future.is_none() 59 } 60 } 61 62 impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> 63 where St: TryStream, 64 F: FnMut(T, St::Ok) -> Fut, 65 Fut: TryFuture<Ok = T, Error = St::Error>, 66 { 67 type Output = Result<T, St::Error>; 68 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>69 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 70 let mut this = self.project(); 71 72 Poll::Ready(loop { 73 if let Some(fut) = this.future.as_mut().as_pin_mut() { 74 // we're currently processing a future to produce a new accum value 75 let res = ready!(fut.try_poll(cx)); 76 this.future.set(None); 77 match res { 78 Ok(a) => *this.accum = Some(a), 79 Err(e) => break Err(e), 80 } 81 } else if this.accum.is_some() { 82 // we're waiting on a new item from the stream 83 let res = ready!(this.stream.as_mut().try_poll_next(cx)); 84 let a = this.accum.take().unwrap(); 85 match res { 86 Some(Ok(item)) => this.future.set(Some((this.f)(a, item))), 87 Some(Err(e)) => break Err(e), 88 None => break Ok(a), 89 } 90 } else { 91 panic!("Fold polled after completion") 92 } 93 }) 94 } 95 } 96