1 use crate::Msg; 2 use crossbeam_channel::{Receiver, RecvError, TryRecvError}; 3 use std::fmt::Debug; 4 use std::io::Write; 5 use std::{io, thread}; 6 7 pub(crate) struct Worker<T: Write + Send + Sync + 'static> { 8 writer: T, 9 receiver: Receiver<Msg>, 10 shutdown: Receiver<()>, 11 } 12 13 #[derive(Debug, Clone, Copy, Eq, PartialEq)] 14 pub(crate) enum WorkerState { 15 Empty, 16 Disconnected, 17 Continue, 18 Shutdown, 19 } 20 21 impl<T: Write + Send + Sync + 'static> Worker<T> { new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T>22 pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> { 23 Self { 24 writer, 25 receiver, 26 shutdown, 27 } 28 } 29 handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState>30 fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> { 31 match result { 32 Ok(Msg::Line(msg)) => { 33 self.writer.write_all(msg)?; 34 Ok(WorkerState::Continue) 35 } 36 Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown), 37 Err(_) => Ok(WorkerState::Disconnected), 38 } 39 } 40 handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState>41 fn handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState> { 42 match result { 43 Ok(Msg::Line(msg)) => { 44 self.writer.write_all(msg)?; 45 Ok(WorkerState::Continue) 46 } 47 Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown), 48 Err(TryRecvError::Empty) => Ok(WorkerState::Empty), 49 Err(TryRecvError::Disconnected) => Ok(WorkerState::Disconnected), 50 } 51 } 52 53 /// Blocks on the first recv of each batch of logs, unless the 54 /// channel is disconnected. Afterwards, grabs as many logs as 55 /// it can off the channel, buffers them and attempts a flush. work(&mut self) -> io::Result<WorkerState>56 pub(crate) fn work(&mut self) -> io::Result<WorkerState> { 57 // Worker thread yields here if receive buffer is empty 58 let mut worker_state = self.handle_recv(&self.receiver.recv())?; 59 60 while worker_state == WorkerState::Continue { 61 let try_recv_result = self.receiver.try_recv(); 62 let handle_result = self.handle_try_recv(&try_recv_result); 63 worker_state = handle_result?; 64 } 65 self.writer.flush()?; 66 Ok(worker_state) 67 } 68 69 /// Creates a worker thread that processes a channel until it's disconnected worker_thread(mut self) -> std::thread::JoinHandle<()>70 pub(crate) fn worker_thread(mut self) -> std::thread::JoinHandle<()> { 71 thread::Builder::new() 72 .name("tracing-appender".to_string()) 73 .spawn(move || { 74 loop { 75 match self.work() { 76 Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} 77 Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { 78 let _ = self.shutdown.recv(); 79 break; 80 } 81 Err(_) => { 82 // TODO: Expose a metric for IO Errors, or print to stderr 83 } 84 } 85 } 86 if let Err(e) = self.writer.flush() { 87 eprintln!("Failed to flush. Error: {}", e); 88 } 89 }) 90 .expect("failed to spawn `tracing-appender` non-blocking worker thread") 91 } 92 } 93