• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Threads that can borrow variables from the stack.
2 //!
3 //! Create a scope when spawned threads need to access variables on the stack:
4 //!
5 //! ```
6 //! use crossbeam_utils::thread;
7 //!
8 //! let people = vec![
9 //!     "Alice".to_string(),
10 //!     "Bob".to_string(),
11 //!     "Carol".to_string(),
12 //! ];
13 //!
14 //! thread::scope(|s| {
15 //!     for person in &people {
16 //!         s.spawn(move |_| {
17 //!             println!("Hello, {}!", person);
18 //!         });
19 //!     }
20 //! }).unwrap();
21 //! ```
22 //!
23 //! # Why scoped threads?
24 //!
25 //! Suppose we wanted to re-write the previous example using plain threads:
26 //!
27 //! ```compile_fail,E0597
28 //! use std::thread;
29 //!
30 //! let people = vec![
31 //!     "Alice".to_string(),
32 //!     "Bob".to_string(),
33 //!     "Carol".to_string(),
34 //! ];
35 //!
36 //! let mut threads = Vec::new();
37 //!
38 //! for person in &people {
39 //!     threads.push(thread::spawn(move || {
40 //!         println!("Hello, {}!", person);
41 //!     }));
42 //! }
43 //!
44 //! for thread in threads {
45 //!     thread.join().unwrap();
46 //! }
47 //! ```
48 //!
49 //! This doesn't work because the borrow checker complains about `people` not living long enough:
50 //!
51 //! ```text
52 //! error[E0597]: `people` does not live long enough
53 //!   --> src/main.rs:12:20
54 //!    |
55 //! 12 |     for person in &people {
56 //!    |                    ^^^^^^ borrowed value does not live long enough
57 //! ...
58 //! 21 | }
59 //!    | - borrowed value only lives until here
60 //!    |
61 //!    = note: borrowed value must be valid for the static lifetime...
62 //! ```
63 //!
64 //! The problem here is that spawned threads are not allowed to borrow variables on stack because
65 //! the compiler cannot prove they will be joined before `people` is destroyed.
66 //!
67 //! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68 //! before the scope ends.
69 //!
70 //! # How scoped threads work
71 //!
72 //! If a variable is borrowed by a thread, the thread must complete before the variable is
73 //! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74 //! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75 //!
76 //! A scope creates a clear boundary between variables outside the scope and threads inside the
77 //! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78 //! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79 //! can safely access variables outside it.
80 //!
81 //! # Nesting scoped threads
82 //!
83 //! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84 //! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85 //! cannot be borrowed by scoped threads:
86 //!
87 //! ```compile_fail,E0521
88 //! use crossbeam_utils::thread;
89 //!
90 //! thread::scope(|s| {
91 //!     s.spawn(|_| {
92 //!         // Not going to compile because we're trying to borrow `s`,
93 //!         // which lives *inside* the scope! :(
94 //!         s.spawn(|_| println!("nested thread"));
95 //!     });
96 //! });
97 //! ```
98 //!
99 //! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100 //! argument, which can be used for spawning nested threads:
101 //!
102 //! ```
103 //! use crossbeam_utils::thread;
104 //!
105 //! thread::scope(|s| {
106 //!     // Note the `|s|` here.
107 //!     s.spawn(|s| {
108 //!         // Yay, this works because we're using a fresh argument `s`! :)
109 //!         s.spawn(|_| println!("nested thread"));
110 //!     });
111 //! }).unwrap();
112 //! ```
113 
114 use std::fmt;
115 use std::io;
116 use std::marker::PhantomData;
117 use std::mem;
118 use std::panic;
119 use std::sync::{Arc, Mutex};
120 use std::thread;
121 
122 use crate::sync::WaitGroup;
123 
124 type SharedVec<T> = Arc<Mutex<Vec<T>>>;
125 type SharedOption<T> = Arc<Mutex<Option<T>>>;
126 
127 /// Creates a new scope for spawning threads.
128 ///
129 /// All child threads that haven't been manually joined will be automatically joined just before
130 /// this function invocation ends. If all joined threads have successfully completed, `Ok` is
131 /// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
132 /// returned containing errors from panicked threads. Note that if panics are implemented by
133 /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
134 ///
135 /// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`].
136 ///
137 /// # Examples
138 ///
139 /// ```
140 /// use crossbeam_utils::thread;
141 ///
142 /// let var = vec![1, 2, 3];
143 ///
144 /// thread::scope(|s| {
145 ///     s.spawn(|_| {
146 ///         println!("A child thread borrowing `var`: {:?}", var);
147 ///     });
148 /// }).unwrap();
149 /// ```
scope<'env, F, R>(f: F) -> thread::Result<R> where F: FnOnce(&Scope<'env>) -> R,150 pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
151 where
152     F: FnOnce(&Scope<'env>) -> R,
153 {
154     struct AbortOnPanic;
155     impl Drop for AbortOnPanic {
156         fn drop(&mut self) {
157             if thread::panicking() {
158                 std::process::abort();
159             }
160         }
161     }
162 
163     let wg = WaitGroup::new();
164     let scope = Scope::<'env> {
165         handles: SharedVec::default(),
166         wait_group: wg.clone(),
167         _marker: PhantomData,
168     };
169 
170     // Execute the scoped function, but catch any panics.
171     let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
172 
173     // If an unwinding panic occurs before all threads are joined
174     // promote it to an aborting panic to prevent any threads from escaping the scope.
175     let guard = AbortOnPanic;
176 
177     // Wait until all nested scopes are dropped.
178     drop(scope.wait_group);
179     wg.wait();
180 
181     // Join all remaining spawned threads.
182     let panics: Vec<_> = scope
183         .handles
184         .lock()
185         .unwrap()
186         // Filter handles that haven't been joined, join them, and collect errors.
187         .drain(..)
188         .filter_map(|handle| handle.lock().unwrap().take())
189         .filter_map(|handle| handle.join().err())
190         .collect();
191 
192     mem::forget(guard);
193 
194     // If `f` has panicked, resume unwinding.
195     // If any of the child threads have panicked, return the panic errors.
196     // Otherwise, everything is OK and return the result of `f`.
197     match result {
198         Err(err) => panic::resume_unwind(err),
199         Ok(res) => {
200             if panics.is_empty() {
201                 Ok(res)
202             } else {
203                 Err(Box::new(panics))
204             }
205         }
206     }
207 }
208 
209 /// A scope for spawning threads.
210 pub struct Scope<'env> {
211     /// The list of the thread join handles.
212     handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
213 
214     /// Used to wait until all subscopes all dropped.
215     wait_group: WaitGroup,
216 
217     /// Borrows data with invariant lifetime `'env`.
218     _marker: PhantomData<&'env mut &'env ()>,
219 }
220 
221 unsafe impl Sync for Scope<'_> {}
222 
223 impl<'env> Scope<'env> {
224     /// Spawns a scoped thread.
225     ///
226     /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
227     /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
228     /// allowing it to reference variables outside the scope.
229     ///
230     /// The scoped thread is passed a reference to this scope as an argument, which can be used for
231     /// spawning nested threads.
232     ///
233     /// The returned [handle](ScopedJoinHandle) can be used to manually
234     /// [join](ScopedJoinHandle::join) the thread before the scope exits.
235     ///
236     /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
237     /// stack size or the name of the thread, use this API instead.
238     ///
239     /// [`spawn`]: std::thread::spawn
240     ///
241     /// # Panics
242     ///
243     /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
244     /// to recover from such errors.
245     ///
246     /// # Examples
247     ///
248     /// ```
249     /// use crossbeam_utils::thread;
250     ///
251     /// thread::scope(|s| {
252     ///     let handle = s.spawn(|_| {
253     ///         println!("A child thread is running");
254     ///         42
255     ///     });
256     ///
257     ///     // Join the thread and retrieve its result.
258     ///     let res = handle.join().unwrap();
259     ///     assert_eq!(res, 42);
260     /// }).unwrap();
261     /// ```
spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env,262     pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
263     where
264         F: FnOnce(&Scope<'env>) -> T,
265         F: Send + 'env,
266         T: Send + 'env,
267     {
268         self.builder()
269             .spawn(f)
270             .expect("failed to spawn scoped thread")
271     }
272 
273     /// Creates a builder that can configure a thread before spawning.
274     ///
275     /// # Examples
276     ///
277     /// ```
278     /// use crossbeam_utils::thread;
279     ///
280     /// thread::scope(|s| {
281     ///     s.builder()
282     ///         .spawn(|_| println!("A child thread is running"))
283     ///         .unwrap();
284     /// }).unwrap();
285     /// ```
builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env>286     pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
287         ScopedThreadBuilder {
288             scope: self,
289             builder: thread::Builder::new(),
290         }
291     }
292 }
293 
294 impl fmt::Debug for Scope<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result295     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296         f.pad("Scope { .. }")
297     }
298 }
299 
300 /// Configures the properties of a new thread.
301 ///
302 /// The two configurable properties are:
303 ///
304 /// - [`name`]: Specifies an [associated name for the thread][naming-threads].
305 /// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
306 ///
307 /// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
308 /// thread handle with the given configuration.
309 ///
310 /// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
311 /// value. You may want to use this builder when you want to recover from a failure to launch a
312 /// thread.
313 ///
314 /// # Examples
315 ///
316 /// ```
317 /// use crossbeam_utils::thread;
318 ///
319 /// thread::scope(|s| {
320 ///     s.builder()
321 ///         .spawn(|_| println!("Running a child thread"))
322 ///         .unwrap();
323 /// }).unwrap();
324 /// ```
325 ///
326 /// [`name`]: ScopedThreadBuilder::name
327 /// [`stack_size`]: ScopedThreadBuilder::stack_size
328 /// [`spawn`]: ScopedThreadBuilder::spawn
329 /// [`io::Result`]: std::io::Result
330 /// [naming-threads]: std::thread#naming-threads
331 /// [stack-size]: std::thread#stack-size
332 #[derive(Debug)]
333 pub struct ScopedThreadBuilder<'scope, 'env> {
334     scope: &'scope Scope<'env>,
335     builder: thread::Builder,
336 }
337 
338 impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
339     /// Sets the name for the new thread.
340     ///
341     /// The name must not contain null bytes (`\0`).
342     ///
343     /// For more information about named threads, see [here][naming-threads].
344     ///
345     /// # Examples
346     ///
347     /// ```
348     /// use crossbeam_utils::thread;
349     /// use std::thread::current;
350     ///
351     /// thread::scope(|s| {
352     ///     s.builder()
353     ///         .name("my thread".to_string())
354     ///         .spawn(|_| assert_eq!(current().name(), Some("my thread")))
355     ///         .unwrap();
356     /// }).unwrap();
357     /// ```
358     ///
359     /// [naming-threads]: std::thread#naming-threads
name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env>360     pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
361         self.builder = self.builder.name(name);
362         self
363     }
364 
365     /// Sets the size of the stack for the new thread.
366     ///
367     /// The stack size is measured in bytes.
368     ///
369     /// For more information about the stack size for threads, see [here][stack-size].
370     ///
371     /// # Examples
372     ///
373     /// ```
374     /// use crossbeam_utils::thread;
375     ///
376     /// thread::scope(|s| {
377     ///     s.builder()
378     ///         .stack_size(32 * 1024)
379     ///         .spawn(|_| println!("Running a child thread"))
380     ///         .unwrap();
381     /// }).unwrap();
382     /// ```
383     ///
384     /// [stack-size]: std::thread#stack-size
stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env>385     pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
386         self.builder = self.builder.stack_size(size);
387         self
388     }
389 
390     /// Spawns a scoped thread with this configuration.
391     ///
392     /// The scoped thread is passed a reference to this scope as an argument, which can be used for
393     /// spawning nested threads.
394     ///
395     /// The returned handle can be used to manually join the thread before the scope exits.
396     ///
397     /// # Errors
398     ///
399     /// Unlike the [`Scope::spawn`] method, this method yields an
400     /// [`io::Result`] to capture any failure to create the thread at
401     /// the OS level.
402     ///
403     /// [`io::Result`]: std::io::Result
404     ///
405     /// # Panics
406     ///
407     /// Panics if a thread name was set and it contained null bytes.
408     ///
409     /// # Examples
410     ///
411     /// ```
412     /// use crossbeam_utils::thread;
413     ///
414     /// thread::scope(|s| {
415     ///     let handle = s.builder()
416     ///         .spawn(|_| {
417     ///             println!("A child thread is running");
418     ///             42
419     ///         })
420     ///         .unwrap();
421     ///
422     ///     // Join the thread and retrieve its result.
423     ///     let res = handle.join().unwrap();
424     ///     assert_eq!(res, 42);
425     /// }).unwrap();
426     /// ```
spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env,427     pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
428     where
429         F: FnOnce(&Scope<'env>) -> T,
430         F: Send + 'env,
431         T: Send + 'env,
432     {
433         // The result of `f` will be stored here.
434         let result = SharedOption::default();
435 
436         // Spawn the thread and grab its join handle and thread handle.
437         let (handle, thread) = {
438             let result = Arc::clone(&result);
439 
440             // A clone of the scope that will be moved into the new thread.
441             let scope = Scope::<'env> {
442                 handles: Arc::clone(&self.scope.handles),
443                 wait_group: self.scope.wait_group.clone(),
444                 _marker: PhantomData,
445             };
446 
447             // Spawn the thread.
448             let handle = {
449                 let closure = move || {
450                     // Make sure the scope is inside the closure with the proper `'env` lifetime.
451                     let scope: Scope<'env> = scope;
452 
453                     // Run the closure.
454                     let res = f(&scope);
455 
456                     // Store the result if the closure didn't panic.
457                     *result.lock().unwrap() = Some(res);
458                 };
459 
460                 // Allocate `closure` on the heap and erase the `'env` bound.
461                 let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
462                 let closure: Box<dyn FnOnce() + Send + 'static> =
463                     unsafe { mem::transmute(closure) };
464 
465                 // Finally, spawn the closure.
466                 self.builder.spawn(closure)?
467             };
468 
469             let thread = handle.thread().clone();
470             let handle = Arc::new(Mutex::new(Some(handle)));
471             (handle, thread)
472         };
473 
474         // Add the handle to the shared list of join handles.
475         self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
476 
477         Ok(ScopedJoinHandle {
478             handle,
479             result,
480             thread,
481             _marker: PhantomData,
482         })
483     }
484 }
485 
486 unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
487 unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
488 
489 /// A handle that can be used to join its scoped thread.
490 ///
491 /// This struct is created by the [`Scope::spawn`] method and the
492 /// [`ScopedThreadBuilder::spawn`] method.
493 pub struct ScopedJoinHandle<'scope, T> {
494     /// A join handle to the spawned thread.
495     handle: SharedOption<thread::JoinHandle<()>>,
496 
497     /// Holds the result of the inner closure.
498     result: SharedOption<T>,
499 
500     /// A handle to the the spawned thread.
501     thread: thread::Thread,
502 
503     /// Borrows the parent scope with lifetime `'scope`.
504     _marker: PhantomData<&'scope ()>,
505 }
506 
507 impl<T> ScopedJoinHandle<'_, T> {
508     /// Waits for the thread to finish and returns its result.
509     ///
510     /// If the child thread panics, an error is returned. Note that if panics are implemented by
511     /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
512     ///
513     /// # Panics
514     ///
515     /// This function may panic on some platforms if a thread attempts to join itself or otherwise
516     /// may create a deadlock with joining threads.
517     ///
518     /// # Examples
519     ///
520     /// ```
521     /// use crossbeam_utils::thread;
522     ///
523     /// thread::scope(|s| {
524     ///     let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
525     ///     let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
526     ///
527     ///     // Join the first thread and verify that it succeeded.
528     ///     let res = handle1.join();
529     ///     assert!(res.is_ok());
530     ///
531     ///     // Join the second thread and verify that it panicked.
532     ///     let res = handle2.join();
533     ///     assert!(res.is_err());
534     /// }).unwrap();
535     /// ```
join(self) -> thread::Result<T>536     pub fn join(self) -> thread::Result<T> {
537         // Take out the handle. The handle will surely be available because the root scope waits
538         // for nested scopes before joining remaining threads.
539         let handle = self.handle.lock().unwrap().take().unwrap();
540 
541         // Join the thread and then take the result out of its inner closure.
542         handle
543             .join()
544             .map(|()| self.result.lock().unwrap().take().unwrap())
545     }
546 
547     /// Returns a handle to the underlying thread.
548     ///
549     /// # Examples
550     ///
551     /// ```
552     /// use crossbeam_utils::thread;
553     ///
554     /// thread::scope(|s| {
555     ///     let handle = s.spawn(|_| println!("A child thread is running"));
556     ///     println!("The child thread ID: {:?}", handle.thread().id());
557     /// }).unwrap();
558     /// ```
thread(&self) -> &thread::Thread559     pub fn thread(&self) -> &thread::Thread {
560         &self.thread
561     }
562 }
563 
564 /// Unix-specific extensions.
565 #[cfg(unix)]
566 mod unix {
567     use super::ScopedJoinHandle;
568     use std::os::unix::thread::{JoinHandleExt, RawPthread};
569 
570     impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
as_pthread_t(&self) -> RawPthread571         fn as_pthread_t(&self) -> RawPthread {
572             // Borrow the handle. The handle will surely be available because the root scope waits
573             // for nested scopes before joining remaining threads.
574             let handle = self.handle.lock().unwrap();
575             handle.as_ref().unwrap().as_pthread_t()
576         }
into_pthread_t(self) -> RawPthread577         fn into_pthread_t(self) -> RawPthread {
578             self.as_pthread_t()
579         }
580     }
581 }
582 /// Windows-specific extensions.
583 #[cfg(windows)]
584 mod windows {
585     use super::ScopedJoinHandle;
586     use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
587 
588     impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
as_raw_handle(&self) -> RawHandle589         fn as_raw_handle(&self) -> RawHandle {
590             // Borrow the handle. The handle will surely be available because the root scope waits
591             // for nested scopes before joining remaining threads.
592             let handle = self.handle.lock().unwrap();
593             handle.as_ref().unwrap().as_raw_handle()
594         }
595     }
596 
597     impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
into_raw_handle(self) -> RawHandle598         fn into_raw_handle(self) -> RawHandle {
599             self.as_raw_handle()
600         }
601     }
602 }
603 
604 impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result605     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606         f.pad("ScopedJoinHandle { .. }")
607     }
608 }
609