• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Msg;
2 use crossbeam_channel::{Receiver, RecvError, TryRecvError};
3 use std::fmt::Debug;
4 use std::io::Write;
5 use std::{io, thread};
6 
7 pub(crate) struct Worker<T: Write + Send + Sync + 'static> {
8     writer: T,
9     receiver: Receiver<Msg>,
10     shutdown: Receiver<()>,
11 }
12 
13 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
14 pub(crate) enum WorkerState {
15     Empty,
16     Disconnected,
17     Continue,
18     Shutdown,
19 }
20 
21 impl<T: Write + Send + Sync + 'static> Worker<T> {
new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T>22     pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> {
23         Self {
24             writer,
25             receiver,
26             shutdown,
27         }
28     }
29 
handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState>30     fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
31         match result {
32             Ok(Msg::Line(msg)) => {
33                 self.writer.write_all(msg)?;
34                 Ok(WorkerState::Continue)
35             }
36             Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
37             Err(_) => Ok(WorkerState::Disconnected),
38         }
39     }
40 
handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState>41     fn handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState> {
42         match result {
43             Ok(Msg::Line(msg)) => {
44                 self.writer.write_all(msg)?;
45                 Ok(WorkerState::Continue)
46             }
47             Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
48             Err(TryRecvError::Empty) => Ok(WorkerState::Empty),
49             Err(TryRecvError::Disconnected) => Ok(WorkerState::Disconnected),
50         }
51     }
52 
53     /// Blocks on the first recv of each batch of logs, unless the
54     /// channel is disconnected. Afterwards, grabs as many logs as
55     /// it can off the channel, buffers them and attempts a flush.
work(&mut self) -> io::Result<WorkerState>56     pub(crate) fn work(&mut self) -> io::Result<WorkerState> {
57         // Worker thread yields here if receive buffer is empty
58         let mut worker_state = self.handle_recv(&self.receiver.recv())?;
59 
60         while worker_state == WorkerState::Continue {
61             let try_recv_result = self.receiver.try_recv();
62             let handle_result = self.handle_try_recv(&try_recv_result);
63             worker_state = handle_result?;
64         }
65         self.writer.flush()?;
66         Ok(worker_state)
67     }
68 
69     /// Creates a worker thread that processes a channel until it's disconnected
worker_thread(mut self) -> std::thread::JoinHandle<()>70     pub(crate) fn worker_thread(mut self) -> std::thread::JoinHandle<()> {
71         thread::Builder::new()
72             .name("tracing-appender".to_string())
73             .spawn(move || {
74                 loop {
75                     match self.work() {
76                         Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
77                         Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
78                             let _ = self.shutdown.recv();
79                             break;
80                         }
81                         Err(_) => {
82                             // TODO: Expose a metric for IO Errors, or print to stderr
83                         }
84                     }
85                 }
86                 if let Err(e) = self.writer.flush() {
87                     eprintln!("Failed to flush. Error: {}", e);
88                 }
89             })
90             .expect("failed to spawn `tracing-appender` non-blocking worker thread")
91     }
92 }
93