• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
2 //!
3 //! [`Service<Request>`]: crate::Service
4 //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
5 
6 use super::common;
7 use futures_core::Stream;
8 use futures_util::stream::FuturesOrdered;
9 use pin_project_lite::pin_project;
10 use std::{
11     future::Future,
12     pin::Pin,
13     task::{Context, Poll},
14 };
15 use tower_service::Service;
16 
17 pin_project! {
18     /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
19     /// request received on the wrapped [`Stream`].
20     ///
21     /// ```rust
22     /// # use std::task::{Poll, Context};
23     /// # use std::cell::Cell;
24     /// # use std::error::Error;
25     /// # use std::rc::Rc;
26     /// #
27     /// use futures::future::{ready, Ready};
28     /// use futures::StreamExt;
29     /// use futures::channel::mpsc;
30     /// use tower_service::Service;
31     /// use tower::util::ServiceExt;
32     ///
33     /// // First, we need to have a Service to process our requests.
34     /// #[derive(Debug, Eq, PartialEq)]
35     /// struct FirstLetter;
36     /// impl Service<&'static str> for FirstLetter {
37     ///      type Response = &'static str;
38     ///      type Error = Box<dyn Error + Send + Sync>;
39     ///      type Future = Ready<Result<Self::Response, Self::Error>>;
40     ///
41     ///      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42     ///          Poll::Ready(Ok(()))
43     ///      }
44     ///
45     ///      fn call(&mut self, req: &'static str) -> Self::Future {
46     ///          ready(Ok(&req[..1]))
47     ///      }
48     /// }
49     ///
50     /// #[tokio::main]
51     /// async fn main() {
52     ///     // Next, we need a Stream of requests.
53     // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
54     //              tokio::sync::mpsc again?
55     ///     let (mut reqs, rx) = mpsc::unbounded();
56     ///     // Note that we have to help Rust out here by telling it what error type to use.
57     ///     // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
58     ///     let mut rsps = FirstLetter.call_all(rx);
59     ///
60     ///     // Now, let's send a few requests and then check that we get the corresponding responses.
61     ///     reqs.unbounded_send("one").unwrap();
62     ///     reqs.unbounded_send("two").unwrap();
63     ///     reqs.unbounded_send("three").unwrap();
64     ///     drop(reqs);
65     ///
66     ///     // We then loop over the response Strem that we get back from call_all.
67     ///     let mut i = 0usize;
68     ///     while let Some(rsp) = rsps.next().await {
69     ///         // Each response is a Result (we could also have used TryStream::try_next)
70     ///         match (i + 1, rsp.unwrap()) {
71     ///             (1, "o") |
72     ///             (2, "t") |
73     ///             (3, "t") => {}
74     ///             (n, i) => {
75     ///                 unreachable!("{}. response was '{}'", n, i);
76     ///             }
77     ///         }
78     ///         i += 1;
79     ///     }
80     ///
81     ///     // And at the end, we can get the Service back when there are no more requests.
82     ///     assert_eq!(rsps.into_inner(), FirstLetter);
83     /// }
84     /// ```
85     ///
86     /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
87     #[derive(Debug)]
88     pub struct CallAll<Svc, S>
89     where
90         Svc: Service<S::Item>,
91         S: Stream,
92     {
93         #[pin]
94         inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
95     }
96 }
97 
98 impl<Svc, S> CallAll<Svc, S>
99 where
100     Svc: Service<S::Item>,
101     Svc::Error: Into<crate::BoxError>,
102     S: Stream,
103 {
104     /// Create new [`CallAll`] combinator.
105     ///
106     /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are
107     /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`].
108     ///
109     /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
new(service: Svc, stream: S) -> CallAll<Svc, S>110     pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
111         CallAll {
112             inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
113         }
114     }
115 
116     /// Extract the wrapped [`Service`].
117     ///
118     /// # Panics
119     ///
120     /// Panics if [`take_service`] was already called.
121     ///
122     /// [`take_service`]: crate::util::CallAll::take_service
into_inner(self) -> Svc123     pub fn into_inner(self) -> Svc {
124         self.inner.into_inner()
125     }
126 
127     /// Extract the wrapped [`Service`].
128     ///
129     /// This [`CallAll`] can no longer be used after this function has been called.
130     ///
131     /// # Panics
132     ///
133     /// Panics if [`take_service`] was already called.
134     ///
135     /// [`take_service`]: crate::util::CallAll::take_service
take_service(self: Pin<&mut Self>) -> Svc136     pub fn take_service(self: Pin<&mut Self>) -> Svc {
137         self.project().inner.take_service()
138     }
139 
140     /// Return responses as they are ready, regardless of the initial order.
141     ///
142     /// This function must be called before the stream is polled.
143     ///
144     /// # Panics
145     ///
146     /// Panics if [`poll`] was called.
147     ///
148     /// [`poll`]: std::future::Future::poll
unordered(self) -> super::CallAllUnordered<Svc, S>149     pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
150         self.inner.unordered()
151     }
152 }
153 
154 impl<Svc, S> Stream for CallAll<Svc, S>
155 where
156     Svc: Service<S::Item>,
157     Svc::Error: Into<crate::BoxError>,
158     S: Stream,
159 {
160     type Item = Result<Svc::Response, crate::BoxError>;
161 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>162     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163         self.project().inner.poll_next(cx)
164     }
165 }
166 
167 impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
is_empty(&self) -> bool168     fn is_empty(&self) -> bool {
169         FuturesOrdered::is_empty(self)
170     }
171 
push(&mut self, future: F)172     fn push(&mut self, future: F) {
173         FuturesOrdered::push(self, future)
174     }
175 
poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>176     fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
177         Stream::poll_next(Pin::new(self), cx)
178     }
179 }
180