• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Contains support for user-managed thread pools, represented by the
2 //! the [`ThreadPool`] type (see that struct for details).
3 //!
4 //! [`ThreadPool`]: struct.ThreadPool.html
5 
6 use crate::broadcast::{self, BroadcastContext};
7 use crate::join;
8 use crate::registry::{Registry, ThreadSpawn, WorkerThread};
9 use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
10 use crate::spawn;
11 use crate::{scope, Scope};
12 use crate::{scope_fifo, ScopeFifo};
13 use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
14 use std::error::Error;
15 use std::fmt;
16 use std::sync::Arc;
17 
18 mod test;
19 
20 /// Represents a user created [thread-pool].
21 ///
22 /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23 /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24 /// execute functions explicitly within this [`ThreadPool`] using
25 /// [`ThreadPool::install()`]. By contrast, top level rayon functions
26 /// (like `join()`) will execute implicitly within the current thread-pool.
27 ///
28 ///
29 /// ## Creating a ThreadPool
30 ///
31 /// ```rust
32 /// # use rayon_core as rayon;
33 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34 /// ```
35 ///
36 /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37 /// threads. In addition, any other rayon operations called inside of `install()` will also
38 /// execute in the context of the `ThreadPool`.
39 ///
40 /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41 /// they will complete executing any remaining work that you have spawned, and automatically
42 /// terminate.
43 ///
44 ///
45 /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46 /// [`ThreadPool`]: struct.ThreadPool.html
47 /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49 /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50 /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51 pub struct ThreadPool {
52     registry: Arc<Registry>,
53 }
54 
55 impl ThreadPool {
56     #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
57     #[allow(deprecated)]
58     /// Deprecated in favor of `ThreadPoolBuilder::build`.
new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>>59     pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
60         Self::build(configuration.into_builder()).map_err(Box::from)
61     }
62 
build<S>( builder: ThreadPoolBuilder<S>, ) -> Result<ThreadPool, ThreadPoolBuildError> where S: ThreadSpawn,63     pub(super) fn build<S>(
64         builder: ThreadPoolBuilder<S>,
65     ) -> Result<ThreadPool, ThreadPoolBuildError>
66     where
67         S: ThreadSpawn,
68     {
69         let registry = Registry::new(builder)?;
70         Ok(ThreadPool { registry })
71     }
72 
73     /// Executes `op` within the threadpool. Any attempts to use
74     /// `join`, `scope`, or parallel iterators will then operate
75     /// within that threadpool.
76     ///
77     /// # Warning: thread-local data
78     ///
79     /// Because `op` is executing within the Rayon thread-pool,
80     /// thread-local data from the current thread will not be
81     /// accessible.
82     ///
83     /// # Warning: execution order
84     ///
85     /// If the current thread is part of a different thread pool, it will try to
86     /// keep busy while the `op` completes in its target pool, similar to
87     /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
88     /// potentially schedule other tasks to run on the current thread in the
89     /// meantime. For example
90     ///
91     /// ```rust
92     /// # use rayon_core as rayon;
93     /// fn main() {
94     ///     rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
95     ///     let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
96     ///     let do_it = || {
97     ///         print!("one ");
98     ///         pool.install(||{});
99     ///         print!("two ");
100     ///     };
101     ///     rayon::join(|| do_it(), || do_it());
102     /// }
103     /// ```
104     ///
105     /// Since we configured just one thread in the global pool, one might
106     /// expect `do_it()` to run sequentially, producing:
107     ///
108     /// ```ascii
109     /// one two one two
110     /// ```
111     ///
112     /// However each call to `install()` yields implicitly, allowing rayon to
113     /// run multiple instances of `do_it()` concurrently on the single, global
114     /// thread. The following output would be equally valid:
115     ///
116     /// ```ascii
117     /// one one two two
118     /// ```
119     ///
120     /// # Panics
121     ///
122     /// If `op` should panic, that panic will be propagated.
123     ///
124     /// ## Using `install()`
125     ///
126     /// ```rust
127     ///    # use rayon_core as rayon;
128     ///    fn main() {
129     ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
130     ///         let n = pool.install(|| fib(20));
131     ///         println!("{}", n);
132     ///    }
133     ///
134     ///    fn fib(n: usize) -> usize {
135     ///         if n == 0 || n == 1 {
136     ///             return n;
137     ///         }
138     ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
139     ///         return a + b;
140     ///     }
141     /// ```
install<OP, R>(&self, op: OP) -> R where OP: FnOnce() -> R + Send, R: Send,142     pub fn install<OP, R>(&self, op: OP) -> R
143     where
144         OP: FnOnce() -> R + Send,
145         R: Send,
146     {
147         self.registry.in_worker(|_, _| op())
148     }
149 
150     /// Executes `op` within every thread in the threadpool. Any attempts to use
151     /// `join`, `scope`, or parallel iterators will then operate within that
152     /// threadpool.
153     ///
154     /// Broadcasts are executed on each thread after they have exhausted their
155     /// local work queue, before they attempt work-stealing from other threads.
156     /// The goal of that strategy is to run everywhere in a timely manner
157     /// *without* being too disruptive to current work. There may be alternative
158     /// broadcast styles added in the future for more or less aggressive
159     /// injection, if the need arises.
160     ///
161     /// # Warning: thread-local data
162     ///
163     /// Because `op` is executing within the Rayon thread-pool,
164     /// thread-local data from the current thread will not be
165     /// accessible.
166     ///
167     /// # Panics
168     ///
169     /// If `op` should panic on one or more threads, exactly one panic
170     /// will be propagated, only after all threads have completed
171     /// (or panicked) their own `op`.
172     ///
173     /// # Examples
174     ///
175     /// ```
176     ///    # use rayon_core as rayon;
177     ///    use std::sync::atomic::{AtomicUsize, Ordering};
178     ///
179     ///    fn main() {
180     ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
181     ///
182     ///         // The argument gives context, including the index of each thread.
183     ///         let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
184     ///         assert_eq!(v, &[0, 1, 4, 9, 16]);
185     ///
186     ///         // The closure can reference the local stack
187     ///         let count = AtomicUsize::new(0);
188     ///         pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
189     ///         assert_eq!(count.into_inner(), 5);
190     ///    }
191     /// ```
broadcast<OP, R>(&self, op: OP) -> Vec<R> where OP: Fn(BroadcastContext<'_>) -> R + Sync, R: Send,192     pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
193     where
194         OP: Fn(BroadcastContext<'_>) -> R + Sync,
195         R: Send,
196     {
197         // We assert that `self.registry` has not terminated.
198         unsafe { broadcast::broadcast_in(op, &self.registry) }
199     }
200 
201     /// Returns the (current) number of threads in the thread pool.
202     ///
203     /// # Future compatibility note
204     ///
205     /// Note that unless this thread-pool was created with a
206     /// [`ThreadPoolBuilder`] that specifies the number of threads,
207     /// then this number may vary over time in future versions (see [the
208     /// `num_threads()` method for details][snt]).
209     ///
210     /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
211     /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
212     #[inline]
current_num_threads(&self) -> usize213     pub fn current_num_threads(&self) -> usize {
214         self.registry.num_threads()
215     }
216 
217     /// If called from a Rayon worker thread in this thread-pool,
218     /// returns the index of that thread; if not called from a Rayon
219     /// thread, or called from a Rayon thread that belongs to a
220     /// different thread-pool, returns `None`.
221     ///
222     /// The index for a given thread will not change over the thread's
223     /// lifetime. However, multiple threads may share the same index if
224     /// they are in distinct thread-pools.
225     ///
226     /// # Future compatibility note
227     ///
228     /// Currently, every thread-pool (including the global
229     /// thread-pool) has a fixed number of threads, but this may
230     /// change in future Rayon versions (see [the `num_threads()` method
231     /// for details][snt]). In that case, the index for a
232     /// thread would not change during its lifetime, but thread
233     /// indices may wind up being reused if threads are terminated and
234     /// restarted.
235     ///
236     /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
237     #[inline]
current_thread_index(&self) -> Option<usize>238     pub fn current_thread_index(&self) -> Option<usize> {
239         let curr = self.registry.current_thread()?;
240         Some(curr.index())
241     }
242 
243     /// Returns true if the current worker thread currently has "local
244     /// tasks" pending. This can be useful as part of a heuristic for
245     /// deciding whether to spawn a new task or execute code on the
246     /// current thread, particularly in breadth-first
247     /// schedulers. However, keep in mind that this is an inherently
248     /// racy check, as other worker threads may be actively "stealing"
249     /// tasks from our local deque.
250     ///
251     /// **Background:** Rayon's uses a [work-stealing] scheduler. The
252     /// key idea is that each thread has its own [deque] of
253     /// tasks. Whenever a new task is spawned -- whether through
254     /// `join()`, `Scope::spawn()`, or some other means -- that new
255     /// task is pushed onto the thread's *local* deque. Worker threads
256     /// have a preference for executing their own tasks; if however
257     /// they run out of tasks, they will go try to "steal" tasks from
258     /// other threads. This function therefore has an inherent race
259     /// with other active worker threads, which may be removing items
260     /// from the local deque.
261     ///
262     /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
263     /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
264     #[inline]
current_thread_has_pending_tasks(&self) -> Option<bool>265     pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
266         let curr = self.registry.current_thread()?;
267         Some(!curr.local_deque_is_empty())
268     }
269 
270     /// Execute `oper_a` and `oper_b` in the thread-pool and return
271     /// the results. Equivalent to `self.install(|| join(oper_a,
272     /// oper_b))`.
join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, RA: Send, RB: Send,273     pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
274     where
275         A: FnOnce() -> RA + Send,
276         B: FnOnce() -> RB + Send,
277         RA: Send,
278         RB: Send,
279     {
280         self.install(|| join(oper_a, oper_b))
281     }
282 
283     /// Creates a scope that executes within this thread-pool.
284     /// Equivalent to `self.install(|| scope(...))`.
285     ///
286     /// See also: [the `scope()` function][scope].
287     ///
288     /// [scope]: fn.scope.html
scope<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send,289     pub fn scope<'scope, OP, R>(&self, op: OP) -> R
290     where
291         OP: FnOnce(&Scope<'scope>) -> R + Send,
292         R: Send,
293     {
294         self.install(|| scope(op))
295     }
296 
297     /// Creates a scope that executes within this thread-pool.
298     /// Spawns from the same thread are prioritized in relative FIFO order.
299     /// Equivalent to `self.install(|| scope_fifo(...))`.
300     ///
301     /// See also: [the `scope_fifo()` function][scope_fifo].
302     ///
303     /// [scope_fifo]: fn.scope_fifo.html
scope_fifo<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send,304     pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
305     where
306         OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
307         R: Send,
308     {
309         self.install(|| scope_fifo(op))
310     }
311 
312     /// Creates a scope that spawns work into this thread-pool.
313     ///
314     /// See also: [the `in_place_scope()` function][in_place_scope].
315     ///
316     /// [in_place_scope]: fn.in_place_scope.html
in_place_scope<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R,317     pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
318     where
319         OP: FnOnce(&Scope<'scope>) -> R,
320     {
321         do_in_place_scope(Some(&self.registry), op)
322     }
323 
324     /// Creates a scope that spawns work into this thread-pool in FIFO order.
325     ///
326     /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
327     ///
328     /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R,329     pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
330     where
331         OP: FnOnce(&ScopeFifo<'scope>) -> R,
332     {
333         do_in_place_scope_fifo(Some(&self.registry), op)
334     }
335 
336     /// Spawns an asynchronous task in this thread-pool. This task will
337     /// run in the implicit, global scope, which means that it may outlast
338     /// the current stack frame -- therefore, it cannot capture any references
339     /// onto the stack (you will likely need a `move` closure).
340     ///
341     /// See also: [the `spawn()` function defined on scopes][spawn].
342     ///
343     /// [spawn]: struct.Scope.html#method.spawn
spawn<OP>(&self, op: OP) where OP: FnOnce() + Send + 'static,344     pub fn spawn<OP>(&self, op: OP)
345     where
346         OP: FnOnce() + Send + 'static,
347     {
348         // We assert that `self.registry` has not terminated.
349         unsafe { spawn::spawn_in(op, &self.registry) }
350     }
351 
352     /// Spawns an asynchronous task in this thread-pool. This task will
353     /// run in the implicit, global scope, which means that it may outlast
354     /// the current stack frame -- therefore, it cannot capture any references
355     /// onto the stack (you will likely need a `move` closure).
356     ///
357     /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
358     ///
359     /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
spawn_fifo<OP>(&self, op: OP) where OP: FnOnce() + Send + 'static,360     pub fn spawn_fifo<OP>(&self, op: OP)
361     where
362         OP: FnOnce() + Send + 'static,
363     {
364         // We assert that `self.registry` has not terminated.
365         unsafe { spawn::spawn_fifo_in(op, &self.registry) }
366     }
367 
368     /// Spawns an asynchronous task on every thread in this thread-pool. This task
369     /// will run in the implicit, global scope, which means that it may outlast the
370     /// current stack frame -- therefore, it cannot capture any references onto the
371     /// stack (you will likely need a `move` closure).
spawn_broadcast<OP>(&self, op: OP) where OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,372     pub fn spawn_broadcast<OP>(&self, op: OP)
373     where
374         OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
375     {
376         // We assert that `self.registry` has not terminated.
377         unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
378     }
379 
380     /// Cooperatively yields execution to Rayon.
381     ///
382     /// This is similar to the general [`yield_now()`], but only if the current
383     /// thread is part of *this* thread pool.
384     ///
385     /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
386     /// nothing was available, or `None` if the current thread is not part this pool.
yield_now(&self) -> Option<Yield>387     pub fn yield_now(&self) -> Option<Yield> {
388         let curr = self.registry.current_thread()?;
389         Some(curr.yield_now())
390     }
391 
392     /// Cooperatively yields execution to local Rayon work.
393     ///
394     /// This is similar to the general [`yield_local()`], but only if the current
395     /// thread is part of *this* thread pool.
396     ///
397     /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
398     /// nothing was available, or `None` if the current thread is not part this pool.
yield_local(&self) -> Option<Yield>399     pub fn yield_local(&self) -> Option<Yield> {
400         let curr = self.registry.current_thread()?;
401         Some(curr.yield_local())
402     }
403 }
404 
405 impl Drop for ThreadPool {
drop(&mut self)406     fn drop(&mut self) {
407         self.registry.terminate();
408     }
409 }
410 
411 impl fmt::Debug for ThreadPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result412     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413         fmt.debug_struct("ThreadPool")
414             .field("num_threads", &self.current_num_threads())
415             .field("id", &self.registry.id())
416             .finish()
417     }
418 }
419 
420 /// If called from a Rayon worker thread, returns the index of that
421 /// thread within its current pool; if not called from a Rayon thread,
422 /// returns `None`.
423 ///
424 /// The index for a given thread will not change over the thread's
425 /// lifetime. However, multiple threads may share the same index if
426 /// they are in distinct thread-pools.
427 ///
428 /// See also: [the `ThreadPool::current_thread_index()` method].
429 ///
430 /// [m]: struct.ThreadPool.html#method.current_thread_index
431 ///
432 /// # Future compatibility note
433 ///
434 /// Currently, every thread-pool (including the global
435 /// thread-pool) has a fixed number of threads, but this may
436 /// change in future Rayon versions (see [the `num_threads()` method
437 /// for details][snt]). In that case, the index for a
438 /// thread would not change during its lifetime, but thread
439 /// indices may wind up being reused if threads are terminated and
440 /// restarted.
441 ///
442 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
443 #[inline]
current_thread_index() -> Option<usize>444 pub fn current_thread_index() -> Option<usize> {
445     unsafe {
446         let curr = WorkerThread::current().as_ref()?;
447         Some(curr.index())
448     }
449 }
450 
451 /// If called from a Rayon worker thread, indicates whether that
452 /// thread's local deque still has pending tasks. Otherwise, returns
453 /// `None`. For more information, see [the
454 /// `ThreadPool::current_thread_has_pending_tasks()` method][m].
455 ///
456 /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
457 #[inline]
current_thread_has_pending_tasks() -> Option<bool>458 pub fn current_thread_has_pending_tasks() -> Option<bool> {
459     unsafe {
460         let curr = WorkerThread::current().as_ref()?;
461         Some(!curr.local_deque_is_empty())
462     }
463 }
464 
465 /// Cooperatively yields execution to Rayon.
466 ///
467 /// If the current thread is part of a rayon thread pool, this looks for a
468 /// single unit of pending work in the pool, then executes it. Completion of
469 /// that work might include nested work or further work stealing.
470 ///
471 /// This is similar to [`std::thread::yield_now()`], but does not literally make
472 /// that call. If you are implementing a polling loop, you may want to also
473 /// yield to the OS scheduler yourself if no Rayon work was found.
474 ///
475 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
476 /// nothing was available, or `None` if this thread is not part of any pool at all.
yield_now() -> Option<Yield>477 pub fn yield_now() -> Option<Yield> {
478     unsafe {
479         let thread = WorkerThread::current().as_ref()?;
480         Some(thread.yield_now())
481     }
482 }
483 
484 /// Cooperatively yields execution to local Rayon work.
485 ///
486 /// If the current thread is part of a rayon thread pool, this looks for a
487 /// single unit of pending work in this thread's queue, then executes it.
488 /// Completion of that work might include nested work or further work stealing.
489 ///
490 /// This is similar to [`yield_now()`], but does not steal from other threads.
491 ///
492 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
493 /// nothing was available, or `None` if this thread is not part of any pool at all.
yield_local() -> Option<Yield>494 pub fn yield_local() -> Option<Yield> {
495     unsafe {
496         let thread = WorkerThread::current().as_ref()?;
497         Some(thread.yield_local())
498     }
499 }
500 
501 /// Result of [`yield_now()`] or [`yield_local()`].
502 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
503 pub enum Yield {
504     /// Work was found and executed.
505     Executed,
506     /// No available work was found.
507     Idle,
508 }
509