• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use pin_project_lite::pin_project;
2 use std::{
3     future::Future,
4     pin::Pin,
5     task::{Context, Poll},
6 };
7 use tower_service::Service;
8 
9 /// A policy which decides which requests can be cloned and sent to the B
10 /// service.
11 pub trait Policy<Request> {
clone_request(&self, req: &Request) -> Option<Request>12     fn clone_request(&self, req: &Request) -> Option<Request>;
13 }
14 
15 /// Select is a middleware which attempts to clone the request and sends the
16 /// original request to the A service and, if the request was able to be cloned,
17 /// the cloned request to the B service.  Both resulting futures will be polled
18 /// and whichever future completes first will be used as the result.
19 #[derive(Debug)]
20 pub struct Select<P, A, B> {
21     policy: P,
22     a: A,
23     b: B,
24 }
25 
26 pin_project! {
27     #[derive(Debug)]
28     pub struct ResponseFuture<AF, BF> {
29         #[pin]
30         a_fut: AF,
31         #[pin]
32         b_fut: Option<BF>,
33     }
34 }
35 
36 impl<P, A, B> Select<P, A, B> {
new<Request>(policy: P, a: A, b: B) -> Self where P: Policy<Request>, A: Service<Request>, A::Error: Into<crate::BoxError>, B: Service<Request, Response = A::Response>, B::Error: Into<crate::BoxError>,37     pub fn new<Request>(policy: P, a: A, b: B) -> Self
38     where
39         P: Policy<Request>,
40         A: Service<Request>,
41         A::Error: Into<crate::BoxError>,
42         B: Service<Request, Response = A::Response>,
43         B::Error: Into<crate::BoxError>,
44     {
45         Select { policy, a, b }
46     }
47 }
48 
49 impl<P, A, B, Request> Service<Request> for Select<P, A, B>
50 where
51     P: Policy<Request>,
52     A: Service<Request>,
53     A::Error: Into<crate::BoxError>,
54     B: Service<Request, Response = A::Response>,
55     B::Error: Into<crate::BoxError>,
56 {
57     type Response = A::Response;
58     type Error = crate::BoxError;
59     type Future = ResponseFuture<A::Future, B::Future>;
60 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>61     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62         match (self.a.poll_ready(cx), self.b.poll_ready(cx)) {
63             (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
64             (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())),
65             (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())),
66             _ => Poll::Pending,
67         }
68     }
69 
call(&mut self, request: Request) -> Self::Future70     fn call(&mut self, request: Request) -> Self::Future {
71         let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) {
72             Some(self.b.call(cloned_req))
73         } else {
74             None
75         };
76         ResponseFuture {
77             a_fut: self.a.call(request),
78             b_fut,
79         }
80     }
81 }
82 
83 impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF>
84 where
85     AF: Future<Output = Result<T, AE>>,
86     AE: Into<crate::BoxError>,
87     BF: Future<Output = Result<T, BE>>,
88     BE: Into<crate::BoxError>,
89 {
90     type Output = Result<T, crate::BoxError>;
91 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>92     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93         let this = self.project();
94 
95         if let Poll::Ready(r) = this.a_fut.poll(cx) {
96             return Poll::Ready(Ok(r.map_err(Into::into)?));
97         }
98         if let Some(b_fut) = this.b_fut.as_pin_mut() {
99             if let Poll::Ready(r) = b_fut.poll(cx) {
100                 return Poll::Ready(Ok(r.map_err(Into::into)?));
101             }
102         }
103         Poll::Pending
104     }
105 }
106