• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Pre-emptively retry requests which have been outstanding for longer
2 //! than a given latency percentile.
3 
4 #![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
5 
6 use crate::filter::AsyncFilter;
7 use futures_util::future;
8 use pin_project_lite::pin_project;
9 use std::sync::{Arc, Mutex};
10 use std::time::Duration;
11 use std::{
12     pin::Pin,
13     task::{Context, Poll},
14 };
15 use tracing::error;
16 
17 mod delay;
18 mod latency;
19 mod rotating_histogram;
20 mod select;
21 
22 use delay::Delay;
23 use latency::Latency;
24 use rotating_histogram::RotatingHistogram;
25 use select::Select;
26 
27 type Histo = Arc<Mutex<RotatingHistogram>>;
28 type Service<S, P> = select::Select<
29     SelectPolicy<P>,
30     Latency<Histo, S>,
31     Delay<DelayPolicy, AsyncFilter<Latency<Histo, S>, PolicyPredicate<P>>>,
32 >;
33 
34 /// A middleware that pre-emptively retries requests which have been outstanding
35 /// for longer than a given latency percentile.  If either of the original
36 /// future or the retry future completes, that value is used.
37 #[derive(Debug)]
38 pub struct Hedge<S, P>(Service<S, P>);
39 
40 pin_project! {
41     /// The [`Future`] returned by the [`Hedge`] service.
42     ///
43     /// [`Future`]: std::future::Future
44     #[derive(Debug)]
45     pub struct Future<S, Request>
46     where
47         S: tower_service::Service<Request>,
48     {
49         #[pin]
50         inner: S::Future,
51     }
52 }
53 
54 /// A policy which describes which requests can be cloned and then whether those
55 /// requests should be retried.
56 pub trait Policy<Request> {
57     /// Called when the request is first received to determine if the request is retryable.
clone_request(&self, req: &Request) -> Option<Request>58     fn clone_request(&self, req: &Request) -> Option<Request>;
59 
60     /// Called after the hedge timeout to determine if the hedge retry should be issued.
can_retry(&self, req: &Request) -> bool61     fn can_retry(&self, req: &Request) -> bool;
62 }
63 
64 // NOTE: these are pub only because they appear inside a Future<F>
65 
66 #[doc(hidden)]
67 #[derive(Clone, Debug)]
68 pub struct PolicyPredicate<P>(P);
69 
70 #[doc(hidden)]
71 #[derive(Debug)]
72 pub struct DelayPolicy {
73     histo: Histo,
74     latency_percentile: f32,
75 }
76 
77 #[doc(hidden)]
78 #[derive(Debug)]
79 pub struct SelectPolicy<P> {
80     policy: P,
81     histo: Histo,
82     min_data_points: u64,
83 }
84 
85 impl<S, P> Hedge<S, P> {
86     /// Create a new hedge middleware.
new<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<crate::BoxError>, P: Policy<Request> + Clone,87     pub fn new<Request>(
88         service: S,
89         policy: P,
90         min_data_points: u64,
91         latency_percentile: f32,
92         period: Duration,
93     ) -> Hedge<S, P>
94     where
95         S: tower_service::Service<Request> + Clone,
96         S::Error: Into<crate::BoxError>,
97         P: Policy<Request> + Clone,
98     {
99         let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
100         Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
101     }
102 
103     /// A hedge middleware with a prepopulated latency histogram.  This is usedful
104     /// for integration tests.
new_with_mock_latencies<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, latencies_ms: &[u64], ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<crate::BoxError>, P: Policy<Request> + Clone,105     pub fn new_with_mock_latencies<Request>(
106         service: S,
107         policy: P,
108         min_data_points: u64,
109         latency_percentile: f32,
110         period: Duration,
111         latencies_ms: &[u64],
112     ) -> Hedge<S, P>
113     where
114         S: tower_service::Service<Request> + Clone,
115         S::Error: Into<crate::BoxError>,
116         P: Policy<Request> + Clone,
117     {
118         let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
119         {
120             let mut locked = histo.lock().unwrap();
121             for latency in latencies_ms.iter() {
122                 locked.read().record(*latency).unwrap();
123             }
124         }
125         Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
126     }
127 
new_with_histo<Request>( service: S, policy: P, min_data_points: u64, latency_percentile: f32, histo: Histo, ) -> Hedge<S, P> where S: tower_service::Service<Request> + Clone, S::Error: Into<crate::BoxError>, P: Policy<Request> + Clone,128     fn new_with_histo<Request>(
129         service: S,
130         policy: P,
131         min_data_points: u64,
132         latency_percentile: f32,
133         histo: Histo,
134     ) -> Hedge<S, P>
135     where
136         S: tower_service::Service<Request> + Clone,
137         S::Error: Into<crate::BoxError>,
138         P: Policy<Request> + Clone,
139     {
140         // Clone the underlying service and wrap both copies in a middleware that
141         // records the latencies in a rotating histogram.
142         let recorded_a = Latency::new(histo.clone(), service.clone());
143         let recorded_b = Latency::new(histo.clone(), service);
144 
145         // Check policy to see if the hedge request should be issued.
146         let filtered = AsyncFilter::new(recorded_b, PolicyPredicate(policy.clone()));
147 
148         // Delay the second request by a percentile of the recorded request latency
149         // histogram.
150         let delay_policy = DelayPolicy {
151             histo: histo.clone(),
152             latency_percentile,
153         };
154         let delayed = Delay::new(delay_policy, filtered);
155 
156         // If the request is retryable, issue two requests -- the second one delayed
157         // by a latency percentile.  Use the first result to complete.
158         let select_policy = SelectPolicy {
159             policy,
160             histo,
161             min_data_points,
162         };
163         Hedge(Select::new(select_policy, recorded_a, delayed))
164     }
165 }
166 
167 impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
168 where
169     S: tower_service::Service<Request> + Clone,
170     S::Error: Into<crate::BoxError>,
171     P: Policy<Request> + Clone,
172 {
173     type Response = S::Response;
174     type Error = crate::BoxError;
175     type Future = Future<Service<S, P>, Request>;
176 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>177     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178         self.0.poll_ready(cx)
179     }
180 
call(&mut self, request: Request) -> Self::Future181     fn call(&mut self, request: Request) -> Self::Future {
182         Future {
183             inner: self.0.call(request),
184         }
185     }
186 }
187 
188 impl<S, Request> std::future::Future for Future<S, Request>
189 where
190     S: tower_service::Service<Request>,
191     S::Error: Into<crate::BoxError>,
192 {
193     type Output = Result<S::Response, crate::BoxError>;
194 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>195     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196         self.project().inner.poll(cx).map_err(Into::into)
197     }
198 }
199 
200 // TODO: Remove when Duration::as_millis() becomes stable.
201 const NANOS_PER_MILLI: u32 = 1_000_000;
202 const MILLIS_PER_SEC: u64 = 1_000;
millis(duration: Duration) -> u64203 fn millis(duration: Duration) -> u64 {
204     // Round up.
205     let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
206     duration
207         .as_secs()
208         .saturating_mul(MILLIS_PER_SEC)
209         .saturating_add(u64::from(millis))
210 }
211 
212 impl latency::Record for Histo {
record(&mut self, latency: Duration)213     fn record(&mut self, latency: Duration) {
214         let mut locked = self.lock().unwrap();
215         locked.write().record(millis(latency)).unwrap_or_else(|e| {
216             error!("Failed to write to hedge histogram: {:?}", e);
217         })
218     }
219 }
220 
221 impl<P, Request> crate::filter::AsyncPredicate<Request> for PolicyPredicate<P>
222 where
223     P: Policy<Request>,
224 {
225     type Future = future::Either<
226         future::Ready<Result<Request, crate::BoxError>>,
227         future::Pending<Result<Request, crate::BoxError>>,
228     >;
229     type Request = Request;
230 
check(&mut self, request: Request) -> Self::Future231     fn check(&mut self, request: Request) -> Self::Future {
232         if self.0.can_retry(&request) {
233             future::Either::Left(future::ready(Ok(request)))
234         } else {
235             // If the hedge retry should not be issued, we simply want to wait
236             // for the result of the original request.  Therefore we don't want
237             // to return an error here.  Instead, we use future::pending to ensure
238             // that the original request wins the select.
239             future::Either::Right(future::pending())
240         }
241     }
242 }
243 
244 impl<Request> delay::Policy<Request> for DelayPolicy {
delay(&self, _req: &Request) -> Duration245     fn delay(&self, _req: &Request) -> Duration {
246         let mut locked = self.histo.lock().unwrap();
247         let millis = locked
248             .read()
249             .value_at_quantile(self.latency_percentile.into());
250         Duration::from_millis(millis)
251     }
252 }
253 
254 impl<P, Request> select::Policy<Request> for SelectPolicy<P>
255 where
256     P: Policy<Request>,
257 {
clone_request(&self, req: &Request) -> Option<Request>258     fn clone_request(&self, req: &Request) -> Option<Request> {
259         self.policy.clone_request(req).filter(|_| {
260             let mut locked = self.histo.lock().unwrap();
261             // Do not attempt a retry if there are insufficiently many data
262             // points in the histogram.
263             locked.read().len() >= self.min_data_points
264         })
265     }
266 }
267