• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Code that decides when workers should go to sleep. See README.md
2 //! for an overview.
3 
4 use crate::latch::CoreLatch;
5 use crate::log::Event::*;
6 use crate::log::Logger;
7 use crossbeam_utils::CachePadded;
8 use std::sync::atomic::Ordering;
9 use std::sync::{Condvar, Mutex};
10 use std::thread;
11 use std::usize;
12 
13 mod counters;
14 pub(crate) use self::counters::THREADS_MAX;
15 use self::counters::{AtomicCounters, JobsEventCounter};
16 
17 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
18 /// of workers. It has callbacks that are invoked periodically at significant events,
19 /// such as when workers are looping and looking for work, when latches are set, or when
20 /// jobs are published, and it either blocks threads or wakes them in response to these
21 /// events. See the [`README.md`] in this module for more details.
22 ///
23 /// [`README.md`] README.md
24 pub(super) struct Sleep {
25     logger: Logger,
26 
27     /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
28     /// them block.
29     worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
30 
31     counters: AtomicCounters,
32 }
33 
34 /// An instance of this struct is created when a thread becomes idle.
35 /// It is consumed when the thread finds work, and passed by `&mut`
36 /// reference for operations that preserve the idle state. (In other
37 /// words, producing one of these structs is evidence the thread is
38 /// idle.) It tracks state such as how long the thread has been idle.
39 pub(super) struct IdleState {
40     /// What is worker index of the idle thread?
41     worker_index: usize,
42 
43     /// How many rounds have we been circling without sleeping?
44     rounds: u32,
45 
46     /// Once we become sleepy, what was the sleepy counter value?
47     /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
48     jobs_counter: JobsEventCounter,
49 }
50 
51 /// The "sleep state" for an individual worker.
52 #[derive(Default)]
53 struct WorkerSleepState {
54     /// Set to true when the worker goes to sleep; set to false when
55     /// the worker is notified or when it wakes.
56     is_blocked: Mutex<bool>,
57 
58     condvar: Condvar,
59 }
60 
61 const ROUNDS_UNTIL_SLEEPY: u32 = 32;
62 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
63 
64 impl Sleep {
new(logger: Logger, n_threads: usize) -> Sleep65     pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
66         assert!(n_threads <= THREADS_MAX);
67         Sleep {
68             logger,
69             worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
70             counters: AtomicCounters::new(),
71         }
72     }
73 
74     #[inline]
start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState75     pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
76         self.logger.log(|| ThreadIdle {
77             worker: worker_index,
78             latch_addr: latch.addr(),
79         });
80 
81         self.counters.add_inactive_thread();
82 
83         IdleState {
84             worker_index,
85             rounds: 0,
86             jobs_counter: JobsEventCounter::DUMMY,
87         }
88     }
89 
90     #[inline]
work_found(&self, idle_state: IdleState)91     pub(super) fn work_found(&self, idle_state: IdleState) {
92         self.logger.log(|| ThreadFoundWork {
93             worker: idle_state.worker_index,
94             yields: idle_state.rounds,
95         });
96 
97         // If we were the last idle thread and other threads are still sleeping,
98         // then we should wake up another thread.
99         let threads_to_wake = self.counters.sub_inactive_thread();
100         self.wake_any_threads(threads_to_wake as u32);
101     }
102 
103     #[inline]
no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )104     pub(super) fn no_work_found(
105         &self,
106         idle_state: &mut IdleState,
107         latch: &CoreLatch,
108         has_injected_jobs: impl FnOnce() -> bool,
109     ) {
110         if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
111             thread::yield_now();
112             idle_state.rounds += 1;
113         } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
114             idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
115             idle_state.rounds += 1;
116             thread::yield_now();
117         } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
118             idle_state.rounds += 1;
119             thread::yield_now();
120         } else {
121             debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
122             self.sleep(idle_state, latch, has_injected_jobs);
123         }
124     }
125 
126     #[cold]
announce_sleepy(&self, worker_index: usize) -> JobsEventCounter127     fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
128         let counters = self
129             .counters
130             .increment_jobs_event_counter_if(JobsEventCounter::is_active);
131         let jobs_counter = counters.jobs_counter();
132         self.logger.log(|| ThreadSleepy {
133             worker: worker_index,
134             jobs_counter: jobs_counter.as_usize(),
135         });
136         jobs_counter
137     }
138 
139     #[cold]
sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )140     fn sleep(
141         &self,
142         idle_state: &mut IdleState,
143         latch: &CoreLatch,
144         has_injected_jobs: impl FnOnce() -> bool,
145     ) {
146         let worker_index = idle_state.worker_index;
147 
148         if !latch.get_sleepy() {
149             self.logger.log(|| ThreadSleepInterruptedByLatch {
150                 worker: worker_index,
151                 latch_addr: latch.addr(),
152             });
153 
154             return;
155         }
156 
157         let sleep_state = &self.worker_sleep_states[worker_index];
158         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
159         debug_assert!(!*is_blocked);
160 
161         // Our latch was signalled. We should wake back up fully as we
162         // will have some stuff to do.
163         if !latch.fall_asleep() {
164             self.logger.log(|| ThreadSleepInterruptedByLatch {
165                 worker: worker_index,
166                 latch_addr: latch.addr(),
167             });
168 
169             idle_state.wake_fully();
170             return;
171         }
172 
173         loop {
174             let counters = self.counters.load(Ordering::SeqCst);
175 
176             // Check if the JEC has changed since we got sleepy.
177             debug_assert!(idle_state.jobs_counter.is_sleepy());
178             if counters.jobs_counter() != idle_state.jobs_counter {
179                 // JEC has changed, so a new job was posted, but for some reason
180                 // we didn't see it. We should return to just before the SLEEPY
181                 // state so we can do another search and (if we fail to find
182                 // work) go back to sleep.
183                 self.logger.log(|| ThreadSleepInterruptedByJob {
184                     worker: worker_index,
185                 });
186 
187                 idle_state.wake_partly();
188                 latch.wake_up();
189                 return;
190             }
191 
192             // Otherwise, let's move from IDLE to SLEEPING.
193             if self.counters.try_add_sleeping_thread(counters) {
194                 break;
195             }
196         }
197 
198         // Successfully registered as asleep.
199 
200         self.logger.log(|| ThreadSleeping {
201             worker: worker_index,
202             latch_addr: latch.addr(),
203         });
204 
205         // We have one last check for injected jobs to do. This protects against
206         // deadlock in the very unlikely event that
207         //
208         // - an external job is being injected while we are sleepy
209         // - that job triggers the rollover over the JEC such that we don't see it
210         // - we are the last active worker thread
211         std::sync::atomic::fence(Ordering::SeqCst);
212         if has_injected_jobs() {
213             // If we see an externally injected job, then we have to 'wake
214             // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
215             // the one that wakes us.)
216             self.counters.sub_sleeping_thread();
217         } else {
218             // If we don't see an injected job (the normal case), then flag
219             // ourselves as asleep and wait till we are notified.
220             //
221             // (Note that `is_blocked` is held under a mutex and the mutex was
222             // acquired *before* we incremented the "sleepy counter". This means
223             // that whomever is coming to wake us will have to wait until we
224             // release the mutex in the call to `wait`, so they will see this
225             // boolean as true.)
226             *is_blocked = true;
227             while *is_blocked {
228                 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
229             }
230         }
231 
232         // Update other state:
233         idle_state.wake_fully();
234         latch.wake_up();
235 
236         self.logger.log(|| ThreadAwoken {
237             worker: worker_index,
238             latch_addr: latch.addr(),
239         });
240     }
241 
242     /// Notify the given thread that it should wake up (if it is
243     /// sleeping).  When this method is invoked, we typically know the
244     /// thread is asleep, though in rare cases it could have been
245     /// awoken by (e.g.) new work having been posted.
notify_worker_latch_is_set(&self, target_worker_index: usize)246     pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
247         self.wake_specific_thread(target_worker_index);
248     }
249 
250     /// Signals that `num_jobs` new jobs were injected into the thread
251     /// pool from outside. This function will ensure that there are
252     /// threads available to process them, waking threads from sleep
253     /// if necessary.
254     ///
255     /// # Parameters
256     ///
257     /// - `source_worker_index` -- index of the thread that did the
258     ///   push, or `usize::MAX` if this came from outside the thread
259     ///   pool -- it is used only for logging.
260     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
261     ///   We'll try to get at least one thread per job.
262     #[inline]
new_injected_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )263     pub(super) fn new_injected_jobs(
264         &self,
265         source_worker_index: usize,
266         num_jobs: u32,
267         queue_was_empty: bool,
268     ) {
269         // This fence is needed to guarantee that threads
270         // as they are about to fall asleep, observe any
271         // new jobs that may have been injected.
272         std::sync::atomic::fence(Ordering::SeqCst);
273 
274         self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
275     }
276 
277     /// Signals that `num_jobs` new jobs were pushed onto a thread's
278     /// local deque. This function will try to ensure that there are
279     /// threads available to process them, waking threads from sleep
280     /// if necessary. However, this is not guaranteed: under certain
281     /// race conditions, the function may fail to wake any new
282     /// threads; in that case the existing thread should eventually
283     /// pop the job.
284     ///
285     /// # Parameters
286     ///
287     /// - `source_worker_index` -- index of the thread that did the
288     ///   push, or `usize::MAX` if this came from outside the thread
289     ///   pool -- it is used only for logging.
290     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
291     ///   We'll try to get at least one thread per job.
292     #[inline]
new_internal_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )293     pub(super) fn new_internal_jobs(
294         &self,
295         source_worker_index: usize,
296         num_jobs: u32,
297         queue_was_empty: bool,
298     ) {
299         self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
300     }
301 
302     /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
303     #[inline]
new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool)304     fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
305         // Read the counters and -- if sleepy workers have announced themselves
306         // -- announce that there is now work available. The final value of `counters`
307         // with which we exit the loop thus corresponds to a state when
308         let counters = self
309             .counters
310             .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
311         let num_awake_but_idle = counters.awake_but_idle_threads();
312         let num_sleepers = counters.sleeping_threads();
313 
314         self.logger.log(|| JobThreadCounts {
315             worker: source_worker_index,
316             num_idle: num_awake_but_idle as u16,
317             num_sleepers: num_sleepers as u16,
318         });
319 
320         if num_sleepers == 0 {
321             // nobody to wake
322             return;
323         }
324 
325         // Promote from u16 to u32 so we can interoperate with
326         // num_jobs more easily.
327         let num_awake_but_idle = num_awake_but_idle as u32;
328         let num_sleepers = num_sleepers as u32;
329 
330         // If the queue is non-empty, then we always wake up a worker
331         // -- clearly the existing idle jobs aren't enough. Otherwise,
332         // check to see if we have enough idle workers.
333         if !queue_was_empty {
334             let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
335             self.wake_any_threads(num_to_wake);
336         } else if num_awake_but_idle < num_jobs {
337             let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
338             self.wake_any_threads(num_to_wake);
339         }
340     }
341 
342     #[cold]
wake_any_threads(&self, mut num_to_wake: u32)343     fn wake_any_threads(&self, mut num_to_wake: u32) {
344         if num_to_wake > 0 {
345             for i in 0..self.worker_sleep_states.len() {
346                 if self.wake_specific_thread(i) {
347                     num_to_wake -= 1;
348                     if num_to_wake == 0 {
349                         return;
350                     }
351                 }
352             }
353         }
354     }
355 
wake_specific_thread(&self, index: usize) -> bool356     fn wake_specific_thread(&self, index: usize) -> bool {
357         let sleep_state = &self.worker_sleep_states[index];
358 
359         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
360         if *is_blocked {
361             *is_blocked = false;
362             sleep_state.condvar.notify_one();
363 
364             // When the thread went to sleep, it will have incremented
365             // this value. When we wake it, its our job to decrement
366             // it. We could have the thread do it, but that would
367             // introduce a delay between when the thread was
368             // *notified* and when this counter was decremented. That
369             // might mislead people with new work into thinking that
370             // there are sleeping threads that they should try to
371             // wake, when in fact there is nothing left for them to
372             // do.
373             self.counters.sub_sleeping_thread();
374 
375             self.logger.log(|| ThreadNotify { worker: index });
376 
377             true
378         } else {
379             false
380         }
381     }
382 }
383 
384 impl IdleState {
wake_fully(&mut self)385     fn wake_fully(&mut self) {
386         self.rounds = 0;
387         self.jobs_counter = JobsEventCounter::DUMMY;
388     }
389 
wake_partly(&mut self)390     fn wake_partly(&mut self) {
391         self.rounds = ROUNDS_UNTIL_SLEEPY;
392         self.jobs_counter = JobsEventCounter::DUMMY;
393     }
394 }
395