1 use core::marker::PhantomData; 2 use core::pin::Pin; 3 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 10 use pin_project_lite::pin_project; 11 12 use crate::future::Either; 13 use crate::stream::stream::flatten_unordered::{ 14 FlattenUnorderedWithFlowController, FlowController, FlowStep, 15 }; 16 use crate::stream::IntoStream; 17 use crate::TryStreamExt; 18 19 delegate_all!( 20 /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. 21 TryFlattenUnordered<St>( 22 FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> 23 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] 24 + New[ 25 |stream: St, limit: impl Into<Option<usize>>| 26 FlattenUnorderedWithFlowController::new( 27 NestedTryStreamIntoEitherTryStream::new(stream), 28 limit.into() 29 ) 30 ] 31 where 32 St: TryStream, 33 St::Ok: TryStream, 34 St::Ok: Unpin, 35 <St::Ok as TryStream>::Error: From<St::Error> 36 ); 37 38 pin_project! { 39 /// Emits either successful streams or single-item streams containing the underlying errors. 40 /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. 41 #[derive(Debug)] 42 #[must_use = "streams do nothing unless polled"] 43 pub struct NestedTryStreamIntoEitherTryStream<St> 44 where 45 St: TryStream, 46 St::Ok: TryStream, 47 St::Ok: Unpin, 48 <St::Ok as TryStream>::Error: From<St::Error> 49 { 50 #[pin] 51 stream: St 52 } 53 } 54 55 impl<St> NestedTryStreamIntoEitherTryStream<St> 56 where 57 St: TryStream, 58 St::Ok: TryStream + Unpin, 59 <St::Ok as TryStream>::Error: From<St::Error>, 60 { new(stream: St) -> Self61 fn new(stream: St) -> Self { 62 Self { stream } 63 } 64 65 delegate_access_inner!(stream, St, ()); 66 } 67 68 /// Emits a single item immediately, then stream will be terminated. 69 #[derive(Debug, Clone)] 70 pub struct Single<T>(Option<T>); 71 72 impl<T> Single<T> { 73 /// Constructs new `Single` with the given value. new(val: T) -> Self74 fn new(val: T) -> Self { 75 Self(Some(val)) 76 } 77 78 /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. next_immediate(&mut self) -> Option<T>79 fn next_immediate(&mut self) -> Option<T> { 80 self.0.take() 81 } 82 } 83 84 impl<T> Unpin for Single<T> {} 85 86 impl<T> Stream for Single<T> { 87 type Item = T; 88 poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>>89 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { 90 Poll::Ready(self.0.take()) 91 } 92 size_hint(&self) -> (usize, Option<usize>)93 fn size_hint(&self) -> (usize, Option<usize>) { 94 self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) 95 } 96 } 97 98 /// Immediately propagates errors occurred in the base stream. 99 #[derive(Debug, Clone, Copy)] 100 pub struct PropagateBaseStreamError<St>(PhantomData<St>); 101 102 type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; 103 type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; 104 105 impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> 106 where 107 St: TryStream, 108 St::Ok: TryStream + Unpin, 109 <St::Ok as TryStream>::Error: From<St::Error>, 110 { next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>>111 fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { 112 match item { 113 // A new successful inner stream received 114 st @ Either::Left(_) => FlowStep::Continue(st), 115 // An error encountered 116 Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), 117 } 118 } 119 } 120 121 type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; 122 123 impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> 124 where 125 St: TryStream, 126 St::Ok: TryStream + Unpin, 127 <St::Ok as TryStream>::Error: From<St::Error>, 128 { 129 // Item is either an inner stream or a stream containing a single error. 130 // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. 131 type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; 132 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>133 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 134 let item = ready!(self.project().stream.try_poll_next(cx)); 135 136 let out = match item { 137 Some(res) => match res { 138 // Emit successful inner stream as is 139 Ok(stream) => Either::Left(stream.into_stream()), 140 // Wrap an error into a stream containing a single item 141 err @ Err(_) => { 142 let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); 143 144 Either::Right(Single::new(res)) 145 } 146 }, 147 None => return Poll::Ready(None), 148 }; 149 150 Poll::Ready(Some(out)) 151 } 152 } 153 154 impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> 155 where 156 St: TryStream + FusedStream, 157 St::Ok: TryStream + Unpin, 158 <St::Ok as TryStream>::Error: From<St::Error>, 159 { is_terminated(&self) -> bool160 fn is_terminated(&self) -> bool { 161 self.stream.is_terminated() 162 } 163 } 164 165 // Forwarding impl of Sink from the underlying stream 166 #[cfg(feature = "sink")] 167 impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> 168 where 169 St: TryStream + Sink<Item>, 170 St::Ok: TryStream + Unpin, 171 <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, 172 { 173 type Error = <St as Sink<Item>>::Error; 174 175 delegate_sink!(stream, Item); 176 } 177