1 //! A non-blocking, off-thread writer. 2 //! 3 //! This spawns a dedicated worker thread which is responsible for writing log 4 //! lines to the provided writer. When a line is written using the returned 5 //! `NonBlocking` struct's `make_writer` method, it will be enqueued to be 6 //! written by the worker thread. 7 //! 8 //! The queue has a fixed capacity, and if it becomes full, any logs written 9 //! to it will be dropped until capacity is once again available. This may 10 //! occur if logs are consistently produced faster than the worker thread can 11 //! output them. The queue capacity and behavior when full (i.e., whether to 12 //! drop logs or to exert backpressure to slow down senders) can be configured 13 //! using [`NonBlockingBuilder::default()`][builder]. 14 //! This function returns the default configuration. It is equivalent to: 15 //! 16 //! ```rust 17 //! # use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; 18 //! # fn doc() -> (NonBlocking, WorkerGuard) { 19 //! tracing_appender::non_blocking(std::io::stdout()) 20 //! # } 21 //! ``` 22 //! [builder]: NonBlockingBuilder::default 23 //! 24 //! <br/> This function returns a tuple of `NonBlocking` and `WorkerGuard`. 25 //! `NonBlocking` implements [`MakeWriter`] which integrates with `tracing_subscriber`. 26 //! `WorkerGuard` is a drop guard that is responsible for flushing any remaining logs when 27 //! the program terminates. 28 //! 29 //! Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that 30 //! is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately. 31 //! Unintentional drops of `WorkerGuard` remove the guarantee that logs will be flushed 32 //! during a program's termination, in a panic or otherwise. 33 //! 34 //! See [`WorkerGuard`][worker_guard] for examples of using the guard. 35 //! 36 //! [worker_guard]: WorkerGuard 37 //! 38 //! # Examples 39 //! 40 //! ``` rust 41 //! # fn docs() { 42 //! let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); 43 //! let subscriber = tracing_subscriber::fmt().with_writer(non_blocking); 44 //! tracing::subscriber::with_default(subscriber.finish(), || { 45 //! tracing::event!(tracing::Level::INFO, "Hello"); 46 //! }); 47 //! # } 48 //! ``` 49 use crate::worker::Worker; 50 use crate::Msg; 51 use crossbeam_channel::{bounded, SendTimeoutError, Sender}; 52 use std::io; 53 use std::io::Write; 54 use std::sync::atomic::AtomicUsize; 55 use std::sync::atomic::Ordering; 56 use std::sync::Arc; 57 use std::thread::JoinHandle; 58 use std::time::Duration; 59 use tracing_subscriber::fmt::MakeWriter; 60 61 /// The default maximum number of buffered log lines. 62 /// 63 /// If [`NonBlocking`][non-blocking] is lossy, it will drop spans/events at capacity. 64 /// If [`NonBlocking`][non-blocking] is _not_ lossy, 65 /// backpressure will be exerted on senders, causing them to block their 66 /// respective threads until there is available capacity. 67 /// 68 /// [non-blocking]: NonBlocking 69 /// Recommended to be a power of 2. 70 pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000; 71 72 /// A guard that flushes spans/events associated to a [`NonBlocking`] on a drop 73 /// 74 /// Writing to a [`NonBlocking`] writer will **not** immediately write a span or event to the underlying 75 /// output. Instead, the span or event will be written by a dedicated logging thread at some later point. 76 /// To increase throughput, the non-blocking writer will flush to the underlying output on 77 /// a periodic basis rather than every time a span or event is written. This means that if the program 78 /// terminates abruptly (such as through an uncaught `panic` or a `std::process::exit`), some spans 79 /// or events may not be written. 80 /// 81 /// Since spans/events and events recorded near a crash are often necessary for diagnosing the failure, 82 /// `WorkerGuard` provides a mechanism to ensure that _all_ buffered logs are flushed to their output. 83 /// `WorkerGuard` should be assigned in the `main` function or whatever the entrypoint of the program is. 84 /// This will ensure that the guard will be dropped during an unwinding or when `main` exits 85 /// successfully. 86 /// 87 /// # Examples 88 /// 89 /// ``` rust 90 /// # #[clippy::allow(needless_doctest_main)] 91 /// fn main () { 92 /// # fn doc() { 93 /// let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); 94 /// let subscriber = tracing_subscriber::fmt().with_writer(non_blocking); 95 /// tracing::subscriber::with_default(subscriber.finish(), || { 96 /// // Emit some tracing events within context of the non_blocking `_guard` and tracing subscriber 97 /// tracing::event!(tracing::Level::INFO, "Hello"); 98 /// }); 99 /// // Exiting the context of `main` will drop the `_guard` and any remaining logs should get flushed 100 /// # } 101 /// } 102 /// ``` 103 #[must_use] 104 #[derive(Debug)] 105 pub struct WorkerGuard { 106 _guard: Option<JoinHandle<()>>, 107 sender: Sender<Msg>, 108 shutdown: Sender<()>, 109 } 110 111 /// A non-blocking writer. 112 /// 113 /// While the line between "blocking" and "non-blocking" IO is fuzzy, writing to a file is typically 114 /// considered to be a _blocking_ operation. For an application whose `Subscriber` writes spans and events 115 /// as they are emitted, an application might find the latency profile to be unacceptable. 116 /// `NonBlocking` moves the writing out of an application's data path by sending spans and events 117 /// to a dedicated logging thread. 118 /// 119 /// This struct implements [`MakeWriter`][make_writer] from the `tracing-subscriber` 120 /// crate. Therefore, it can be used with the [`tracing_subscriber::fmt`][fmt] module 121 /// or with any other subscriber/layer implementation that uses the `MakeWriter` trait. 122 /// 123 /// [make_writer]: tracing_subscriber::fmt::MakeWriter 124 /// [fmt]: mod@tracing_subscriber::fmt 125 #[derive(Clone, Debug)] 126 pub struct NonBlocking { 127 error_counter: ErrorCounter, 128 channel: Sender<Msg>, 129 is_lossy: bool, 130 } 131 132 /// Tracks the number of times a log line was dropped by the background thread. 133 /// 134 /// If the non-blocking writer is not configured in [lossy mode], the error 135 /// count should always be 0. 136 /// 137 /// [lossy mode]: NonBlockingBuilder::lossy 138 #[derive(Clone, Debug)] 139 pub struct ErrorCounter(Arc<AtomicUsize>); 140 141 impl NonBlocking { 142 /// Returns a new `NonBlocking` writer wrapping the provided `writer`. 143 /// 144 /// The returned `NonBlocking` writer will have the [default configuration][default] values. 145 /// Other configurations can be specified using the [builder] interface. 146 /// 147 /// [default]: NonBlockingBuilder::default 148 /// [builder]: NonBlockingBuilder new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard)149 pub fn new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) { 150 NonBlockingBuilder::default().finish(writer) 151 } 152 create<T: Write + Send + Sync + 'static>( writer: T, buffered_lines_limit: usize, is_lossy: bool, ) -> (NonBlocking, WorkerGuard)153 fn create<T: Write + Send + Sync + 'static>( 154 writer: T, 155 buffered_lines_limit: usize, 156 is_lossy: bool, 157 ) -> (NonBlocking, WorkerGuard) { 158 let (sender, receiver) = bounded(buffered_lines_limit); 159 160 let (shutdown_sender, shutdown_receiver) = bounded(0); 161 162 let worker = Worker::new(receiver, writer, shutdown_receiver); 163 let worker_guard = 164 WorkerGuard::new(worker.worker_thread(), sender.clone(), shutdown_sender); 165 166 ( 167 Self { 168 channel: sender, 169 error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))), 170 is_lossy, 171 }, 172 worker_guard, 173 ) 174 } 175 176 /// Returns a counter for the number of times logs where dropped. This will always return zero if 177 /// `NonBlocking` is not lossy. error_counter(&self) -> ErrorCounter178 pub fn error_counter(&self) -> ErrorCounter { 179 self.error_counter.clone() 180 } 181 } 182 183 /// A builder for [`NonBlocking`][non-blocking]. 184 /// 185 /// [non-blocking]: NonBlocking 186 #[derive(Debug)] 187 pub struct NonBlockingBuilder { 188 buffered_lines_limit: usize, 189 is_lossy: bool, 190 } 191 192 impl NonBlockingBuilder { 193 /// Sets the number of lines to buffer before dropping logs or exerting backpressure on senders buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder194 pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder { 195 self.buffered_lines_limit = buffered_lines_limit; 196 self 197 } 198 199 /// Sets whether `NonBlocking` should be lossy or not. 200 /// 201 /// If set to `true`, logs will be dropped when the buffered limit is reached. If `false`, backpressure 202 /// will be exerted on senders, blocking them until the buffer has capacity again. 203 /// 204 /// By default, the built `NonBlocking` will be lossy. lossy(mut self, is_lossy: bool) -> NonBlockingBuilder205 pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder { 206 self.is_lossy = is_lossy; 207 self 208 } 209 210 /// Completes the builder, returning the configured `NonBlocking`. finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard)211 pub fn finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) { 212 NonBlocking::create(writer, self.buffered_lines_limit, self.is_lossy) 213 } 214 } 215 216 impl Default for NonBlockingBuilder { default() -> Self217 fn default() -> Self { 218 NonBlockingBuilder { 219 buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT, 220 is_lossy: true, 221 } 222 } 223 } 224 225 impl std::io::Write for NonBlocking { write(&mut self, buf: &[u8]) -> io::Result<usize>226 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 227 let buf_size = buf.len(); 228 if self.is_lossy { 229 if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() { 230 self.error_counter.incr_saturating(); 231 } 232 } else { 233 return match self.channel.send(Msg::Line(buf.to_vec())) { 234 Ok(_) => Ok(buf_size), 235 Err(_) => Err(io::Error::from(io::ErrorKind::Other)), 236 }; 237 } 238 Ok(buf_size) 239 } 240 flush(&mut self) -> io::Result<()>241 fn flush(&mut self) -> io::Result<()> { 242 Ok(()) 243 } 244 245 #[inline] write_all(&mut self, buf: &[u8]) -> io::Result<()>246 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { 247 self.write(buf).map(|_| ()) 248 } 249 } 250 251 impl<'a> MakeWriter<'a> for NonBlocking { 252 type Writer = NonBlocking; 253 make_writer(&'a self) -> Self::Writer254 fn make_writer(&'a self) -> Self::Writer { 255 self.clone() 256 } 257 } 258 259 impl WorkerGuard { new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self260 fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self { 261 WorkerGuard { 262 _guard: Some(handle), 263 sender, 264 shutdown, 265 } 266 } 267 } 268 269 impl Drop for WorkerGuard { drop(&mut self)270 fn drop(&mut self) { 271 match self 272 .sender 273 .send_timeout(Msg::Shutdown, Duration::from_millis(100)) 274 { 275 Ok(_) => { 276 // Attempt to wait for `Worker` to flush all messages before dropping. This happens 277 // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` 278 // so that drop is not blocked indefinitely. 279 // TODO: Make timeout configurable. 280 let _ = self.shutdown.send_timeout((), Duration::from_millis(1000)); 281 } 282 Err(SendTimeoutError::Disconnected(_)) => (), 283 Err(SendTimeoutError::Timeout(e)) => println!( 284 "Failed to send shutdown signal to logging worker. Error: {:?}", 285 e 286 ), 287 } 288 } 289 } 290 291 // === impl ErrorCounter === 292 293 impl ErrorCounter { 294 /// Returns the number of log lines that have been dropped. 295 /// 296 /// If the non-blocking writer is not configured in [lossy mode], the error 297 /// count should always be 0. 298 /// 299 /// [lossy mode]: NonBlockingBuilder::lossy dropped_lines(&self) -> usize300 pub fn dropped_lines(&self) -> usize { 301 self.0.load(Ordering::Acquire) 302 } 303 incr_saturating(&self)304 fn incr_saturating(&self) { 305 let mut curr = self.0.load(Ordering::Acquire); 306 // We don't need to enter the CAS loop if the current value is already 307 // `usize::MAX`. 308 if curr == usize::MAX { 309 return; 310 } 311 312 // This is implemented as a CAS loop rather than as a simple 313 // `fetch_add`, because we don't want to wrap on overflow. Instead, we 314 // need to ensure that saturating addition is performed. 315 loop { 316 let val = curr.saturating_add(1); 317 match self 318 .0 319 .compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire) 320 { 321 Ok(_) => return, 322 Err(actual) => curr = actual, 323 } 324 } 325 } 326 } 327 328 #[cfg(test)] 329 mod test { 330 use super::*; 331 use std::sync::mpsc; 332 use std::thread; 333 use std::time::Duration; 334 335 struct MockWriter { 336 tx: mpsc::SyncSender<String>, 337 } 338 339 impl MockWriter { new(capacity: usize) -> (Self, mpsc::Receiver<String>)340 fn new(capacity: usize) -> (Self, mpsc::Receiver<String>) { 341 let (tx, rx) = mpsc::sync_channel(capacity); 342 (Self { tx }, rx) 343 } 344 } 345 346 impl std::io::Write for MockWriter { write(&mut self, buf: &[u8]) -> std::io::Result<usize>347 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 348 let buf_len = buf.len(); 349 let _ = self.tx.send(String::from_utf8_lossy(buf).to_string()); 350 Ok(buf_len) 351 } 352 flush(&mut self) -> std::io::Result<()>353 fn flush(&mut self) -> std::io::Result<()> { 354 Ok(()) 355 } 356 } 357 358 #[test] backpressure_exerted()359 fn backpressure_exerted() { 360 let (mock_writer, rx) = MockWriter::new(1); 361 362 let (mut non_blocking, _guard) = self::NonBlockingBuilder::default() 363 .lossy(false) 364 .buffered_lines_limit(1) 365 .finish(mock_writer); 366 367 let error_count = non_blocking.error_counter(); 368 369 non_blocking.write_all(b"Hello").expect("Failed to write"); 370 assert_eq!(0, error_count.dropped_lines()); 371 372 let handle = thread::spawn(move || { 373 non_blocking.write_all(b", World").expect("Failed to write"); 374 }); 375 376 // Sleep a little to ensure previously spawned thread gets blocked on write. 377 thread::sleep(Duration::from_millis(100)); 378 // We should not drop logs when blocked. 379 assert_eq!(0, error_count.dropped_lines()); 380 381 // Read the first message to unblock sender. 382 let mut line = rx.recv().unwrap(); 383 assert_eq!(line, "Hello"); 384 385 // Wait for thread to finish. 386 handle.join().expect("thread should not panic"); 387 388 // Thread has joined, we should be able to read the message it sent. 389 line = rx.recv().unwrap(); 390 assert_eq!(line, ", World"); 391 } 392 write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8])393 fn write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8]) { 394 non_blocking.write_all(msg).expect("Failed to write"); 395 396 // Sleep a bit to prevent races. 397 thread::sleep(Duration::from_millis(200)); 398 } 399 400 #[test] 401 #[ignore] // flaky, see https://github.com/tokio-rs/tracing/issues/751 logs_dropped_if_lossy()402 fn logs_dropped_if_lossy() { 403 let (mock_writer, rx) = MockWriter::new(1); 404 405 let (mut non_blocking, _guard) = self::NonBlockingBuilder::default() 406 .lossy(true) 407 .buffered_lines_limit(1) 408 .finish(mock_writer); 409 410 let error_count = non_blocking.error_counter(); 411 412 // First write will not block 413 write_non_blocking(&mut non_blocking, b"Hello"); 414 assert_eq!(0, error_count.dropped_lines()); 415 416 // Second write will not block as Worker will have called `recv` on channel. 417 // "Hello" is not yet consumed. MockWriter call to write_all will block until 418 // "Hello" is consumed. 419 write_non_blocking(&mut non_blocking, b", World"); 420 assert_eq!(0, error_count.dropped_lines()); 421 422 // Will sit in NonBlocking channel's buffer. 423 write_non_blocking(&mut non_blocking, b"Test"); 424 assert_eq!(0, error_count.dropped_lines()); 425 426 // Allow a line to be written. "Hello" message will be consumed. 427 // ", World" will be able to write to MockWriter. 428 // "Test" will block on call to MockWriter's `write_all` 429 let line = rx.recv().unwrap(); 430 assert_eq!(line, "Hello"); 431 432 // This will block as NonBlocking channel is full. 433 write_non_blocking(&mut non_blocking, b"Universe"); 434 assert_eq!(1, error_count.dropped_lines()); 435 436 // Finally the second message sent will be consumed. 437 let line = rx.recv().unwrap(); 438 assert_eq!(line, ", World"); 439 assert_eq!(1, error_count.dropped_lines()); 440 } 441 442 #[test] multi_threaded_writes()443 fn multi_threaded_writes() { 444 let (mock_writer, rx) = MockWriter::new(DEFAULT_BUFFERED_LINES_LIMIT); 445 446 let (non_blocking, _guard) = self::NonBlockingBuilder::default() 447 .lossy(true) 448 .finish(mock_writer); 449 450 let error_count = non_blocking.error_counter(); 451 let mut join_handles: Vec<JoinHandle<()>> = Vec::with_capacity(10); 452 453 for _ in 0..10 { 454 let cloned_non_blocking = non_blocking.clone(); 455 join_handles.push(thread::spawn(move || { 456 let subscriber = tracing_subscriber::fmt().with_writer(cloned_non_blocking); 457 tracing::subscriber::with_default(subscriber.finish(), || { 458 tracing::event!(tracing::Level::INFO, "Hello"); 459 }); 460 })); 461 } 462 463 for handle in join_handles { 464 handle.join().expect("Failed to join thread"); 465 } 466 467 let mut hello_count: u8 = 0; 468 469 while let Ok(event_str) = rx.recv_timeout(Duration::from_secs(5)) { 470 assert!(event_str.contains("Hello")); 471 hello_count += 1; 472 } 473 474 assert_eq!(10, hello_count); 475 assert_eq!(0, error_count.dropped_lines()); 476 } 477 } 478