• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::TryFuture;
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream, TryStream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
13     #[must_use = "streams do nothing unless polled"]
14     pub struct OrElse<St, Fut, F> {
15         #[pin]
16         stream: St,
17         #[pin]
18         future: Option<Fut>,
19         f: F,
20     }
21 }
22 
23 impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
24 where
25     St: fmt::Debug,
26     Fut: fmt::Debug,
27 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         f.debug_struct("OrElse")
30             .field("stream", &self.stream)
31             .field("future", &self.future)
32             .finish()
33     }
34 }
35 
36 impl<St, Fut, F> OrElse<St, Fut, F>
37 where
38     St: TryStream,
39     F: FnMut(St::Error) -> Fut,
40     Fut: TryFuture<Ok = St::Ok>,
41 {
new(stream: St, f: F) -> Self42     pub(super) fn new(stream: St, f: F) -> Self {
43         Self { stream, future: None, f }
44     }
45 
46     delegate_access_inner!(stream, St, ());
47 }
48 
49 impl<St, Fut, F> Stream for OrElse<St, Fut, F>
50 where
51     St: TryStream,
52     F: FnMut(St::Error) -> Fut,
53     Fut: TryFuture<Ok = St::Ok>,
54 {
55     type Item = Result<St::Ok, Fut::Error>;
56 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>57     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58         let mut this = self.project();
59 
60         Poll::Ready(loop {
61             if let Some(fut) = this.future.as_mut().as_pin_mut() {
62                 let item = ready!(fut.try_poll(cx));
63                 this.future.set(None);
64                 break Some(item);
65             } else {
66                 match ready!(this.stream.as_mut().try_poll_next(cx)) {
67                     Some(Ok(item)) => break Some(Ok(item)),
68                     Some(Err(e)) => {
69                         this.future.set(Some((this.f)(e)));
70                     }
71                     None => break None,
72                 }
73             }
74         })
75     }
76 
size_hint(&self) -> (usize, Option<usize>)77     fn size_hint(&self) -> (usize, Option<usize>) {
78         let future_len = usize::from(self.future.is_some());
79         let (lower, upper) = self.stream.size_hint();
80         let lower = lower.saturating_add(future_len);
81         let upper = match upper {
82             Some(x) => x.checked_add(future_len),
83             None => None,
84         };
85         (lower, upper)
86     }
87 }
88 
89 impl<St, Fut, F> FusedStream for OrElse<St, Fut, F>
90 where
91     St: TryStream + FusedStream,
92     F: FnMut(St::Error) -> Fut,
93     Fut: TryFuture<Ok = St::Ok>,
94 {
is_terminated(&self) -> bool95     fn is_terminated(&self) -> bool {
96         self.future.is_none() && self.stream.is_terminated()
97     }
98 }
99 
100 // Forwarding impl of Sink from the underlying stream
101 #[cfg(feature = "sink")]
102 impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>
103 where
104     S: Sink<Item>,
105 {
106     type Error = S::Error;
107 
108     delegate_sink!(stream, Item);
109 }
110