• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Error;
2 use pin_project::pin_project;
3 use std::fmt;
4 use std::{
5     future::Future,
6     pin::Pin,
7     task::{Context, Poll},
8 };
9 use tower::make::MakeService;
10 use tower_service::Service;
11 use tracing::trace;
12 
13 pub(crate) struct Reconnect<M, Target>
14 where
15     M: Service<Target>,
16     M::Error: Into<Error>,
17 {
18     mk_service: M,
19     state: State<M::Future, M::Response>,
20     target: Target,
21     error: Option<crate::Error>,
22     has_been_connected: bool,
23     is_lazy: bool,
24 }
25 
26 #[derive(Debug)]
27 enum State<F, S> {
28     Idle,
29     Connecting(F),
30     Connected(S),
31 }
32 
33 impl<M, Target> Reconnect<M, Target>
34 where
35     M: Service<Target>,
36     M::Error: Into<Error>,
37 {
new(mk_service: M, target: Target, is_lazy: bool) -> Self38     pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
39         Reconnect {
40             mk_service,
41             state: State::Idle,
42             target,
43             error: None,
44             has_been_connected: false,
45             is_lazy,
46         }
47     }
48 }
49 
50 impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
51 where
52     M: Service<Target, Response = S>,
53     S: Service<Request>,
54     M::Future: Unpin,
55     Error: From<M::Error> + From<S::Error>,
56     Target: Clone,
57     <M as tower_service::Service<Target>>::Error: Into<crate::Error>,
58 {
59     type Response = S::Response;
60     type Error = Error;
61     type Future = ResponseFuture<S::Future>;
62 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>63     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64         let mut state;
65 
66         if self.error.is_some() {
67             return Poll::Ready(Ok(()));
68         }
69 
70         loop {
71             match self.state {
72                 State::Idle => {
73                     trace!("poll_ready; idle");
74                     match self.mk_service.poll_ready(cx) {
75                         Poll::Ready(r) => r?,
76                         Poll::Pending => {
77                             trace!("poll_ready; MakeService not ready");
78                             return Poll::Pending;
79                         }
80                     }
81 
82                     let fut = self.mk_service.make_service(self.target.clone());
83                     self.state = State::Connecting(fut);
84                     continue;
85                 }
86                 State::Connecting(ref mut f) => {
87                     trace!("poll_ready; connecting");
88                     match Pin::new(f).poll(cx) {
89                         Poll::Ready(Ok(service)) => {
90                             state = State::Connected(service);
91                         }
92                         Poll::Pending => {
93                             trace!("poll_ready; not ready");
94                             return Poll::Pending;
95                         }
96                         Poll::Ready(Err(e)) => {
97                             trace!("poll_ready; error");
98 
99                             state = State::Idle;
100 
101                             if !(self.has_been_connected || self.is_lazy) {
102                                 return Poll::Ready(Err(e.into()));
103                             } else {
104                                 let error = e.into();
105                                 tracing::debug!("reconnect::poll_ready: {:?}", error);
106                                 self.error = Some(error);
107                                 break;
108                             }
109                         }
110                     }
111                 }
112                 State::Connected(ref mut inner) => {
113                     trace!("poll_ready; connected");
114 
115                     self.has_been_connected = true;
116 
117                     match inner.poll_ready(cx) {
118                         Poll::Ready(Ok(())) => {
119                             trace!("poll_ready; ready");
120                             return Poll::Ready(Ok(()));
121                         }
122                         Poll::Pending => {
123                             trace!("poll_ready; not ready");
124                             return Poll::Pending;
125                         }
126                         Poll::Ready(Err(_)) => {
127                             trace!("poll_ready; error");
128                             state = State::Idle;
129                         }
130                     }
131                 }
132             }
133 
134             self.state = state;
135         }
136 
137         self.state = state;
138         Poll::Ready(Ok(()))
139     }
140 
call(&mut self, request: Request) -> Self::Future141     fn call(&mut self, request: Request) -> Self::Future {
142         tracing::trace!("Reconnect::call");
143         if let Some(error) = self.error.take() {
144             tracing::debug!("error: {}", error);
145             return ResponseFuture::error(error);
146         }
147 
148         let service = match self.state {
149             State::Connected(ref mut service) => service,
150             _ => panic!("service not ready; poll_ready must be called first"),
151         };
152 
153         let fut = service.call(request);
154         ResponseFuture::new(fut)
155     }
156 }
157 
158 impl<M, Target> fmt::Debug for Reconnect<M, Target>
159 where
160     M: Service<Target> + fmt::Debug,
161     M::Future: fmt::Debug,
162     M::Response: fmt::Debug,
163     Target: fmt::Debug,
164     <M as tower_service::Service<Target>>::Error: Into<Error>,
165 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result166     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167         fmt.debug_struct("Reconnect")
168             .field("mk_service", &self.mk_service)
169             .field("state", &self.state)
170             .field("target", &self.target)
171             .finish()
172     }
173 }
174 
175 /// Future that resolves to the response or failure to connect.
176 #[pin_project]
177 #[derive(Debug)]
178 pub(crate) struct ResponseFuture<F> {
179     #[pin]
180     inner: Inner<F>,
181 }
182 
183 #[pin_project(project = InnerProj)]
184 #[derive(Debug)]
185 enum Inner<F> {
186     Future(#[pin] F),
187     Error(Option<crate::Error>),
188 }
189 
190 impl<F> ResponseFuture<F> {
new(inner: F) -> Self191     pub(crate) fn new(inner: F) -> Self {
192         ResponseFuture {
193             inner: Inner::Future(inner),
194         }
195     }
196 
error(error: crate::Error) -> Self197     pub(crate) fn error(error: crate::Error) -> Self {
198         ResponseFuture {
199             inner: Inner::Error(Some(error)),
200         }
201     }
202 }
203 
204 impl<F, T, E> Future for ResponseFuture<F>
205 where
206     F: Future<Output = Result<T, E>>,
207     E: Into<Error>,
208 {
209     type Output = Result<T, Error>;
210 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>211     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212         //self.project().inner.poll(cx).map_err(Into::into)
213         let me = self.project();
214         match me.inner.project() {
215             InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
216             InnerProj::Error(e) => {
217                 let e = e.take().expect("Polled after ready.");
218                 Poll::Ready(Err(e))
219             }
220         }
221     }
222 }
223