• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Debug Logging
2 //!
3 //! To use in a debug build, set the env var `RAYON_LOG` as
4 //! described below.  In a release build, logs are compiled out by
5 //! default unless Rayon is built with `--cfg rayon_rs_log` (try
6 //! `RUSTFLAGS="--cfg rayon_rs_log"`).
7 //!
8 //! Note that logs are an internally debugging tool and their format
9 //! is considered unstable, as are the details of how to enable them.
10 //!
11 //! # Valid values for RAYON_LOG
12 //!
13 //! The `RAYON_LOG` variable can take on the following values:
14 //!
15 //! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16 //!   useful for tracking down deadlocks
17 //! * `profile:<file>` -- dumps only those events needed to reconstruct how
18 //!   many workers are active at a given time
19 //! * `all:<file>` -- dumps every event to the file; useful for debugging
20 
21 use crossbeam_channel::{self, Receiver, Sender};
22 use std::collections::VecDeque;
23 use std::env;
24 use std::fs::File;
25 use std::io::{self, BufWriter, Write};
26 
27 /// True if logs are compiled in.
28 pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
29 
30 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31 pub(super) enum Event {
32     /// Flushes events to disk, used to terminate benchmarking.
33     Flush,
34 
35     /// Indicates that a worker thread started execution.
36     ThreadStart {
37         worker: usize,
38         terminate_addr: usize,
39     },
40 
41     /// Indicates that a worker thread started execution.
42     ThreadTerminate { worker: usize },
43 
44     /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45     ThreadIdle { worker: usize, latch_addr: usize },
46 
47     /// Indicates that an idle worker thread found work to do, after
48     /// yield rounds. It should no longer be considered idle.
49     ThreadFoundWork { worker: usize, yields: u32 },
50 
51     /// Indicates that a worker blocked on a latch observed that it was set.
52     ///
53     /// Internal debugging event that does not affect the state
54     /// machine.
55     ThreadSawLatchSet { worker: usize, latch_addr: usize },
56 
57     /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58     /// sleep state that we saw at the time.
59     ThreadSleepy { worker: usize, jobs_counter: usize },
60 
61     /// Indicates that the thread's attempt to fall asleep was
62     /// interrupted because the latch was set. (This is not, in and of
63     /// itself, a change to the thread state.)
64     ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
65 
66     /// Indicates that the thread's attempt to fall asleep was
67     /// interrupted because a job was posted. (This is not, in and of
68     /// itself, a change to the thread state.)
69     ThreadSleepInterruptedByJob { worker: usize },
70 
71     /// Indicates that an idle worker has gone to sleep.
72     ThreadSleeping { worker: usize, latch_addr: usize },
73 
74     /// Indicates that a sleeping worker has awoken.
75     ThreadAwoken { worker: usize, latch_addr: usize },
76 
77     /// Indicates that the given worker thread was notified it should
78     /// awaken.
79     ThreadNotify { worker: usize },
80 
81     /// The given worker has pushed a job to its local deque.
82     JobPushed { worker: usize },
83 
84     /// The given worker has popped a job from its local deque.
85     JobPopped { worker: usize },
86 
87     /// The given worker has stolen a job from the deque of another.
88     JobStolen { worker: usize, victim: usize },
89 
90     /// N jobs were injected into the global queue.
91     JobsInjected { count: usize },
92 
93     /// A job was removed from the global queue.
94     JobUninjected { worker: usize },
95 
96     /// A job was broadcasted to N threads.
97     JobBroadcast { count: usize },
98 
99     /// When announcing a job, this was the value of the counters we observed.
100     ///
101     /// No effect on thread state, just a debugging event.
102     JobThreadCounts {
103         worker: usize,
104         num_idle: u16,
105         num_sleepers: u16,
106     },
107 }
108 
109 /// Handle to the logging thread, if any. You can use this to deliver
110 /// logs. You can also clone it freely.
111 #[derive(Clone)]
112 pub(super) struct Logger {
113     sender: Option<Sender<Event>>,
114 }
115 
116 impl Logger {
new(num_workers: usize) -> Logger117     pub(super) fn new(num_workers: usize) -> Logger {
118         if !LOG_ENABLED {
119             return Self::disabled();
120         }
121 
122         // see the doc comment for the format
123         let env_log = match env::var("RAYON_LOG") {
124             Ok(s) => s,
125             Err(_) => return Self::disabled(),
126         };
127 
128         let (sender, receiver) = crossbeam_channel::unbounded();
129 
130         if let Some(filename) = env_log.strip_prefix("tail:") {
131             let filename = filename.to_string();
132             ::std::thread::spawn(move || {
133                 Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
134             });
135         } else if env_log == "all" {
136             ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
137         } else if let Some(filename) = env_log.strip_prefix("profile:") {
138             let filename = filename.to_string();
139             ::std::thread::spawn(move || {
140                 Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
141             });
142         } else {
143             panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
144         }
145 
146         Logger {
147             sender: Some(sender),
148         }
149     }
150 
disabled() -> Logger151     fn disabled() -> Logger {
152         Logger { sender: None }
153     }
154 
155     #[inline]
log(&self, event: impl FnOnce() -> Event)156     pub(super) fn log(&self, event: impl FnOnce() -> Event) {
157         if !LOG_ENABLED {
158             return;
159         }
160 
161         if let Some(sender) = &self.sender {
162             sender.send(event()).unwrap();
163         }
164     }
165 
profile_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )166     fn profile_logger_thread(
167         num_workers: usize,
168         log_filename: String,
169         capacity: usize,
170         receiver: Receiver<Event>,
171     ) {
172         let file = File::create(&log_filename)
173             .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
174 
175         let mut writer = BufWriter::new(file);
176         let mut events = Vec::with_capacity(capacity);
177         let mut state = SimulatorState::new(num_workers);
178         let timeout = std::time::Duration::from_secs(30);
179 
180         loop {
181             while let Ok(event) = receiver.recv_timeout(timeout) {
182                 if let Event::Flush = event {
183                     break;
184                 }
185 
186                 events.push(event);
187                 if events.len() == capacity {
188                     break;
189                 }
190             }
191 
192             for event in events.drain(..) {
193                 if state.simulate(&event) {
194                     state.dump(&mut writer, &event).unwrap();
195                 }
196             }
197 
198             writer.flush().unwrap();
199         }
200     }
201 
tail_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )202     fn tail_logger_thread(
203         num_workers: usize,
204         log_filename: String,
205         capacity: usize,
206         receiver: Receiver<Event>,
207     ) {
208         let file = File::create(&log_filename)
209             .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
210 
211         let mut writer = BufWriter::new(file);
212         let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
213         let mut state = SimulatorState::new(num_workers);
214         let timeout = std::time::Duration::from_secs(30);
215         let mut skipped = false;
216 
217         loop {
218             while let Ok(event) = receiver.recv_timeout(timeout) {
219                 if let Event::Flush = event {
220                     // We ignore Flush events in tail mode --
221                     // we're really just looking for
222                     // deadlocks.
223                     continue;
224                 } else {
225                     if events.len() == capacity {
226                         let event = events.pop_front().unwrap();
227                         state.simulate(&event);
228                         skipped = true;
229                     }
230 
231                     events.push_back(event);
232                 }
233             }
234 
235             if skipped {
236                 writeln!(writer, "...").unwrap();
237                 skipped = false;
238             }
239 
240             for event in events.drain(..) {
241                 // In tail mode, we dump *all* events out, whether or
242                 // not they were 'interesting' to the state machine.
243                 state.simulate(&event);
244                 state.dump(&mut writer, &event).unwrap();
245             }
246 
247             writer.flush().unwrap();
248         }
249     }
250 
all_logger_thread(num_workers: usize, receiver: Receiver<Event>)251     fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
252         let stderr = std::io::stderr();
253         let mut state = SimulatorState::new(num_workers);
254 
255         for event in receiver {
256             let mut writer = BufWriter::new(stderr.lock());
257             state.simulate(&event);
258             state.dump(&mut writer, &event).unwrap();
259             writer.flush().unwrap();
260         }
261     }
262 }
263 
264 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
265 enum State {
266     Working,
267     Idle,
268     Notified,
269     Sleeping,
270     Terminated,
271 }
272 
273 impl State {
letter(&self) -> char274     fn letter(&self) -> char {
275         match self {
276             State::Working => 'W',
277             State::Idle => 'I',
278             State::Notified => 'N',
279             State::Sleeping => 'S',
280             State::Terminated => 'T',
281         }
282     }
283 }
284 
285 struct SimulatorState {
286     local_queue_size: Vec<usize>,
287     thread_states: Vec<State>,
288     injector_size: usize,
289 }
290 
291 impl SimulatorState {
new(num_workers: usize) -> Self292     fn new(num_workers: usize) -> Self {
293         Self {
294             local_queue_size: (0..num_workers).map(|_| 0).collect(),
295             thread_states: (0..num_workers).map(|_| State::Working).collect(),
296             injector_size: 0,
297         }
298     }
299 
simulate(&mut self, event: &Event) -> bool300     fn simulate(&mut self, event: &Event) -> bool {
301         match *event {
302             Event::ThreadIdle { worker, .. } => {
303                 assert_eq!(self.thread_states[worker], State::Working);
304                 self.thread_states[worker] = State::Idle;
305                 true
306             }
307 
308             Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
309                 self.thread_states[worker] = State::Working;
310                 true
311             }
312 
313             Event::ThreadTerminate { worker, .. } => {
314                 self.thread_states[worker] = State::Terminated;
315                 true
316             }
317 
318             Event::ThreadSleeping { worker, .. } => {
319                 assert_eq!(self.thread_states[worker], State::Idle);
320                 self.thread_states[worker] = State::Sleeping;
321                 true
322             }
323 
324             Event::ThreadAwoken { worker, .. } => {
325                 assert_eq!(self.thread_states[worker], State::Notified);
326                 self.thread_states[worker] = State::Idle;
327                 true
328             }
329 
330             Event::JobPushed { worker } => {
331                 self.local_queue_size[worker] += 1;
332                 true
333             }
334 
335             Event::JobPopped { worker } => {
336                 self.local_queue_size[worker] -= 1;
337                 true
338             }
339 
340             Event::JobStolen { victim, .. } => {
341                 self.local_queue_size[victim] -= 1;
342                 true
343             }
344 
345             Event::JobsInjected { count } => {
346                 self.injector_size += count;
347                 true
348             }
349 
350             Event::JobUninjected { .. } => {
351                 self.injector_size -= 1;
352                 true
353             }
354 
355             Event::ThreadNotify { worker } => {
356                 // Currently, this log event occurs while holding the
357                 // thread lock, so we should *always* see it before
358                 // the worker awakens.
359                 assert_eq!(self.thread_states[worker], State::Sleeping);
360                 self.thread_states[worker] = State::Notified;
361                 true
362             }
363 
364             // remaining events are no-ops from pov of simulating the
365             // thread state
366             _ => false,
367         }
368     }
369 
dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()>370     fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
371         let num_idle_threads = self
372             .thread_states
373             .iter()
374             .filter(|s| **s == State::Idle)
375             .count();
376 
377         let num_sleeping_threads = self
378             .thread_states
379             .iter()
380             .filter(|s| **s == State::Sleeping)
381             .count();
382 
383         let num_notified_threads = self
384             .thread_states
385             .iter()
386             .filter(|s| **s == State::Notified)
387             .count();
388 
389         let num_pending_jobs: usize = self.local_queue_size.iter().sum();
390 
391         write!(w, "{:2},", num_idle_threads)?;
392         write!(w, "{:2},", num_sleeping_threads)?;
393         write!(w, "{:2},", num_notified_threads)?;
394         write!(w, "{:4},", num_pending_jobs)?;
395         write!(w, "{:4},", self.injector_size)?;
396 
397         let event_str = format!("{:?}", event);
398         write!(w, r#""{:60}","#, event_str)?;
399 
400         for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
401             write!(w, " T{:02},{}", i, state.letter(),)?;
402 
403             if *queue_size > 0 {
404                 write!(w, ",{:03},", queue_size)?;
405             } else {
406                 write!(w, ",   ,")?;
407             }
408         }
409 
410         writeln!(w)?;
411         Ok(())
412     }
413 }
414