• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::assert_stream;
2 use core::fmt;
3 use core::pin::Pin;
4 use futures_core::future::TryFuture;
5 use futures_core::ready;
6 use futures_core::stream::Stream;
7 use futures_core::task::{Context, Poll};
8 use pin_project_lite::pin_project;
9 
10 /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
11 ///
12 /// This function is the dual for the `TryStream::try_fold()` adapter: while
13 /// `TryStream::try_fold()` reduces a `TryStream` to one single value,
14 /// `try_unfold()` creates a `TryStream` from a seed value.
15 ///
16 /// `try_unfold()` will call the provided closure with the provided seed, then
17 /// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
18 /// yield the value `a`, and use `b` as the next internal state.
19 ///
20 /// If the closure returns `None` instead of `Some(TryFuture)`, then the
21 /// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
22 /// future calls to `poll()`.
23 ///
24 /// In case of error generated by the returned `TryFuture`, the error will be
25 /// returned by the `TryStream`. The `TryStream` will then yield
26 /// `Poll::Ready(None)` in future calls to `poll()`.
27 ///
28 /// This function can typically be used when wanting to go from the "world of
29 /// futures" to the "world of streams": the provided closure can build a
30 /// `TryFuture` using other library functions working on futures, and
31 /// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
32 ///
33 /// # Example
34 ///
35 /// ```
36 /// # #[derive(Debug, PartialEq)]
37 /// # struct SomeError;
38 /// # futures::executor::block_on(async {
39 /// use futures::stream::{self, TryStreamExt};
40 ///
41 /// let stream = stream::try_unfold(0, |state| async move {
42 ///     if state < 0 {
43 ///         return Err(SomeError);
44 ///     }
45 ///
46 ///     if state <= 2 {
47 ///         let next_state = state + 1;
48 ///         let yielded = state * 2;
49 ///         Ok(Some((yielded, next_state)))
50 ///     } else {
51 ///         Ok(None)
52 ///     }
53 /// });
54 ///
55 /// let result: Result<Vec<i32>, _> = stream.try_collect().await;
56 /// assert_eq!(result, Ok(vec![0, 2, 4]));
57 /// # });
58 /// ```
try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, Fut: TryFuture<Ok = Option<(Item, T)>>,59 pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
60 where
61     F: FnMut(T) -> Fut,
62     Fut: TryFuture<Ok = Option<(Item, T)>>,
63 {
64     assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None })
65 }
66 
67 pin_project! {
68     /// Stream for the [`try_unfold`] function.
69     #[must_use = "streams do nothing unless polled"]
70     pub struct TryUnfold<T, F, Fut> {
71         f: F,
72         state: Option<T>,
73         #[pin]
74         fut: Option<Fut>,
75     }
76 }
77 
78 impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
79 where
80     T: fmt::Debug,
81     Fut: fmt::Debug,
82 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result83     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84         f.debug_struct("TryUnfold").field("state", &self.state).field("fut", &self.fut).finish()
85     }
86 }
87 
88 impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
89 where
90     F: FnMut(T) -> Fut,
91     Fut: TryFuture<Ok = Option<(Item, T)>>,
92 {
93     type Item = Result<Item, Fut::Error>;
94 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>95     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96         let mut this = self.project();
97 
98         if let Some(state) = this.state.take() {
99             this.fut.set(Some((this.f)(state)));
100         }
101 
102         match this.fut.as_mut().as_pin_mut() {
103             None => {
104                 // The future previously errored
105                 Poll::Ready(None)
106             }
107             Some(future) => {
108                 let step = ready!(future.try_poll(cx));
109                 this.fut.set(None);
110 
111                 match step {
112                     Ok(Some((item, next_state))) => {
113                         *this.state = Some(next_state);
114                         Poll::Ready(Some(Ok(item)))
115                     }
116                     Ok(None) => Poll::Ready(None),
117                     Err(e) => Poll::Ready(Some(Err(e))),
118                 }
119             }
120         }
121     }
122 }
123