• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A demo showing how filtering on values and dynamic filter reloading can be
2 //! used together to help make sense of complex or noisy traces.
3 //!
4 //! This example runs a simple HTTP server that implements highly advanced,
5 //! cloud-native "character repetition as a service", on port 3000. The server's
6 //! `GET /${CHARACTER}` route will respond with a string containing that
7 //! character repeated up to the requested `Content-Length`. A load generator
8 //! runs in the background and constantly sends requests for various characters
9 //! to be repeated.
10 //!
11 //! As the load generator's logs indicate, the server will sometimes return
12 //! errors, including HTTP 500s! Because the logs at high load are so noisy,
13 //! tracking down the root cause of the errors can be difficult, if not
14 //! impossible. Since the character-repetition service is absolutely
15 //! mission-critical to our organization, we have to determine what is causing
16 //! these errors as soon as possible!
17 //!
18 //! Fortunately, an admin service running on port 3001 exposes a `PUT /filter`
19 //! route that can be used to change the trace filter for the format subscriber.
20 //! By dynamically changing the filter we can try to track down the cause of the
21 //! error.
22 //!
23 //! As a hint: all spans and events from the load generator have the "gen" target
24 #![deny(rust_2018_idioms)]
25 
26 use bytes::Bytes;
27 use futures::{
28     future::{self, Ready},
29     Future,
30 };
31 use http::{header, Method, Request, Response, StatusCode};
32 use hyper::{server::conn::AddrStream, Body, Client, Server};
33 use rand::Rng;
34 use std::{
35     error::Error,
36     fmt,
37     net::SocketAddr,
38     pin::Pin,
39     task::{Context, Poll},
40     time::Duration,
41 };
42 use tokio::{time, try_join};
43 use tower::{Service, ServiceBuilder, ServiceExt};
44 use tracing::{
45     self, debug, error, info, info_span, span, trace, warn, Instrument as _, Level, Span,
46 };
47 use tracing_subscriber::{filter::EnvFilter, reload::Handle};
48 use tracing_tower::{request_span, request_span::make};
49 
50 type Err = Box<dyn Error + Send + Sync + 'static>;
51 
52 #[tokio::main]
main() -> Result<(), Err>53 async fn main() -> Result<(), Err> {
54     let builder = tracing_subscriber::fmt()
55         .with_env_filter("info,tower_load=debug")
56         .with_filter_reloading();
57     let handle = builder.reload_handle();
58     builder.try_init()?;
59 
60     let addr = "[::1]:3000".parse::<SocketAddr>()?;
61     let admin_addr = "[::1]:3001".parse::<SocketAddr>()?;
62 
63     let admin = ServiceBuilder::new().service(AdminSvc { handle });
64 
65     let svc = ServiceBuilder::new()
66         .layer(make::layer::<_, Svc, _>(req_span))
67         .service(MakeSvc);
68 
69     let svc = Server::bind(&addr).serve(svc);
70     let admin = Server::bind(&admin_addr).serve(admin);
71 
72     let res = try_join!(
73         tokio::spawn(load_gen(addr)),
74         tokio::spawn(load_gen(addr)),
75         tokio::spawn(load_gen(addr)),
76         tokio::spawn(svc),
77         tokio::spawn(admin)
78     );
79 
80     match res {
81         Ok(_) => info!("load generator exited successfully"),
82         Err(e) => {
83             error!(error = ?e, "load generator failed");
84         }
85     }
86     Ok(())
87 }
88 
89 struct Svc;
90 impl Service<Request<Body>> for Svc {
91     type Response = Response<Body>;
92     type Error = Err;
93     type Future = Ready<Result<Self::Response, Self::Error>>;
94 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>95     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96         Poll::Ready(Ok(()))
97     }
98 
call(&mut self, req: Request<Body>) -> Self::Future99     fn call(&mut self, req: Request<Body>) -> Self::Future {
100         let rsp = Self::handle_request(req)
101             .map(|body| {
102                 trace!("sending response");
103                 rsp(StatusCode::OK, body)
104             })
105             .unwrap_or_else(|e| {
106                 trace!(rsp.error = %e);
107                 let status = match e {
108                     HandleError::BadPath => {
109                         warn!(rsp.status = ?StatusCode::NOT_FOUND);
110                         StatusCode::NOT_FOUND
111                     }
112                     HandleError::NoContentLength | HandleError::BadRequest(_) => {
113                         StatusCode::BAD_REQUEST
114                     }
115                     HandleError::Unknown => StatusCode::INTERNAL_SERVER_ERROR,
116                 };
117                 rsp(status, e.to_string())
118             });
119         future::ok(rsp)
120     }
121 }
122 
123 impl Svc {
handle_request(req: Request<Body>) -> Result<String, HandleError>124     fn handle_request(req: Request<Body>) -> Result<String, HandleError> {
125         const BAD_METHOD: WrongMethod = WrongMethod(&[Method::GET]);
126         trace!("handling request...");
127         match (req.method(), req.uri().path()) {
128             (&Method::GET, "/z") => {
129                 trace!(error = %"i don't like this letter.", letter = "z");
130                 Err(HandleError::Unknown)
131             }
132             (&Method::GET, path) => {
133                 let ch = path.get(1..2).ok_or(HandleError::BadPath)?;
134                 let content_length = req
135                     .headers()
136                     .get(header::CONTENT_LENGTH)
137                     .ok_or(HandleError::NoContentLength)?;
138                 trace!(req.content_length = ?content_length);
139                 let content_length = content_length
140                     .to_str()
141                     .map_err(HandleError::bad_request)?
142                     .parse::<usize>()
143                     .map_err(HandleError::bad_request)?;
144                 let mut body = String::new();
145                 let span = span!(
146                     Level::DEBUG,
147                     "build_rsp",
148                     rsp.len = content_length,
149                     rsp.character = ch
150                 );
151                 let _enter = span.enter();
152                 for idx in 0..content_length {
153                     body.push_str(ch);
154                     trace!(rsp.body = ?body, rsp.body.idx = idx);
155                 }
156                 Ok(body)
157             }
158             _ => Err(HandleError::bad_request(BAD_METHOD)),
159         }
160     }
161 }
162 
163 #[derive(Debug)]
164 enum HandleError {
165     BadPath,
166     NoContentLength,
167     BadRequest(Box<dyn Error + Send + 'static>),
168     Unknown,
169 }
170 
171 #[derive(Debug, Clone)]
172 struct WrongMethod(&'static [Method]);
173 
174 struct MakeSvc;
175 impl<T> Service<T> for MakeSvc {
176     type Response = Svc;
177     type Error = Err;
178     type Future = Ready<Result<Self::Response, Self::Error>>;
179 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>180     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181         Poll::Ready(Ok(()))
182     }
183 
call(&mut self, _: T) -> Self::Future184     fn call(&mut self, _: T) -> Self::Future {
185         future::ok(Svc)
186     }
187 }
188 
189 struct AdminSvc<S> {
190     handle: Handle<EnvFilter, S>,
191 }
192 
193 impl<S> Clone for AdminSvc<S> {
clone(&self) -> Self194     fn clone(&self) -> Self {
195         Self {
196             handle: self.handle.clone(),
197         }
198     }
199 }
200 
201 impl<'a, S> Service<&'a AddrStream> for AdminSvc<S>
202 where
203     S: tracing::Subscriber,
204 {
205     type Response = AdminSvc<S>;
206     type Error = hyper::Error;
207     type Future = Ready<Result<Self::Response, Self::Error>>;
208 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>209     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210         Poll::Ready(Ok(()))
211     }
212 
call(&mut self, _: &'a AddrStream) -> Self::Future213     fn call(&mut self, _: &'a AddrStream) -> Self::Future {
214         future::ok(self.clone())
215     }
216 }
217 
218 impl<S> Service<Request<Body>> for AdminSvc<S>
219 where
220     S: tracing::Subscriber + 'static,
221 {
222     type Response = Response<Body>;
223     type Error = Err;
224     type Future = Pin<Box<dyn Future<Output = Result<Response<Body>, Err>> + std::marker::Send>>;
225 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>226     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
227         Poll::Ready(Ok(()))
228     }
229 
call(&mut self, req: Request<Body>) -> Self::Future230     fn call(&mut self, req: Request<Body>) -> Self::Future {
231         // we need to clone so that the reference to self
232         // isn't outlived by the returned future.
233         let handle = self.clone();
234         let f = async move {
235             let rsp = match (req.method(), req.uri().path()) {
236                 (&Method::PUT, "/filter") => {
237                     trace!("setting filter");
238 
239                     let body = hyper::body::to_bytes(req).await?;
240                     match handle.set_from(body) {
241                         Err(error) => {
242                             error!(%error, "setting filter failed!");
243                             rsp(StatusCode::INTERNAL_SERVER_ERROR, error)
244                         }
245                         Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()),
246                     }
247                 }
248                 _ => rsp(StatusCode::NOT_FOUND, "try `/filter`"),
249             };
250             Ok(rsp)
251         };
252         Box::pin(f)
253     }
254 }
255 
256 impl<S> AdminSvc<S>
257 where
258     S: tracing::Subscriber + 'static,
259 {
set_from(&self, bytes: Bytes) -> Result<(), String>260     fn set_from(&self, bytes: Bytes) -> Result<(), String> {
261         use std::str;
262         let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
263         trace!(request.body = ?body);
264         let new_filter = body
265             .parse::<tracing_subscriber::filter::EnvFilter>()
266             .map_err(|e| format!("{}", e))?;
267         self.handle.reload(new_filter).map_err(|e| format!("{}", e))
268     }
269 }
270 
rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body>271 fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
272     Response::builder()
273         .status(status)
274         .body(body.into())
275         .expect("builder with known status code must not fail")
276 }
277 
278 impl HandleError {
bad_request(e: impl std::error::Error + Send + 'static) -> Self279     fn bad_request(e: impl std::error::Error + Send + 'static) -> Self {
280         HandleError::BadRequest(Box::new(e))
281     }
282 }
283 
284 impl fmt::Display for HandleError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result285     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286         match self {
287             HandleError::BadPath => f.pad("path must be a single ASCII character"),
288             HandleError::NoContentLength => f.pad("request must have Content-Length header"),
289             HandleError::BadRequest(ref e) => write!(f, "bad request: {}", e),
290             HandleError::Unknown => f.pad("unknown internal error"),
291         }
292     }
293 }
294 
295 impl std::error::Error for HandleError {}
296 
297 impl fmt::Display for WrongMethod {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result298     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299         write!(f, "unsupported method: please use one of {:?}", self.0)
300     }
301 }
302 
303 impl std::error::Error for WrongMethod {}
304 
gen_uri(authority: &str) -> (usize, String)305 fn gen_uri(authority: &str) -> (usize, String) {
306     static ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
307     let mut rng = rand::thread_rng();
308     let idx = rng.gen_range(0, ALPHABET.len() + 1);
309     let len = rng.gen_range(0, 26);
310     let letter = ALPHABET.get(idx..=idx).unwrap_or("");
311     (len, format!("http://{}/{}", authority, letter))
312 }
313 
314 #[tracing::instrument(target = "gen", "load_gen")]
load_gen(addr: SocketAddr) -> Result<(), Err>315 async fn load_gen(addr: SocketAddr) -> Result<(), Err> {
316     let svc = ServiceBuilder::new()
317         .buffer(5)
318         .layer(request_span::layer(req_span))
319         .timeout(Duration::from_millis(200))
320         .service(Client::new());
321     let mut interval = tokio::time::interval(Duration::from_millis(50));
322 
323     loop {
324         interval.tick().await;
325         let authority = format!("{}", addr);
326         let mut svc = svc.clone().ready_oneshot().await?;
327 
328         let f = async move {
329             let sleep = rand::thread_rng().gen_range(0, 25);
330             time::sleep(Duration::from_millis(sleep)).await;
331 
332             let (len, uri) = gen_uri(&authority);
333             let req = Request::get(&uri[..])
334                 .header("Content-Length", len)
335                 .body(Body::empty())
336                 .unwrap();
337 
338             let span = tracing::debug_span!(
339                 target: "gen",
340                 "request",
341                 req.method = ?req.method(),
342                 req.path = ?req.uri().path(),
343             );
344             async move {
345                 info!(target: "gen", "sending request");
346                 let rsp = match svc.call(req).await {
347                     Err(e) => {
348                         error!(target: "gen", error = %e, "request error!");
349                         return Err(e);
350                     }
351                     Ok(rsp) => rsp,
352                 };
353 
354                 let status = rsp.status();
355                 if status != StatusCode::OK {
356                     error!(target: "gen", status = ?status, "error received from server!");
357                 }
358 
359                 let body = match hyper::body::to_bytes(rsp).await {
360                     Err(e) => {
361                         error!(target: "gen", error = ?e, "body error!");
362                         return Err(e.into());
363                     }
364                     Ok(body) => body,
365                 };
366                 let body = String::from_utf8(body.to_vec())?;
367                 info!(target: "gen", message = "response complete.", rsp.body = %body);
368                 Ok(())
369             }
370             .instrument(span)
371             .await
372         }
373         .instrument(info_span!(target: "gen", "generated_request", remote.addr=%addr).or_current());
374         tokio::spawn(f);
375     }
376 }
377 
req_span<A>(req: &Request<A>) -> Span378 fn req_span<A>(req: &Request<A>) -> Span {
379     let span = tracing::span!(
380         target: "gen",
381         Level::INFO,
382         "request",
383         req.method = ?req.method(),
384         req.path = ?req.uri().path(),
385     );
386     debug!(
387         parent: &span,
388         message = "received request.",
389         req.headers = ?req.headers(),
390         req.version = ?req.version(),
391     );
392     span
393 }
394