1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{Stream, TryStream, FusedStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) 13 /// method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct TrySkipWhile<St, Fut, F> where St: TryStream { 16 #[pin] 17 stream: St, 18 f: F, 19 #[pin] 20 pending_fut: Option<Fut>, 21 pending_item: Option<St::Ok>, 22 done_skipping: bool, 23 } 24 } 25 26 impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F> 27 where 28 St: TryStream + fmt::Debug, 29 St::Ok: fmt::Debug, 30 Fut: fmt::Debug, 31 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 33 f.debug_struct("TrySkipWhile") 34 .field("stream", &self.stream) 35 .field("pending_fut", &self.pending_fut) 36 .field("pending_item", &self.pending_item) 37 .field("done_skipping", &self.done_skipping) 38 .finish() 39 } 40 } 41 42 impl<St, Fut, F> TrySkipWhile<St, Fut, F> 43 where St: TryStream, 44 F: FnMut(&St::Ok) -> Fut, 45 Fut: TryFuture<Ok = bool, Error = St::Error>, 46 { new(stream: St, f: F) -> Self47 pub(super) fn new(stream: St, f: F) -> Self { 48 Self { 49 stream, 50 f, 51 pending_fut: None, 52 pending_item: None, 53 done_skipping: false, 54 } 55 } 56 57 delegate_access_inner!(stream, St, ()); 58 } 59 60 impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> 61 where St: TryStream, 62 F: FnMut(&St::Ok) -> Fut, 63 Fut: TryFuture<Ok = bool, Error = St::Error>, 64 { 65 type Item = Result<St::Ok, St::Error>; 66 poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>67 fn poll_next( 68 self: Pin<&mut Self>, 69 cx: &mut Context<'_>, 70 ) -> Poll<Option<Self::Item>> { 71 let mut this = self.project(); 72 73 if *this.done_skipping { 74 return this.stream.try_poll_next(cx); 75 } 76 77 Poll::Ready(loop { 78 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { 79 let res = ready!(fut.try_poll(cx)); 80 this.pending_fut.set(None); 81 let skipped = res?; 82 let item = this.pending_item.take(); 83 if !skipped { 84 *this.done_skipping = true; 85 break item.map(Ok); 86 } 87 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 88 this.pending_fut.set(Some((this.f)(&item))); 89 *this.pending_item = Some(item); 90 } else { 91 break None; 92 } 93 }) 94 } 95 size_hint(&self) -> (usize, Option<usize>)96 fn size_hint(&self) -> (usize, Option<usize>) { 97 let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; 98 let (_, upper) = self.stream.size_hint(); 99 let upper = match upper { 100 Some(x) => x.checked_add(pending_len), 101 None => None, 102 }; 103 (0, upper) // can't know a lower bound, due to the predicate 104 } 105 } 106 107 impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F> 108 where St: TryStream + FusedStream, 109 F: FnMut(&St::Ok) -> Fut, 110 Fut: TryFuture<Ok = bool, Error = St::Error>, 111 { is_terminated(&self) -> bool112 fn is_terminated(&self) -> bool { 113 self.pending_item.is_none() && self.stream.is_terminated() 114 } 115 } 116 117 // Forwarding impl of Sink from the underlying stream 118 #[cfg(feature = "sink")] 119 impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F> 120 where S: TryStream + Sink<Item, Error = E>, 121 { 122 type Error = E; 123 124 delegate_sink!(stream, Item); 125 } 126