• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A non-blocking, off-thread writer.
2 //!
3 //! This spawns a dedicated worker thread which is responsible for writing log
4 //! lines to the provided writer. When a line is written using the returned
5 //! `NonBlocking` struct's `make_writer` method, it will be enqueued to be
6 //! written by the worker thread.
7 //!
8 //! The queue has a fixed capacity, and if it becomes full, any logs written
9 //! to it will be dropped until capacity is once again available. This may
10 //! occur if logs are consistently produced faster than the worker thread can
11 //! output them. The queue capacity and behavior when full (i.e., whether to
12 //! drop logs or to exert backpressure to slow down senders) can be configured
13 //! using [`NonBlockingBuilder::default()`][builder].
14 //! This function returns the default configuration. It is equivalent to:
15 //!
16 //! ```rust
17 //! # use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
18 //! # fn doc() -> (NonBlocking, WorkerGuard) {
19 //! tracing_appender::non_blocking(std::io::stdout())
20 //! # }
21 //! ```
22 //! [builder]: NonBlockingBuilder::default
23 //!
24 //! <br/> This function returns a tuple of `NonBlocking` and `WorkerGuard`.
25 //! `NonBlocking` implements [`MakeWriter`] which integrates with `tracing_subscriber`.
26 //! `WorkerGuard` is a drop guard that is responsible for flushing any remaining logs when
27 //! the program terminates.
28 //!
29 //! Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that
30 //! is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately.
31 //! Unintentional drops of `WorkerGuard` remove the guarantee that logs will be flushed
32 //! during a program's termination, in a panic or otherwise.
33 //!
34 //! See [`WorkerGuard`][worker_guard] for examples of using the guard.
35 //!
36 //! [worker_guard]: WorkerGuard
37 //!
38 //! # Examples
39 //!
40 //! ``` rust
41 //! # fn docs() {
42 //! let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
43 //! let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
44 //! tracing::subscriber::with_default(subscriber.finish(), || {
45 //!    tracing::event!(tracing::Level::INFO, "Hello");
46 //! });
47 //! # }
48 //! ```
49 use crate::worker::Worker;
50 use crate::Msg;
51 use crossbeam_channel::{bounded, SendTimeoutError, Sender};
52 use std::io;
53 use std::io::Write;
54 use std::sync::atomic::AtomicUsize;
55 use std::sync::atomic::Ordering;
56 use std::sync::Arc;
57 use std::thread::JoinHandle;
58 use std::time::Duration;
59 use tracing_subscriber::fmt::MakeWriter;
60 
61 /// The default maximum number of buffered log lines.
62 ///
63 /// If [`NonBlocking`][non-blocking] is lossy, it will drop spans/events at capacity.
64 /// If [`NonBlocking`][non-blocking] is _not_ lossy,
65 /// backpressure will be exerted on senders, causing them to block their
66 /// respective threads until there is available capacity.
67 ///
68 /// [non-blocking]: NonBlocking
69 /// Recommended to be a power of 2.
70 pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
71 
72 /// A guard that flushes spans/events associated to a [`NonBlocking`] on a drop
73 ///
74 /// Writing to a [`NonBlocking`] writer will **not** immediately write a span or event to the underlying
75 /// output. Instead, the span or event will be written by a dedicated logging thread at some later point.
76 /// To increase throughput, the non-blocking writer will flush to the underlying output on
77 /// a periodic basis rather than every time a span or event is written. This means that if the program
78 /// terminates abruptly (such as through an uncaught `panic` or a `std::process::exit`), some spans
79 /// or events may not be written.
80 ///
81 /// Since spans/events and events recorded near a crash are often necessary for diagnosing the failure,
82 /// `WorkerGuard` provides a mechanism to ensure that _all_ buffered logs are flushed to their output.
83 /// `WorkerGuard` should be assigned in the `main` function or whatever the entrypoint of the program is.
84 /// This will ensure that the guard will be dropped during an unwinding or when `main` exits
85 /// successfully.
86 ///
87 /// # Examples
88 ///
89 /// ``` rust
90 /// # #[clippy::allow(needless_doctest_main)]
91 /// fn main () {
92 /// # fn doc() {
93 ///     let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
94 ///     let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
95 ///     tracing::subscriber::with_default(subscriber.finish(), || {
96 ///         // Emit some tracing events within context of the non_blocking `_guard` and tracing subscriber
97 ///         tracing::event!(tracing::Level::INFO, "Hello");
98 ///     });
99 ///     // Exiting the context of `main` will drop the `_guard` and any remaining logs should get flushed
100 /// # }
101 /// }
102 /// ```
103 #[must_use]
104 #[derive(Debug)]
105 pub struct WorkerGuard {
106     _guard: Option<JoinHandle<()>>,
107     sender: Sender<Msg>,
108     shutdown: Sender<()>,
109 }
110 
111 /// A non-blocking writer.
112 ///
113 /// While the line between "blocking" and "non-blocking" IO is fuzzy, writing to a file is typically
114 /// considered to be a _blocking_ operation. For an application whose `Subscriber` writes spans and events
115 /// as they are emitted, an application might find the latency profile to be unacceptable.
116 /// `NonBlocking` moves the writing out of an application's data path by sending spans and events
117 /// to a dedicated logging thread.
118 ///
119 /// This struct implements [`MakeWriter`][make_writer] from the `tracing-subscriber`
120 /// crate. Therefore, it can be used with the [`tracing_subscriber::fmt`][fmt] module
121 /// or with any other subscriber/layer implementation that uses the `MakeWriter` trait.
122 ///
123 /// [make_writer]: tracing_subscriber::fmt::MakeWriter
124 /// [fmt]: mod@tracing_subscriber::fmt
125 #[derive(Clone, Debug)]
126 pub struct NonBlocking {
127     error_counter: ErrorCounter,
128     channel: Sender<Msg>,
129     is_lossy: bool,
130 }
131 
132 /// Tracks the number of times a log line was dropped by the background thread.
133 ///
134 /// If the non-blocking writer is not configured in [lossy mode], the error
135 /// count should always be 0.
136 ///
137 /// [lossy mode]: NonBlockingBuilder::lossy
138 #[derive(Clone, Debug)]
139 pub struct ErrorCounter(Arc<AtomicUsize>);
140 
141 impl NonBlocking {
142     /// Returns a new `NonBlocking` writer wrapping the provided `writer`.
143     ///
144     /// The returned `NonBlocking` writer will have the [default configuration][default] values.
145     /// Other configurations can be specified using the [builder] interface.
146     ///
147     /// [default]: NonBlockingBuilder::default
148     /// [builder]: NonBlockingBuilder
new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard)149     pub fn new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
150         NonBlockingBuilder::default().finish(writer)
151     }
152 
create<T: Write + Send + Sync + 'static>( writer: T, buffered_lines_limit: usize, is_lossy: bool, ) -> (NonBlocking, WorkerGuard)153     fn create<T: Write + Send + Sync + 'static>(
154         writer: T,
155         buffered_lines_limit: usize,
156         is_lossy: bool,
157     ) -> (NonBlocking, WorkerGuard) {
158         let (sender, receiver) = bounded(buffered_lines_limit);
159 
160         let (shutdown_sender, shutdown_receiver) = bounded(0);
161 
162         let worker = Worker::new(receiver, writer, shutdown_receiver);
163         let worker_guard =
164             WorkerGuard::new(worker.worker_thread(), sender.clone(), shutdown_sender);
165 
166         (
167             Self {
168                 channel: sender,
169                 error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
170                 is_lossy,
171             },
172             worker_guard,
173         )
174     }
175 
176     /// Returns a counter for the number of times logs where dropped. This will always return zero if
177     /// `NonBlocking` is not lossy.
error_counter(&self) -> ErrorCounter178     pub fn error_counter(&self) -> ErrorCounter {
179         self.error_counter.clone()
180     }
181 }
182 
183 /// A builder for [`NonBlocking`][non-blocking].
184 ///
185 /// [non-blocking]: NonBlocking
186 #[derive(Debug)]
187 pub struct NonBlockingBuilder {
188     buffered_lines_limit: usize,
189     is_lossy: bool,
190 }
191 
192 impl NonBlockingBuilder {
193     /// Sets the number of lines to buffer before dropping logs or exerting backpressure on senders
buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder194     pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
195         self.buffered_lines_limit = buffered_lines_limit;
196         self
197     }
198 
199     /// Sets whether `NonBlocking` should be lossy or not.
200     ///
201     /// If set to `true`, logs will be dropped when the buffered limit is reached. If `false`, backpressure
202     /// will be exerted on senders, blocking them until the buffer has capacity again.
203     ///
204     /// By default, the built `NonBlocking` will be lossy.
lossy(mut self, is_lossy: bool) -> NonBlockingBuilder205     pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder {
206         self.is_lossy = is_lossy;
207         self
208     }
209 
210     /// Completes the builder, returning the configured `NonBlocking`.
finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard)211     pub fn finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
212         NonBlocking::create(writer, self.buffered_lines_limit, self.is_lossy)
213     }
214 }
215 
216 impl Default for NonBlockingBuilder {
default() -> Self217     fn default() -> Self {
218         NonBlockingBuilder {
219             buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
220             is_lossy: true,
221         }
222     }
223 }
224 
225 impl std::io::Write for NonBlocking {
write(&mut self, buf: &[u8]) -> io::Result<usize>226     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
227         let buf_size = buf.len();
228         if self.is_lossy {
229             if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() {
230                 self.error_counter.incr_saturating();
231             }
232         } else {
233             return match self.channel.send(Msg::Line(buf.to_vec())) {
234                 Ok(_) => Ok(buf_size),
235                 Err(_) => Err(io::Error::from(io::ErrorKind::Other)),
236             };
237         }
238         Ok(buf_size)
239     }
240 
flush(&mut self) -> io::Result<()>241     fn flush(&mut self) -> io::Result<()> {
242         Ok(())
243     }
244 
245     #[inline]
write_all(&mut self, buf: &[u8]) -> io::Result<()>246     fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
247         self.write(buf).map(|_| ())
248     }
249 }
250 
251 impl<'a> MakeWriter<'a> for NonBlocking {
252     type Writer = NonBlocking;
253 
make_writer(&'a self) -> Self::Writer254     fn make_writer(&'a self) -> Self::Writer {
255         self.clone()
256     }
257 }
258 
259 impl WorkerGuard {
new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self260     fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
261         WorkerGuard {
262             _guard: Some(handle),
263             sender,
264             shutdown,
265         }
266     }
267 }
268 
269 impl Drop for WorkerGuard {
drop(&mut self)270     fn drop(&mut self) {
271         match self
272             .sender
273             .send_timeout(Msg::Shutdown, Duration::from_millis(100))
274         {
275             Ok(_) => {
276                 // Attempt to wait for `Worker` to flush all messages before dropping. This happens
277                 // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
278                 // so that drop is not blocked indefinitely.
279                 // TODO: Make timeout configurable.
280                 let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
281             }
282             Err(SendTimeoutError::Disconnected(_)) => (),
283             Err(SendTimeoutError::Timeout(e)) => println!(
284                 "Failed to send shutdown signal to logging worker. Error: {:?}",
285                 e
286             ),
287         }
288     }
289 }
290 
291 // === impl ErrorCounter ===
292 
293 impl ErrorCounter {
294     /// Returns the number of log lines that have been dropped.
295     ///
296     /// If the non-blocking writer is not configured in [lossy mode], the error
297     /// count should always be 0.
298     ///
299     /// [lossy mode]: NonBlockingBuilder::lossy
dropped_lines(&self) -> usize300     pub fn dropped_lines(&self) -> usize {
301         self.0.load(Ordering::Acquire)
302     }
303 
incr_saturating(&self)304     fn incr_saturating(&self) {
305         let mut curr = self.0.load(Ordering::Acquire);
306         // We don't need to enter the CAS loop if the current value is already
307         // `usize::MAX`.
308         if curr == usize::MAX {
309             return;
310         }
311 
312         // This is implemented as a CAS loop rather than as a simple
313         // `fetch_add`, because we don't want to wrap on overflow. Instead, we
314         // need to ensure that saturating addition is performed.
315         loop {
316             let val = curr.saturating_add(1);
317             match self
318                 .0
319                 .compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire)
320             {
321                 Ok(_) => return,
322                 Err(actual) => curr = actual,
323             }
324         }
325     }
326 }
327 
328 #[cfg(test)]
329 mod test {
330     use super::*;
331     use std::sync::mpsc;
332     use std::thread;
333     use std::time::Duration;
334 
335     struct MockWriter {
336         tx: mpsc::SyncSender<String>,
337     }
338 
339     impl MockWriter {
new(capacity: usize) -> (Self, mpsc::Receiver<String>)340         fn new(capacity: usize) -> (Self, mpsc::Receiver<String>) {
341             let (tx, rx) = mpsc::sync_channel(capacity);
342             (Self { tx }, rx)
343         }
344     }
345 
346     impl std::io::Write for MockWriter {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>347         fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
348             let buf_len = buf.len();
349             let _ = self.tx.send(String::from_utf8_lossy(buf).to_string());
350             Ok(buf_len)
351         }
352 
flush(&mut self) -> std::io::Result<()>353         fn flush(&mut self) -> std::io::Result<()> {
354             Ok(())
355         }
356     }
357 
358     #[test]
backpressure_exerted()359     fn backpressure_exerted() {
360         let (mock_writer, rx) = MockWriter::new(1);
361 
362         let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
363             .lossy(false)
364             .buffered_lines_limit(1)
365             .finish(mock_writer);
366 
367         let error_count = non_blocking.error_counter();
368 
369         non_blocking.write_all(b"Hello").expect("Failed to write");
370         assert_eq!(0, error_count.dropped_lines());
371 
372         let handle = thread::spawn(move || {
373             non_blocking.write_all(b", World").expect("Failed to write");
374         });
375 
376         // Sleep a little to ensure previously spawned thread gets blocked on write.
377         thread::sleep(Duration::from_millis(100));
378         // We should not drop logs when blocked.
379         assert_eq!(0, error_count.dropped_lines());
380 
381         // Read the first message to unblock sender.
382         let mut line = rx.recv().unwrap();
383         assert_eq!(line, "Hello");
384 
385         // Wait for thread to finish.
386         handle.join().expect("thread should not panic");
387 
388         // Thread has joined, we should be able to read the message it sent.
389         line = rx.recv().unwrap();
390         assert_eq!(line, ", World");
391     }
392 
write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8])393     fn write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8]) {
394         non_blocking.write_all(msg).expect("Failed to write");
395 
396         // Sleep a bit to prevent races.
397         thread::sleep(Duration::from_millis(200));
398     }
399 
400     #[test]
401     #[ignore] // flaky, see https://github.com/tokio-rs/tracing/issues/751
logs_dropped_if_lossy()402     fn logs_dropped_if_lossy() {
403         let (mock_writer, rx) = MockWriter::new(1);
404 
405         let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
406             .lossy(true)
407             .buffered_lines_limit(1)
408             .finish(mock_writer);
409 
410         let error_count = non_blocking.error_counter();
411 
412         // First write will not block
413         write_non_blocking(&mut non_blocking, b"Hello");
414         assert_eq!(0, error_count.dropped_lines());
415 
416         // Second write will not block as Worker will have called `recv` on channel.
417         // "Hello" is not yet consumed. MockWriter call to write_all will block until
418         // "Hello" is consumed.
419         write_non_blocking(&mut non_blocking, b", World");
420         assert_eq!(0, error_count.dropped_lines());
421 
422         // Will sit in NonBlocking channel's buffer.
423         write_non_blocking(&mut non_blocking, b"Test");
424         assert_eq!(0, error_count.dropped_lines());
425 
426         // Allow a line to be written. "Hello" message will be consumed.
427         // ", World" will be able to write to MockWriter.
428         // "Test" will block on call to MockWriter's `write_all`
429         let line = rx.recv().unwrap();
430         assert_eq!(line, "Hello");
431 
432         // This will block as NonBlocking channel is full.
433         write_non_blocking(&mut non_blocking, b"Universe");
434         assert_eq!(1, error_count.dropped_lines());
435 
436         // Finally the second message sent will be consumed.
437         let line = rx.recv().unwrap();
438         assert_eq!(line, ", World");
439         assert_eq!(1, error_count.dropped_lines());
440     }
441 
442     #[test]
multi_threaded_writes()443     fn multi_threaded_writes() {
444         let (mock_writer, rx) = MockWriter::new(DEFAULT_BUFFERED_LINES_LIMIT);
445 
446         let (non_blocking, _guard) = self::NonBlockingBuilder::default()
447             .lossy(true)
448             .finish(mock_writer);
449 
450         let error_count = non_blocking.error_counter();
451         let mut join_handles: Vec<JoinHandle<()>> = Vec::with_capacity(10);
452 
453         for _ in 0..10 {
454             let cloned_non_blocking = non_blocking.clone();
455             join_handles.push(thread::spawn(move || {
456                 let subscriber = tracing_subscriber::fmt().with_writer(cloned_non_blocking);
457                 tracing::subscriber::with_default(subscriber.finish(), || {
458                     tracing::event!(tracing::Level::INFO, "Hello");
459                 });
460             }));
461         }
462 
463         for handle in join_handles {
464             handle.join().expect("Failed to join thread");
465         }
466 
467         let mut hello_count: u8 = 0;
468 
469         while let Ok(event_str) = rx.recv_timeout(Duration::from_secs(5)) {
470             assert!(event_str.contains("Hello"));
471             hello_count += 1;
472         }
473 
474         assert_eq!(10, hello_count);
475         assert_eq!(0, error_count.dropped_lines());
476     }
477 }
478