• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::TryFuture;
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream, TryStream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
13     /// method.
14     #[must_use = "streams do nothing unless polled"]
15     pub struct TryTakeWhile<St, Fut, F>
16     where
17         St: TryStream,
18     {
19         #[pin]
20         stream: St,
21         f: F,
22         #[pin]
23         pending_fut: Option<Fut>,
24         pending_item: Option<St::Ok>,
25         done_taking: bool,
26     }
27 }
28 
29 impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
30 where
31     St: TryStream + fmt::Debug,
32     St::Ok: fmt::Debug,
33     Fut: fmt::Debug,
34 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result35     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36         f.debug_struct("TryTakeWhile")
37             .field("stream", &self.stream)
38             .field("pending_fut", &self.pending_fut)
39             .field("pending_item", &self.pending_item)
40             .field("done_taking", &self.done_taking)
41             .finish()
42     }
43 }
44 
45 impl<St, Fut, F> TryTakeWhile<St, Fut, F>
46 where
47     St: TryStream,
48     F: FnMut(&St::Ok) -> Fut,
49     Fut: TryFuture<Ok = bool, Error = St::Error>,
50 {
new(stream: St, f: F) -> Self51     pub(super) fn new(stream: St, f: F) -> Self {
52         Self {
53             stream,
54             f,
55             pending_fut: None,
56             pending_item: None,
57             done_taking: false,
58         }
59     }
60 
61     delegate_access_inner!(stream, St, ());
62 }
63 
64 impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
65 where
66     St: TryStream,
67     F: FnMut(&St::Ok) -> Fut,
68     Fut: TryFuture<Ok = bool, Error = St::Error>,
69 {
70     type Item = Result<St::Ok, St::Error>;
71 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>72     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73         let mut this = self.project();
74 
75         if *this.done_taking {
76             return Poll::Ready(None);
77         }
78 
79         Poll::Ready(loop {
80             if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
81                 let res = ready!(fut.try_poll(cx));
82                 this.pending_fut.set(None);
83                 let take = res?;
84                 let item = this.pending_item.take();
85                 if take {
86                     break item.map(Ok);
87                 } else {
88                     *this.done_taking = true;
89                     break None;
90                 }
91             } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
92                 this.pending_fut.set(Some((this.f)(&item)));
93                 *this.pending_item = Some(item);
94             } else {
95                 break None;
96             }
97         })
98     }
99 
size_hint(&self) -> (usize, Option<usize>)100     fn size_hint(&self) -> (usize, Option<usize>) {
101         if self.done_taking {
102             return (0, Some(0));
103         }
104 
105         let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
106         let (_, upper) = self.stream.size_hint();
107         let upper = match upper {
108             Some(x) => x.checked_add(pending_len),
109             None => None,
110         };
111         (0, upper) // can't know a lower bound, due to the predicate
112     }
113 }
114 
115 impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
116 where
117     St: TryStream + FusedStream,
118     F: FnMut(&St::Ok) -> Fut,
119     Fut: TryFuture<Ok = bool, Error = St::Error>,
120 {
is_terminated(&self) -> bool121     fn is_terminated(&self) -> bool {
122         self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
123     }
124 }
125 
126 // Forwarding impl of Sink from the underlying stream
127 #[cfg(feature = "sink")]
128 impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
129 where
130     S: TryStream + Sink<Item, Error = E>,
131 {
132     type Error = E;
133 
134     delegate_sink!(stream, Item);
135 }
136