1 use crate::job::{JobFifo, JobRef, StackJob};
2 use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
3 use crate::log::Event::*;
4 use crate::log::Logger;
5 use crate::sleep::Sleep;
6 use crate::unwind;
7 use crate::util::leak;
8 use crate::{
9 ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10 };
11 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12 use std::any::Any;
13 use std::cell::Cell;
14 use std::collections::hash_map::DefaultHasher;
15 use std::fmt;
16 use std::hash::Hasher;
17 use std::io;
18 use std::mem;
19 use std::ptr;
20 #[allow(deprecated)]
21 use std::sync::atomic::ATOMIC_USIZE_INIT;
22 use std::sync::atomic::{AtomicUsize, Ordering};
23 use std::sync::{Arc, Once};
24 use std::thread;
25 use std::usize;
26
27 /// Thread builder used for customization via
28 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
29 pub struct ThreadBuilder {
30 name: Option<String>,
31 stack_size: Option<usize>,
32 worker: Worker<JobRef>,
33 registry: Arc<Registry>,
34 index: usize,
35 }
36
37 impl ThreadBuilder {
38 /// Gets the index of this thread in the pool, within `0..num_threads`.
index(&self) -> usize39 pub fn index(&self) -> usize {
40 self.index
41 }
42
43 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
name(&self) -> Option<&str>44 pub fn name(&self) -> Option<&str> {
45 self.name.as_ref().map(String::as_str)
46 }
47
48 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
stack_size(&self) -> Option<usize>49 pub fn stack_size(&self) -> Option<usize> {
50 self.stack_size
51 }
52
53 /// Executes the main loop for this thread. This will not return until the
54 /// thread pool is dropped.
run(self)55 pub fn run(self) {
56 unsafe { main_loop(self.worker, self.registry, self.index) }
57 }
58 }
59
60 impl fmt::Debug for ThreadBuilder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 f.debug_struct("ThreadBuilder")
63 .field("pool", &self.registry.id())
64 .field("index", &self.index)
65 .field("name", &self.name)
66 .field("stack_size", &self.stack_size)
67 .finish()
68 }
69 }
70
71 /// Generalized trait for spawning a thread in the `Registry`.
72 ///
73 /// This trait is pub-in-private -- E0445 forces us to make it public,
74 /// but we don't actually want to expose these details in the API.
75 pub trait ThreadSpawn {
76 private_decl! {}
77
78 /// Spawn a thread with the `ThreadBuilder` parameters, and then
79 /// call `ThreadBuilder::run()`.
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>80 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
81 }
82
83 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
84 ///
85 /// This type is pub-in-private -- E0445 forces us to make it public,
86 /// but we don't actually want to expose these details in the API.
87 #[derive(Debug, Default)]
88 pub struct DefaultSpawn;
89
90 impl ThreadSpawn for DefaultSpawn {
91 private_impl! {}
92
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>93 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
94 let mut b = thread::Builder::new();
95 if let Some(name) = thread.name() {
96 b = b.name(name.to_owned());
97 }
98 if let Some(stack_size) = thread.stack_size() {
99 b = b.stack_size(stack_size);
100 }
101 b.spawn(|| thread.run())?;
102 Ok(())
103 }
104 }
105
106 /// Spawns a thread with a user's custom callback.
107 ///
108 /// This type is pub-in-private -- E0445 forces us to make it public,
109 /// but we don't actually want to expose these details in the API.
110 #[derive(Debug)]
111 pub struct CustomSpawn<F>(F);
112
113 impl<F> CustomSpawn<F>
114 where
115 F: FnMut(ThreadBuilder) -> io::Result<()>,
116 {
new(spawn: F) -> Self117 pub(super) fn new(spawn: F) -> Self {
118 CustomSpawn(spawn)
119 }
120 }
121
122 impl<F> ThreadSpawn for CustomSpawn<F>
123 where
124 F: FnMut(ThreadBuilder) -> io::Result<()>,
125 {
126 private_impl! {}
127
128 #[inline]
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>129 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
130 (self.0)(thread)
131 }
132 }
133
134 pub(super) struct Registry {
135 logger: Logger,
136 thread_infos: Vec<ThreadInfo>,
137 sleep: Sleep,
138 injected_jobs: Injector<JobRef>,
139 panic_handler: Option<Box<PanicHandler>>,
140 start_handler: Option<Box<StartHandler>>,
141 exit_handler: Option<Box<ExitHandler>>,
142
143 // When this latch reaches 0, it means that all work on this
144 // registry must be complete. This is ensured in the following ways:
145 //
146 // - if this is the global registry, there is a ref-count that never
147 // gets released.
148 // - if this is a user-created thread-pool, then so long as the thread-pool
149 // exists, it holds a reference.
150 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
151 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
152 // return until the blocking job is complete, that ref will continue to be held.
153 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
154 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
155 // and that job will keep the pool alive.
156 terminate_count: AtomicUsize,
157 }
158
159 /// ////////////////////////////////////////////////////////////////////////
160 /// Initialization
161
162 static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
163 static THE_REGISTRY_SET: Once = Once::new();
164
165 /// Starts the worker threads (if that has not already happened). If
166 /// initialization has not already occurred, use the default
167 /// configuration.
global_registry() -> &'static Arc<Registry>168 fn global_registry() -> &'static Arc<Registry> {
169 set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
170 .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
171 .expect("The global thread pool has not been initialized.")
172 }
173
174 /// Starts the worker threads (if that has not already happened) with
175 /// the given builder.
init_global_registry<S>( builder: ThreadPoolBuilder<S>, ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> where S: ThreadSpawn,176 pub(super) fn init_global_registry<S>(
177 builder: ThreadPoolBuilder<S>,
178 ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
179 where
180 S: ThreadSpawn,
181 {
182 set_global_registry(|| Registry::new(builder))
183 }
184
185 /// Starts the worker threads (if that has not already happened)
186 /// by creating a registry with the given callback.
set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> where F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,187 fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
188 where
189 F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
190 {
191 let mut result = Err(ThreadPoolBuildError::new(
192 ErrorKind::GlobalPoolAlreadyInitialized,
193 ));
194 THE_REGISTRY_SET.call_once(|| {
195 result = registry().map(|registry| {
196 let registry = leak(registry);
197 unsafe {
198 THE_REGISTRY = Some(registry);
199 }
200 registry
201 });
202 });
203 result
204 }
205
206 struct Terminator<'a>(&'a Arc<Registry>);
207
208 impl<'a> Drop for Terminator<'a> {
drop(&mut self)209 fn drop(&mut self) {
210 self.0.terminate()
211 }
212 }
213
214 impl Registry {
new<S>( mut builder: ThreadPoolBuilder<S>, ) -> Result<Arc<Self>, ThreadPoolBuildError> where S: ThreadSpawn,215 pub(super) fn new<S>(
216 mut builder: ThreadPoolBuilder<S>,
217 ) -> Result<Arc<Self>, ThreadPoolBuildError>
218 where
219 S: ThreadSpawn,
220 {
221 let n_threads = builder.get_num_threads();
222 let breadth_first = builder.get_breadth_first();
223
224 let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
225 .map(|_| {
226 let worker = if breadth_first {
227 Worker::new_fifo()
228 } else {
229 Worker::new_lifo()
230 };
231
232 let stealer = worker.stealer();
233 (worker, stealer)
234 })
235 .unzip();
236
237 let logger = Logger::new(n_threads);
238 let registry = Arc::new(Registry {
239 logger: logger.clone(),
240 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
241 sleep: Sleep::new(logger, n_threads),
242 injected_jobs: Injector::new(),
243 terminate_count: AtomicUsize::new(1),
244 panic_handler: builder.take_panic_handler(),
245 start_handler: builder.take_start_handler(),
246 exit_handler: builder.take_exit_handler(),
247 });
248
249 // If we return early or panic, make sure to terminate existing threads.
250 let t1000 = Terminator(®istry);
251
252 for (index, worker) in workers.into_iter().enumerate() {
253 let thread = ThreadBuilder {
254 name: builder.get_thread_name(index),
255 stack_size: builder.get_stack_size(),
256 registry: registry.clone(),
257 worker,
258 index,
259 };
260 if let Err(e) = builder.get_spawn_handler().spawn(thread) {
261 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
262 }
263 }
264
265 // Returning normally now, without termination.
266 mem::forget(t1000);
267
268 Ok(registry.clone())
269 }
270
current() -> Arc<Registry>271 pub(super) fn current() -> Arc<Registry> {
272 unsafe {
273 let worker_thread = WorkerThread::current();
274 if worker_thread.is_null() {
275 global_registry().clone()
276 } else {
277 (*worker_thread).registry.clone()
278 }
279 }
280 }
281
282 /// Returns the number of threads in the current registry. This
283 /// is better than `Registry::current().num_threads()` because it
284 /// avoids incrementing the `Arc`.
current_num_threads() -> usize285 pub(super) fn current_num_threads() -> usize {
286 unsafe {
287 let worker_thread = WorkerThread::current();
288 if worker_thread.is_null() {
289 global_registry().num_threads()
290 } else {
291 (*worker_thread).registry.num_threads()
292 }
293 }
294 }
295
296 /// Returns the current `WorkerThread` if it's part of this `Registry`.
current_thread(&self) -> Option<&WorkerThread>297 pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
298 unsafe {
299 let worker = WorkerThread::current().as_ref()?;
300 if worker.registry().id() == self.id() {
301 Some(worker)
302 } else {
303 None
304 }
305 }
306 }
307
308 /// Returns an opaque identifier for this registry.
id(&self) -> RegistryId309 pub(super) fn id(&self) -> RegistryId {
310 // We can rely on `self` not to change since we only ever create
311 // registries that are boxed up in an `Arc` (see `new()` above).
312 RegistryId {
313 addr: self as *const Self as usize,
314 }
315 }
316
317 #[inline]
log(&self, event: impl FnOnce() -> crate::log::Event)318 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
319 self.logger.log(event)
320 }
321
num_threads(&self) -> usize322 pub(super) fn num_threads(&self) -> usize {
323 self.thread_infos.len()
324 }
325
handle_panic(&self, err: Box<dyn Any + Send>)326 pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
327 match self.panic_handler {
328 Some(ref handler) => {
329 // If the customizable panic handler itself panics,
330 // then we abort.
331 let abort_guard = unwind::AbortIfPanic;
332 handler(err);
333 mem::forget(abort_guard);
334 }
335 None => {
336 // Default panic handler aborts.
337 let _ = unwind::AbortIfPanic; // let this drop.
338 }
339 }
340 }
341
342 /// Waits for the worker threads to get up and running. This is
343 /// meant to be used for benchmarking purposes, primarily, so that
344 /// you can get more consistent numbers by having everything
345 /// "ready to go".
wait_until_primed(&self)346 pub(super) fn wait_until_primed(&self) {
347 for info in &self.thread_infos {
348 info.primed.wait();
349 }
350 }
351
352 /// Waits for the worker threads to stop. This is used for testing
353 /// -- so we can check that termination actually works.
354 #[cfg(test)]
wait_until_stopped(&self)355 pub(super) fn wait_until_stopped(&self) {
356 for info in &self.thread_infos {
357 info.stopped.wait();
358 }
359 }
360
361 /// ////////////////////////////////////////////////////////////////////////
362 /// MAIN LOOP
363 ///
364 /// So long as all of the worker threads are hanging out in their
365 /// top-level loop, there is no work to be done.
366
367 /// Push a job into the given `registry`. If we are running on a
368 /// worker thread for the registry, this will push onto the
369 /// deque. Else, it will inject from the outside (which is slower).
inject_or_push(&self, job_ref: JobRef)370 pub(super) fn inject_or_push(&self, job_ref: JobRef) {
371 let worker_thread = WorkerThread::current();
372 unsafe {
373 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
374 (*worker_thread).push(job_ref);
375 } else {
376 self.inject(&[job_ref]);
377 }
378 }
379 }
380
381 /// Push a job into the "external jobs" queue; it will be taken by
382 /// whatever worker has nothing to do. Use this is you know that
383 /// you are not on a worker of this registry.
inject(&self, injected_jobs: &[JobRef])384 pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
385 self.log(|| JobsInjected {
386 count: injected_jobs.len(),
387 });
388
389 // It should not be possible for `state.terminate` to be true
390 // here. It is only set to true when the user creates (and
391 // drops) a `ThreadPool`; and, in that case, they cannot be
392 // calling `inject()` later, since they dropped their
393 // `ThreadPool`.
394 debug_assert_ne!(
395 self.terminate_count.load(Ordering::Acquire),
396 0,
397 "inject() sees state.terminate as true"
398 );
399
400 let queue_was_empty = self.injected_jobs.is_empty();
401
402 for &job_ref in injected_jobs {
403 self.injected_jobs.push(job_ref);
404 }
405
406 self.sleep
407 .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
408 }
409
has_injected_job(&self) -> bool410 fn has_injected_job(&self) -> bool {
411 !self.injected_jobs.is_empty()
412 }
413
pop_injected_job(&self, worker_index: usize) -> Option<JobRef>414 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
415 loop {
416 match self.injected_jobs.steal() {
417 Steal::Success(job) => {
418 self.log(|| JobUninjected {
419 worker: worker_index,
420 });
421 return Some(job);
422 }
423 Steal::Empty => return None,
424 Steal::Retry => {}
425 }
426 }
427 }
428
429 /// If already in a worker-thread of this registry, just execute `op`.
430 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
431 /// completes and return its return value. If `op` panics, that panic will
432 /// be propagated as well. The second argument indicates `true` if injection
433 /// was performed, `false` if executed directly.
in_worker<OP, R>(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,434 pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
435 where
436 OP: FnOnce(&WorkerThread, bool) -> R + Send,
437 R: Send,
438 {
439 unsafe {
440 let worker_thread = WorkerThread::current();
441 if worker_thread.is_null() {
442 self.in_worker_cold(op)
443 } else if (*worker_thread).registry().id() != self.id() {
444 self.in_worker_cross(&*worker_thread, op)
445 } else {
446 // Perfectly valid to give them a `&T`: this is the
447 // current thread, so we know the data structure won't be
448 // invalidated until we return.
449 op(&*worker_thread, false)
450 }
451 }
452 }
453
454 #[cold]
in_worker_cold<OP, R>(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,455 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
456 where
457 OP: FnOnce(&WorkerThread, bool) -> R + Send,
458 R: Send,
459 {
460 thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
461
462 LOCK_LATCH.with(|l| {
463 // This thread isn't a member of *any* thread pool, so just block.
464 debug_assert!(WorkerThread::current().is_null());
465 let job = StackJob::new(
466 |injected| {
467 let worker_thread = WorkerThread::current();
468 assert!(injected && !worker_thread.is_null());
469 op(&*worker_thread, true)
470 },
471 l,
472 );
473 self.inject(&[job.as_job_ref()]);
474 job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
475
476 // flush accumulated logs as we exit the thread
477 self.logger.log(|| Flush);
478
479 job.into_result()
480 })
481 }
482
483 #[cold]
in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,484 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
485 where
486 OP: FnOnce(&WorkerThread, bool) -> R + Send,
487 R: Send,
488 {
489 // This thread is a member of a different pool, so let it process
490 // other work while waiting for this `op` to complete.
491 debug_assert!(current_thread.registry().id() != self.id());
492 let latch = SpinLatch::cross(current_thread);
493 let job = StackJob::new(
494 |injected| {
495 let worker_thread = WorkerThread::current();
496 assert!(injected && !worker_thread.is_null());
497 op(&*worker_thread, true)
498 },
499 latch,
500 );
501 self.inject(&[job.as_job_ref()]);
502 current_thread.wait_until(&job.latch);
503 job.into_result()
504 }
505
506 /// Increments the terminate counter. This increment should be
507 /// balanced by a call to `terminate`, which will decrement. This
508 /// is used when spawning asynchronous work, which needs to
509 /// prevent the registry from terminating so long as it is active.
510 ///
511 /// Note that blocking functions such as `join` and `scope` do not
512 /// need to concern themselves with this fn; their context is
513 /// responsible for ensuring the current thread-pool will not
514 /// terminate until they return.
515 ///
516 /// The global thread-pool always has an outstanding reference
517 /// (the initial one). Custom thread-pools have one outstanding
518 /// reference that is dropped when the `ThreadPool` is dropped:
519 /// since installing the thread-pool blocks until any joins/scopes
520 /// complete, this ensures that joins/scopes are covered.
521 ///
522 /// The exception is `::spawn()`, which can create a job outside
523 /// of any blocking scope. In that case, the job itself holds a
524 /// terminate count and is responsible for invoking `terminate()`
525 /// when finished.
increment_terminate_count(&self)526 pub(super) fn increment_terminate_count(&self) {
527 let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
528 debug_assert!(previous != 0, "registry ref count incremented from zero");
529 assert!(
530 previous != std::usize::MAX,
531 "overflow in registry ref count"
532 );
533 }
534
535 /// Signals that the thread-pool which owns this registry has been
536 /// dropped. The worker threads will gradually terminate, once any
537 /// extant work is completed.
terminate(&self)538 pub(super) fn terminate(&self) {
539 if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
540 for (i, thread_info) in self.thread_infos.iter().enumerate() {
541 thread_info.terminate.set_and_tickle_one(self, i);
542 }
543 }
544 }
545
546 /// Notify the worker that the latch they are sleeping on has been "set".
notify_worker_latch_is_set(&self, target_worker_index: usize)547 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
548 self.sleep.notify_worker_latch_is_set(target_worker_index);
549 }
550 }
551
552 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
553 pub(super) struct RegistryId {
554 addr: usize,
555 }
556
557 struct ThreadInfo {
558 /// Latch set once thread has started and we are entering into the
559 /// main loop. Used to wait for worker threads to become primed,
560 /// primarily of interest for benchmarking.
561 primed: LockLatch,
562
563 /// Latch is set once worker thread has completed. Used to wait
564 /// until workers have stopped; only used for tests.
565 stopped: LockLatch,
566
567 /// The latch used to signal that terminated has been requested.
568 /// This latch is *set* by the `terminate` method on the
569 /// `Registry`, once the registry's main "terminate" counter
570 /// reaches zero.
571 ///
572 /// NB. We use a `CountLatch` here because it has no lifetimes and is
573 /// meant for async use, but the count never gets higher than one.
574 terminate: CountLatch,
575
576 /// the "stealer" half of the worker's deque
577 stealer: Stealer<JobRef>,
578 }
579
580 impl ThreadInfo {
new(stealer: Stealer<JobRef>) -> ThreadInfo581 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
582 ThreadInfo {
583 primed: LockLatch::new(),
584 stopped: LockLatch::new(),
585 terminate: CountLatch::new(),
586 stealer,
587 }
588 }
589 }
590
591 /// ////////////////////////////////////////////////////////////////////////
592 /// WorkerThread identifiers
593
594 pub(super) struct WorkerThread {
595 /// the "worker" half of our local deque
596 worker: Worker<JobRef>,
597
598 /// local queue used for `spawn_fifo` indirection
599 fifo: JobFifo,
600
601 index: usize,
602
603 /// A weak random number generator.
604 rng: XorShift64Star,
605
606 registry: Arc<Registry>,
607 }
608
609 // This is a bit sketchy, but basically: the WorkerThread is
610 // allocated on the stack of the worker on entry and stored into this
611 // thread local variable. So it will remain valid at least until the
612 // worker is fully unwound. Using an unsafe pointer avoids the need
613 // for a RefCell<T> etc.
614 thread_local! {
615 static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
616 }
617
618 impl Drop for WorkerThread {
drop(&mut self)619 fn drop(&mut self) {
620 // Undo `set_current`
621 WORKER_THREAD_STATE.with(|t| {
622 assert!(t.get().eq(&(self as *const _)));
623 t.set(ptr::null());
624 });
625 }
626 }
627
628 impl WorkerThread {
629 /// Gets the `WorkerThread` index for the current thread; returns
630 /// NULL if this is not a worker thread. This pointer is valid
631 /// anywhere on the current thread.
632 #[inline]
current() -> *const WorkerThread633 pub(super) fn current() -> *const WorkerThread {
634 WORKER_THREAD_STATE.with(Cell::get)
635 }
636
637 /// Sets `self` as the worker thread index for the current thread.
638 /// This is done during worker thread startup.
set_current(thread: *const WorkerThread)639 unsafe fn set_current(thread: *const WorkerThread) {
640 WORKER_THREAD_STATE.with(|t| {
641 assert!(t.get().is_null());
642 t.set(thread);
643 });
644 }
645
646 /// Returns the registry that owns this worker thread.
647 #[inline]
registry(&self) -> &Arc<Registry>648 pub(super) fn registry(&self) -> &Arc<Registry> {
649 &self.registry
650 }
651
652 #[inline]
log(&self, event: impl FnOnce() -> crate::log::Event)653 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
654 self.registry.logger.log(event)
655 }
656
657 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
658 #[inline]
index(&self) -> usize659 pub(super) fn index(&self) -> usize {
660 self.index
661 }
662
663 #[inline]
push(&self, job: JobRef)664 pub(super) unsafe fn push(&self, job: JobRef) {
665 self.log(|| JobPushed { worker: self.index });
666 let queue_was_empty = self.worker.is_empty();
667 self.worker.push(job);
668 self.registry
669 .sleep
670 .new_internal_jobs(self.index, 1, queue_was_empty);
671 }
672
673 #[inline]
push_fifo(&self, job: JobRef)674 pub(super) unsafe fn push_fifo(&self, job: JobRef) {
675 self.push(self.fifo.push(job));
676 }
677
678 #[inline]
local_deque_is_empty(&self) -> bool679 pub(super) fn local_deque_is_empty(&self) -> bool {
680 self.worker.is_empty()
681 }
682
683 /// Attempts to obtain a "local" job -- typically this means
684 /// popping from the top of the stack, though if we are configured
685 /// for breadth-first execution, it would mean dequeuing from the
686 /// bottom.
687 #[inline]
take_local_job(&self) -> Option<JobRef>688 pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
689 let popped_job = self.worker.pop();
690
691 if popped_job.is_some() {
692 self.log(|| JobPopped { worker: self.index });
693 }
694
695 popped_job
696 }
697
698 /// Wait until the latch is set. Try to keep busy by popping and
699 /// stealing tasks as necessary.
700 #[inline]
wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L)701 pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
702 let latch = latch.as_core_latch();
703 if !latch.probe() {
704 self.wait_until_cold(latch);
705 }
706 }
707
708 #[cold]
wait_until_cold(&self, latch: &CoreLatch)709 unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
710 // the code below should swallow all panics and hence never
711 // unwind; but if something does wrong, we want to abort,
712 // because otherwise other code in rayon may assume that the
713 // latch has been signaled, and that can lead to random memory
714 // accesses, which would be *very bad*
715 let abort_guard = unwind::AbortIfPanic;
716
717 let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
718 while !latch.probe() {
719 // Try to find some work to do. We give preference first
720 // to things in our local deque, then in other workers
721 // deques, and finally to injected jobs from the
722 // outside. The idea is to finish what we started before
723 // we take on something new.
724 if let Some(job) = self
725 .take_local_job()
726 .or_else(|| self.steal())
727 .or_else(|| self.registry.pop_injected_job(self.index))
728 {
729 self.registry.sleep.work_found(idle_state);
730 self.execute(job);
731 idle_state = self.registry.sleep.start_looking(self.index, latch);
732 } else {
733 self.registry
734 .sleep
735 .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
736 }
737 }
738
739 // If we were sleepy, we are not anymore. We "found work" --
740 // whatever the surrounding thread was doing before it had to
741 // wait.
742 self.registry.sleep.work_found(idle_state);
743
744 self.log(|| ThreadSawLatchSet {
745 worker: self.index,
746 latch_addr: latch.addr(),
747 });
748 mem::forget(abort_guard); // successful execution, do not abort
749 }
750
751 #[inline]
execute(&self, job: JobRef)752 pub(super) unsafe fn execute(&self, job: JobRef) {
753 job.execute();
754 }
755
756 /// Try to steal a single job and return it.
757 ///
758 /// This should only be done as a last resort, when there is no
759 /// local work to do.
steal(&self) -> Option<JobRef>760 unsafe fn steal(&self) -> Option<JobRef> {
761 // we only steal when we don't have any work to do locally
762 debug_assert!(self.local_deque_is_empty());
763
764 // otherwise, try to steal
765 let thread_infos = &self.registry.thread_infos.as_slice();
766 let num_threads = thread_infos.len();
767 if num_threads <= 1 {
768 return None;
769 }
770
771 loop {
772 let mut retry = false;
773 let start = self.rng.next_usize(num_threads);
774 let job = (start..num_threads)
775 .chain(0..start)
776 .filter(move |&i| i != self.index)
777 .find_map(|victim_index| {
778 let victim = &thread_infos[victim_index];
779 match victim.stealer.steal() {
780 Steal::Success(job) => {
781 self.log(|| JobStolen {
782 worker: self.index,
783 victim: victim_index,
784 });
785 Some(job)
786 }
787 Steal::Empty => None,
788 Steal::Retry => {
789 retry = true;
790 None
791 }
792 }
793 });
794 if job.is_some() || !retry {
795 return job;
796 }
797 }
798 }
799 }
800
801 /// ////////////////////////////////////////////////////////////////////////
802
main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize)803 unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
804 let worker_thread = &WorkerThread {
805 worker,
806 fifo: JobFifo::new(),
807 index,
808 rng: XorShift64Star::new(),
809 registry: registry.clone(),
810 };
811 WorkerThread::set_current(worker_thread);
812
813 // let registry know we are ready to do work
814 registry.thread_infos[index].primed.set();
815
816 // Worker threads should not panic. If they do, just abort, as the
817 // internal state of the threadpool is corrupted. Note that if
818 // **user code** panics, we should catch that and redirect.
819 let abort_guard = unwind::AbortIfPanic;
820
821 // Inform a user callback that we started a thread.
822 if let Some(ref handler) = registry.start_handler {
823 let registry = registry.clone();
824 match unwind::halt_unwinding(|| handler(index)) {
825 Ok(()) => {}
826 Err(err) => {
827 registry.handle_panic(err);
828 }
829 }
830 }
831
832 let my_terminate_latch = ®istry.thread_infos[index].terminate;
833 worker_thread.log(|| ThreadStart {
834 worker: index,
835 terminate_addr: my_terminate_latch.as_core_latch().addr(),
836 });
837 worker_thread.wait_until(my_terminate_latch);
838
839 // Should not be any work left in our queue.
840 debug_assert!(worker_thread.take_local_job().is_none());
841
842 // let registry know we are done
843 registry.thread_infos[index].stopped.set();
844
845 // Normal termination, do not abort.
846 mem::forget(abort_guard);
847
848 worker_thread.log(|| ThreadTerminate { worker: index });
849
850 // Inform a user callback that we exited a thread.
851 if let Some(ref handler) = registry.exit_handler {
852 let registry = registry.clone();
853 match unwind::halt_unwinding(|| handler(index)) {
854 Ok(()) => {}
855 Err(err) => {
856 registry.handle_panic(err);
857 }
858 }
859 // We're already exiting the thread, there's nothing else to do.
860 }
861 }
862
863 /// If already in a worker-thread, just execute `op`. Otherwise,
864 /// execute `op` in the default thread-pool. Either way, block until
865 /// `op` completes and return its return value. If `op` panics, that
866 /// panic will be propagated as well. The second argument indicates
867 /// `true` if injection was performed, `false` if executed directly.
in_worker<OP, R>(op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,868 pub(super) fn in_worker<OP, R>(op: OP) -> R
869 where
870 OP: FnOnce(&WorkerThread, bool) -> R + Send,
871 R: Send,
872 {
873 unsafe {
874 let owner_thread = WorkerThread::current();
875 if !owner_thread.is_null() {
876 // Perfectly valid to give them a `&T`: this is the
877 // current thread, so we know the data structure won't be
878 // invalidated until we return.
879 op(&*owner_thread, false)
880 } else {
881 global_registry().in_worker_cold(op)
882 }
883 }
884 }
885
886 /// [xorshift*] is a fast pseudorandom number generator which will
887 /// even tolerate weak seeding, as long as it's not zero.
888 ///
889 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
890 struct XorShift64Star {
891 state: Cell<u64>,
892 }
893
894 impl XorShift64Star {
new() -> Self895 fn new() -> Self {
896 // Any non-zero seed will do -- this uses the hash of a global counter.
897 let mut seed = 0;
898 while seed == 0 {
899 let mut hasher = DefaultHasher::new();
900 #[allow(deprecated)]
901 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
902 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
903 seed = hasher.finish();
904 }
905
906 XorShift64Star {
907 state: Cell::new(seed),
908 }
909 }
910
next(&self) -> u64911 fn next(&self) -> u64 {
912 let mut x = self.state.get();
913 debug_assert_ne!(x, 0);
914 x ^= x >> 12;
915 x ^= x << 25;
916 x ^= x >> 27;
917 self.state.set(x);
918 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
919 }
920
921 /// Return a value from `0..n`.
next_usize(&self, n: usize) -> usize922 fn next_usize(&self, n: usize) -> usize {
923 (self.next() % n as u64) as usize
924 }
925 }
926