1 //! Code that decides when workers should go to sleep. See README.md 2 //! for an overview. 3 4 use crate::latch::CoreLatch; 5 use crate::log::Event::*; 6 use crate::log::Logger; 7 use crossbeam_utils::CachePadded; 8 use std::sync::atomic::Ordering; 9 use std::sync::{Condvar, Mutex}; 10 use std::thread; 11 use std::usize; 12 13 mod counters; 14 use self::counters::{AtomicCounters, JobsEventCounter}; 15 16 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping 17 /// of workers. It has callbacks that are invoked periodically at significant events, 18 /// such as when workers are looping and looking for work, when latches are set, or when 19 /// jobs are published, and it either blocks threads or wakes them in response to these 20 /// events. See the [`README.md`] in this module for more details. 21 /// 22 /// [`README.md`] README.md 23 pub(super) struct Sleep { 24 logger: Logger, 25 26 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have 27 /// them block. 28 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, 29 30 counters: AtomicCounters, 31 } 32 33 /// An instance of this struct is created when a thread becomes idle. 34 /// It is consumed when the thread finds work, and passed by `&mut` 35 /// reference for operations that preserve the idle state. (In other 36 /// words, producing one of these structs is evidence the thread is 37 /// idle.) It tracks state such as how long the thread has been idle. 38 pub(super) struct IdleState { 39 /// What is worker index of the idle thread? 40 worker_index: usize, 41 42 /// How many rounds have we been circling without sleeping? 43 rounds: u32, 44 45 /// Once we become sleepy, what was the sleepy counter value? 46 /// Set to `INVALID_SLEEPY_COUNTER` otherwise. 47 jobs_counter: JobsEventCounter, 48 } 49 50 /// The "sleep state" for an individual worker. 51 #[derive(Default)] 52 struct WorkerSleepState { 53 /// Set to true when the worker goes to sleep; set to false when 54 /// the worker is notified or when it wakes. 55 is_blocked: Mutex<bool>, 56 57 condvar: Condvar, 58 } 59 60 const ROUNDS_UNTIL_SLEEPY: u32 = 32; 61 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; 62 63 impl Sleep { new(logger: Logger, n_threads: usize) -> Sleep64 pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { 65 Sleep { 66 logger, 67 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), 68 counters: AtomicCounters::new(), 69 } 70 } 71 72 #[inline] start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState73 pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { 74 self.logger.log(|| ThreadIdle { 75 worker: worker_index, 76 latch_addr: latch.addr(), 77 }); 78 79 self.counters.add_inactive_thread(); 80 81 IdleState { 82 worker_index, 83 rounds: 0, 84 jobs_counter: JobsEventCounter::DUMMY, 85 } 86 } 87 88 #[inline] work_found(&self, idle_state: IdleState)89 pub(super) fn work_found(&self, idle_state: IdleState) { 90 self.logger.log(|| ThreadFoundWork { 91 worker: idle_state.worker_index, 92 yields: idle_state.rounds, 93 }); 94 95 // If we were the last idle thread and other threads are still sleeping, 96 // then we should wake up another thread. 97 let threads_to_wake = self.counters.sub_inactive_thread(); 98 self.wake_any_threads(threads_to_wake as u32); 99 } 100 101 #[inline] no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )102 pub(super) fn no_work_found( 103 &self, 104 idle_state: &mut IdleState, 105 latch: &CoreLatch, 106 has_injected_jobs: impl FnOnce() -> bool, 107 ) { 108 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { 109 thread::yield_now(); 110 idle_state.rounds += 1; 111 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { 112 idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); 113 idle_state.rounds += 1; 114 thread::yield_now(); 115 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { 116 idle_state.rounds += 1; 117 thread::yield_now(); 118 } else { 119 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); 120 self.sleep(idle_state, latch, has_injected_jobs); 121 } 122 } 123 124 #[cold] announce_sleepy(&self, worker_index: usize) -> JobsEventCounter125 fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { 126 let counters = self 127 .counters 128 .increment_jobs_event_counter_if(JobsEventCounter::is_active); 129 let jobs_counter = counters.jobs_counter(); 130 self.logger.log(|| ThreadSleepy { 131 worker: worker_index, 132 jobs_counter: jobs_counter.as_usize(), 133 }); 134 jobs_counter 135 } 136 137 #[cold] sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )138 fn sleep( 139 &self, 140 idle_state: &mut IdleState, 141 latch: &CoreLatch, 142 has_injected_jobs: impl FnOnce() -> bool, 143 ) { 144 let worker_index = idle_state.worker_index; 145 146 if !latch.get_sleepy() { 147 self.logger.log(|| ThreadSleepInterruptedByLatch { 148 worker: worker_index, 149 latch_addr: latch.addr(), 150 }); 151 152 return; 153 } 154 155 let sleep_state = &self.worker_sleep_states[worker_index]; 156 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 157 debug_assert!(!*is_blocked); 158 159 // Our latch was signalled. We should wake back up fully as we 160 // wil have some stuff to do. 161 if !latch.fall_asleep() { 162 self.logger.log(|| ThreadSleepInterruptedByLatch { 163 worker: worker_index, 164 latch_addr: latch.addr(), 165 }); 166 167 idle_state.wake_fully(); 168 return; 169 } 170 171 loop { 172 let counters = self.counters.load(Ordering::SeqCst); 173 174 // Check if the JEC has changed since we got sleepy. 175 debug_assert!(idle_state.jobs_counter.is_sleepy()); 176 if counters.jobs_counter() != idle_state.jobs_counter { 177 // JEC has changed, so a new job was posted, but for some reason 178 // we didn't see it. We should return to just before the SLEEPY 179 // state so we can do another search and (if we fail to find 180 // work) go back to sleep. 181 self.logger.log(|| ThreadSleepInterruptedByJob { 182 worker: worker_index, 183 }); 184 185 idle_state.wake_partly(); 186 latch.wake_up(); 187 return; 188 } 189 190 // Otherwise, let's move from IDLE to SLEEPING. 191 if self.counters.try_add_sleeping_thread(counters) { 192 break; 193 } 194 } 195 196 // Successfully registered as asleep. 197 198 self.logger.log(|| ThreadSleeping { 199 worker: worker_index, 200 latch_addr: latch.addr(), 201 }); 202 203 // We have one last check for injected jobs to do. This protects against 204 // deadlock in the very unlikely event that 205 // 206 // - an external job is being injected while we are sleepy 207 // - that job triggers the rollover over the JEC such that we don't see it 208 // - we are the last active worker thread 209 std::sync::atomic::fence(Ordering::SeqCst); 210 if has_injected_jobs() { 211 // If we see an externally injected job, then we have to 'wake 212 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by 213 // the one that wakes us.) 214 self.counters.sub_sleeping_thread(); 215 } else { 216 // If we don't see an injected job (the normal case), then flag 217 // ourselves as asleep and wait till we are notified. 218 // 219 // (Note that `is_blocked` is held under a mutex and the mutex was 220 // acquired *before* we incremented the "sleepy counter". This means 221 // that whomever is coming to wake us will have to wait until we 222 // release the mutex in the call to `wait`, so they will see this 223 // boolean as true.) 224 *is_blocked = true; 225 while *is_blocked { 226 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); 227 } 228 } 229 230 // Update other state: 231 idle_state.wake_fully(); 232 latch.wake_up(); 233 234 self.logger.log(|| ThreadAwoken { 235 worker: worker_index, 236 latch_addr: latch.addr(), 237 }); 238 } 239 240 /// Notify the given thread that it should wake up (if it is 241 /// sleeping). When this method is invoked, we typically know the 242 /// thread is asleep, though in rare cases it could have been 243 /// awoken by (e.g.) new work having been posted. notify_worker_latch_is_set(&self, target_worker_index: usize)244 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { 245 self.wake_specific_thread(target_worker_index); 246 } 247 248 /// Signals that `num_jobs` new jobs were injected into the thread 249 /// pool from outside. This function will ensure that there are 250 /// threads available to process them, waking threads from sleep 251 /// if necessary. 252 /// 253 /// # Parameters 254 /// 255 /// - `source_worker_index` -- index of the thread that did the 256 /// push, or `usize::MAX` if this came from outside the thread 257 /// pool -- it is used only for logging. 258 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 259 /// We'll try to get at least one thread per job. 260 #[inline] new_injected_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )261 pub(super) fn new_injected_jobs( 262 &self, 263 source_worker_index: usize, 264 num_jobs: u32, 265 queue_was_empty: bool, 266 ) { 267 // This fence is needed to guarantee that threads 268 // as they are about to fall asleep, observe any 269 // new jobs that may have been injected. 270 std::sync::atomic::fence(Ordering::SeqCst); 271 272 self.new_jobs(source_worker_index, num_jobs, queue_was_empty) 273 } 274 275 /// Signals that `num_jobs` new jobs were pushed onto a thread's 276 /// local deque. This function will try to ensure that there are 277 /// threads available to process them, waking threads from sleep 278 /// if necessary. However, this is not guaranteed: under certain 279 /// race conditions, the function may fail to wake any new 280 /// threads; in that case the existing thread should eventually 281 /// pop the job. 282 /// 283 /// # Parameters 284 /// 285 /// - `source_worker_index` -- index of the thread that did the 286 /// push, or `usize::MAX` if this came from outside the thread 287 /// pool -- it is used only for logging. 288 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 289 /// We'll try to get at least one thread per job. 290 #[inline] new_internal_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )291 pub(super) fn new_internal_jobs( 292 &self, 293 source_worker_index: usize, 294 num_jobs: u32, 295 queue_was_empty: bool, 296 ) { 297 self.new_jobs(source_worker_index, num_jobs, queue_was_empty) 298 } 299 300 /// Common helper for `new_injected_jobs` and `new_internal_jobs`. 301 #[inline] new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool)302 fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { 303 // Read the counters and -- if sleepy workers have announced themselves 304 // -- announce that there is now work available. The final value of `counters` 305 // with which we exit the loop thus corresponds to a state when 306 let counters = self 307 .counters 308 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); 309 let num_awake_but_idle = counters.awake_but_idle_threads(); 310 let num_sleepers = counters.sleeping_threads(); 311 312 self.logger.log(|| JobThreadCounts { 313 worker: source_worker_index, 314 num_idle: num_awake_but_idle as u16, 315 num_sleepers: num_sleepers as u16, 316 }); 317 318 if num_sleepers == 0 { 319 // nobody to wake 320 return; 321 } 322 323 // Promote from u16 to u32 so we can interoperate with 324 // num_jobs more easily. 325 let num_awake_but_idle = num_awake_but_idle as u32; 326 let num_sleepers = num_sleepers as u32; 327 328 // If the queue is non-empty, then we always wake up a worker 329 // -- clearly the existing idle jobs aren't enough. Otherwise, 330 // check to see if we have enough idle workers. 331 if !queue_was_empty { 332 let num_to_wake = std::cmp::min(num_jobs, num_sleepers); 333 self.wake_any_threads(num_to_wake); 334 } else if num_awake_but_idle < num_jobs { 335 let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); 336 self.wake_any_threads(num_to_wake); 337 } 338 } 339 340 #[cold] wake_any_threads(&self, mut num_to_wake: u32)341 fn wake_any_threads(&self, mut num_to_wake: u32) { 342 if num_to_wake > 0 { 343 for i in 0..self.worker_sleep_states.len() { 344 if self.wake_specific_thread(i) { 345 num_to_wake -= 1; 346 if num_to_wake == 0 { 347 return; 348 } 349 } 350 } 351 } 352 } 353 wake_specific_thread(&self, index: usize) -> bool354 fn wake_specific_thread(&self, index: usize) -> bool { 355 let sleep_state = &self.worker_sleep_states[index]; 356 357 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 358 if *is_blocked { 359 *is_blocked = false; 360 sleep_state.condvar.notify_one(); 361 362 // When the thread went to sleep, it will have incremented 363 // this value. When we wake it, its our job to decrement 364 // it. We could have the thread do it, but that would 365 // introduce a delay between when the thread was 366 // *notified* and when this counter was decremented. That 367 // might mislead people with new work into thinking that 368 // there are sleeping threads that they should try to 369 // wake, when in fact there is nothing left for them to 370 // do. 371 self.counters.sub_sleeping_thread(); 372 373 self.logger.log(|| ThreadNotify { worker: index }); 374 375 true 376 } else { 377 false 378 } 379 } 380 } 381 382 impl IdleState { wake_fully(&mut self)383 fn wake_fully(&mut self) { 384 self.rounds = 0; 385 self.jobs_counter = JobsEventCounter::DUMMY; 386 } 387 wake_partly(&mut self)388 fn wake_partly(&mut self) { 389 self.rounds = ROUNDS_UNTIL_SLEEPY; 390 self.jobs_counter = JobsEventCounter::DUMMY; 391 } 392 } 393