• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::marker::PhantomData;
2 use core::pin::Pin;
3 
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream, TryStream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 
10 use pin_project_lite::pin_project;
11 
12 use crate::future::Either;
13 use crate::stream::stream::flatten_unordered::{
14     FlattenUnorderedWithFlowController, FlowController, FlowStep,
15 };
16 use crate::stream::IntoStream;
17 use crate::TryStreamExt;
18 
19 delegate_all!(
20     /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
21     TryFlattenUnordered<St>(
22         FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
23     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
24         + New[
25             |stream: St, limit: impl Into<Option<usize>>|
26                 FlattenUnorderedWithFlowController::new(
27                     NestedTryStreamIntoEitherTryStream::new(stream),
28                     limit.into()
29                 )
30         ]
31     where
32         St: TryStream,
33         St::Ok: TryStream,
34         St::Ok: Unpin,
35         <St::Ok as TryStream>::Error: From<St::Error>
36 );
37 
38 pin_project! {
39     /// Emits either successful streams or single-item streams containing the underlying errors.
40     /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
41     #[derive(Debug)]
42     #[must_use = "streams do nothing unless polled"]
43     pub struct NestedTryStreamIntoEitherTryStream<St>
44         where
45             St: TryStream,
46             St::Ok: TryStream,
47             St::Ok: Unpin,
48             <St::Ok as TryStream>::Error: From<St::Error>
49         {
50             #[pin]
51             stream: St
52         }
53 }
54 
55 impl<St> NestedTryStreamIntoEitherTryStream<St>
56 where
57     St: TryStream,
58     St::Ok: TryStream + Unpin,
59     <St::Ok as TryStream>::Error: From<St::Error>,
60 {
new(stream: St) -> Self61     fn new(stream: St) -> Self {
62         Self { stream }
63     }
64 
65     delegate_access_inner!(stream, St, ());
66 }
67 
68 /// Emits a single item immediately, then stream will be terminated.
69 #[derive(Debug, Clone)]
70 pub struct Single<T>(Option<T>);
71 
72 impl<T> Single<T> {
73     /// Constructs new `Single` with the given value.
new(val: T) -> Self74     fn new(val: T) -> Self {
75         Self(Some(val))
76     }
77 
78     /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
next_immediate(&mut self) -> Option<T>79     fn next_immediate(&mut self) -> Option<T> {
80         self.0.take()
81     }
82 }
83 
84 impl<T> Unpin for Single<T> {}
85 
86 impl<T> Stream for Single<T> {
87     type Item = T;
88 
poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>89     fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90         Poll::Ready(self.0.take())
91     }
92 
size_hint(&self) -> (usize, Option<usize>)93     fn size_hint(&self) -> (usize, Option<usize>) {
94         self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
95     }
96 }
97 
98 /// Immediately propagates errors occurred in the base stream.
99 #[derive(Debug, Clone, Copy)]
100 pub struct PropagateBaseStreamError<St>(PhantomData<St>);
101 
102 type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
103 type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
104 
105 impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
106 where
107     St: TryStream,
108     St::Ok: TryStream + Unpin,
109     <St::Ok as TryStream>::Error: From<St::Error>,
110 {
next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>>111     fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
112         match item {
113             // A new successful inner stream received
114             st @ Either::Left(_) => FlowStep::Continue(st),
115             // An error encountered
116             Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
117         }
118     }
119 }
120 
121 type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
122 
123 impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
124 where
125     St: TryStream,
126     St::Ok: TryStream + Unpin,
127     <St::Ok as TryStream>::Error: From<St::Error>,
128 {
129     // Item is either an inner stream or a stream containing a single error.
130     // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
131     type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
132 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>133     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134         let item = ready!(self.project().stream.try_poll_next(cx));
135 
136         let out = match item {
137             Some(res) => match res {
138                 // Emit successful inner stream as is
139                 Ok(stream) => Either::Left(stream.into_stream()),
140                 // Wrap an error into a stream containing a single item
141                 err @ Err(_) => {
142                     let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
143 
144                     Either::Right(Single::new(res))
145                 }
146             },
147             None => return Poll::Ready(None),
148         };
149 
150         Poll::Ready(Some(out))
151     }
152 }
153 
154 impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
155 where
156     St: TryStream + FusedStream,
157     St::Ok: TryStream + Unpin,
158     <St::Ok as TryStream>::Error: From<St::Error>,
159 {
is_terminated(&self) -> bool160     fn is_terminated(&self) -> bool {
161         self.stream.is_terminated()
162     }
163 }
164 
165 // Forwarding impl of Sink from the underlying stream
166 #[cfg(feature = "sink")]
167 impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
168 where
169     St: TryStream + Sink<Item>,
170     St::Ok: TryStream + Unpin,
171     <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
172 {
173     type Error = <St as Sink<Item>>::Error;
174 
175     delegate_sink!(stream, Item);
176 }
177