1 //! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. 2 //! 3 //! [`Service<Request>`]: crate::Service 4 //! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html 5 6 use super::common; 7 use futures_core::Stream; 8 use futures_util::stream::FuturesOrdered; 9 use pin_project_lite::pin_project; 10 use std::{ 11 future::Future, 12 pin::Pin, 13 task::{Context, Poll}, 14 }; 15 use tower_service::Service; 16 17 pin_project! { 18 /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each 19 /// request received on the wrapped [`Stream`]. 20 /// 21 /// ```rust 22 /// # use std::task::{Poll, Context}; 23 /// # use std::cell::Cell; 24 /// # use std::error::Error; 25 /// # use std::rc::Rc; 26 /// # 27 /// use futures::future::{ready, Ready}; 28 /// use futures::StreamExt; 29 /// use futures::channel::mpsc; 30 /// use tower_service::Service; 31 /// use tower::util::ServiceExt; 32 /// 33 /// // First, we need to have a Service to process our requests. 34 /// #[derive(Debug, Eq, PartialEq)] 35 /// struct FirstLetter; 36 /// impl Service<&'static str> for FirstLetter { 37 /// type Response = &'static str; 38 /// type Error = Box<dyn Error + Send + Sync>; 39 /// type Future = Ready<Result<Self::Response, Self::Error>>; 40 /// 41 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 42 /// Poll::Ready(Ok(())) 43 /// } 44 /// 45 /// fn call(&mut self, req: &'static str) -> Self::Future { 46 /// ready(Ok(&req[..1])) 47 /// } 48 /// } 49 /// 50 /// #[tokio::main] 51 /// async fn main() { 52 /// // Next, we need a Stream of requests. 53 // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams, 54 // tokio::sync::mpsc again? 55 /// let (mut reqs, rx) = mpsc::unbounded(); 56 /// // Note that we have to help Rust out here by telling it what error type to use. 57 /// // Specifically, it has to be From<Service::Error> + From<Stream::Error>. 58 /// let mut rsps = FirstLetter.call_all(rx); 59 /// 60 /// // Now, let's send a few requests and then check that we get the corresponding responses. 61 /// reqs.unbounded_send("one").unwrap(); 62 /// reqs.unbounded_send("two").unwrap(); 63 /// reqs.unbounded_send("three").unwrap(); 64 /// drop(reqs); 65 /// 66 /// // We then loop over the response Strem that we get back from call_all. 67 /// let mut i = 0usize; 68 /// while let Some(rsp) = rsps.next().await { 69 /// // Each response is a Result (we could also have used TryStream::try_next) 70 /// match (i + 1, rsp.unwrap()) { 71 /// (1, "o") | 72 /// (2, "t") | 73 /// (3, "t") => {} 74 /// (n, i) => { 75 /// unreachable!("{}. response was '{}'", n, i); 76 /// } 77 /// } 78 /// i += 1; 79 /// } 80 /// 81 /// // And at the end, we can get the Service back when there are no more requests. 82 /// assert_eq!(rsps.into_inner(), FirstLetter); 83 /// } 84 /// ``` 85 /// 86 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html 87 #[derive(Debug)] 88 pub struct CallAll<Svc, S> 89 where 90 Svc: Service<S::Item>, 91 S: Stream, 92 { 93 #[pin] 94 inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>, 95 } 96 } 97 98 impl<Svc, S> CallAll<Svc, S> 99 where 100 Svc: Service<S::Item>, 101 Svc::Error: Into<crate::BoxError>, 102 S: Stream, 103 { 104 /// Create new [`CallAll`] combinator. 105 /// 106 /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are 107 /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`]. 108 /// 109 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html new(service: Svc, stream: S) -> CallAll<Svc, S>110 pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> { 111 CallAll { 112 inner: common::CallAll::new(service, stream, FuturesOrdered::new()), 113 } 114 } 115 116 /// Extract the wrapped [`Service`]. 117 /// 118 /// # Panics 119 /// 120 /// Panics if [`take_service`] was already called. 121 /// 122 /// [`take_service`]: crate::util::CallAll::take_service into_inner(self) -> Svc123 pub fn into_inner(self) -> Svc { 124 self.inner.into_inner() 125 } 126 127 /// Extract the wrapped [`Service`]. 128 /// 129 /// This [`CallAll`] can no longer be used after this function has been called. 130 /// 131 /// # Panics 132 /// 133 /// Panics if [`take_service`] was already called. 134 /// 135 /// [`take_service`]: crate::util::CallAll::take_service take_service(self: Pin<&mut Self>) -> Svc136 pub fn take_service(self: Pin<&mut Self>) -> Svc { 137 self.project().inner.take_service() 138 } 139 140 /// Return responses as they are ready, regardless of the initial order. 141 /// 142 /// This function must be called before the stream is polled. 143 /// 144 /// # Panics 145 /// 146 /// Panics if [`poll`] was called. 147 /// 148 /// [`poll`]: std::future::Future::poll unordered(self) -> super::CallAllUnordered<Svc, S>149 pub fn unordered(self) -> super::CallAllUnordered<Svc, S> { 150 self.inner.unordered() 151 } 152 } 153 154 impl<Svc, S> Stream for CallAll<Svc, S> 155 where 156 Svc: Service<S::Item>, 157 Svc::Error: Into<crate::BoxError>, 158 S: Stream, 159 { 160 type Item = Result<Svc::Response, crate::BoxError>; 161 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>162 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 163 self.project().inner.poll_next(cx) 164 } 165 } 166 167 impl<F: Future> common::Drive<F> for FuturesOrdered<F> { is_empty(&self) -> bool168 fn is_empty(&self) -> bool { 169 FuturesOrdered::is_empty(self) 170 } 171 push(&mut self, future: F)172 fn push(&mut self, future: F) { 173 FuturesOrdered::push(self, future) 174 } 175 poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>176 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { 177 Stream::poll_next(Pin::new(self), cx) 178 } 179 } 180