1 use pin_project_lite::pin_project; 2 use std::{ 3 future::Future, 4 pin::Pin, 5 task::{Context, Poll}, 6 }; 7 use tower_service::Service; 8 9 /// A policy which decides which requests can be cloned and sent to the B 10 /// service. 11 pub trait Policy<Request> { clone_request(&self, req: &Request) -> Option<Request>12 fn clone_request(&self, req: &Request) -> Option<Request>; 13 } 14 15 /// Select is a middleware which attempts to clone the request and sends the 16 /// original request to the A service and, if the request was able to be cloned, 17 /// the cloned request to the B service. Both resulting futures will be polled 18 /// and whichever future completes first will be used as the result. 19 #[derive(Debug)] 20 pub struct Select<P, A, B> { 21 policy: P, 22 a: A, 23 b: B, 24 } 25 26 pin_project! { 27 #[derive(Debug)] 28 pub struct ResponseFuture<AF, BF> { 29 #[pin] 30 a_fut: AF, 31 #[pin] 32 b_fut: Option<BF>, 33 } 34 } 35 36 impl<P, A, B> Select<P, A, B> { new<Request>(policy: P, a: A, b: B) -> Self where P: Policy<Request>, A: Service<Request>, A::Error: Into<crate::BoxError>, B: Service<Request, Response = A::Response>, B::Error: Into<crate::BoxError>,37 pub fn new<Request>(policy: P, a: A, b: B) -> Self 38 where 39 P: Policy<Request>, 40 A: Service<Request>, 41 A::Error: Into<crate::BoxError>, 42 B: Service<Request, Response = A::Response>, 43 B::Error: Into<crate::BoxError>, 44 { 45 Select { policy, a, b } 46 } 47 } 48 49 impl<P, A, B, Request> Service<Request> for Select<P, A, B> 50 where 51 P: Policy<Request>, 52 A: Service<Request>, 53 A::Error: Into<crate::BoxError>, 54 B: Service<Request, Response = A::Response>, 55 B::Error: Into<crate::BoxError>, 56 { 57 type Response = A::Response; 58 type Error = crate::BoxError; 59 type Future = ResponseFuture<A::Future, B::Future>; 60 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>61 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 62 match (self.a.poll_ready(cx), self.b.poll_ready(cx)) { 63 (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), 64 (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())), 65 (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())), 66 _ => Poll::Pending, 67 } 68 } 69 call(&mut self, request: Request) -> Self::Future70 fn call(&mut self, request: Request) -> Self::Future { 71 let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) { 72 Some(self.b.call(cloned_req)) 73 } else { 74 None 75 }; 76 ResponseFuture { 77 a_fut: self.a.call(request), 78 b_fut, 79 } 80 } 81 } 82 83 impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF> 84 where 85 AF: Future<Output = Result<T, AE>>, 86 AE: Into<crate::BoxError>, 87 BF: Future<Output = Result<T, BE>>, 88 BE: Into<crate::BoxError>, 89 { 90 type Output = Result<T, crate::BoxError>; 91 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 93 let this = self.project(); 94 95 if let Poll::Ready(r) = this.a_fut.poll(cx) { 96 return Poll::Ready(Ok(r.map_err(Into::into)?)); 97 } 98 if let Some(b_fut) = this.b_fut.as_pin_mut() { 99 if let Poll::Ready(r) = b_fut.poll(cx) { 100 return Poll::Ready(Ok(r.map_err(Into::into)?)); 101 } 102 } 103 Poll::Pending 104 } 105 } 106