1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{Future, TryFuture}; 4 use futures_core::ready; 5 use futures_core::stream::TryStream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct TryForEach<St, Fut, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 #[pin] 17 future: Option<Fut>, 18 } 19 } 20 21 impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F> 22 where 23 St: fmt::Debug, 24 Fut: fmt::Debug, 25 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 27 f.debug_struct("TryForEach") 28 .field("stream", &self.stream) 29 .field("future", &self.future) 30 .finish() 31 } 32 } 33 34 impl<St, Fut, F> TryForEach<St, Fut, F> 35 where 36 St: TryStream, 37 F: FnMut(St::Ok) -> Fut, 38 Fut: TryFuture<Ok = (), Error = St::Error>, 39 { new(stream: St, f: F) -> Self40 pub(super) fn new(stream: St, f: F) -> Self { 41 Self { stream, f, future: None } 42 } 43 } 44 45 impl<St, Fut, F> Future for TryForEach<St, Fut, F> 46 where 47 St: TryStream, 48 F: FnMut(St::Ok) -> Fut, 49 Fut: TryFuture<Ok = (), Error = St::Error>, 50 { 51 type Output = Result<(), St::Error>; 52 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 54 let mut this = self.project(); 55 loop { 56 if let Some(fut) = this.future.as_mut().as_pin_mut() { 57 ready!(fut.try_poll(cx))?; 58 this.future.set(None); 59 } else { 60 match ready!(this.stream.as_mut().try_poll_next(cx)?) { 61 Some(e) => this.future.set(Some((this.f)(e))), 62 None => break, 63 } 64 } 65 } 66 Poll::Ready(Ok(())) 67 } 68 } 69