1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. 13 #[must_use = "streams do nothing unless polled"] 14 pub struct OrElse<St, Fut, F> { 15 #[pin] 16 stream: St, 17 #[pin] 18 future: Option<Fut>, 19 f: F, 20 } 21 } 22 23 impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F> 24 where 25 St: fmt::Debug, 26 Fut: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("OrElse") 30 .field("stream", &self.stream) 31 .field("future", &self.future) 32 .finish() 33 } 34 } 35 36 impl<St, Fut, F> OrElse<St, Fut, F> 37 where 38 St: TryStream, 39 F: FnMut(St::Error) -> Fut, 40 Fut: TryFuture<Ok = St::Ok>, 41 { new(stream: St, f: F) -> Self42 pub(super) fn new(stream: St, f: F) -> Self { 43 Self { stream, future: None, f } 44 } 45 46 delegate_access_inner!(stream, St, ()); 47 } 48 49 impl<St, Fut, F> Stream for OrElse<St, Fut, F> 50 where 51 St: TryStream, 52 F: FnMut(St::Error) -> Fut, 53 Fut: TryFuture<Ok = St::Ok>, 54 { 55 type Item = Result<St::Ok, Fut::Error>; 56 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 58 let mut this = self.project(); 59 60 Poll::Ready(loop { 61 if let Some(fut) = this.future.as_mut().as_pin_mut() { 62 let item = ready!(fut.try_poll(cx)); 63 this.future.set(None); 64 break Some(item); 65 } else { 66 match ready!(this.stream.as_mut().try_poll_next(cx)) { 67 Some(Ok(item)) => break Some(Ok(item)), 68 Some(Err(e)) => { 69 this.future.set(Some((this.f)(e))); 70 } 71 None => break None, 72 } 73 } 74 }) 75 } 76 size_hint(&self) -> (usize, Option<usize>)77 fn size_hint(&self) -> (usize, Option<usize>) { 78 let future_len = usize::from(self.future.is_some()); 79 let (lower, upper) = self.stream.size_hint(); 80 let lower = lower.saturating_add(future_len); 81 let upper = match upper { 82 Some(x) => x.checked_add(future_len), 83 None => None, 84 }; 85 (lower, upper) 86 } 87 } 88 89 impl<St, Fut, F> FusedStream for OrElse<St, Fut, F> 90 where 91 St: TryStream + FusedStream, 92 F: FnMut(St::Error) -> Fut, 93 Fut: TryFuture<Ok = St::Ok>, 94 { is_terminated(&self) -> bool95 fn is_terminated(&self) -> bool { 96 self.future.is_none() && self.stream.is_terminated() 97 } 98 } 99 100 // Forwarding impl of Sink from the underlying stream 101 #[cfg(feature = "sink")] 102 impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F> 103 where 104 S: Sink<Item>, 105 { 106 type Error = S::Error; 107 108 delegate_sink!(stream, Item); 109 } 110