use crate::job::{JobFifo, JobRef, StackJob}; use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; use crate::unwind; use crate::util::leak; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, }; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::any::Any; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; use std::fmt; use std::hash::Hasher; use std::io; use std::mem; use std::ptr; #[allow(deprecated)] use std::sync::atomic::ATOMIC_USIZE_INIT; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Once}; use std::thread; use std::usize; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). pub struct ThreadBuilder { name: Option, stack_size: Option, worker: Worker, registry: Arc, index: usize, } impl ThreadBuilder { /// Gets the index of this thread in the pool, within `0..num_threads`. pub fn index(&self) -> usize { self.index } /// Gets the string that was specified by `ThreadPoolBuilder::name()`. pub fn name(&self) -> Option<&str> { self.name.as_ref().map(String::as_str) } /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. pub fn stack_size(&self) -> Option { self.stack_size } /// Executes the main loop for this thread. This will not return until the /// thread pool is dropped. pub fn run(self) { unsafe { main_loop(self.worker, self.registry, self.index) } } } impl fmt::Debug for ThreadBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ThreadBuilder") .field("pool", &self.registry.id()) .field("index", &self.index) .field("name", &self.name) .field("stack_size", &self.stack_size) .finish() } } /// Generalized trait for spawning a thread in the `Registry`. /// /// This trait is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. pub trait ThreadSpawn { private_decl! {} /// Spawn a thread with the `ThreadBuilder` parameters, and then /// call `ThreadBuilder::run()`. fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; } /// Spawns a thread in the "normal" way with `std::thread::Builder`. /// /// This type is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. #[derive(Debug, Default)] pub struct DefaultSpawn; impl ThreadSpawn for DefaultSpawn { private_impl! {} fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { let mut b = thread::Builder::new(); if let Some(name) = thread.name() { b = b.name(name.to_owned()); } if let Some(stack_size) = thread.stack_size() { b = b.stack_size(stack_size); } b.spawn(|| thread.run())?; Ok(()) } } /// Spawns a thread with a user's custom callback. /// /// This type is pub-in-private -- E0445 forces us to make it public, /// but we don't actually want to expose these details in the API. #[derive(Debug)] pub struct CustomSpawn(F); impl CustomSpawn where F: FnMut(ThreadBuilder) -> io::Result<()>, { pub(super) fn new(spawn: F) -> Self { CustomSpawn(spawn) } } impl ThreadSpawn for CustomSpawn where F: FnMut(ThreadBuilder) -> io::Result<()>, { private_impl! {} #[inline] fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { (self.0)(thread) } } pub(super) struct Registry { logger: Logger, thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: // // - if this is the global registry, there is a ref-count that never // gets released. // - if this is a user-created thread-pool, then so long as the thread-pool // exists, it holds a reference. // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't // return until the blocking job is complete, that ref will continue to be held. // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) // and that job will keep the pool alive. terminate_count: AtomicUsize, } /// //////////////////////////////////////////////////////////////////////// /// Initialization static mut THE_REGISTRY: Option<&'static Arc> = None; static THE_REGISTRY_SET: Once = Once::new(); /// Starts the worker threads (if that has not already happened). If /// initialization has not already occurred, use the default /// configuration. fn global_registry() -> &'static Arc { set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) }) .expect("The global thread pool has not been initialized.") } /// Starts the worker threads (if that has not already happened) with /// the given builder. pub(super) fn init_global_registry( builder: ThreadPoolBuilder, ) -> Result<&'static Arc, ThreadPoolBuildError> where S: ThreadSpawn, { set_global_registry(|| Registry::new(builder)) } /// Starts the worker threads (if that has not already happened) /// by creating a registry with the given callback. fn set_global_registry(registry: F) -> Result<&'static Arc, ThreadPoolBuildError> where F: FnOnce() -> Result, ThreadPoolBuildError>, { let mut result = Err(ThreadPoolBuildError::new( ErrorKind::GlobalPoolAlreadyInitialized, )); THE_REGISTRY_SET.call_once(|| { result = registry().map(|registry| { let registry = leak(registry); unsafe { THE_REGISTRY = Some(registry); } registry }); }); result } struct Terminator<'a>(&'a Arc); impl<'a> Drop for Terminator<'a> { fn drop(&mut self) { self.0.terminate() } } impl Registry { pub(super) fn new( mut builder: ThreadPoolBuilder, ) -> Result, ThreadPoolBuildError> where S: ThreadSpawn, { let n_threads = builder.get_num_threads(); let breadth_first = builder.get_breadth_first(); let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() }; let stealer = worker.stealer(); (worker, stealer) }) .unzip(); let logger = Logger::new(n_threads); let registry = Arc::new(Registry { logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), sleep: Sleep::new(logger, n_threads), injected_jobs: Injector::new(), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), }); // If we return early or panic, make sure to terminate existing threads. let t1000 = Terminator(®istry); for (index, worker) in workers.into_iter().enumerate() { let thread = ThreadBuilder { name: builder.get_thread_name(index), stack_size: builder.get_stack_size(), registry: registry.clone(), worker, index, }; if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } } // Returning normally now, without termination. mem::forget(t1000); Ok(registry.clone()) } pub(super) fn current() -> Arc { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { global_registry().clone() } else { (*worker_thread).registry.clone() } } } /// Returns the number of threads in the current registry. This /// is better than `Registry::current().num_threads()` because it /// avoids incrementing the `Arc`. pub(super) fn current_num_threads() -> usize { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { global_registry().num_threads() } else { (*worker_thread).registry.num_threads() } } } /// Returns the current `WorkerThread` if it's part of this `Registry`. pub(super) fn current_thread(&self) -> Option<&WorkerThread> { unsafe { let worker = WorkerThread::current().as_ref()?; if worker.registry().id() == self.id() { Some(worker) } else { None } } } /// Returns an opaque identifier for this registry. pub(super) fn id(&self) -> RegistryId { // We can rely on `self` not to change since we only ever create // registries that are boxed up in an `Arc` (see `new()` above). RegistryId { addr: self as *const Self as usize, } } #[inline] pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { self.logger.log(event) } pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } pub(super) fn handle_panic(&self, err: Box) { match self.panic_handler { Some(ref handler) => { // If the customizable panic handler itself panics, // then we abort. let abort_guard = unwind::AbortIfPanic; handler(err); mem::forget(abort_guard); } None => { // Default panic handler aborts. let _ = unwind::AbortIfPanic; // let this drop. } } } /// Waits for the worker threads to get up and running. This is /// meant to be used for benchmarking purposes, primarily, so that /// you can get more consistent numbers by having everything /// "ready to go". pub(super) fn wait_until_primed(&self) { for info in &self.thread_infos { info.primed.wait(); } } /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. #[cfg(test)] pub(super) fn wait_until_stopped(&self) { for info in &self.thread_infos { info.stopped.wait(); } } /// //////////////////////////////////////////////////////////////////////// /// MAIN LOOP /// /// So long as all of the worker threads are hanging out in their /// top-level loop, there is no work to be done. /// Push a job into the given `registry`. If we are running on a /// worker thread for the registry, this will push onto the /// deque. Else, it will inject from the outside (which is slower). pub(super) fn inject_or_push(&self, job_ref: JobRef) { let worker_thread = WorkerThread::current(); unsafe { if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { (*worker_thread).push(job_ref); } else { self.inject(&[job_ref]); } } } /// Push a job into the "external jobs" queue; it will be taken by /// whatever worker has nothing to do. Use this is you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_jobs: &[JobRef]) { self.log(|| JobsInjected { count: injected_jobs.len(), }); // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be // calling `inject()` later, since they dropped their // `ThreadPool`. debug_assert_ne!( self.terminate_count.load(Ordering::Acquire), 0, "inject() sees state.terminate as true" ); let queue_was_empty = self.injected_jobs.is_empty(); for &job_ref in injected_jobs { self.injected_jobs.push(job_ref); } self.sleep .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); } fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } fn pop_injected_job(&self, worker_index: usize) -> Option { loop { match self.injected_jobs.steal() { Steal::Success(job) => { self.log(|| JobUninjected { worker: worker_index, }); return Some(job); } Steal::Empty => return None, Steal::Retry => {} } } } /// If already in a worker-thread of this registry, just execute `op`. /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` /// completes and return its return value. If `op` panics, that panic will /// be propagated as well. The second argument indicates `true` if injection /// was performed, `false` if executed directly. pub(super) fn in_worker(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { self.in_worker_cold(op) } else if (*worker_thread).registry().id() != self.id() { self.in_worker_cross(&*worker_thread, op) } else { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be // invalidated until we return. op(&*worker_thread, false) } } } #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); LOCK_LATCH.with(|l| { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, l, ); self.inject(&[job.as_job_ref()]); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. // flush accumulated logs as we exit the thread self.logger.log(|| Flush); job.into_result() }) } #[cold] unsafe fn in_worker_cross(&self, current_thread: &WorkerThread, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { // This thread is a member of a different pool, so let it process // other work while waiting for this `op` to complete. debug_assert!(current_thread.registry().id() != self.id()); let latch = SpinLatch::cross(current_thread); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, latch, ); self.inject(&[job.as_job_ref()]); current_thread.wait_until(&job.latch); job.into_result() } /// Increments the terminate counter. This increment should be /// balanced by a call to `terminate`, which will decrement. This /// is used when spawning asynchronous work, which needs to /// prevent the registry from terminating so long as it is active. /// /// Note that blocking functions such as `join` and `scope` do not /// need to concern themselves with this fn; their context is /// responsible for ensuring the current thread-pool will not /// terminate until they return. /// /// The global thread-pool always has an outstanding reference /// (the initial one). Custom thread-pools have one outstanding /// reference that is dropped when the `ThreadPool` is dropped: /// since installing the thread-pool blocks until any joins/scopes /// complete, this ensures that joins/scopes are covered. /// /// The exception is `::spawn()`, which can create a job outside /// of any blocking scope. In that case, the job itself holds a /// terminate count and is responsible for invoking `terminate()` /// when finished. pub(super) fn increment_terminate_count(&self) { let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); debug_assert!(previous != 0, "registry ref count incremented from zero"); assert!( previous != std::usize::MAX, "overflow in registry ref count" ); } /// Signals that the thread-pool which owns this registry has been /// dropped. The worker threads will gradually terminate, once any /// extant work is completed. pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { thread_info.terminate.set_and_tickle_one(self, i); } } } /// Notify the worker that the latch they are sleeping on has been "set". pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { self.sleep.notify_worker_latch_is_set(target_worker_index); } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(super) struct RegistryId { addr: usize, } struct ThreadInfo { /// Latch set once thread has started and we are entering into the /// main loop. Used to wait for worker threads to become primed, /// primarily of interest for benchmarking. primed: LockLatch, /// Latch is set once worker thread has completed. Used to wait /// until workers have stopped; only used for tests. stopped: LockLatch, /// The latch used to signal that terminated has been requested. /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. /// /// NB. We use a `CountLatch` here because it has no lifetimes and is /// meant for async use, but the count never gets higher than one. terminate: CountLatch, /// the "stealer" half of the worker's deque stealer: Stealer, } impl ThreadInfo { fn new(stealer: Stealer) -> ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), terminate: CountLatch::new(), stealer, } } } /// //////////////////////////////////////////////////////////////////////// /// WorkerThread identifiers pub(super) struct WorkerThread { /// the "worker" half of our local deque worker: Worker, /// local queue used for `spawn_fifo` indirection fifo: JobFifo, index: usize, /// A weak random number generator. rng: XorShift64Star, registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is // allocated on the stack of the worker on entry and stored into this // thread local variable. So it will remain valid at least until the // worker is fully unwound. Using an unsafe pointer avoids the need // for a RefCell etc. thread_local! { static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); } impl Drop for WorkerThread { fn drop(&mut self) { // Undo `set_current` WORKER_THREAD_STATE.with(|t| { assert!(t.get().eq(&(self as *const _))); t.set(ptr::null()); }); } } impl WorkerThread { /// Gets the `WorkerThread` index for the current thread; returns /// NULL if this is not a worker thread. This pointer is valid /// anywhere on the current thread. #[inline] pub(super) fn current() -> *const WorkerThread { WORKER_THREAD_STATE.with(Cell::get) } /// Sets `self` as the worker thread index for the current thread. /// This is done during worker thread startup. unsafe fn set_current(thread: *const WorkerThread) { WORKER_THREAD_STATE.with(|t| { assert!(t.get().is_null()); t.set(thread); }); } /// Returns the registry that owns this worker thread. #[inline] pub(super) fn registry(&self) -> &Arc { &self.registry } #[inline] pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { self.registry.logger.log(event) } /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { self.index } #[inline] pub(super) unsafe fn push(&self, job: JobRef) { self.log(|| JobPushed { worker: self.index }); let queue_was_empty = self.worker.is_empty(); self.worker.push(job); self.registry .sleep .new_internal_jobs(self.index, 1, queue_was_empty); } #[inline] pub(super) unsafe fn push_fifo(&self, job: JobRef) { self.push(self.fifo.push(job)); } #[inline] pub(super) fn local_deque_is_empty(&self) -> bool { self.worker.is_empty() } /// Attempts to obtain a "local" job -- typically this means /// popping from the top of the stack, though if we are configured /// for breadth-first execution, it would mean dequeuing from the /// bottom. #[inline] pub(super) unsafe fn take_local_job(&self) -> Option { let popped_job = self.worker.pop(); if popped_job.is_some() { self.log(|| JobPopped { worker: self.index }); } popped_job } /// Wait until the latch is set. Try to keep busy by popping and /// stealing tasks as necessary. #[inline] pub(super) unsafe fn wait_until(&self, latch: &L) { let latch = latch.as_core_latch(); if !latch.probe() { self.wait_until_cold(latch); } } #[cold] unsafe fn wait_until_cold(&self, latch: &CoreLatch) { // the code below should swallow all panics and hence never // unwind; but if something does wrong, we want to abort, // because otherwise other code in rayon may assume that the // latch has been signaled, and that can lead to random memory // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; let mut idle_state = self.registry.sleep.start_looking(self.index, latch); while !latch.probe() { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers // deques, and finally to injected jobs from the // outside. The idea is to finish what we started before // we take on something new. if let Some(job) = self .take_local_job() .or_else(|| self.steal()) .or_else(|| self.registry.pop_injected_job(self.index)) { self.registry.sleep.work_found(idle_state); self.execute(job); idle_state = self.registry.sleep.start_looking(self.index, latch); } else { self.registry .sleep .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job()) } } // If we were sleepy, we are not anymore. We "found work" -- // whatever the surrounding thread was doing before it had to // wait. self.registry.sleep.work_found(idle_state); self.log(|| ThreadSawLatchSet { worker: self.index, latch_addr: latch.addr(), }); mem::forget(abort_guard); // successful execution, do not abort } #[inline] pub(super) unsafe fn execute(&self, job: JobRef) { job.execute(); } /// Try to steal a single job and return it. /// /// This should only be done as a last resort, when there is no /// local work to do. unsafe fn steal(&self) -> Option { // we only steal when we don't have any work to do locally debug_assert!(self.local_deque_is_empty()); // otherwise, try to steal let thread_infos = &self.registry.thread_infos.as_slice(); let num_threads = thread_infos.len(); if num_threads <= 1 { return None; } loop { let mut retry = false; let start = self.rng.next_usize(num_threads); let job = (start..num_threads) .chain(0..start) .filter(move |&i| i != self.index) .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { Steal::Success(job) => { self.log(|| JobStolen { worker: self.index, victim: victim_index, }); Some(job) } Steal::Empty => None, Steal::Retry => { retry = true; None } } }); if job.is_some() || !retry { return job; } } } } /// //////////////////////////////////////////////////////////////////////// unsafe fn main_loop(worker: Worker, registry: Arc, index: usize) { let worker_thread = &WorkerThread { worker, fifo: JobFifo::new(), index, rng: XorShift64Star::new(), registry: registry.clone(), }; WorkerThread::set_current(worker_thread); // let registry know we are ready to do work registry.thread_infos[index].primed.set(); // Worker threads should not panic. If they do, just abort, as the // internal state of the threadpool is corrupted. Note that if // **user code** panics, we should catch that and redirect. let abort_guard = unwind::AbortIfPanic; // Inform a user callback that we started a thread. if let Some(ref handler) = registry.start_handler { let registry = registry.clone(); match unwind::halt_unwinding(|| handler(index)) { Ok(()) => {} Err(err) => { registry.handle_panic(err); } } } let my_terminate_latch = ®istry.thread_infos[index].terminate; worker_thread.log(|| ThreadStart { worker: index, terminate_addr: my_terminate_latch.as_core_latch().addr(), }); worker_thread.wait_until(my_terminate_latch); // Should not be any work left in our queue. debug_assert!(worker_thread.take_local_job().is_none()); // let registry know we are done registry.thread_infos[index].stopped.set(); // Normal termination, do not abort. mem::forget(abort_guard); worker_thread.log(|| ThreadTerminate { worker: index }); // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { let registry = registry.clone(); match unwind::halt_unwinding(|| handler(index)) { Ok(()) => {} Err(err) => { registry.handle_panic(err); } } // We're already exiting the thread, there's nothing else to do. } } /// If already in a worker-thread, just execute `op`. Otherwise, /// execute `op` in the default thread-pool. Either way, block until /// `op` completes and return its return value. If `op` panics, that /// panic will be propagated as well. The second argument indicates /// `true` if injection was performed, `false` if executed directly. pub(super) fn in_worker(op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { unsafe { let owner_thread = WorkerThread::current(); if !owner_thread.is_null() { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be // invalidated until we return. op(&*owner_thread, false) } else { global_registry().in_worker_cold(op) } } } /// [xorshift*] is a fast pseudorandom number generator which will /// even tolerate weak seeding, as long as it's not zero. /// /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* struct XorShift64Star { state: Cell, } impl XorShift64Star { fn new() -> Self { // Any non-zero seed will do -- this uses the hash of a global counter. let mut seed = 0; while seed == 0 { let mut hasher = DefaultHasher::new(); #[allow(deprecated)] static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); seed = hasher.finish(); } XorShift64Star { state: Cell::new(seed), } } fn next(&self) -> u64 { let mut x = self.state.get(); debug_assert_ne!(x, 0); x ^= x >> 12; x ^= x << 25; x ^= x >> 27; self.state.set(x); x.wrapping_mul(0x2545_f491_4f6c_dd1d) } /// Return a value from `0..n`. fn next_usize(&self, n: usize) -> usize { (self.next() % n as u64) as usize } }