• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Contains support for user-managed thread pools, represented by the
2 //! the [`ThreadPool`] type (see that struct for details).
3 //!
4 //! [`ThreadPool`]: struct.ThreadPool.html
5 
6 use crate::join;
7 use crate::registry::{Registry, ThreadSpawn, WorkerThread};
8 use crate::spawn;
9 #[allow(deprecated)]
10 use crate::Configuration;
11 use crate::{scope, Scope};
12 use crate::{scope_fifo, ScopeFifo};
13 use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
14 use std::error::Error;
15 use std::fmt;
16 use std::sync::Arc;
17 
18 mod test;
19 
20 /// Represents a user created [thread-pool].
21 ///
22 /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23 /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24 /// execute functions explicitly within this [`ThreadPool`] using
25 /// [`ThreadPool::install()`]. By contrast, top level rayon functions
26 /// (like `join()`) will execute implicitly within the current thread-pool.
27 ///
28 ///
29 /// ## Creating a ThreadPool
30 ///
31 /// ```rust
32 /// # use rayon_core as rayon;
33 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34 /// ```
35 ///
36 /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37 /// threads. In addition, any other rayon operations called inside of `install()` will also
38 /// execute in the context of the `ThreadPool`.
39 ///
40 /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41 /// they will complete executing any remaining work that you have spawned, and automatically
42 /// terminate.
43 ///
44 ///
45 /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46 /// [`ThreadPool`]: struct.ThreadPool.html
47 /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49 /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50 /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51 pub struct ThreadPool {
52     registry: Arc<Registry>,
53 }
54 
55 impl ThreadPool {
56     #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
57     #[allow(deprecated)]
58     /// Deprecated in favor of `ThreadPoolBuilder::build`.
new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>>59     pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
60         Self::build(configuration.into_builder()).map_err(Box::from)
61     }
62 
build<S>( builder: ThreadPoolBuilder<S>, ) -> Result<ThreadPool, ThreadPoolBuildError> where S: ThreadSpawn,63     pub(super) fn build<S>(
64         builder: ThreadPoolBuilder<S>,
65     ) -> Result<ThreadPool, ThreadPoolBuildError>
66     where
67         S: ThreadSpawn,
68     {
69         let registry = Registry::new(builder)?;
70         Ok(ThreadPool { registry })
71     }
72 
73     /// Executes `op` within the threadpool. Any attempts to use
74     /// `join`, `scope`, or parallel iterators will then operate
75     /// within that threadpool.
76     ///
77     /// # Warning: thread-local data
78     ///
79     /// Because `op` is executing within the Rayon thread-pool,
80     /// thread-local data from the current thread will not be
81     /// accessible.
82     ///
83     /// # Panics
84     ///
85     /// If `op` should panic, that panic will be propagated.
86     ///
87     /// ## Using `install()`
88     ///
89     /// ```rust
90     ///    # use rayon_core as rayon;
91     ///    fn main() {
92     ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
93     ///         let n = pool.install(|| fib(20));
94     ///         println!("{}", n);
95     ///    }
96     ///
97     ///    fn fib(n: usize) -> usize {
98     ///         if n == 0 || n == 1 {
99     ///             return n;
100     ///         }
101     ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
102     ///         return a + b;
103     ///     }
104     /// ```
install<OP, R>(&self, op: OP) -> R where OP: FnOnce() -> R + Send, R: Send,105     pub fn install<OP, R>(&self, op: OP) -> R
106     where
107         OP: FnOnce() -> R + Send,
108         R: Send,
109     {
110         self.registry.in_worker(|_, _| op())
111     }
112 
113     /// Returns the (current) number of threads in the thread pool.
114     ///
115     /// # Future compatibility note
116     ///
117     /// Note that unless this thread-pool was created with a
118     /// [`ThreadPoolBuilder`] that specifies the number of threads,
119     /// then this number may vary over time in future versions (see [the
120     /// `num_threads()` method for details][snt]).
121     ///
122     /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
123     /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
124     #[inline]
current_num_threads(&self) -> usize125     pub fn current_num_threads(&self) -> usize {
126         self.registry.num_threads()
127     }
128 
129     /// If called from a Rayon worker thread in this thread-pool,
130     /// returns the index of that thread; if not called from a Rayon
131     /// thread, or called from a Rayon thread that belongs to a
132     /// different thread-pool, returns `None`.
133     ///
134     /// The index for a given thread will not change over the thread's
135     /// lifetime. However, multiple threads may share the same index if
136     /// they are in distinct thread-pools.
137     ///
138     /// # Future compatibility note
139     ///
140     /// Currently, every thread-pool (including the global
141     /// thread-pool) has a fixed number of threads, but this may
142     /// change in future Rayon versions (see [the `num_threads()` method
143     /// for details][snt]). In that case, the index for a
144     /// thread would not change during its lifetime, but thread
145     /// indices may wind up being reused if threads are terminated and
146     /// restarted.
147     ///
148     /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
149     #[inline]
current_thread_index(&self) -> Option<usize>150     pub fn current_thread_index(&self) -> Option<usize> {
151         let curr = self.registry.current_thread()?;
152         Some(curr.index())
153     }
154 
155     /// Returns true if the current worker thread currently has "local
156     /// tasks" pending. This can be useful as part of a heuristic for
157     /// deciding whether to spawn a new task or execute code on the
158     /// current thread, particularly in breadth-first
159     /// schedulers. However, keep in mind that this is an inherently
160     /// racy check, as other worker threads may be actively "stealing"
161     /// tasks from our local deque.
162     ///
163     /// **Background:** Rayon's uses a [work-stealing] scheduler. The
164     /// key idea is that each thread has its own [deque] of
165     /// tasks. Whenever a new task is spawned -- whether through
166     /// `join()`, `Scope::spawn()`, or some other means -- that new
167     /// task is pushed onto the thread's *local* deque. Worker threads
168     /// have a preference for executing their own tasks; if however
169     /// they run out of tasks, they will go try to "steal" tasks from
170     /// other threads. This function therefore has an inherent race
171     /// with other active worker threads, which may be removing items
172     /// from the local deque.
173     ///
174     /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
175     /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
176     #[inline]
current_thread_has_pending_tasks(&self) -> Option<bool>177     pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
178         let curr = self.registry.current_thread()?;
179         Some(!curr.local_deque_is_empty())
180     }
181 
182     /// Execute `oper_a` and `oper_b` in the thread-pool and return
183     /// the results. Equivalent to `self.install(|| join(oper_a,
184     /// oper_b))`.
join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, RA: Send, RB: Send,185     pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
186     where
187         A: FnOnce() -> RA + Send,
188         B: FnOnce() -> RB + Send,
189         RA: Send,
190         RB: Send,
191     {
192         self.install(|| join(oper_a, oper_b))
193     }
194 
195     /// Creates a scope that executes within this thread-pool.
196     /// Equivalent to `self.install(|| scope(...))`.
197     ///
198     /// See also: [the `scope()` function][scope].
199     ///
200     /// [scope]: fn.scope.html
scope<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send,201     pub fn scope<'scope, OP, R>(&self, op: OP) -> R
202     where
203         OP: FnOnce(&Scope<'scope>) -> R + Send,
204         R: Send,
205     {
206         self.install(|| scope(op))
207     }
208 
209     /// Creates a scope that executes within this thread-pool.
210     /// Spawns from the same thread are prioritized in relative FIFO order.
211     /// Equivalent to `self.install(|| scope_fifo(...))`.
212     ///
213     /// See also: [the `scope_fifo()` function][scope_fifo].
214     ///
215     /// [scope_fifo]: fn.scope_fifo.html
scope_fifo<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send,216     pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
217     where
218         OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
219         R: Send,
220     {
221         self.install(|| scope_fifo(op))
222     }
223 
224     /// Spawns an asynchronous task in this thread-pool. This task will
225     /// run in the implicit, global scope, which means that it may outlast
226     /// the current stack frame -- therefore, it cannot capture any references
227     /// onto the stack (you will likely need a `move` closure).
228     ///
229     /// See also: [the `spawn()` function defined on scopes][spawn].
230     ///
231     /// [spawn]: struct.Scope.html#method.spawn
spawn<OP>(&self, op: OP) where OP: FnOnce() + Send + 'static,232     pub fn spawn<OP>(&self, op: OP)
233     where
234         OP: FnOnce() + Send + 'static,
235     {
236         // We assert that `self.registry` has not terminated.
237         unsafe { spawn::spawn_in(op, &self.registry) }
238     }
239 
240     /// Spawns an asynchronous task in this thread-pool. This task will
241     /// run in the implicit, global scope, which means that it may outlast
242     /// the current stack frame -- therefore, it cannot capture any references
243     /// onto the stack (you will likely need a `move` closure).
244     ///
245     /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
246     ///
247     /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
spawn_fifo<OP>(&self, op: OP) where OP: FnOnce() + Send + 'static,248     pub fn spawn_fifo<OP>(&self, op: OP)
249     where
250         OP: FnOnce() + Send + 'static,
251     {
252         // We assert that `self.registry` has not terminated.
253         unsafe { spawn::spawn_fifo_in(op, &self.registry) }
254     }
255 }
256 
257 impl Drop for ThreadPool {
drop(&mut self)258     fn drop(&mut self) {
259         self.registry.terminate();
260     }
261 }
262 
263 impl fmt::Debug for ThreadPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result264     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
265         fmt.debug_struct("ThreadPool")
266             .field("num_threads", &self.current_num_threads())
267             .field("id", &self.registry.id())
268             .finish()
269     }
270 }
271 
272 /// If called from a Rayon worker thread, returns the index of that
273 /// thread within its current pool; if not called from a Rayon thread,
274 /// returns `None`.
275 ///
276 /// The index for a given thread will not change over the thread's
277 /// lifetime. However, multiple threads may share the same index if
278 /// they are in distinct thread-pools.
279 ///
280 /// See also: [the `ThreadPool::current_thread_index()` method].
281 ///
282 /// [m]: struct.ThreadPool.html#method.current_thread_index
283 ///
284 /// # Future compatibility note
285 ///
286 /// Currently, every thread-pool (including the global
287 /// thread-pool) has a fixed number of threads, but this may
288 /// change in future Rayon versions (see [the `num_threads()` method
289 /// for details][snt]). In that case, the index for a
290 /// thread would not change during its lifetime, but thread
291 /// indices may wind up being reused if threads are terminated and
292 /// restarted.
293 ///
294 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
295 #[inline]
current_thread_index() -> Option<usize>296 pub fn current_thread_index() -> Option<usize> {
297     unsafe {
298         let curr = WorkerThread::current().as_ref()?;
299         Some(curr.index())
300     }
301 }
302 
303 /// If called from a Rayon worker thread, indicates whether that
304 /// thread's local deque still has pending tasks. Otherwise, returns
305 /// `None`. For more information, see [the
306 /// `ThreadPool::current_thread_has_pending_tasks()` method][m].
307 ///
308 /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
309 #[inline]
current_thread_has_pending_tasks() -> Option<bool>310 pub fn current_thread_has_pending_tasks() -> Option<bool> {
311     unsafe {
312         let curr = WorkerThread::current().as_ref()?;
313         Some(!curr.local_deque_is_empty())
314     }
315 }
316