1 use futures_core::{ready, Stream}; 2 use pin_project_lite::pin_project; 3 use std::{ 4 future::Future, 5 pin::Pin, 6 task::{Context, Poll}, 7 }; 8 use tower_service::Service; 9 10 pin_project! { 11 /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. 12 #[derive(Debug)] 13 pub(crate) struct CallAll<Svc, S, Q> { 14 service: Option<Svc>, 15 #[pin] 16 stream: S, 17 queue: Q, 18 eof: bool, 19 } 20 } 21 22 pub(crate) trait Drive<F: Future> { is_empty(&self) -> bool23 fn is_empty(&self) -> bool; 24 push(&mut self, future: F)25 fn push(&mut self, future: F); 26 poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>27 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>; 28 } 29 30 impl<Svc, S, Q> CallAll<Svc, S, Q> 31 where 32 Svc: Service<S::Item>, 33 Svc::Error: Into<crate::BoxError>, 34 S: Stream, 35 Q: Drive<Svc::Future>, 36 { new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q>37 pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> { 38 CallAll { 39 service: Some(service), 40 stream, 41 queue, 42 eof: false, 43 } 44 } 45 46 /// Extract the wrapped [`Service`]. into_inner(mut self) -> Svc47 pub(crate) fn into_inner(mut self) -> Svc { 48 self.service.take().expect("Service already taken") 49 } 50 51 /// Extract the wrapped [`Service`]. take_service(self: Pin<&mut Self>) -> Svc52 pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc { 53 self.project() 54 .service 55 .take() 56 .expect("Service already taken") 57 } 58 unordered(mut self) -> super::CallAllUnordered<Svc, S>59 pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> { 60 assert!(self.queue.is_empty() && !self.eof); 61 62 super::CallAllUnordered::new(self.service.take().unwrap(), self.stream) 63 } 64 } 65 66 impl<Svc, S, Q> Stream for CallAll<Svc, S, Q> 67 where 68 Svc: Service<S::Item>, 69 Svc::Error: Into<crate::BoxError>, 70 S: Stream, 71 Q: Drive<Svc::Future>, 72 { 73 type Item = Result<Svc::Response, crate::BoxError>; 74 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 76 let mut this = self.project(); 77 78 loop { 79 // First, see if we have any responses to yield 80 if let Poll::Ready(r) = this.queue.poll(cx) { 81 if let Some(rsp) = r.transpose().map_err(Into::into)? { 82 return Poll::Ready(Some(Ok(rsp))); 83 } 84 } 85 86 // If there are no more requests coming, check if we're done 87 if *this.eof { 88 if this.queue.is_empty() { 89 return Poll::Ready(None); 90 } else { 91 return Poll::Pending; 92 } 93 } 94 95 // Then, see that the service is ready for another request 96 let svc = this 97 .service 98 .as_mut() 99 .expect("Using CallAll after extracing inner Service"); 100 ready!(svc.poll_ready(cx)).map_err(Into::into)?; 101 102 // If it is, gather the next request (if there is one), or return `Pending` if the 103 // stream is not ready. 104 // TODO: We probably want to "release" the slot we reserved in Svc if the 105 // stream returns `Pending`. It may be a while until we get around to actually 106 // using it. 107 match ready!(this.stream.as_mut().poll_next(cx)) { 108 Some(req) => { 109 this.queue.push(svc.call(req)); 110 } 111 None => { 112 // We're all done once any outstanding requests have completed 113 *this.eof = true; 114 } 115 } 116 } 117 } 118 } 119