1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::{FusedStream, Stream, TryStream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. 11 #[derive(Debug)] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct TryFlatten<St> 14 where 15 St: TryStream, 16 { 17 #[pin] 18 stream: St, 19 #[pin] 20 next: Option<St::Ok>, 21 } 22 } 23 24 impl<St> TryFlatten<St> 25 where 26 St: TryStream, 27 St::Ok: TryStream, 28 <St::Ok as TryStream>::Error: From<St::Error>, 29 { new(stream: St) -> Self30 pub(super) fn new(stream: St) -> Self { 31 Self { stream, next: None } 32 } 33 34 delegate_access_inner!(stream, St, ()); 35 } 36 37 impl<St> FusedStream for TryFlatten<St> 38 where 39 St: TryStream + FusedStream, 40 St::Ok: TryStream, 41 <St::Ok as TryStream>::Error: From<St::Error>, 42 { is_terminated(&self) -> bool43 fn is_terminated(&self) -> bool { 44 self.next.is_none() && self.stream.is_terminated() 45 } 46 } 47 48 impl<St> Stream for TryFlatten<St> 49 where 50 St: TryStream, 51 St::Ok: TryStream, 52 <St::Ok as TryStream>::Error: From<St::Error>, 53 { 54 type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>; 55 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 57 let mut this = self.project(); 58 59 Poll::Ready(loop { 60 if let Some(s) = this.next.as_mut().as_pin_mut() { 61 if let Some(item) = ready!(s.try_poll_next(cx)?) { 62 break Some(Ok(item)); 63 } else { 64 this.next.set(None); 65 } 66 } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 67 this.next.set(Some(s)); 68 } else { 69 break None; 70 } 71 }) 72 } 73 } 74 75 // Forwarding impl of Sink from the underlying stream 76 #[cfg(feature = "sink")] 77 impl<S, Item> Sink<Item> for TryFlatten<S> 78 where 79 S: TryStream + Sink<Item>, 80 { 81 type Error = <S as Sink<Item>>::Error; 82 83 delegate_sink!(stream, Item); 84 } 85