• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
2 //!
3 //! [`Service<Request>`]: crate::Service
4 //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
5 
6 use super::common;
7 use futures_core::Stream;
8 use futures_util::stream::FuturesUnordered;
9 use pin_project_lite::pin_project;
10 use std::{
11     future::Future,
12     pin::Pin,
13     task::{Context, Poll},
14 };
15 use tower_service::Service;
16 
17 pin_project! {
18     /// A stream of responses received from the inner service in received order.
19     ///
20     /// Similar to [`CallAll`] except, instead of yielding responses in request order,
21     /// responses are returned as they are available.
22     ///
23     /// [`CallAll`]: crate::util::CallAll
24     #[derive(Debug)]
25     pub struct CallAllUnordered<Svc, S>
26     where
27         Svc: Service<S::Item>,
28         S: Stream,
29     {
30         #[pin]
31         inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
32     }
33 }
34 
35 impl<Svc, S> CallAllUnordered<Svc, S>
36 where
37     Svc: Service<S::Item>,
38     Svc::Error: Into<crate::BoxError>,
39     S: Stream,
40 {
41     /// Create new [`CallAllUnordered`] combinator.
42     ///
43     /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
new(service: Svc, stream: S) -> CallAllUnordered<Svc, S>44     pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
45         CallAllUnordered {
46             inner: common::CallAll::new(service, stream, FuturesUnordered::new()),
47         }
48     }
49 
50     /// Extract the wrapped [`Service`].
51     ///
52     /// # Panics
53     ///
54     /// Panics if [`take_service`] was already called.
55     ///
56     /// [`take_service`]: crate::util::CallAllUnordered::take_service
into_inner(self) -> Svc57     pub fn into_inner(self) -> Svc {
58         self.inner.into_inner()
59     }
60 
61     /// Extract the wrapped `Service`.
62     ///
63     /// This [`CallAllUnordered`] can no longer be used after this function has been called.
64     ///
65     /// # Panics
66     ///
67     /// Panics if [`take_service`] was already called.
68     ///
69     /// [`take_service`]: crate::util::CallAllUnordered::take_service
take_service(self: Pin<&mut Self>) -> Svc70     pub fn take_service(self: Pin<&mut Self>) -> Svc {
71         self.project().inner.take_service()
72     }
73 }
74 
75 impl<Svc, S> Stream for CallAllUnordered<Svc, S>
76 where
77     Svc: Service<S::Item>,
78     Svc::Error: Into<crate::BoxError>,
79     S: Stream,
80 {
81     type Item = Result<Svc::Response, crate::BoxError>;
82 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>83     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84         self.project().inner.poll_next(cx)
85     }
86 }
87 
88 impl<F: Future> common::Drive<F> for FuturesUnordered<F> {
is_empty(&self) -> bool89     fn is_empty(&self) -> bool {
90         FuturesUnordered::is_empty(self)
91     }
92 
push(&mut self, future: F)93     fn push(&mut self, future: F) {
94         FuturesUnordered::push(self, future)
95     }
96 
poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>97     fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
98         Stream::poll_next(Pin::new(self), cx)
99     }
100 }
101