1 //! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. 2 //! 3 //! [`Service<Request>`]: crate::Service 4 //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html 5 6 use super::common; 7 use futures_core::Stream; 8 use futures_util::stream::FuturesUnordered; 9 use pin_project_lite::pin_project; 10 use std::{ 11 future::Future, 12 pin::Pin, 13 task::{Context, Poll}, 14 }; 15 use tower_service::Service; 16 17 pin_project! { 18 /// A stream of responses received from the inner service in received order. 19 /// 20 /// Similar to [`CallAll`] except, instead of yielding responses in request order, 21 /// responses are returned as they are available. 22 /// 23 /// [`CallAll`]: crate::util::CallAll 24 #[derive(Debug)] 25 pub struct CallAllUnordered<Svc, S> 26 where 27 Svc: Service<S::Item>, 28 S: Stream, 29 { 30 #[pin] 31 inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>, 32 } 33 } 34 35 impl<Svc, S> CallAllUnordered<Svc, S> 36 where 37 Svc: Service<S::Item>, 38 Svc::Error: Into<crate::BoxError>, 39 S: Stream, 40 { 41 /// Create new [`CallAllUnordered`] combinator. 42 /// 43 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html new(service: Svc, stream: S) -> CallAllUnordered<Svc, S>44 pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> { 45 CallAllUnordered { 46 inner: common::CallAll::new(service, stream, FuturesUnordered::new()), 47 } 48 } 49 50 /// Extract the wrapped [`Service`]. 51 /// 52 /// # Panics 53 /// 54 /// Panics if [`take_service`] was already called. 55 /// 56 /// [`take_service`]: crate::util::CallAllUnordered::take_service into_inner(self) -> Svc57 pub fn into_inner(self) -> Svc { 58 self.inner.into_inner() 59 } 60 61 /// Extract the wrapped `Service`. 62 /// 63 /// This [`CallAllUnordered`] can no longer be used after this function has been called. 64 /// 65 /// # Panics 66 /// 67 /// Panics if [`take_service`] was already called. 68 /// 69 /// [`take_service`]: crate::util::CallAllUnordered::take_service take_service(self: Pin<&mut Self>) -> Svc70 pub fn take_service(self: Pin<&mut Self>) -> Svc { 71 self.project().inner.take_service() 72 } 73 } 74 75 impl<Svc, S> Stream for CallAllUnordered<Svc, S> 76 where 77 Svc: Service<S::Item>, 78 Svc::Error: Into<crate::BoxError>, 79 S: Stream, 80 { 81 type Item = Result<Svc::Response, crate::BoxError>; 82 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>83 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 84 self.project().inner.poll_next(cx) 85 } 86 } 87 88 impl<F: Future> common::Drive<F> for FuturesUnordered<F> { is_empty(&self) -> bool89 fn is_empty(&self) -> bool { 90 FuturesUnordered::is_empty(self) 91 } 92 push(&mut self, future: F)93 fn push(&mut self, future: F) { 94 FuturesUnordered::push(self, future) 95 } 96 poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>97 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { 98 Stream::poll_next(Pin::new(self), cx) 99 } 100 } 101