• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Methods for custom fork-join scopes, created by the [`scope()`]
2 //! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3 //!
4 //! [`scope()`]: fn.scope.html
5 //! [`in_place_scope()`]: fn.in_place_scope.html
6 //! [`join()`]: ../join/join.fn.html
7 
8 use crate::broadcast::BroadcastContext;
9 use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
10 use crate::latch::{CountLatch, Latch};
11 use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
12 use crate::unwind;
13 use std::any::Any;
14 use std::fmt;
15 use std::marker::PhantomData;
16 use std::mem::ManuallyDrop;
17 use std::ptr;
18 use std::sync::atomic::{AtomicPtr, Ordering};
19 use std::sync::Arc;
20 
21 #[cfg(test)]
22 mod test;
23 
24 /// Represents a fork-join scope which can be used to spawn any number of tasks.
25 /// See [`scope()`] for more information.
26 ///
27 ///[`scope()`]: fn.scope.html
28 pub struct Scope<'scope> {
29     base: ScopeBase<'scope>,
30 }
31 
32 /// Represents a fork-join scope which can be used to spawn any number of tasks.
33 /// Those spawned from the same thread are prioritized in relative FIFO order.
34 /// See [`scope_fifo()`] for more information.
35 ///
36 ///[`scope_fifo()`]: fn.scope_fifo.html
37 pub struct ScopeFifo<'scope> {
38     base: ScopeBase<'scope>,
39     fifos: Vec<JobFifo>,
40 }
41 
42 struct ScopeBase<'scope> {
43     /// thread registry where `scope()` was executed or where `in_place_scope()`
44     /// should spawn jobs.
45     registry: Arc<Registry>,
46 
47     /// if some job panicked, the error is stored here; it will be
48     /// propagated to the one who created the scope
49     panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
50 
51     /// latch to track job counts
52     job_completed_latch: CountLatch,
53 
54     /// You can think of a scope as containing a list of closures to execute,
55     /// all of which outlive `'scope`.  They're not actually required to be
56     /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
57     /// the closures are only *moved* across threads to be executed.
58     marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
59 }
60 
61 /// Creates a "fork-join" scope `s` and invokes the closure with a
62 /// reference to `s`. This closure can then spawn asynchronous tasks
63 /// into `s`. Those tasks may run asynchronously with respect to the
64 /// closure; they may themselves spawn additional tasks into `s`. When
65 /// the closure returns, it will block until all tasks that have been
66 /// spawned into `s` complete.
67 ///
68 /// `scope()` is a more flexible building block compared to `join()`,
69 /// since a loop can be used to spawn any number of tasks without
70 /// recursing. However, that flexibility comes at a performance price:
71 /// tasks spawned using `scope()` must be allocated onto the heap,
72 /// whereas `join()` can make exclusive use of the stack. **Prefer
73 /// `join()` (or, even better, parallel iterators) where possible.**
74 ///
75 /// # Example
76 ///
77 /// The Rayon `join()` function launches two closures and waits for them
78 /// to stop. One could implement `join()` using a scope like so, although
79 /// it would be less efficient than the real implementation:
80 ///
81 /// ```rust
82 /// # use rayon_core as rayon;
83 /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
84 ///     where A: FnOnce() -> RA + Send,
85 ///           B: FnOnce() -> RB + Send,
86 ///           RA: Send,
87 ///           RB: Send,
88 /// {
89 ///     let mut result_a: Option<RA> = None;
90 ///     let mut result_b: Option<RB> = None;
91 ///     rayon::scope(|s| {
92 ///         s.spawn(|_| result_a = Some(oper_a()));
93 ///         s.spawn(|_| result_b = Some(oper_b()));
94 ///     });
95 ///     (result_a.unwrap(), result_b.unwrap())
96 /// }
97 /// ```
98 ///
99 /// # A note on threading
100 ///
101 /// The closure given to `scope()` executes in the Rayon thread-pool,
102 /// as do those given to `spawn()`. This means that you can't access
103 /// thread-local variables (well, you can, but they may have
104 /// unexpected values).
105 ///
106 /// # Task execution
107 ///
108 /// Task execution potentially starts as soon as `spawn()` is called.
109 /// The task will end sometime before `scope()` returns. Note that the
110 /// *closure* given to scope may return much earlier. In general
111 /// the lifetime of a scope created like `scope(body)` goes something like this:
112 ///
113 /// - Scope begins when `scope(body)` is called
114 /// - Scope body `body()` is invoked
115 ///     - Scope tasks may be spawned
116 /// - Scope body returns
117 /// - Scope tasks execute, possibly spawning more tasks
118 /// - Once all tasks are done, scope ends and `scope()` returns
119 ///
120 /// To see how and when tasks are joined, consider this example:
121 ///
122 /// ```rust
123 /// # use rayon_core as rayon;
124 /// // point start
125 /// rayon::scope(|s| {
126 ///     s.spawn(|s| { // task s.1
127 ///         s.spawn(|s| { // task s.1.1
128 ///             rayon::scope(|t| {
129 ///                 t.spawn(|_| ()); // task t.1
130 ///                 t.spawn(|_| ()); // task t.2
131 ///             });
132 ///         });
133 ///     });
134 ///     s.spawn(|s| { // task s.2
135 ///     });
136 ///     // point mid
137 /// });
138 /// // point end
139 /// ```
140 ///
141 /// The various tasks that are run will execute roughly like so:
142 ///
143 /// ```notrust
144 /// | (start)
145 /// |
146 /// | (scope `s` created)
147 /// +-----------------------------------------------+ (task s.2)
148 /// +-------+ (task s.1)                            |
149 /// |       |                                       |
150 /// |       +---+ (task s.1.1)                      |
151 /// |       |   |                                   |
152 /// |       |   | (scope `t` created)               |
153 /// |       |   +----------------+ (task t.2)       |
154 /// |       |   +---+ (task t.1) |                  |
155 /// | (mid) |   |   |            |                  |
156 /// :       |   + <-+------------+ (scope `t` ends) |
157 /// :       |   |                                   |
158 /// |<------+---+-----------------------------------+ (scope `s` ends)
159 /// |
160 /// | (end)
161 /// ```
162 ///
163 /// The point here is that everything spawned into scope `s` will
164 /// terminate (at latest) at the same point -- right before the
165 /// original call to `rayon::scope` returns. This includes new
166 /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
167 /// scope is created (such as `t`), the things spawned into that scope
168 /// will be joined before that scope returns, which in turn occurs
169 /// before the creating task (task `s.1.1` in this case) finishes.
170 ///
171 /// There is no guaranteed order of execution for spawns in a scope,
172 /// given that other threads may steal tasks at any time. However, they
173 /// are generally prioritized in a LIFO order on the thread from which
174 /// they were spawned. So in this example, absent any stealing, we can
175 /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
176 /// threads always steal from the other end of the deque, like FIFO
177 /// order.  The idea is that "recent" tasks are most likely to be fresh
178 /// in the local CPU's cache, while other threads can steal older
179 /// "stale" tasks.  For an alternate approach, consider
180 /// [`scope_fifo()`] instead.
181 ///
182 /// [`scope_fifo()`]: fn.scope_fifo.html
183 ///
184 /// # Accessing stack data
185 ///
186 /// In general, spawned tasks may access stack data in place that
187 /// outlives the scope itself. Other data must be fully owned by the
188 /// spawned task.
189 ///
190 /// ```rust
191 /// # use rayon_core as rayon;
192 /// let ok: Vec<i32> = vec![1, 2, 3];
193 /// rayon::scope(|s| {
194 ///     let bad: Vec<i32> = vec![4, 5, 6];
195 ///     s.spawn(|_| {
196 ///         // We can access `ok` because outlives the scope `s`.
197 ///         println!("ok: {:?}", ok);
198 ///
199 ///         // If we just try to use `bad` here, the closure will borrow `bad`
200 ///         // (because we are just printing it out, and that only requires a
201 ///         // borrow), which will result in a compilation error. Read on
202 ///         // for options.
203 ///         // println!("bad: {:?}", bad);
204 ///    });
205 /// });
206 /// ```
207 ///
208 /// As the comments example above suggest, to reference `bad` we must
209 /// take ownership of it. One way to do this is to detach the closure
210 /// from the surrounding stack frame, using the `move` keyword. This
211 /// will cause it to take ownership of *all* the variables it touches,
212 /// in this case including both `ok` *and* `bad`:
213 ///
214 /// ```rust
215 /// # use rayon_core as rayon;
216 /// let ok: Vec<i32> = vec![1, 2, 3];
217 /// rayon::scope(|s| {
218 ///     let bad: Vec<i32> = vec![4, 5, 6];
219 ///     s.spawn(move |_| {
220 ///         println!("ok: {:?}", ok);
221 ///         println!("bad: {:?}", bad);
222 ///     });
223 ///
224 ///     // That closure is fine, but now we can't use `ok` anywhere else,
225 ///     // since it is owned by the previous task:
226 ///     // s.spawn(|_| println!("ok: {:?}", ok));
227 /// });
228 /// ```
229 ///
230 /// While this works, it could be a problem if we want to use `ok` elsewhere.
231 /// There are two choices. We can keep the closure as a `move` closure, but
232 /// instead of referencing the variable `ok`, we create a shadowed variable that
233 /// is a borrow of `ok` and capture *that*:
234 ///
235 /// ```rust
236 /// # use rayon_core as rayon;
237 /// let ok: Vec<i32> = vec![1, 2, 3];
238 /// rayon::scope(|s| {
239 ///     let bad: Vec<i32> = vec![4, 5, 6];
240 ///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
241 ///     s.spawn(move |_| {
242 ///         println!("ok: {:?}", ok); // captures the shadowed version
243 ///         println!("bad: {:?}", bad);
244 ///     });
245 ///
246 ///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
247 ///     // can be shared freely. Note that we need a `move` closure here though,
248 ///     // because otherwise we'd be trying to borrow the shadowed `ok`,
249 ///     // and that doesn't outlive `scope`.
250 ///     s.spawn(move |_| println!("ok: {:?}", ok));
251 /// });
252 /// ```
253 ///
254 /// Another option is not to use the `move` keyword but instead to take ownership
255 /// of individual variables:
256 ///
257 /// ```rust
258 /// # use rayon_core as rayon;
259 /// let ok: Vec<i32> = vec![1, 2, 3];
260 /// rayon::scope(|s| {
261 ///     let bad: Vec<i32> = vec![4, 5, 6];
262 ///     s.spawn(|_| {
263 ///         // Transfer ownership of `bad` into a local variable (also named `bad`).
264 ///         // This will force the closure to take ownership of `bad` from the environment.
265 ///         let bad = bad;
266 ///         println!("ok: {:?}", ok); // `ok` is only borrowed.
267 ///         println!("bad: {:?}", bad); // refers to our local variable, above.
268 ///     });
269 ///
270 ///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
271 /// });
272 /// ```
273 ///
274 /// # Panics
275 ///
276 /// If a panic occurs, either in the closure given to `scope()` or in
277 /// any of the spawned jobs, that panic will be propagated and the
278 /// call to `scope()` will panic. If multiple panics occurs, it is
279 /// non-deterministic which of their panic values will propagate.
280 /// Regardless, once a task is spawned using `scope.spawn()`, it will
281 /// execute, even if the spawning task should later panic. `scope()`
282 /// returns once all spawned jobs have completed, and any panics are
283 /// propagated at that point.
scope<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send,284 pub fn scope<'scope, OP, R>(op: OP) -> R
285 where
286     OP: FnOnce(&Scope<'scope>) -> R + Send,
287     R: Send,
288 {
289     in_worker(|owner_thread, _| {
290         let scope = Scope::<'scope>::new(Some(owner_thread), None);
291         scope.base.complete(Some(owner_thread), || op(&scope))
292     })
293 }
294 
295 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
296 /// closure with a reference to `s`. This closure can then spawn
297 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
298 /// respect to the closure; they may themselves spawn additional tasks
299 /// into `s`. When the closure returns, it will block until all tasks
300 /// that have been spawned into `s` complete.
301 ///
302 /// # Task execution
303 ///
304 /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
305 /// difference in the order of execution. Consider a similar example:
306 ///
307 /// [`scope()`]: fn.scope.html
308 ///
309 /// ```rust
310 /// # use rayon_core as rayon;
311 /// // point start
312 /// rayon::scope_fifo(|s| {
313 ///     s.spawn_fifo(|s| { // task s.1
314 ///         s.spawn_fifo(|s| { // task s.1.1
315 ///             rayon::scope_fifo(|t| {
316 ///                 t.spawn_fifo(|_| ()); // task t.1
317 ///                 t.spawn_fifo(|_| ()); // task t.2
318 ///             });
319 ///         });
320 ///     });
321 ///     s.spawn_fifo(|s| { // task s.2
322 ///     });
323 ///     // point mid
324 /// });
325 /// // point end
326 /// ```
327 ///
328 /// The various tasks that are run will execute roughly like so:
329 ///
330 /// ```notrust
331 /// | (start)
332 /// |
333 /// | (FIFO scope `s` created)
334 /// +--------------------+ (task s.1)
335 /// +-------+ (task s.2) |
336 /// |       |            +---+ (task s.1.1)
337 /// |       |            |   |
338 /// |       |            |   | (FIFO scope `t` created)
339 /// |       |            |   +----------------+ (task t.1)
340 /// |       |            |   +---+ (task t.2) |
341 /// | (mid) |            |   |   |            |
342 /// :       |            |   + <-+------------+ (scope `t` ends)
343 /// :       |            |   |
344 /// |<------+------------+---+ (scope `s` ends)
345 /// |
346 /// | (end)
347 /// ```
348 ///
349 /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
350 /// the thread from which they were spawned, as opposed to `scope()`'s
351 /// LIFO.  So in this example, we can expect `s.1` to execute before
352 /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
353 /// FIFO order, as usual. Overall, this has roughly the same order as
354 /// the now-deprecated [`breadth_first`] option, except the effect is
355 /// isolated to a particular scope. If spawns are intermingled from any
356 /// combination of `scope()` and `scope_fifo()`, or from different
357 /// threads, their order is only specified with respect to spawns in the
358 /// same scope and thread.
359 ///
360 /// For more details on this design, see Rayon [RFC #1].
361 ///
362 /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
363 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
364 ///
365 /// # Panics
366 ///
367 /// If a panic occurs, either in the closure given to `scope_fifo()` or
368 /// in any of the spawned jobs, that panic will be propagated and the
369 /// call to `scope_fifo()` will panic. If multiple panics occurs, it is
370 /// non-deterministic which of their panic values will propagate.
371 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
372 /// will execute, even if the spawning task should later panic.
373 /// `scope_fifo()` returns once all spawned jobs have completed, and any
374 /// panics are propagated at that point.
scope_fifo<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send,375 pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
376 where
377     OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
378     R: Send,
379 {
380     in_worker(|owner_thread, _| {
381         let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
382         scope.base.complete(Some(owner_thread), || op(&scope))
383     })
384 }
385 
386 /// Creates a "fork-join" scope `s` and invokes the closure with a
387 /// reference to `s`. This closure can then spawn asynchronous tasks
388 /// into `s`. Those tasks may run asynchronously with respect to the
389 /// closure; they may themselves spawn additional tasks into `s`. When
390 /// the closure returns, it will block until all tasks that have been
391 /// spawned into `s` complete.
392 ///
393 /// This is just like `scope()` except the closure runs on the same thread
394 /// that calls `in_place_scope()`. Only work that it spawns runs in the
395 /// thread pool.
396 ///
397 /// # Panics
398 ///
399 /// If a panic occurs, either in the closure given to `in_place_scope()` or in
400 /// any of the spawned jobs, that panic will be propagated and the
401 /// call to `in_place_scope()` will panic. If multiple panics occurs, it is
402 /// non-deterministic which of their panic values will propagate.
403 /// Regardless, once a task is spawned using `scope.spawn()`, it will
404 /// execute, even if the spawning task should later panic. `in_place_scope()`
405 /// returns once all spawned jobs have completed, and any panics are
406 /// propagated at that point.
in_place_scope<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R,407 pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
408 where
409     OP: FnOnce(&Scope<'scope>) -> R,
410 {
411     do_in_place_scope(None, op)
412 }
413 
do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R,414 pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
415 where
416     OP: FnOnce(&Scope<'scope>) -> R,
417 {
418     let thread = unsafe { WorkerThread::current().as_ref() };
419     let scope = Scope::<'scope>::new(thread, registry);
420     scope.base.complete(thread, || op(&scope))
421 }
422 
423 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
424 /// closure with a reference to `s`. This closure can then spawn
425 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
426 /// respect to the closure; they may themselves spawn additional tasks
427 /// into `s`. When the closure returns, it will block until all tasks
428 /// that have been spawned into `s` complete.
429 ///
430 /// This is just like `scope_fifo()` except the closure runs on the same thread
431 /// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
432 /// thread pool.
433 ///
434 /// # Panics
435 ///
436 /// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
437 /// any of the spawned jobs, that panic will be propagated and the
438 /// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
439 /// non-deterministic which of their panic values will propagate.
440 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
441 /// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
442 /// returns once all spawned jobs have completed, and any panics are
443 /// propagated at that point.
in_place_scope_fifo<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R,444 pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
445 where
446     OP: FnOnce(&ScopeFifo<'scope>) -> R,
447 {
448     do_in_place_scope_fifo(None, op)
449 }
450 
do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R,451 pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
452 where
453     OP: FnOnce(&ScopeFifo<'scope>) -> R,
454 {
455     let thread = unsafe { WorkerThread::current().as_ref() };
456     let scope = ScopeFifo::<'scope>::new(thread, registry);
457     scope.base.complete(thread, || op(&scope))
458 }
459 
460 impl<'scope> Scope<'scope> {
new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self461     fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
462         let base = ScopeBase::new(owner, registry);
463         Scope { base }
464     }
465 
466     /// Spawns a job into the fork-join scope `self`. This job will
467     /// execute sometime before the fork-join scope completes.  The
468     /// job is specified as a closure, and this closure receives its
469     /// own reference to the scope `self` as argument. This can be
470     /// used to inject new jobs into `self`.
471     ///
472     /// # Returns
473     ///
474     /// Nothing. The spawned closures cannot pass back values to the
475     /// caller directly, though they can write to local variables on
476     /// the stack (if those variables outlive the scope) or
477     /// communicate through shared channels.
478     ///
479     /// (The intention is to eventually integrate with Rust futures to
480     /// support spawns of functions that compute a value.)
481     ///
482     /// # Examples
483     ///
484     /// ```rust
485     /// # use rayon_core as rayon;
486     /// let mut value_a = None;
487     /// let mut value_b = None;
488     /// let mut value_c = None;
489     /// rayon::scope(|s| {
490     ///     s.spawn(|s1| {
491     ///           // ^ this is the same scope as `s`; this handle `s1`
492     ///           //   is intended for use by the spawned task,
493     ///           //   since scope handles cannot cross thread boundaries.
494     ///
495     ///         value_a = Some(22);
496     ///
497     ///         // the scope `s` will not end until all these tasks are done
498     ///         s1.spawn(|_| {
499     ///             value_b = Some(44);
500     ///         });
501     ///     });
502     ///
503     ///     s.spawn(|_| {
504     ///         value_c = Some(66);
505     ///     });
506     /// });
507     /// assert_eq!(value_a, Some(22));
508     /// assert_eq!(value_b, Some(44));
509     /// assert_eq!(value_c, Some(66));
510     /// ```
511     ///
512     /// # See also
513     ///
514     /// The [`scope` function] has more extensive documentation about
515     /// task spawning.
516     ///
517     /// [`scope` function]: fn.scope.html
spawn<BODY>(&self, body: BODY) where BODY: FnOnce(&Scope<'scope>) + Send + 'scope,518     pub fn spawn<BODY>(&self, body: BODY)
519     where
520         BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
521     {
522         let scope_ptr = ScopePtr(self);
523         let job = HeapJob::new(move || unsafe {
524             // SAFETY: this job will execute before the scope ends.
525             let scope = scope_ptr.as_ref();
526             ScopeBase::execute_job(&scope.base, move || body(scope))
527         });
528         let job_ref = self.base.heap_job_ref(job);
529 
530         // Since `Scope` implements `Sync`, we can't be sure that we're still in a
531         // thread of this pool, so we can't just push to the local worker thread.
532         // Also, this might be an in-place scope.
533         self.base.registry.inject_or_push(job_ref);
534     }
535 
536     /// Spawns a job into every thread of the fork-join scope `self`. This job will
537     /// execute on each thread sometime before the fork-join scope completes.  The
538     /// job is specified as a closure, and this closure receives its own reference
539     /// to the scope `self` as argument, as well as a `BroadcastContext`.
spawn_broadcast<BODY>(&self, body: BODY) where BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,540     pub fn spawn_broadcast<BODY>(&self, body: BODY)
541     where
542         BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
543     {
544         let scope_ptr = ScopePtr(self);
545         let job = ArcJob::new(move || unsafe {
546             // SAFETY: this job will execute before the scope ends.
547             let scope = scope_ptr.as_ref();
548             let body = &body;
549             let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
550             ScopeBase::execute_job(&scope.base, func)
551         });
552         self.base.inject_broadcast(job)
553     }
554 }
555 
556 impl<'scope> ScopeFifo<'scope> {
new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self557     fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
558         let base = ScopeBase::new(owner, registry);
559         let num_threads = base.registry.num_threads();
560         let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
561         ScopeFifo { base, fifos }
562     }
563 
564     /// Spawns a job into the fork-join scope `self`. This job will
565     /// execute sometime before the fork-join scope completes.  The
566     /// job is specified as a closure, and this closure receives its
567     /// own reference to the scope `self` as argument. This can be
568     /// used to inject new jobs into `self`.
569     ///
570     /// # See also
571     ///
572     /// This method is akin to [`Scope::spawn()`], but with a FIFO
573     /// priority.  The [`scope_fifo` function] has more details about
574     /// this distinction.
575     ///
576     /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
577     /// [`scope_fifo` function]: fn.scope_fifo.html
spawn_fifo<BODY>(&self, body: BODY) where BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,578     pub fn spawn_fifo<BODY>(&self, body: BODY)
579     where
580         BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
581     {
582         let scope_ptr = ScopePtr(self);
583         let job = HeapJob::new(move || unsafe {
584             // SAFETY: this job will execute before the scope ends.
585             let scope = scope_ptr.as_ref();
586             ScopeBase::execute_job(&scope.base, move || body(scope))
587         });
588         let job_ref = self.base.heap_job_ref(job);
589 
590         // If we're in the pool, use our scope's private fifo for this thread to execute
591         // in a locally-FIFO order. Otherwise, just use the pool's global injector.
592         match self.base.registry.current_thread() {
593             Some(worker) => {
594                 let fifo = &self.fifos[worker.index()];
595                 // SAFETY: this job will execute before the scope ends.
596                 unsafe { worker.push(fifo.push(job_ref)) };
597             }
598             None => self.base.registry.inject(job_ref),
599         }
600     }
601 
602     /// Spawns a job into every thread of the fork-join scope `self`. This job will
603     /// execute on each thread sometime before the fork-join scope completes.  The
604     /// job is specified as a closure, and this closure receives its own reference
605     /// to the scope `self` as argument, as well as a `BroadcastContext`.
spawn_broadcast<BODY>(&self, body: BODY) where BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,606     pub fn spawn_broadcast<BODY>(&self, body: BODY)
607     where
608         BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
609     {
610         let scope_ptr = ScopePtr(self);
611         let job = ArcJob::new(move || unsafe {
612             // SAFETY: this job will execute before the scope ends.
613             let scope = scope_ptr.as_ref();
614             let body = &body;
615             let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
616             ScopeBase::execute_job(&scope.base, func)
617         });
618         self.base.inject_broadcast(job)
619     }
620 }
621 
622 impl<'scope> ScopeBase<'scope> {
623     /// Creates the base of a new scope for the given registry
new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self624     fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
625         let registry = registry.unwrap_or_else(|| match owner {
626             Some(owner) => owner.registry(),
627             None => global_registry(),
628         });
629 
630         ScopeBase {
631             registry: Arc::clone(registry),
632             panic: AtomicPtr::new(ptr::null_mut()),
633             job_completed_latch: CountLatch::new(owner),
634             marker: PhantomData,
635         }
636     }
637 
heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef where FUNC: FnOnce() + Send + 'scope,638     fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
639     where
640         FUNC: FnOnce() + Send + 'scope,
641     {
642         unsafe {
643             self.job_completed_latch.increment();
644             job.into_job_ref()
645         }
646     }
647 
inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>) where FUNC: Fn() + Send + Sync + 'scope,648     fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
649     where
650         FUNC: Fn() + Send + Sync + 'scope,
651     {
652         let n_threads = self.registry.num_threads();
653         let job_refs = (0..n_threads).map(|_| unsafe {
654             self.job_completed_latch.increment();
655             ArcJob::as_job_ref(&job)
656         });
657 
658         self.registry.inject_broadcast(job_refs);
659     }
660 
661     /// Executes `func` as a job, either aborting or executing as
662     /// appropriate.
complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R where FUNC: FnOnce() -> R,663     fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
664     where
665         FUNC: FnOnce() -> R,
666     {
667         let result = unsafe { Self::execute_job_closure(self, func) };
668         self.job_completed_latch.wait(owner);
669         self.maybe_propagate_panic();
670         result.unwrap() // only None if `op` panicked, and that would have been propagated
671     }
672 
673     /// Executes `func` as a job, either aborting or executing as
674     /// appropriate.
execute_job<FUNC>(this: *const Self, func: FUNC) where FUNC: FnOnce(),675     unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
676     where
677         FUNC: FnOnce(),
678     {
679         let _: Option<()> = Self::execute_job_closure(this, func);
680     }
681 
682     /// Executes `func` as a job in scope. Adjusts the "job completed"
683     /// counters and also catches any panic and stores it into
684     /// `scope`.
execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R,685     unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
686     where
687         FUNC: FnOnce() -> R,
688     {
689         let result = match unwind::halt_unwinding(func) {
690             Ok(r) => Some(r),
691             Err(err) => {
692                 (*this).job_panicked(err);
693                 None
694             }
695         };
696         Latch::set(&(*this).job_completed_latch);
697         result
698     }
699 
job_panicked(&self, err: Box<dyn Any + Send + 'static>)700     fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
701         // capture the first error we see, free the rest
702         if self.panic.load(Ordering::Relaxed).is_null() {
703             let nil = ptr::null_mut();
704             let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
705             let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
706             if self
707                 .panic
708                 .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
709                 .is_ok()
710             {
711                 // ownership now transferred into self.panic
712             } else {
713                 // another panic raced in ahead of us, so drop ours
714                 let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
715             }
716         }
717     }
718 
maybe_propagate_panic(&self)719     fn maybe_propagate_panic(&self) {
720         // propagate panic, if any occurred; at this point, all
721         // outstanding jobs have completed, so we can use a relaxed
722         // ordering:
723         let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
724         if !panic.is_null() {
725             let value = unsafe { Box::from_raw(panic) };
726             unwind::resume_unwinding(*value);
727         }
728     }
729 }
730 
731 impl<'scope> fmt::Debug for Scope<'scope> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result732     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
733         fmt.debug_struct("Scope")
734             .field("pool_id", &self.base.registry.id())
735             .field("panic", &self.base.panic)
736             .field("job_completed_latch", &self.base.job_completed_latch)
737             .finish()
738     }
739 }
740 
741 impl<'scope> fmt::Debug for ScopeFifo<'scope> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result742     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
743         fmt.debug_struct("ScopeFifo")
744             .field("num_fifos", &self.fifos.len())
745             .field("pool_id", &self.base.registry.id())
746             .field("panic", &self.base.panic)
747             .field("job_completed_latch", &self.base.job_completed_latch)
748             .finish()
749     }
750 }
751 
752 /// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
753 ///
754 /// Unsafe code is still required to dereference the pointer, but that's fine in
755 /// scope jobs that are guaranteed to execute before the scope ends.
756 struct ScopePtr<T>(*const T);
757 
758 // SAFETY: !Send for raw pointers is not for safety, just as a lint
759 unsafe impl<T: Sync> Send for ScopePtr<T> {}
760 
761 // SAFETY: !Sync for raw pointers is not for safety, just as a lint
762 unsafe impl<T: Sync> Sync for ScopePtr<T> {}
763 
764 impl<T> ScopePtr<T> {
765     // Helper to avoid disjoint captures of `scope_ptr.0`
as_ref(&self) -> &T766     unsafe fn as_ref(&self) -> &T {
767         &*self.0
768     }
769 }
770