• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures_core::{ready, Stream};
2 use pin_project_lite::pin_project;
3 use std::{
4     future::Future,
5     pin::Pin,
6     task::{Context, Poll},
7 };
8 use tower_service::Service;
9 
10 pin_project! {
11     /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
12     #[derive(Debug)]
13     pub(crate) struct CallAll<Svc, S, Q> {
14         service: Option<Svc>,
15         #[pin]
16         stream: S,
17         queue: Q,
18         eof: bool,
19     }
20 }
21 
22 pub(crate) trait Drive<F: Future> {
is_empty(&self) -> bool23     fn is_empty(&self) -> bool;
24 
push(&mut self, future: F)25     fn push(&mut self, future: F);
26 
poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>27     fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
28 }
29 
30 impl<Svc, S, Q> CallAll<Svc, S, Q>
31 where
32     Svc: Service<S::Item>,
33     Svc::Error: Into<crate::BoxError>,
34     S: Stream,
35     Q: Drive<Svc::Future>,
36 {
new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q>37     pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
38         CallAll {
39             service: Some(service),
40             stream,
41             queue,
42             eof: false,
43         }
44     }
45 
46     /// Extract the wrapped [`Service`].
into_inner(mut self) -> Svc47     pub(crate) fn into_inner(mut self) -> Svc {
48         self.service.take().expect("Service already taken")
49     }
50 
51     /// Extract the wrapped [`Service`].
take_service(self: Pin<&mut Self>) -> Svc52     pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
53         self.project()
54             .service
55             .take()
56             .expect("Service already taken")
57     }
58 
unordered(mut self) -> super::CallAllUnordered<Svc, S>59     pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
60         assert!(self.queue.is_empty() && !self.eof);
61 
62         super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
63     }
64 }
65 
66 impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
67 where
68     Svc: Service<S::Item>,
69     Svc::Error: Into<crate::BoxError>,
70     S: Stream,
71     Q: Drive<Svc::Future>,
72 {
73     type Item = Result<Svc::Response, crate::BoxError>;
74 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>75     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76         let mut this = self.project();
77 
78         loop {
79             // First, see if we have any responses to yield
80             if let Poll::Ready(r) = this.queue.poll(cx) {
81                 if let Some(rsp) = r.transpose().map_err(Into::into)? {
82                     return Poll::Ready(Some(Ok(rsp)));
83                 }
84             }
85 
86             // If there are no more requests coming, check if we're done
87             if *this.eof {
88                 if this.queue.is_empty() {
89                     return Poll::Ready(None);
90                 } else {
91                     return Poll::Pending;
92                 }
93             }
94 
95             // Then, see that the service is ready for another request
96             let svc = this
97                 .service
98                 .as_mut()
99                 .expect("Using CallAll after extracing inner Service");
100             ready!(svc.poll_ready(cx)).map_err(Into::into)?;
101 
102             // If it is, gather the next request (if there is one), or return `Pending` if the
103             // stream is not ready.
104             // TODO: We probably want to "release" the slot we reserved in Svc if the
105             // stream returns `Pending`. It may be a while until we get around to actually
106             // using it.
107             match ready!(this.stream.as_mut().poll_next(cx)) {
108                 Some(req) => {
109                     this.queue.push(svc.call(req));
110                 }
111                 None => {
112                     // We're all done once any outstanding requests have completed
113                     *this.eof = true;
114                 }
115             }
116         }
117     }
118 }
119