• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::pin::Pin;
2 use futures_core::ready;
3 use futures_core::stream::{FusedStream, Stream, TryStream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
11     #[derive(Debug)]
12     #[must_use = "streams do nothing unless polled"]
13     pub struct TryFlatten<St>
14     where
15         St: TryStream,
16     {
17         #[pin]
18         stream: St,
19         #[pin]
20         next: Option<St::Ok>,
21     }
22 }
23 
24 impl<St> TryFlatten<St>
25 where
26     St: TryStream,
27     St::Ok: TryStream,
28     <St::Ok as TryStream>::Error: From<St::Error>,
29 {
new(stream: St) -> Self30     pub(super) fn new(stream: St) -> Self {
31         Self { stream, next: None }
32     }
33 
34     delegate_access_inner!(stream, St, ());
35 }
36 
37 impl<St> FusedStream for TryFlatten<St>
38 where
39     St: TryStream + FusedStream,
40     St::Ok: TryStream,
41     <St::Ok as TryStream>::Error: From<St::Error>,
42 {
is_terminated(&self) -> bool43     fn is_terminated(&self) -> bool {
44         self.next.is_none() && self.stream.is_terminated()
45     }
46 }
47 
48 impl<St> Stream for TryFlatten<St>
49 where
50     St: TryStream,
51     St::Ok: TryStream,
52     <St::Ok as TryStream>::Error: From<St::Error>,
53 {
54     type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
55 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>56     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57         let mut this = self.project();
58 
59         Poll::Ready(loop {
60             if let Some(s) = this.next.as_mut().as_pin_mut() {
61                 if let Some(item) = ready!(s.try_poll_next(cx)?) {
62                     break Some(Ok(item));
63                 } else {
64                     this.next.set(None);
65                 }
66             } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
67                 this.next.set(Some(s));
68             } else {
69                 break None;
70             }
71         })
72     }
73 }
74 
75 // Forwarding impl of Sink from the underlying stream
76 #[cfg(feature = "sink")]
77 impl<S, Item> Sink<Item> for TryFlatten<S>
78 where
79     S: TryStream + Sink<Item>,
80 {
81     type Error = <S as Sink<Item>>::Error;
82 
83     delegate_sink!(stream, Item);
84 }
85