1 //! Code that decides when workers should go to sleep. See README.md 2 //! for an overview. 3 4 use crate::latch::CoreLatch; 5 use crate::log::Event::*; 6 use crate::log::Logger; 7 use crossbeam_utils::CachePadded; 8 use std::sync::atomic::Ordering; 9 use std::sync::{Condvar, Mutex}; 10 use std::thread; 11 use std::usize; 12 13 mod counters; 14 pub(crate) use self::counters::THREADS_MAX; 15 use self::counters::{AtomicCounters, JobsEventCounter}; 16 17 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping 18 /// of workers. It has callbacks that are invoked periodically at significant events, 19 /// such as when workers are looping and looking for work, when latches are set, or when 20 /// jobs are published, and it either blocks threads or wakes them in response to these 21 /// events. See the [`README.md`] in this module for more details. 22 /// 23 /// [`README.md`] README.md 24 pub(super) struct Sleep { 25 logger: Logger, 26 27 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have 28 /// them block. 29 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, 30 31 counters: AtomicCounters, 32 } 33 34 /// An instance of this struct is created when a thread becomes idle. 35 /// It is consumed when the thread finds work, and passed by `&mut` 36 /// reference for operations that preserve the idle state. (In other 37 /// words, producing one of these structs is evidence the thread is 38 /// idle.) It tracks state such as how long the thread has been idle. 39 pub(super) struct IdleState { 40 /// What is worker index of the idle thread? 41 worker_index: usize, 42 43 /// How many rounds have we been circling without sleeping? 44 rounds: u32, 45 46 /// Once we become sleepy, what was the sleepy counter value? 47 /// Set to `INVALID_SLEEPY_COUNTER` otherwise. 48 jobs_counter: JobsEventCounter, 49 } 50 51 /// The "sleep state" for an individual worker. 52 #[derive(Default)] 53 struct WorkerSleepState { 54 /// Set to true when the worker goes to sleep; set to false when 55 /// the worker is notified or when it wakes. 56 is_blocked: Mutex<bool>, 57 58 condvar: Condvar, 59 } 60 61 const ROUNDS_UNTIL_SLEEPY: u32 = 32; 62 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; 63 64 impl Sleep { new(logger: Logger, n_threads: usize) -> Sleep65 pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { 66 assert!(n_threads <= THREADS_MAX); 67 Sleep { 68 logger, 69 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), 70 counters: AtomicCounters::new(), 71 } 72 } 73 74 #[inline] start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState75 pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { 76 self.logger.log(|| ThreadIdle { 77 worker: worker_index, 78 latch_addr: latch.addr(), 79 }); 80 81 self.counters.add_inactive_thread(); 82 83 IdleState { 84 worker_index, 85 rounds: 0, 86 jobs_counter: JobsEventCounter::DUMMY, 87 } 88 } 89 90 #[inline] work_found(&self, idle_state: IdleState)91 pub(super) fn work_found(&self, idle_state: IdleState) { 92 self.logger.log(|| ThreadFoundWork { 93 worker: idle_state.worker_index, 94 yields: idle_state.rounds, 95 }); 96 97 // If we were the last idle thread and other threads are still sleeping, 98 // then we should wake up another thread. 99 let threads_to_wake = self.counters.sub_inactive_thread(); 100 self.wake_any_threads(threads_to_wake as u32); 101 } 102 103 #[inline] no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )104 pub(super) fn no_work_found( 105 &self, 106 idle_state: &mut IdleState, 107 latch: &CoreLatch, 108 has_injected_jobs: impl FnOnce() -> bool, 109 ) { 110 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { 111 thread::yield_now(); 112 idle_state.rounds += 1; 113 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { 114 idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); 115 idle_state.rounds += 1; 116 thread::yield_now(); 117 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { 118 idle_state.rounds += 1; 119 thread::yield_now(); 120 } else { 121 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); 122 self.sleep(idle_state, latch, has_injected_jobs); 123 } 124 } 125 126 #[cold] announce_sleepy(&self, worker_index: usize) -> JobsEventCounter127 fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { 128 let counters = self 129 .counters 130 .increment_jobs_event_counter_if(JobsEventCounter::is_active); 131 let jobs_counter = counters.jobs_counter(); 132 self.logger.log(|| ThreadSleepy { 133 worker: worker_index, 134 jobs_counter: jobs_counter.as_usize(), 135 }); 136 jobs_counter 137 } 138 139 #[cold] sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )140 fn sleep( 141 &self, 142 idle_state: &mut IdleState, 143 latch: &CoreLatch, 144 has_injected_jobs: impl FnOnce() -> bool, 145 ) { 146 let worker_index = idle_state.worker_index; 147 148 if !latch.get_sleepy() { 149 self.logger.log(|| ThreadSleepInterruptedByLatch { 150 worker: worker_index, 151 latch_addr: latch.addr(), 152 }); 153 154 return; 155 } 156 157 let sleep_state = &self.worker_sleep_states[worker_index]; 158 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 159 debug_assert!(!*is_blocked); 160 161 // Our latch was signalled. We should wake back up fully as we 162 // will have some stuff to do. 163 if !latch.fall_asleep() { 164 self.logger.log(|| ThreadSleepInterruptedByLatch { 165 worker: worker_index, 166 latch_addr: latch.addr(), 167 }); 168 169 idle_state.wake_fully(); 170 return; 171 } 172 173 loop { 174 let counters = self.counters.load(Ordering::SeqCst); 175 176 // Check if the JEC has changed since we got sleepy. 177 debug_assert!(idle_state.jobs_counter.is_sleepy()); 178 if counters.jobs_counter() != idle_state.jobs_counter { 179 // JEC has changed, so a new job was posted, but for some reason 180 // we didn't see it. We should return to just before the SLEEPY 181 // state so we can do another search and (if we fail to find 182 // work) go back to sleep. 183 self.logger.log(|| ThreadSleepInterruptedByJob { 184 worker: worker_index, 185 }); 186 187 idle_state.wake_partly(); 188 latch.wake_up(); 189 return; 190 } 191 192 // Otherwise, let's move from IDLE to SLEEPING. 193 if self.counters.try_add_sleeping_thread(counters) { 194 break; 195 } 196 } 197 198 // Successfully registered as asleep. 199 200 self.logger.log(|| ThreadSleeping { 201 worker: worker_index, 202 latch_addr: latch.addr(), 203 }); 204 205 // We have one last check for injected jobs to do. This protects against 206 // deadlock in the very unlikely event that 207 // 208 // - an external job is being injected while we are sleepy 209 // - that job triggers the rollover over the JEC such that we don't see it 210 // - we are the last active worker thread 211 std::sync::atomic::fence(Ordering::SeqCst); 212 if has_injected_jobs() { 213 // If we see an externally injected job, then we have to 'wake 214 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by 215 // the one that wakes us.) 216 self.counters.sub_sleeping_thread(); 217 } else { 218 // If we don't see an injected job (the normal case), then flag 219 // ourselves as asleep and wait till we are notified. 220 // 221 // (Note that `is_blocked` is held under a mutex and the mutex was 222 // acquired *before* we incremented the "sleepy counter". This means 223 // that whomever is coming to wake us will have to wait until we 224 // release the mutex in the call to `wait`, so they will see this 225 // boolean as true.) 226 *is_blocked = true; 227 while *is_blocked { 228 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); 229 } 230 } 231 232 // Update other state: 233 idle_state.wake_fully(); 234 latch.wake_up(); 235 236 self.logger.log(|| ThreadAwoken { 237 worker: worker_index, 238 latch_addr: latch.addr(), 239 }); 240 } 241 242 /// Notify the given thread that it should wake up (if it is 243 /// sleeping). When this method is invoked, we typically know the 244 /// thread is asleep, though in rare cases it could have been 245 /// awoken by (e.g.) new work having been posted. notify_worker_latch_is_set(&self, target_worker_index: usize)246 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { 247 self.wake_specific_thread(target_worker_index); 248 } 249 250 /// Signals that `num_jobs` new jobs were injected into the thread 251 /// pool from outside. This function will ensure that there are 252 /// threads available to process them, waking threads from sleep 253 /// if necessary. 254 /// 255 /// # Parameters 256 /// 257 /// - `source_worker_index` -- index of the thread that did the 258 /// push, or `usize::MAX` if this came from outside the thread 259 /// pool -- it is used only for logging. 260 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 261 /// We'll try to get at least one thread per job. 262 #[inline] new_injected_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )263 pub(super) fn new_injected_jobs( 264 &self, 265 source_worker_index: usize, 266 num_jobs: u32, 267 queue_was_empty: bool, 268 ) { 269 // This fence is needed to guarantee that threads 270 // as they are about to fall asleep, observe any 271 // new jobs that may have been injected. 272 std::sync::atomic::fence(Ordering::SeqCst); 273 274 self.new_jobs(source_worker_index, num_jobs, queue_was_empty) 275 } 276 277 /// Signals that `num_jobs` new jobs were pushed onto a thread's 278 /// local deque. This function will try to ensure that there are 279 /// threads available to process them, waking threads from sleep 280 /// if necessary. However, this is not guaranteed: under certain 281 /// race conditions, the function may fail to wake any new 282 /// threads; in that case the existing thread should eventually 283 /// pop the job. 284 /// 285 /// # Parameters 286 /// 287 /// - `source_worker_index` -- index of the thread that did the 288 /// push, or `usize::MAX` if this came from outside the thread 289 /// pool -- it is used only for logging. 290 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 291 /// We'll try to get at least one thread per job. 292 #[inline] new_internal_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )293 pub(super) fn new_internal_jobs( 294 &self, 295 source_worker_index: usize, 296 num_jobs: u32, 297 queue_was_empty: bool, 298 ) { 299 self.new_jobs(source_worker_index, num_jobs, queue_was_empty) 300 } 301 302 /// Common helper for `new_injected_jobs` and `new_internal_jobs`. 303 #[inline] new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool)304 fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { 305 // Read the counters and -- if sleepy workers have announced themselves 306 // -- announce that there is now work available. The final value of `counters` 307 // with which we exit the loop thus corresponds to a state when 308 let counters = self 309 .counters 310 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); 311 let num_awake_but_idle = counters.awake_but_idle_threads(); 312 let num_sleepers = counters.sleeping_threads(); 313 314 self.logger.log(|| JobThreadCounts { 315 worker: source_worker_index, 316 num_idle: num_awake_but_idle as u16, 317 num_sleepers: num_sleepers as u16, 318 }); 319 320 if num_sleepers == 0 { 321 // nobody to wake 322 return; 323 } 324 325 // Promote from u16 to u32 so we can interoperate with 326 // num_jobs more easily. 327 let num_awake_but_idle = num_awake_but_idle as u32; 328 let num_sleepers = num_sleepers as u32; 329 330 // If the queue is non-empty, then we always wake up a worker 331 // -- clearly the existing idle jobs aren't enough. Otherwise, 332 // check to see if we have enough idle workers. 333 if !queue_was_empty { 334 let num_to_wake = std::cmp::min(num_jobs, num_sleepers); 335 self.wake_any_threads(num_to_wake); 336 } else if num_awake_but_idle < num_jobs { 337 let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); 338 self.wake_any_threads(num_to_wake); 339 } 340 } 341 342 #[cold] wake_any_threads(&self, mut num_to_wake: u32)343 fn wake_any_threads(&self, mut num_to_wake: u32) { 344 if num_to_wake > 0 { 345 for i in 0..self.worker_sleep_states.len() { 346 if self.wake_specific_thread(i) { 347 num_to_wake -= 1; 348 if num_to_wake == 0 { 349 return; 350 } 351 } 352 } 353 } 354 } 355 wake_specific_thread(&self, index: usize) -> bool356 fn wake_specific_thread(&self, index: usize) -> bool { 357 let sleep_state = &self.worker_sleep_states[index]; 358 359 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 360 if *is_blocked { 361 *is_blocked = false; 362 sleep_state.condvar.notify_one(); 363 364 // When the thread went to sleep, it will have incremented 365 // this value. When we wake it, its our job to decrement 366 // it. We could have the thread do it, but that would 367 // introduce a delay between when the thread was 368 // *notified* and when this counter was decremented. That 369 // might mislead people with new work into thinking that 370 // there are sleeping threads that they should try to 371 // wake, when in fact there is nothing left for them to 372 // do. 373 self.counters.sub_sleeping_thread(); 374 375 self.logger.log(|| ThreadNotify { worker: index }); 376 377 true 378 } else { 379 false 380 } 381 } 382 } 383 384 impl IdleState { wake_fully(&mut self)385 fn wake_fully(&mut self) { 386 self.rounds = 0; 387 self.jobs_counter = JobsEventCounter::DUMMY; 388 } 389 wake_partly(&mut self)390 fn wake_partly(&mut self) { 391 self.rounds = ROUNDS_UNTIL_SLEEPY; 392 self.jobs_counter = JobsEventCounter::DUMMY; 393 } 394 } 395