1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while) 13 /// method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct TryTakeWhile<St, Fut, F> 16 where 17 St: TryStream, 18 { 19 #[pin] 20 stream: St, 21 f: F, 22 #[pin] 23 pending_fut: Option<Fut>, 24 pending_item: Option<St::Ok>, 25 done_taking: bool, 26 } 27 } 28 29 impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F> 30 where 31 St: TryStream + fmt::Debug, 32 St::Ok: fmt::Debug, 33 Fut: fmt::Debug, 34 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 36 f.debug_struct("TryTakeWhile") 37 .field("stream", &self.stream) 38 .field("pending_fut", &self.pending_fut) 39 .field("pending_item", &self.pending_item) 40 .field("done_taking", &self.done_taking) 41 .finish() 42 } 43 } 44 45 impl<St, Fut, F> TryTakeWhile<St, Fut, F> 46 where 47 St: TryStream, 48 F: FnMut(&St::Ok) -> Fut, 49 Fut: TryFuture<Ok = bool, Error = St::Error>, 50 { new(stream: St, f: F) -> Self51 pub(super) fn new(stream: St, f: F) -> Self { 52 Self { 53 stream, 54 f, 55 pending_fut: None, 56 pending_item: None, 57 done_taking: false, 58 } 59 } 60 61 delegate_access_inner!(stream, St, ()); 62 } 63 64 impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F> 65 where 66 St: TryStream, 67 F: FnMut(&St::Ok) -> Fut, 68 Fut: TryFuture<Ok = bool, Error = St::Error>, 69 { 70 type Item = Result<St::Ok, St::Error>; 71 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>72 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 73 let mut this = self.project(); 74 75 if *this.done_taking { 76 return Poll::Ready(None); 77 } 78 79 Poll::Ready(loop { 80 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { 81 let res = ready!(fut.try_poll(cx)); 82 this.pending_fut.set(None); 83 let take = res?; 84 let item = this.pending_item.take(); 85 if take { 86 break item.map(Ok); 87 } else { 88 *this.done_taking = true; 89 break None; 90 } 91 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 92 this.pending_fut.set(Some((this.f)(&item))); 93 *this.pending_item = Some(item); 94 } else { 95 break None; 96 } 97 }) 98 } 99 size_hint(&self) -> (usize, Option<usize>)100 fn size_hint(&self) -> (usize, Option<usize>) { 101 if self.done_taking { 102 return (0, Some(0)); 103 } 104 105 let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; 106 let (_, upper) = self.stream.size_hint(); 107 let upper = match upper { 108 Some(x) => x.checked_add(pending_len), 109 None => None, 110 }; 111 (0, upper) // can't know a lower bound, due to the predicate 112 } 113 } 114 115 impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F> 116 where 117 St: TryStream + FusedStream, 118 F: FnMut(&St::Ok) -> Fut, 119 Fut: TryFuture<Ok = bool, Error = St::Error>, 120 { is_terminated(&self) -> bool121 fn is_terminated(&self) -> bool { 122 self.done_taking || self.pending_item.is_none() && self.stream.is_terminated() 123 } 124 } 125 126 // Forwarding impl of Sink from the underlying stream 127 #[cfg(feature = "sink")] 128 impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F> 129 where 130 S: TryStream + Sink<Item, Error = E>, 131 { 132 type Error = E; 133 134 delegate_sink!(stream, Item); 135 } 136