1 use crate::Error; 2 use pin_project::pin_project; 3 use std::fmt; 4 use std::{ 5 future::Future, 6 pin::Pin, 7 task::{Context, Poll}, 8 }; 9 use tower::make::MakeService; 10 use tower_service::Service; 11 use tracing::trace; 12 13 pub(crate) struct Reconnect<M, Target> 14 where 15 M: Service<Target>, 16 M::Error: Into<Error>, 17 { 18 mk_service: M, 19 state: State<M::Future, M::Response>, 20 target: Target, 21 error: Option<crate::Error>, 22 has_been_connected: bool, 23 is_lazy: bool, 24 } 25 26 #[derive(Debug)] 27 enum State<F, S> { 28 Idle, 29 Connecting(F), 30 Connected(S), 31 } 32 33 impl<M, Target> Reconnect<M, Target> 34 where 35 M: Service<Target>, 36 M::Error: Into<Error>, 37 { new(mk_service: M, target: Target, is_lazy: bool) -> Self38 pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self { 39 Reconnect { 40 mk_service, 41 state: State::Idle, 42 target, 43 error: None, 44 has_been_connected: false, 45 is_lazy, 46 } 47 } 48 } 49 50 impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target> 51 where 52 M: Service<Target, Response = S>, 53 S: Service<Request>, 54 M::Future: Unpin, 55 Error: From<M::Error> + From<S::Error>, 56 Target: Clone, 57 <M as tower_service::Service<Target>>::Error: Into<crate::Error>, 58 { 59 type Response = S::Response; 60 type Error = Error; 61 type Future = ResponseFuture<S::Future>; 62 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>63 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 64 let mut state; 65 66 if self.error.is_some() { 67 return Poll::Ready(Ok(())); 68 } 69 70 loop { 71 match self.state { 72 State::Idle => { 73 trace!("poll_ready; idle"); 74 match self.mk_service.poll_ready(cx) { 75 Poll::Ready(r) => r?, 76 Poll::Pending => { 77 trace!("poll_ready; MakeService not ready"); 78 return Poll::Pending; 79 } 80 } 81 82 let fut = self.mk_service.make_service(self.target.clone()); 83 self.state = State::Connecting(fut); 84 continue; 85 } 86 State::Connecting(ref mut f) => { 87 trace!("poll_ready; connecting"); 88 match Pin::new(f).poll(cx) { 89 Poll::Ready(Ok(service)) => { 90 state = State::Connected(service); 91 } 92 Poll::Pending => { 93 trace!("poll_ready; not ready"); 94 return Poll::Pending; 95 } 96 Poll::Ready(Err(e)) => { 97 trace!("poll_ready; error"); 98 99 state = State::Idle; 100 101 if !(self.has_been_connected || self.is_lazy) { 102 return Poll::Ready(Err(e.into())); 103 } else { 104 let error = e.into(); 105 tracing::debug!("reconnect::poll_ready: {:?}", error); 106 self.error = Some(error); 107 break; 108 } 109 } 110 } 111 } 112 State::Connected(ref mut inner) => { 113 trace!("poll_ready; connected"); 114 115 self.has_been_connected = true; 116 117 match inner.poll_ready(cx) { 118 Poll::Ready(Ok(())) => { 119 trace!("poll_ready; ready"); 120 return Poll::Ready(Ok(())); 121 } 122 Poll::Pending => { 123 trace!("poll_ready; not ready"); 124 return Poll::Pending; 125 } 126 Poll::Ready(Err(_)) => { 127 trace!("poll_ready; error"); 128 state = State::Idle; 129 } 130 } 131 } 132 } 133 134 self.state = state; 135 } 136 137 self.state = state; 138 Poll::Ready(Ok(())) 139 } 140 call(&mut self, request: Request) -> Self::Future141 fn call(&mut self, request: Request) -> Self::Future { 142 tracing::trace!("Reconnect::call"); 143 if let Some(error) = self.error.take() { 144 tracing::debug!("error: {}", error); 145 return ResponseFuture::error(error); 146 } 147 148 let service = match self.state { 149 State::Connected(ref mut service) => service, 150 _ => panic!("service not ready; poll_ready must be called first"), 151 }; 152 153 let fut = service.call(request); 154 ResponseFuture::new(fut) 155 } 156 } 157 158 impl<M, Target> fmt::Debug for Reconnect<M, Target> 159 where 160 M: Service<Target> + fmt::Debug, 161 M::Future: fmt::Debug, 162 M::Response: fmt::Debug, 163 Target: fmt::Debug, 164 <M as tower_service::Service<Target>>::Error: Into<Error>, 165 { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result166 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 167 fmt.debug_struct("Reconnect") 168 .field("mk_service", &self.mk_service) 169 .field("state", &self.state) 170 .field("target", &self.target) 171 .finish() 172 } 173 } 174 175 /// Future that resolves to the response or failure to connect. 176 #[pin_project] 177 #[derive(Debug)] 178 pub(crate) struct ResponseFuture<F> { 179 #[pin] 180 inner: Inner<F>, 181 } 182 183 #[pin_project(project = InnerProj)] 184 #[derive(Debug)] 185 enum Inner<F> { 186 Future(#[pin] F), 187 Error(Option<crate::Error>), 188 } 189 190 impl<F> ResponseFuture<F> { new(inner: F) -> Self191 pub(crate) fn new(inner: F) -> Self { 192 ResponseFuture { 193 inner: Inner::Future(inner), 194 } 195 } 196 error(error: crate::Error) -> Self197 pub(crate) fn error(error: crate::Error) -> Self { 198 ResponseFuture { 199 inner: Inner::Error(Some(error)), 200 } 201 } 202 } 203 204 impl<F, T, E> Future for ResponseFuture<F> 205 where 206 F: Future<Output = Result<T, E>>, 207 E: Into<Error>, 208 { 209 type Output = Result<T, Error>; 210 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>211 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 212 //self.project().inner.poll(cx).map_err(Into::into) 213 let me = self.project(); 214 match me.inner.project() { 215 InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into), 216 InnerProj::Error(e) => { 217 let e = e.take().expect("Polled after ready."); 218 Poll::Ready(Err(e)) 219 } 220 } 221 } 222 } 223