use futures_core::{ready, Stream}; use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tower_service::Service; pin_project! { /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. #[derive(Debug)] pub(crate) struct CallAll { service: Option, #[pin] stream: S, queue: Q, eof: bool, } } pub(crate) trait Drive { fn is_empty(&self) -> bool; fn push(&mut self, future: F); fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; } impl CallAll where Svc: Service, Svc::Error: Into, S: Stream, Q: Drive, { pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll { CallAll { service: Some(service), stream, queue, eof: false, } } /// Extract the wrapped [`Service`]. pub(crate) fn into_inner(mut self) -> Svc { self.service.take().expect("Service already taken") } /// Extract the wrapped [`Service`]. pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc { self.project() .service .take() .expect("Service already taken") } pub(crate) fn unordered(mut self) -> super::CallAllUnordered { assert!(self.queue.is_empty() && !self.eof); super::CallAllUnordered::new(self.service.take().unwrap(), self.stream) } } impl Stream for CallAll where Svc: Service, Svc::Error: Into, S: Stream, Q: Drive, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { // First, see if we have any responses to yield if let Poll::Ready(r) = this.queue.poll(cx) { if let Some(rsp) = r.transpose().map_err(Into::into)? { return Poll::Ready(Some(Ok(rsp))); } } // If there are no more requests coming, check if we're done if *this.eof { if this.queue.is_empty() { return Poll::Ready(None); } else { return Poll::Pending; } } // Then, see that the service is ready for another request let svc = this .service .as_mut() .expect("Using CallAll after extracing inner Service"); ready!(svc.poll_ready(cx)).map_err(Into::into)?; // If it is, gather the next request (if there is one), or return `Pending` if the // stream is not ready. // TODO: We probably want to "release" the slot we reserved in Svc if the // stream returns `Pending`. It may be a while until we get around to actually // using it. match ready!(this.stream.as_mut().poll_next(cx)) { Some(req) => { this.queue.push(svc.call(req)); } None => { // We're all done once any outstanding requests have completed *this.eof = true; } } } } }