1 //! A demo showing how filtering on values and dynamic filter reloading can be
2 //! used together to help make sense of complex or noisy traces.
3 //!
4 //! This example runs a simple HTTP server that implements highly advanced,
5 //! cloud-native "character repetition as a service", on port 3000. The server's
6 //! `GET /${CHARACTER}` route will respond with a string containing that
7 //! character repeated up to the requested `Content-Length`. A load generator
8 //! runs in the background and constantly sends requests for various characters
9 //! to be repeated.
10 //!
11 //! As the load generator's logs indicate, the server will sometimes return
12 //! errors, including HTTP 500s! Because the logs at high load are so noisy,
13 //! tracking down the root cause of the errors can be difficult, if not
14 //! impossible. Since the character-repetition service is absolutely
15 //! mission-critical to our organization, we have to determine what is causing
16 //! these errors as soon as possible!
17 //!
18 //! Fortunately, an admin service running on port 3001 exposes a `PUT /filter`
19 //! route that can be used to change the trace filter for the format subscriber.
20 //! By dynamically changing the filter we can try to track down the cause of the
21 //! error.
22 //!
23 //! As a hint: all spans and events from the load generator have the "gen" target
24 #![deny(rust_2018_idioms)]
25
26 use bytes::Bytes;
27 use futures::{
28 future::{self, Ready},
29 Future,
30 };
31 use http::{header, Method, Request, Response, StatusCode};
32 use hyper::{server::conn::AddrStream, Body, Client, Server};
33 use rand::Rng;
34 use std::{
35 error::Error,
36 fmt,
37 net::SocketAddr,
38 pin::Pin,
39 task::{Context, Poll},
40 time::Duration,
41 };
42 use tokio::{time, try_join};
43 use tower::{Service, ServiceBuilder, ServiceExt};
44 use tracing::{
45 self, debug, error, info, info_span, span, trace, warn, Instrument as _, Level, Span,
46 };
47 use tracing_subscriber::{filter::EnvFilter, reload::Handle};
48 use tracing_tower::{request_span, request_span::make};
49
50 type Err = Box<dyn Error + Send + Sync + 'static>;
51
52 #[tokio::main]
main() -> Result<(), Err>53 async fn main() -> Result<(), Err> {
54 let builder = tracing_subscriber::fmt()
55 .with_env_filter("info,tower_load=debug")
56 .with_filter_reloading();
57 let handle = builder.reload_handle();
58 builder.try_init()?;
59
60 let addr = "[::1]:3000".parse::<SocketAddr>()?;
61 let admin_addr = "[::1]:3001".parse::<SocketAddr>()?;
62
63 let admin = ServiceBuilder::new().service(AdminSvc { handle });
64
65 let svc = ServiceBuilder::new()
66 .layer(make::layer::<_, Svc, _>(req_span))
67 .service(MakeSvc);
68
69 let svc = Server::bind(&addr).serve(svc);
70 let admin = Server::bind(&admin_addr).serve(admin);
71
72 let res = try_join!(
73 tokio::spawn(load_gen(addr)),
74 tokio::spawn(load_gen(addr)),
75 tokio::spawn(load_gen(addr)),
76 tokio::spawn(svc),
77 tokio::spawn(admin)
78 );
79
80 match res {
81 Ok(_) => info!("load generator exited successfully"),
82 Err(e) => {
83 error!(error = ?e, "load generator failed");
84 }
85 }
86 Ok(())
87 }
88
89 struct Svc;
90 impl Service<Request<Body>> for Svc {
91 type Response = Response<Body>;
92 type Error = Err;
93 type Future = Ready<Result<Self::Response, Self::Error>>;
94
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>95 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96 Poll::Ready(Ok(()))
97 }
98
call(&mut self, req: Request<Body>) -> Self::Future99 fn call(&mut self, req: Request<Body>) -> Self::Future {
100 let rsp = Self::handle_request(req)
101 .map(|body| {
102 trace!("sending response");
103 rsp(StatusCode::OK, body)
104 })
105 .unwrap_or_else(|e| {
106 trace!(rsp.error = %e);
107 let status = match e {
108 HandleError::BadPath => {
109 warn!(rsp.status = ?StatusCode::NOT_FOUND);
110 StatusCode::NOT_FOUND
111 }
112 HandleError::NoContentLength | HandleError::BadRequest(_) => {
113 StatusCode::BAD_REQUEST
114 }
115 HandleError::Unknown => StatusCode::INTERNAL_SERVER_ERROR,
116 };
117 rsp(status, e.to_string())
118 });
119 future::ok(rsp)
120 }
121 }
122
123 impl Svc {
handle_request(req: Request<Body>) -> Result<String, HandleError>124 fn handle_request(req: Request<Body>) -> Result<String, HandleError> {
125 const BAD_METHOD: WrongMethod = WrongMethod(&[Method::GET]);
126 trace!("handling request...");
127 match (req.method(), req.uri().path()) {
128 (&Method::GET, "/z") => {
129 trace!(error = %"i don't like this letter.", letter = "z");
130 Err(HandleError::Unknown)
131 }
132 (&Method::GET, path) => {
133 let ch = path.get(1..2).ok_or(HandleError::BadPath)?;
134 let content_length = req
135 .headers()
136 .get(header::CONTENT_LENGTH)
137 .ok_or(HandleError::NoContentLength)?;
138 trace!(req.content_length = ?content_length);
139 let content_length = content_length
140 .to_str()
141 .map_err(HandleError::bad_request)?
142 .parse::<usize>()
143 .map_err(HandleError::bad_request)?;
144 let mut body = String::new();
145 let span = span!(
146 Level::DEBUG,
147 "build_rsp",
148 rsp.len = content_length,
149 rsp.character = ch
150 );
151 let _enter = span.enter();
152 for idx in 0..content_length {
153 body.push_str(ch);
154 trace!(rsp.body = ?body, rsp.body.idx = idx);
155 }
156 Ok(body)
157 }
158 _ => Err(HandleError::bad_request(BAD_METHOD)),
159 }
160 }
161 }
162
163 #[derive(Debug)]
164 enum HandleError {
165 BadPath,
166 NoContentLength,
167 BadRequest(Box<dyn Error + Send + 'static>),
168 Unknown,
169 }
170
171 #[derive(Debug, Clone)]
172 struct WrongMethod(&'static [Method]);
173
174 struct MakeSvc;
175 impl<T> Service<T> for MakeSvc {
176 type Response = Svc;
177 type Error = Err;
178 type Future = Ready<Result<Self::Response, Self::Error>>;
179
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>180 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 Poll::Ready(Ok(()))
182 }
183
call(&mut self, _: T) -> Self::Future184 fn call(&mut self, _: T) -> Self::Future {
185 future::ok(Svc)
186 }
187 }
188
189 struct AdminSvc<S> {
190 handle: Handle<EnvFilter, S>,
191 }
192
193 impl<S> Clone for AdminSvc<S> {
clone(&self) -> Self194 fn clone(&self) -> Self {
195 Self {
196 handle: self.handle.clone(),
197 }
198 }
199 }
200
201 impl<'a, S> Service<&'a AddrStream> for AdminSvc<S>
202 where
203 S: tracing::Subscriber,
204 {
205 type Response = AdminSvc<S>;
206 type Error = hyper::Error;
207 type Future = Ready<Result<Self::Response, Self::Error>>;
208
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>209 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210 Poll::Ready(Ok(()))
211 }
212
call(&mut self, _: &'a AddrStream) -> Self::Future213 fn call(&mut self, _: &'a AddrStream) -> Self::Future {
214 future::ok(self.clone())
215 }
216 }
217
218 impl<S> Service<Request<Body>> for AdminSvc<S>
219 where
220 S: tracing::Subscriber + 'static,
221 {
222 type Response = Response<Body>;
223 type Error = Err;
224 type Future = Pin<Box<dyn Future<Output = Result<Response<Body>, Err>> + std::marker::Send>>;
225
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>226 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
227 Poll::Ready(Ok(()))
228 }
229
call(&mut self, req: Request<Body>) -> Self::Future230 fn call(&mut self, req: Request<Body>) -> Self::Future {
231 // we need to clone so that the reference to self
232 // isn't outlived by the returned future.
233 let handle = self.clone();
234 let f = async move {
235 let rsp = match (req.method(), req.uri().path()) {
236 (&Method::PUT, "/filter") => {
237 trace!("setting filter");
238
239 let body = hyper::body::to_bytes(req).await?;
240 match handle.set_from(body) {
241 Err(error) => {
242 error!(%error, "setting filter failed!");
243 rsp(StatusCode::INTERNAL_SERVER_ERROR, error)
244 }
245 Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()),
246 }
247 }
248 _ => rsp(StatusCode::NOT_FOUND, "try `/filter`"),
249 };
250 Ok(rsp)
251 };
252 Box::pin(f)
253 }
254 }
255
256 impl<S> AdminSvc<S>
257 where
258 S: tracing::Subscriber + 'static,
259 {
set_from(&self, bytes: Bytes) -> Result<(), String>260 fn set_from(&self, bytes: Bytes) -> Result<(), String> {
261 use std::str;
262 let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
263 trace!(request.body = ?body);
264 let new_filter = body
265 .parse::<tracing_subscriber::filter::EnvFilter>()
266 .map_err(|e| format!("{}", e))?;
267 self.handle.reload(new_filter).map_err(|e| format!("{}", e))
268 }
269 }
270
rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body>271 fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
272 Response::builder()
273 .status(status)
274 .body(body.into())
275 .expect("builder with known status code must not fail")
276 }
277
278 impl HandleError {
bad_request(e: impl std::error::Error + Send + 'static) -> Self279 fn bad_request(e: impl std::error::Error + Send + 'static) -> Self {
280 HandleError::BadRequest(Box::new(e))
281 }
282 }
283
284 impl fmt::Display for HandleError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 match self {
287 HandleError::BadPath => f.pad("path must be a single ASCII character"),
288 HandleError::NoContentLength => f.pad("request must have Content-Length header"),
289 HandleError::BadRequest(ref e) => write!(f, "bad request: {}", e),
290 HandleError::Unknown => f.pad("unknown internal error"),
291 }
292 }
293 }
294
295 impl std::error::Error for HandleError {}
296
297 impl fmt::Display for WrongMethod {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result298 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299 write!(f, "unsupported method: please use one of {:?}", self.0)
300 }
301 }
302
303 impl std::error::Error for WrongMethod {}
304
gen_uri(authority: &str) -> (usize, String)305 fn gen_uri(authority: &str) -> (usize, String) {
306 static ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
307 let mut rng = rand::thread_rng();
308 let idx = rng.gen_range(0, ALPHABET.len() + 1);
309 let len = rng.gen_range(0, 26);
310 let letter = ALPHABET.get(idx..=idx).unwrap_or("");
311 (len, format!("http://{}/{}", authority, letter))
312 }
313
314 #[tracing::instrument(target = "gen", "load_gen")]
load_gen(addr: SocketAddr) -> Result<(), Err>315 async fn load_gen(addr: SocketAddr) -> Result<(), Err> {
316 let svc = ServiceBuilder::new()
317 .buffer(5)
318 .layer(request_span::layer(req_span))
319 .timeout(Duration::from_millis(200))
320 .service(Client::new());
321 let mut interval = tokio::time::interval(Duration::from_millis(50));
322
323 loop {
324 interval.tick().await;
325 let authority = format!("{}", addr);
326 let mut svc = svc.clone().ready_oneshot().await?;
327
328 let f = async move {
329 let sleep = rand::thread_rng().gen_range(0, 25);
330 time::sleep(Duration::from_millis(sleep)).await;
331
332 let (len, uri) = gen_uri(&authority);
333 let req = Request::get(&uri[..])
334 .header("Content-Length", len)
335 .body(Body::empty())
336 .unwrap();
337
338 let span = tracing::debug_span!(
339 target: "gen",
340 "request",
341 req.method = ?req.method(),
342 req.path = ?req.uri().path(),
343 );
344 async move {
345 info!(target: "gen", "sending request");
346 let rsp = match svc.call(req).await {
347 Err(e) => {
348 error!(target: "gen", error = %e, "request error!");
349 return Err(e);
350 }
351 Ok(rsp) => rsp,
352 };
353
354 let status = rsp.status();
355 if status != StatusCode::OK {
356 error!(target: "gen", status = ?status, "error received from server!");
357 }
358
359 let body = match hyper::body::to_bytes(rsp).await {
360 Err(e) => {
361 error!(target: "gen", error = ?e, "body error!");
362 return Err(e.into());
363 }
364 Ok(body) => body,
365 };
366 let body = String::from_utf8(body.to_vec())?;
367 info!(target: "gen", message = "response complete.", rsp.body = %body);
368 Ok(())
369 }
370 .instrument(span)
371 .await
372 }
373 .instrument(info_span!(target: "gen", "generated_request", remote.addr=%addr).or_current());
374 tokio::spawn(f);
375 }
376 }
377
req_span<A>(req: &Request<A>) -> Span378 fn req_span<A>(req: &Request<A>) -> Span {
379 let span = tracing::span!(
380 target: "gen",
381 Level::INFO,
382 "request",
383 req.method = ?req.method(),
384 req.path = ?req.uri().path(),
385 );
386 debug!(
387 parent: &span,
388 message = "received request.",
389 req.headers = ?req.headers(),
390 req.version = ?req.version(),
391 );
392 span
393 }
394