//! [`Stream`][stream] + [`Service`] => [`Stream`][stream]. //! //! [`Service`]: crate::Service //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html use super::common; use futures_core::Stream; use futures_util::stream::FuturesOrdered; use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tower_service::Service; pin_project! { /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each /// request received on the wrapped [`Stream`]. /// /// ```rust /// # use std::task::{Poll, Context}; /// # use std::cell::Cell; /// # use std::error::Error; /// # use std::rc::Rc; /// # /// use futures::future::{ready, Ready}; /// use futures::StreamExt; /// use futures::channel::mpsc; /// use tower_service::Service; /// use tower::util::ServiceExt; /// /// // First, we need to have a Service to process our requests. /// #[derive(Debug, Eq, PartialEq)] /// struct FirstLetter; /// impl Service<&'static str> for FirstLetter { /// type Response = &'static str; /// type Error = Box; /// type Future = Ready>; /// /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// /// fn call(&mut self, req: &'static str) -> Self::Future { /// ready(Ok(&req[..1])) /// } /// } /// /// #[tokio::main] /// async fn main() { /// // Next, we need a Stream of requests. // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams, // tokio::sync::mpsc again? /// let (mut reqs, rx) = mpsc::unbounded(); /// // Note that we have to help Rust out here by telling it what error type to use. /// // Specifically, it has to be From + From. /// let mut rsps = FirstLetter.call_all(rx); /// /// // Now, let's send a few requests and then check that we get the corresponding responses. /// reqs.unbounded_send("one").unwrap(); /// reqs.unbounded_send("two").unwrap(); /// reqs.unbounded_send("three").unwrap(); /// drop(reqs); /// /// // We then loop over the response Strem that we get back from call_all. /// let mut i = 0usize; /// while let Some(rsp) = rsps.next().await { /// // Each response is a Result (we could also have used TryStream::try_next) /// match (i + 1, rsp.unwrap()) { /// (1, "o") | /// (2, "t") | /// (3, "t") => {} /// (n, i) => { /// unreachable!("{}. response was '{}'", n, i); /// } /// } /// i += 1; /// } /// /// // And at the end, we can get the Service back when there are no more requests. /// assert_eq!(rsps.into_inner(), FirstLetter); /// } /// ``` /// /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html #[derive(Debug)] pub struct CallAll where Svc: Service, S: Stream, { #[pin] inner: common::CallAll>, } } impl CallAll where Svc: Service, Svc::Error: Into, S: Stream, { /// Create new [`CallAll`] combinator. /// /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`]. /// /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html pub fn new(service: Svc, stream: S) -> CallAll { CallAll { inner: common::CallAll::new(service, stream, FuturesOrdered::new()), } } /// Extract the wrapped [`Service`]. /// /// # Panics /// /// Panics if [`take_service`] was already called. /// /// [`take_service`]: crate::util::CallAll::take_service pub fn into_inner(self) -> Svc { self.inner.into_inner() } /// Extract the wrapped [`Service`]. /// /// This [`CallAll`] can no longer be used after this function has been called. /// /// # Panics /// /// Panics if [`take_service`] was already called. /// /// [`take_service`]: crate::util::CallAll::take_service pub fn take_service(self: Pin<&mut Self>) -> Svc { self.project().inner.take_service() } /// Return responses as they are ready, regardless of the initial order. /// /// This function must be called before the stream is polled. /// /// # Panics /// /// Panics if [`poll`] was called. /// /// [`poll`]: std::future::Future::poll pub fn unordered(self) -> super::CallAllUnordered { self.inner.unordered() } } impl Stream for CallAll where Svc: Service, Svc::Error: Into, S: Stream, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_next(cx) } } impl common::Drive for FuturesOrdered { fn is_empty(&self) -> bool { FuturesOrdered::is_empty(self) } fn push(&mut self, future: F) { FuturesOrdered::push(self, future) } fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { Stream::poll_next(Pin::new(self), cx) } }