1 //! Rayon-core houses the core stable APIs of Rayon.
2 //!
3 //! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
4 //!
5 //! [`join`] is used to take two closures and potentially run them in parallel.
6 //! - It will run in parallel if task B gets stolen before task A can finish.
7 //! - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
8 //!
9 //! [`scope`] creates a scope in which you can run any number of parallel tasks.
10 //! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
11 //! The scope will exist until all tasks spawned within the scope have been completed.
12 //!
13 //! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
14 //!
15 //! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
16 //! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
17 //! where it becomes available for work stealing from other threads in the local threadpool.
18 //!
19 //! [`join`]: fn.join.html
20 //! [`scope`]: fn.scope.html
21 //! [`scope()`]: fn.scope.html
22 //! [`spawn`]: fn.spawn.html
23 //! [`ThreadPool`]: struct.threadpool.html
24 //! [`install()`]: struct.ThreadPool.html#method.install
25 //! [`spawn()`]: struct.ThreadPool.html#method.spawn
26 //! [`join()`]: struct.ThreadPool.html#method.join
27 //! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
28 //!
29 //! # Global fallback when threading is unsupported
30 //!
31 //! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32 //! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33 //! targets are notable examples of this. Rather than panicking on the unsupported error when
34 //! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35 //!
36 //! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37 //! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38 //! there is no other thread to share the work. However, since the pool is not running independent
39 //! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40 //! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41 //! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42 //! can also volunteer execution time.
43 //!
44 //! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
45 //!
46 //! # Restricting multiple versions
47 //!
48 //! In order to ensure proper coordination between threadpools, and especially
49 //! to make sure there's only one global threadpool, `rayon-core` is actively
50 //! restricted from building multiple versions of itself into a single target.
51 //! You may see a build error like this in violation:
52 //!
53 //! ```text
54 //! error: native library `rayon-core` is being linked to by more
55 //! than one package, and can only be linked to by one package
56 //! ```
57 //!
58 //! While we strive to keep `rayon-core` semver-compatible, it's still
59 //! possible to arrive at this situation if different crates have overly
60 //! restrictive tilde or inequality requirements for `rayon-core`. The
61 //! conflicting requirements will need to be resolved before the build will
62 //! succeed.
63
64 #![deny(missing_debug_implementations)]
65 #![deny(missing_docs)]
66 #![deny(unreachable_pub)]
67 #![warn(rust_2018_idioms)]
68
69 use std::any::Any;
70 use std::env;
71 use std::error::Error;
72 use std::fmt;
73 use std::io;
74 use std::marker::PhantomData;
75 use std::str::FromStr;
76
77 #[macro_use]
78 mod log;
79 #[macro_use]
80 mod private;
81
82 mod broadcast;
83 mod job;
84 mod join;
85 mod latch;
86 mod registry;
87 mod scope;
88 mod sleep;
89 mod spawn;
90 mod thread_pool;
91 mod unwind;
92
93 mod compile_fail;
94 mod test;
95
96 pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
97 pub use self::join::{join, join_context};
98 pub use self::registry::ThreadBuilder;
99 pub use self::scope::{in_place_scope, scope, Scope};
100 pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
101 pub use self::spawn::{spawn, spawn_fifo};
102 pub use self::thread_pool::current_thread_has_pending_tasks;
103 pub use self::thread_pool::current_thread_index;
104 pub use self::thread_pool::ThreadPool;
105 pub use self::thread_pool::{yield_local, yield_now, Yield};
106
107 use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
108
109 /// Returns the maximum number of threads that Rayon supports in a single thread-pool.
110 ///
111 /// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
112 /// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
113 ///
114 /// The value may vary between different targets, and is subject to change in new Rayon versions.
max_num_threads() -> usize115 pub fn max_num_threads() -> usize {
116 // We are limited by the bits available in the sleep counter's `AtomicUsize`.
117 crate::sleep::THREADS_MAX
118 }
119
120 /// Returns the number of threads in the current registry. If this
121 /// code is executing within a Rayon thread-pool, then this will be
122 /// the number of threads for the thread-pool of the current
123 /// thread. Otherwise, it will be the number of threads for the global
124 /// thread-pool.
125 ///
126 /// This can be useful when trying to judge how many times to split
127 /// parallel work (the parallel iterator traits use this value
128 /// internally for this purpose).
129 ///
130 /// # Future compatibility note
131 ///
132 /// Note that unless this thread-pool was created with a
133 /// builder that specifies the number of threads, then this
134 /// number may vary over time in future versions (see [the
135 /// `num_threads()` method for details][snt]).
136 ///
137 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
current_num_threads() -> usize138 pub fn current_num_threads() -> usize {
139 crate::registry::Registry::current_num_threads()
140 }
141
142 /// Error when initializing a thread pool.
143 #[derive(Debug)]
144 pub struct ThreadPoolBuildError {
145 kind: ErrorKind,
146 }
147
148 #[derive(Debug)]
149 enum ErrorKind {
150 GlobalPoolAlreadyInitialized,
151 IOError(io::Error),
152 }
153
154 /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
155 /// ## Creating a ThreadPool
156 /// The following creates a thread pool with 22 threads.
157 ///
158 /// ```rust
159 /// # use rayon_core as rayon;
160 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
161 /// ```
162 ///
163 /// To instead configure the global thread pool, use [`build_global()`]:
164 ///
165 /// ```rust
166 /// # use rayon_core as rayon;
167 /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
168 /// ```
169 ///
170 /// [`ThreadPool`]: struct.ThreadPool.html
171 /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
172 pub struct ThreadPoolBuilder<S = DefaultSpawn> {
173 /// The number of threads in the rayon thread pool.
174 /// If zero will use the RAYON_NUM_THREADS environment variable.
175 /// If RAYON_NUM_THREADS is invalid or zero will use the default.
176 num_threads: usize,
177
178 /// Custom closure, if any, to handle a panic that we cannot propagate
179 /// anywhere else.
180 panic_handler: Option<Box<PanicHandler>>,
181
182 /// Closure to compute the name of a thread.
183 get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
184
185 /// The stack size for the created worker threads
186 stack_size: Option<usize>,
187
188 /// Closure invoked on worker thread start.
189 start_handler: Option<Box<StartHandler>>,
190
191 /// Closure invoked on worker thread exit.
192 exit_handler: Option<Box<ExitHandler>>,
193
194 /// Closure invoked to spawn threads.
195 spawn_handler: S,
196
197 /// If false, worker threads will execute spawned jobs in a
198 /// "depth-first" fashion. If true, they will do a "breadth-first"
199 /// fashion. Depth-first is the default.
200 breadth_first: bool,
201 }
202
203 /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
204 ///
205 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
206 #[deprecated(note = "Use `ThreadPoolBuilder`")]
207 #[derive(Default)]
208 pub struct Configuration {
209 builder: ThreadPoolBuilder,
210 }
211
212 /// The type for a panic handling closure. Note that this same closure
213 /// may be invoked multiple times in parallel.
214 type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
215
216 /// The type for a closure that gets invoked when a thread starts. The
217 /// closure is passed the index of the thread on which it is invoked.
218 /// Note that this same closure may be invoked multiple times in parallel.
219 type StartHandler = dyn Fn(usize) + Send + Sync;
220
221 /// The type for a closure that gets invoked when a thread exits. The
222 /// closure is passed the index of the thread on which is is invoked.
223 /// Note that this same closure may be invoked multiple times in parallel.
224 type ExitHandler = dyn Fn(usize) + Send + Sync;
225
226 // NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
227 impl Default for ThreadPoolBuilder {
default() -> Self228 fn default() -> Self {
229 ThreadPoolBuilder {
230 num_threads: 0,
231 panic_handler: None,
232 get_thread_name: None,
233 stack_size: None,
234 start_handler: None,
235 exit_handler: None,
236 spawn_handler: DefaultSpawn,
237 breadth_first: false,
238 }
239 }
240 }
241
242 impl ThreadPoolBuilder {
243 /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
new() -> Self244 pub fn new() -> Self {
245 Self::default()
246 }
247 }
248
249 /// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
250 /// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
251 impl<S> ThreadPoolBuilder<S>
252 where
253 S: ThreadSpawn,
254 {
255 /// Creates a new `ThreadPool` initialized using this configuration.
build(self) -> Result<ThreadPool, ThreadPoolBuildError>256 pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
257 ThreadPool::build(self)
258 }
259
260 /// Initializes the global thread pool. This initialization is
261 /// **optional**. If you do not call this function, the thread pool
262 /// will be automatically initialized with the default
263 /// configuration. Calling `build_global` is not recommended, except
264 /// in two scenarios:
265 ///
266 /// - You wish to change the default configuration.
267 /// - You are running a benchmark, in which case initializing may
268 /// yield slightly more consistent results, since the worker threads
269 /// will already be ready to go even in the first iteration. But
270 /// this cost is minimal.
271 ///
272 /// Initialization of the global thread pool happens exactly
273 /// once. Once started, the configuration cannot be
274 /// changed. Therefore, if you call `build_global` a second time, it
275 /// will return an error. An `Ok` result indicates that this
276 /// is the first initialization of the thread pool.
build_global(self) -> Result<(), ThreadPoolBuildError>277 pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
278 let registry = registry::init_global_registry(self)?;
279 registry.wait_until_primed();
280 Ok(())
281 }
282 }
283
284 impl ThreadPoolBuilder {
285 /// Creates a scoped `ThreadPool` initialized using this configuration.
286 ///
287 /// This is a convenience function for building a pool using [`crossbeam::scope`]
288 /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
289 /// The threads in this pool will start by calling `wrapper`, which should
290 /// do initialization and continue by calling `ThreadBuilder::run()`.
291 ///
292 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
293 ///
294 /// # Examples
295 ///
296 /// A scoped pool may be useful in combination with scoped thread-local variables.
297 ///
298 /// ```
299 /// # use rayon_core as rayon;
300 ///
301 /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
302 ///
303 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
304 /// let pool_data = vec![1, 2, 3];
305 ///
306 /// // We haven't assigned any TLS data yet.
307 /// assert!(!POOL_DATA.is_set());
308 ///
309 /// rayon::ThreadPoolBuilder::new()
310 /// .build_scoped(
311 /// // Borrow `pool_data` in TLS for each thread.
312 /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
313 /// // Do some work that needs the TLS data.
314 /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
315 /// )?;
316 ///
317 /// // Once we've returned, `pool_data` is no longer borrowed.
318 /// drop(pool_data);
319 /// Ok(())
320 /// }
321 /// ```
build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> where W: Fn(ThreadBuilder) + Sync, F: FnOnce(&ThreadPool) -> R,322 pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
323 where
324 W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
325 F: FnOnce(&ThreadPool) -> R,
326 {
327 let result = crossbeam_utils::thread::scope(|scope| {
328 let wrapper = &wrapper;
329 let pool = self
330 .spawn_handler(|thread| {
331 let mut builder = scope.builder();
332 if let Some(name) = thread.name() {
333 builder = builder.name(name.to_string());
334 }
335 if let Some(size) = thread.stack_size() {
336 builder = builder.stack_size(size);
337 }
338 builder.spawn(move |_| wrapper(thread))?;
339 Ok(())
340 })
341 .build()?;
342 Ok(with_pool(&pool))
343 });
344
345 match result {
346 Ok(result) => result,
347 Err(err) => unwind::resume_unwinding(err),
348 }
349 }
350 }
351
352 impl<S> ThreadPoolBuilder<S> {
353 /// Sets a custom function for spawning threads.
354 ///
355 /// Note that the threads will not exit until after the pool is dropped. It
356 /// is up to the caller to wait for thread termination if that is important
357 /// for any invariants. For instance, threads created in [`crossbeam::scope`]
358 /// will be joined before that scope returns, and this will block indefinitely
359 /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
360 /// until the entire process exits!
361 ///
362 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
363 ///
364 /// # Examples
365 ///
366 /// A minimal spawn handler just needs to call `run()` from an independent thread.
367 ///
368 /// ```
369 /// # use rayon_core as rayon;
370 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
371 /// let pool = rayon::ThreadPoolBuilder::new()
372 /// .spawn_handler(|thread| {
373 /// std::thread::spawn(|| thread.run());
374 /// Ok(())
375 /// })
376 /// .build()?;
377 ///
378 /// pool.install(|| println!("Hello from my custom thread!"));
379 /// Ok(())
380 /// }
381 /// ```
382 ///
383 /// The default spawn handler sets the name and stack size if given, and propagates
384 /// any errors from the thread builder.
385 ///
386 /// ```
387 /// # use rayon_core as rayon;
388 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
389 /// let pool = rayon::ThreadPoolBuilder::new()
390 /// .spawn_handler(|thread| {
391 /// let mut b = std::thread::Builder::new();
392 /// if let Some(name) = thread.name() {
393 /// b = b.name(name.to_owned());
394 /// }
395 /// if let Some(stack_size) = thread.stack_size() {
396 /// b = b.stack_size(stack_size);
397 /// }
398 /// b.spawn(|| thread.run())?;
399 /// Ok(())
400 /// })
401 /// .build()?;
402 ///
403 /// pool.install(|| println!("Hello from my fully custom thread!"));
404 /// Ok(())
405 /// }
406 /// ```
407 ///
408 /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
409 /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
410 /// [`build_scoped`](#method.build_scoped).
411 ///
412 /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
413 ///
414 /// ```
415 /// # use rayon_core as rayon;
416 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
417 /// std::thread::scope(|scope| {
418 /// let pool = rayon::ThreadPoolBuilder::new()
419 /// .spawn_handler(|thread| {
420 /// let mut builder = std::thread::Builder::new();
421 /// if let Some(name) = thread.name() {
422 /// builder = builder.name(name.to_string());
423 /// }
424 /// if let Some(size) = thread.stack_size() {
425 /// builder = builder.stack_size(size);
426 /// }
427 /// builder.spawn_scoped(scope, || {
428 /// // Add any scoped initialization here, then run!
429 /// thread.run()
430 /// })?;
431 /// Ok(())
432 /// })
433 /// .build()?;
434 ///
435 /// pool.install(|| println!("Hello from my custom scoped thread!"));
436 /// Ok(())
437 /// })
438 /// }
439 /// ```
spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> where F: FnMut(ThreadBuilder) -> io::Result<()>,440 pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
441 where
442 F: FnMut(ThreadBuilder) -> io::Result<()>,
443 {
444 ThreadPoolBuilder {
445 spawn_handler: CustomSpawn::new(spawn),
446 // ..self
447 num_threads: self.num_threads,
448 panic_handler: self.panic_handler,
449 get_thread_name: self.get_thread_name,
450 stack_size: self.stack_size,
451 start_handler: self.start_handler,
452 exit_handler: self.exit_handler,
453 breadth_first: self.breadth_first,
454 }
455 }
456
457 /// Returns a reference to the current spawn handler.
get_spawn_handler(&mut self) -> &mut S458 fn get_spawn_handler(&mut self) -> &mut S {
459 &mut self.spawn_handler
460 }
461
462 /// Get the number of threads that will be used for the thread
463 /// pool. See `num_threads()` for more information.
get_num_threads(&self) -> usize464 fn get_num_threads(&self) -> usize {
465 if self.num_threads > 0 {
466 self.num_threads
467 } else {
468 match env::var("RAYON_NUM_THREADS")
469 .ok()
470 .and_then(|s| usize::from_str(&s).ok())
471 {
472 Some(x) if x > 0 => return x,
473 Some(x) if x == 0 => return num_cpus::get(),
474 _ => {}
475 }
476
477 // Support for deprecated `RAYON_RS_NUM_CPUS`.
478 match env::var("RAYON_RS_NUM_CPUS")
479 .ok()
480 .and_then(|s| usize::from_str(&s).ok())
481 {
482 Some(x) if x > 0 => x,
483 _ => num_cpus::get(),
484 }
485 }
486 }
487
488 /// Get the thread name for the thread with the given index.
get_thread_name(&mut self, index: usize) -> Option<String>489 fn get_thread_name(&mut self, index: usize) -> Option<String> {
490 let f = self.get_thread_name.as_mut()?;
491 Some(f(index))
492 }
493
494 /// Sets a closure which takes a thread index and returns
495 /// the thread's name.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static,496 pub fn thread_name<F>(mut self, closure: F) -> Self
497 where
498 F: FnMut(usize) -> String + 'static,
499 {
500 self.get_thread_name = Some(Box::new(closure));
501 self
502 }
503
504 /// Sets the number of threads to be used in the rayon threadpool.
505 ///
506 /// If you specify a non-zero number of threads using this
507 /// function, then the resulting thread-pools are guaranteed to
508 /// start at most this number of threads.
509 ///
510 /// If `num_threads` is 0, or you do not call this function, then
511 /// the Rayon runtime will select the number of threads
512 /// automatically. At present, this is based on the
513 /// `RAYON_NUM_THREADS` environment variable (if set),
514 /// or the number of logical CPUs (otherwise).
515 /// In the future, however, the default behavior may
516 /// change to dynamically add or remove threads as needed.
517 ///
518 /// **Future compatibility warning:** Given the default behavior
519 /// may change in the future, if you wish to rely on a fixed
520 /// number of threads, you should use this function to specify
521 /// that number. To reproduce the current default behavior, you
522 /// may wish to use the [`num_cpus`
523 /// crate](https://crates.io/crates/num_cpus) to query the number
524 /// of CPUs dynamically.
525 ///
526 /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
527 /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
528 /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
529 /// be preferred.
num_threads(mut self, num_threads: usize) -> Self530 pub fn num_threads(mut self, num_threads: usize) -> Self {
531 self.num_threads = num_threads;
532 self
533 }
534
535 /// Returns a copy of the current panic handler.
take_panic_handler(&mut self) -> Option<Box<PanicHandler>>536 fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
537 self.panic_handler.take()
538 }
539
540 /// Normally, whenever Rayon catches a panic, it tries to
541 /// propagate it to someplace sensible, to try and reflect the
542 /// semantics of sequential execution. But in some cases,
543 /// particularly with the `spawn()` APIs, there is no
544 /// obvious place where we should propagate the panic to.
545 /// In that case, this panic handler is invoked.
546 ///
547 /// If no panic handler is set, the default is to abort the
548 /// process, under the principle that panics should not go
549 /// unobserved.
550 ///
551 /// If the panic handler itself panics, this will abort the
552 /// process. To prevent this, wrap the body of your panic handler
553 /// in a call to `std::panic::catch_unwind()`.
panic_handler<H>(mut self, panic_handler: H) -> Self where H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,554 pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
555 where
556 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
557 {
558 self.panic_handler = Some(Box::new(panic_handler));
559 self
560 }
561
562 /// Get the stack size of the worker threads
get_stack_size(&self) -> Option<usize>563 fn get_stack_size(&self) -> Option<usize> {
564 self.stack_size
565 }
566
567 /// Sets the stack size of the worker threads
stack_size(mut self, stack_size: usize) -> Self568 pub fn stack_size(mut self, stack_size: usize) -> Self {
569 self.stack_size = Some(stack_size);
570 self
571 }
572
573 /// **(DEPRECATED)** Suggest to worker threads that they execute
574 /// spawned jobs in a "breadth-first" fashion.
575 ///
576 /// Typically, when a worker thread is idle or blocked, it will
577 /// attempt to execute the job from the *top* of its local deque of
578 /// work (i.e., the job most recently spawned). If this flag is set
579 /// to true, however, workers will prefer to execute in a
580 /// *breadth-first* fashion -- that is, they will search for jobs at
581 /// the *bottom* of their local deque. (At present, workers *always*
582 /// steal from the bottom of other workers' deques, regardless of
583 /// the setting of this flag.)
584 ///
585 /// If you think of the tasks as a tree, where a parent task
586 /// spawns its children in the tree, then this flag loosely
587 /// corresponds to doing a breadth-first traversal of the tree,
588 /// whereas the default would be to do a depth-first traversal.
589 ///
590 /// **Note that this is an "execution hint".** Rayon's task
591 /// execution is highly dynamic and the precise order in which
592 /// independent tasks are executed is not intended to be
593 /// guaranteed.
594 ///
595 /// This `breadth_first()` method is now deprecated per [RFC #1],
596 /// and in the future its effect may be removed. Consider using
597 /// [`scope_fifo()`] for a similar effect.
598 ///
599 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
600 /// [`scope_fifo()`]: fn.scope_fifo.html
601 #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
breadth_first(mut self) -> Self602 pub fn breadth_first(mut self) -> Self {
603 self.breadth_first = true;
604 self
605 }
606
get_breadth_first(&self) -> bool607 fn get_breadth_first(&self) -> bool {
608 self.breadth_first
609 }
610
611 /// Takes the current thread start callback, leaving `None`.
take_start_handler(&mut self) -> Option<Box<StartHandler>>612 fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
613 self.start_handler.take()
614 }
615
616 /// Sets a callback to be invoked on thread start.
617 ///
618 /// The closure is passed the index of the thread on which it is invoked.
619 /// Note that this same closure may be invoked multiple times in parallel.
620 /// If this closure panics, the panic will be passed to the panic handler.
621 /// If that handler returns, then startup will continue normally.
start_handler<H>(mut self, start_handler: H) -> Self where H: Fn(usize) + Send + Sync + 'static,622 pub fn start_handler<H>(mut self, start_handler: H) -> Self
623 where
624 H: Fn(usize) + Send + Sync + 'static,
625 {
626 self.start_handler = Some(Box::new(start_handler));
627 self
628 }
629
630 /// Returns a current thread exit callback, leaving `None`.
take_exit_handler(&mut self) -> Option<Box<ExitHandler>>631 fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
632 self.exit_handler.take()
633 }
634
635 /// Sets a callback to be invoked on thread exit.
636 ///
637 /// The closure is passed the index of the thread on which it is invoked.
638 /// Note that this same closure may be invoked multiple times in parallel.
639 /// If this closure panics, the panic will be passed to the panic handler.
640 /// If that handler returns, then the thread will exit normally.
exit_handler<H>(mut self, exit_handler: H) -> Self where H: Fn(usize) + Send + Sync + 'static,641 pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
642 where
643 H: Fn(usize) + Send + Sync + 'static,
644 {
645 self.exit_handler = Some(Box::new(exit_handler));
646 self
647 }
648 }
649
650 #[allow(deprecated)]
651 impl Configuration {
652 /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
new() -> Configuration653 pub fn new() -> Configuration {
654 Configuration {
655 builder: ThreadPoolBuilder::new(),
656 }
657 }
658
659 /// Deprecated in favor of `ThreadPoolBuilder::build`.
build(self) -> Result<ThreadPool, Box<dyn Error + 'static>>660 pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
661 self.builder.build().map_err(Box::from)
662 }
663
664 /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static,665 pub fn thread_name<F>(mut self, closure: F) -> Self
666 where
667 F: FnMut(usize) -> String + 'static,
668 {
669 self.builder = self.builder.thread_name(closure);
670 self
671 }
672
673 /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
num_threads(mut self, num_threads: usize) -> Configuration674 pub fn num_threads(mut self, num_threads: usize) -> Configuration {
675 self.builder = self.builder.num_threads(num_threads);
676 self
677 }
678
679 /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
panic_handler<H>(mut self, panic_handler: H) -> Configuration where H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,680 pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
681 where
682 H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
683 {
684 self.builder = self.builder.panic_handler(panic_handler);
685 self
686 }
687
688 /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
stack_size(mut self, stack_size: usize) -> Self689 pub fn stack_size(mut self, stack_size: usize) -> Self {
690 self.builder = self.builder.stack_size(stack_size);
691 self
692 }
693
694 /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
breadth_first(mut self) -> Self695 pub fn breadth_first(mut self) -> Self {
696 self.builder = self.builder.breadth_first();
697 self
698 }
699
700 /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
start_handler<H>(mut self, start_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static,701 pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
702 where
703 H: Fn(usize) + Send + Sync + 'static,
704 {
705 self.builder = self.builder.start_handler(start_handler);
706 self
707 }
708
709 /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
exit_handler<H>(mut self, exit_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static,710 pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
711 where
712 H: Fn(usize) + Send + Sync + 'static,
713 {
714 self.builder = self.builder.exit_handler(exit_handler);
715 self
716 }
717
718 /// Returns a ThreadPoolBuilder with identical parameters.
into_builder(self) -> ThreadPoolBuilder719 fn into_builder(self) -> ThreadPoolBuilder {
720 self.builder
721 }
722 }
723
724 impl ThreadPoolBuildError {
new(kind: ErrorKind) -> ThreadPoolBuildError725 fn new(kind: ErrorKind) -> ThreadPoolBuildError {
726 ThreadPoolBuildError { kind }
727 }
728
is_unsupported(&self) -> bool729 fn is_unsupported(&self) -> bool {
730 matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
731 }
732 }
733
734 const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
735 "The global thread pool has already been initialized.";
736
737 impl Error for ThreadPoolBuildError {
738 #[allow(deprecated)]
description(&self) -> &str739 fn description(&self) -> &str {
740 match self.kind {
741 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
742 ErrorKind::IOError(ref e) => e.description(),
743 }
744 }
745
source(&self) -> Option<&(dyn Error + 'static)>746 fn source(&self) -> Option<&(dyn Error + 'static)> {
747 match &self.kind {
748 ErrorKind::GlobalPoolAlreadyInitialized => None,
749 ErrorKind::IOError(e) => Some(e),
750 }
751 }
752 }
753
754 impl fmt::Display for ThreadPoolBuildError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756 match &self.kind {
757 ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
758 ErrorKind::IOError(e) => e.fmt(f),
759 }
760 }
761 }
762
763 /// Deprecated in favor of `ThreadPoolBuilder::build_global`.
764 #[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
765 #[allow(deprecated)]
initialize(config: Configuration) -> Result<(), Box<dyn Error>>766 pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
767 config.into_builder().build_global().map_err(Box::from)
768 }
769
770 impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result771 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
772 let ThreadPoolBuilder {
773 ref num_threads,
774 ref get_thread_name,
775 ref panic_handler,
776 ref stack_size,
777 ref start_handler,
778 ref exit_handler,
779 spawn_handler: _,
780 ref breadth_first,
781 } = *self;
782
783 // Just print `Some(<closure>)` or `None` to the debug
784 // output.
785 struct ClosurePlaceholder;
786 impl fmt::Debug for ClosurePlaceholder {
787 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788 f.write_str("<closure>")
789 }
790 }
791 let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
792 let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
793 let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
794 let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
795
796 f.debug_struct("ThreadPoolBuilder")
797 .field("num_threads", num_threads)
798 .field("get_thread_name", &get_thread_name)
799 .field("panic_handler", &panic_handler)
800 .field("stack_size", &stack_size)
801 .field("start_handler", &start_handler)
802 .field("exit_handler", &exit_handler)
803 .field("breadth_first", &breadth_first)
804 .finish()
805 }
806 }
807
808 #[allow(deprecated)]
809 impl fmt::Debug for Configuration {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result810 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
811 self.builder.fmt(f)
812 }
813 }
814
815 /// Provides the calling context to a closure called by `join_context`.
816 #[derive(Debug)]
817 pub struct FnContext {
818 migrated: bool,
819
820 /// disable `Send` and `Sync`, just for a little future-proofing.
821 _marker: PhantomData<*mut ()>,
822 }
823
824 impl FnContext {
825 #[inline]
new(migrated: bool) -> Self826 fn new(migrated: bool) -> Self {
827 FnContext {
828 migrated,
829 _marker: PhantomData,
830 }
831 }
832 }
833
834 impl FnContext {
835 /// Returns `true` if the closure was called from a different thread
836 /// than it was provided from.
837 #[inline]
migrated(&self) -> bool838 pub fn migrated(&self) -> bool {
839 self.migrated
840 }
841 }
842