//! [`Stream`][stream] + [`Service`] => [`Stream`][stream]. //! //! [`Service`]: crate::Service //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html use super::common; use futures_core::Stream; use futures_util::stream::FuturesUnordered; use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tower_service::Service; pin_project! { /// A stream of responses received from the inner service in received order. /// /// Similar to [`CallAll`] except, instead of yielding responses in request order, /// responses are returned as they are available. /// /// [`CallAll`]: crate::util::CallAll #[derive(Debug)] pub struct CallAllUnordered where Svc: Service, S: Stream, { #[pin] inner: common::CallAll>, } } impl CallAllUnordered where Svc: Service, Svc::Error: Into, S: Stream, { /// Create new [`CallAllUnordered`] combinator. /// /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html pub fn new(service: Svc, stream: S) -> CallAllUnordered { CallAllUnordered { inner: common::CallAll::new(service, stream, FuturesUnordered::new()), } } /// Extract the wrapped [`Service`]. /// /// # Panics /// /// Panics if [`take_service`] was already called. /// /// [`take_service`]: crate::util::CallAllUnordered::take_service pub fn into_inner(self) -> Svc { self.inner.into_inner() } /// Extract the wrapped `Service`. /// /// This [`CallAllUnordered`] can no longer be used after this function has been called. /// /// # Panics /// /// Panics if [`take_service`] was already called. /// /// [`take_service`]: crate::util::CallAllUnordered::take_service pub fn take_service(self: Pin<&mut Self>) -> Svc { self.project().inner.take_service() } } impl Stream for CallAllUnordered where Svc: Service, Svc::Error: Into, S: Stream, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_next(cx) } } impl common::Drive for FuturesUnordered { fn is_empty(&self) -> bool { FuturesUnordered::is_empty(self) } fn push(&mut self, future: F) { FuturesUnordered::push(self, future) } fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { Stream::poll_next(Pin::new(self), cx) } }