• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::{Future, TryFuture};
4 use futures_core::ready;
5 use futures_core::stream::TryStream;
6 use futures_core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
11     #[must_use = "futures do nothing unless you `.await` or poll them"]
12     pub struct TryForEach<St, Fut, F> {
13         #[pin]
14         stream: St,
15         f: F,
16         #[pin]
17         future: Option<Fut>,
18     }
19 }
20 
21 impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F>
22 where
23     St: fmt::Debug,
24     Fut: fmt::Debug,
25 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27         f.debug_struct("TryForEach")
28             .field("stream", &self.stream)
29             .field("future", &self.future)
30             .finish()
31     }
32 }
33 
34 impl<St, Fut, F> TryForEach<St, Fut, F>
35 where
36     St: TryStream,
37     F: FnMut(St::Ok) -> Fut,
38     Fut: TryFuture<Ok = (), Error = St::Error>,
39 {
new(stream: St, f: F) -> Self40     pub(super) fn new(stream: St, f: F) -> Self {
41         Self { stream, f, future: None }
42     }
43 }
44 
45 impl<St, Fut, F> Future for TryForEach<St, Fut, F>
46 where
47     St: TryStream,
48     F: FnMut(St::Ok) -> Fut,
49     Fut: TryFuture<Ok = (), Error = St::Error>,
50 {
51     type Output = Result<(), St::Error>;
52 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54         let mut this = self.project();
55         loop {
56             if let Some(fut) = this.future.as_mut().as_pin_mut() {
57                 ready!(fut.try_poll(cx))?;
58                 this.future.set(None);
59             } else {
60                 match ready!(this.stream.as_mut().try_poll_next(cx)?) {
61                     Some(e) => this.future.set(Some((this.f)(e))),
62                     None => break,
63                 }
64             }
65         }
66         Poll::Ready(Ok(()))
67     }
68 }
69