• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::latch::Latch;
2 use crate::unwind;
3 use crossbeam_deque::{Injector, Steal};
4 use std::any::Any;
5 use std::cell::UnsafeCell;
6 use std::mem;
7 use std::sync::Arc;
8 
9 pub(super) enum JobResult<T> {
10     None,
11     Ok(T),
12     Panic(Box<dyn Any + Send>),
13 }
14 
15 /// A `Job` is used to advertise work for other threads that they may
16 /// want to steal. In accordance with time honored tradition, jobs are
17 /// arranged in a deque, so that thieves can take from the top of the
18 /// deque while the main worker manages the bottom of the deque. This
19 /// deque is managed by the `thread_pool` module.
20 pub(super) trait Job {
21     /// Unsafe: this may be called from a different thread than the one
22     /// which scheduled the job, so the implementer must ensure the
23     /// appropriate traits are met, whether `Send`, `Sync`, or both.
execute(this: *const ())24     unsafe fn execute(this: *const ());
25 }
26 
27 /// Effectively a Job trait object. Each JobRef **must** be executed
28 /// exactly once, or else data may leak.
29 ///
30 /// Internally, we store the job's data in a `*const ()` pointer.  The
31 /// true type is something like `*const StackJob<...>`, but we hide
32 /// it. We also carry the "execute fn" from the `Job` trait.
33 pub(super) struct JobRef {
34     pointer: *const (),
35     execute_fn: unsafe fn(*const ()),
36 }
37 
38 unsafe impl Send for JobRef {}
39 unsafe impl Sync for JobRef {}
40 
41 impl JobRef {
42     /// Unsafe: caller asserts that `data` will remain valid until the
43     /// job is executed.
new<T>(data: *const T) -> JobRef where T: Job,44     pub(super) unsafe fn new<T>(data: *const T) -> JobRef
45     where
46         T: Job,
47     {
48         // erase types:
49         JobRef {
50             pointer: data as *const (),
51             execute_fn: <T as Job>::execute,
52         }
53     }
54 
55     /// Returns an opaque handle that can be saved and compared,
56     /// without making `JobRef` itself `Copy + Eq`.
57     #[inline]
id(&self) -> impl Eq58     pub(super) fn id(&self) -> impl Eq {
59         (self.pointer, self.execute_fn)
60     }
61 
62     #[inline]
execute(self)63     pub(super) unsafe fn execute(self) {
64         (self.execute_fn)(self.pointer)
65     }
66 }
67 
68 /// A job that will be owned by a stack slot. This means that when it
69 /// executes it need not free any heap data, the cleanup occurs when
70 /// the stack frame is later popped.  The function parameter indicates
71 /// `true` if the job was stolen -- executed on a different thread.
72 pub(super) struct StackJob<L, F, R>
73 where
74     L: Latch + Sync,
75     F: FnOnce(bool) -> R + Send,
76     R: Send,
77 {
78     pub(super) latch: L,
79     func: UnsafeCell<Option<F>>,
80     result: UnsafeCell<JobResult<R>>,
81 }
82 
83 impl<L, F, R> StackJob<L, F, R>
84 where
85     L: Latch + Sync,
86     F: FnOnce(bool) -> R + Send,
87     R: Send,
88 {
new(func: F, latch: L) -> StackJob<L, F, R>89     pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
90         StackJob {
91             latch,
92             func: UnsafeCell::new(Some(func)),
93             result: UnsafeCell::new(JobResult::None),
94         }
95     }
96 
as_job_ref(&self) -> JobRef97     pub(super) unsafe fn as_job_ref(&self) -> JobRef {
98         JobRef::new(self)
99     }
100 
run_inline(self, stolen: bool) -> R101     pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
102         self.func.into_inner().unwrap()(stolen)
103     }
104 
into_result(self) -> R105     pub(super) unsafe fn into_result(self) -> R {
106         self.result.into_inner().into_return_value()
107     }
108 }
109 
110 impl<L, F, R> Job for StackJob<L, F, R>
111 where
112     L: Latch + Sync,
113     F: FnOnce(bool) -> R + Send,
114     R: Send,
115 {
execute(this: *const ())116     unsafe fn execute(this: *const ()) {
117         let this = &*(this as *const Self);
118         let abort = unwind::AbortIfPanic;
119         let func = (*this.func.get()).take().unwrap();
120         (*this.result.get()) = JobResult::call(func);
121         Latch::set(&this.latch);
122         mem::forget(abort);
123     }
124 }
125 
126 /// Represents a job stored in the heap. Used to implement
127 /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
128 /// invokes a closure, which then triggers the appropriate logic to
129 /// signal that the job executed.
130 ///
131 /// (Probably `StackJob` should be refactored in a similar fashion.)
132 pub(super) struct HeapJob<BODY>
133 where
134     BODY: FnOnce() + Send,
135 {
136     job: BODY,
137 }
138 
139 impl<BODY> HeapJob<BODY>
140 where
141     BODY: FnOnce() + Send,
142 {
new(job: BODY) -> Box<Self>143     pub(super) fn new(job: BODY) -> Box<Self> {
144         Box::new(HeapJob { job })
145     }
146 
147     /// Creates a `JobRef` from this job -- note that this hides all
148     /// lifetimes, so it is up to you to ensure that this JobRef
149     /// doesn't outlive any data that it closes over.
into_job_ref(self: Box<Self>) -> JobRef150     pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
151         JobRef::new(Box::into_raw(self))
152     }
153 
154     /// Creates a static `JobRef` from this job.
into_static_job_ref(self: Box<Self>) -> JobRef where BODY: 'static,155     pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
156     where
157         BODY: 'static,
158     {
159         unsafe { self.into_job_ref() }
160     }
161 }
162 
163 impl<BODY> Job for HeapJob<BODY>
164 where
165     BODY: FnOnce() + Send,
166 {
execute(this: *const ())167     unsafe fn execute(this: *const ()) {
168         let this = Box::from_raw(this as *mut Self);
169         (this.job)();
170     }
171 }
172 
173 /// Represents a job stored in an `Arc` -- like `HeapJob`, but may
174 /// be turned into multiple `JobRef`s and called multiple times.
175 pub(super) struct ArcJob<BODY>
176 where
177     BODY: Fn() + Send + Sync,
178 {
179     job: BODY,
180 }
181 
182 impl<BODY> ArcJob<BODY>
183 where
184     BODY: Fn() + Send + Sync,
185 {
new(job: BODY) -> Arc<Self>186     pub(super) fn new(job: BODY) -> Arc<Self> {
187         Arc::new(ArcJob { job })
188     }
189 
190     /// Creates a `JobRef` from this job -- note that this hides all
191     /// lifetimes, so it is up to you to ensure that this JobRef
192     /// doesn't outlive any data that it closes over.
as_job_ref(this: &Arc<Self>) -> JobRef193     pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
194         JobRef::new(Arc::into_raw(Arc::clone(this)))
195     }
196 
197     /// Creates a static `JobRef` from this job.
as_static_job_ref(this: &Arc<Self>) -> JobRef where BODY: 'static,198     pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
199     where
200         BODY: 'static,
201     {
202         unsafe { Self::as_job_ref(this) }
203     }
204 }
205 
206 impl<BODY> Job for ArcJob<BODY>
207 where
208     BODY: Fn() + Send + Sync,
209 {
execute(this: *const ())210     unsafe fn execute(this: *const ()) {
211         let this = Arc::from_raw(this as *mut Self);
212         (this.job)();
213     }
214 }
215 
216 impl<T> JobResult<T> {
call(func: impl FnOnce(bool) -> T) -> Self217     fn call(func: impl FnOnce(bool) -> T) -> Self {
218         match unwind::halt_unwinding(|| func(true)) {
219             Ok(x) => JobResult::Ok(x),
220             Err(x) => JobResult::Panic(x),
221         }
222     }
223 
224     /// Convert the `JobResult` for a job that has finished (and hence
225     /// its JobResult is populated) into its return value.
226     ///
227     /// NB. This will panic if the job panicked.
into_return_value(self) -> T228     pub(super) fn into_return_value(self) -> T {
229         match self {
230             JobResult::None => unreachable!(),
231             JobResult::Ok(x) => x,
232             JobResult::Panic(x) => unwind::resume_unwinding(x),
233         }
234     }
235 }
236 
237 /// Indirect queue to provide FIFO job priority.
238 pub(super) struct JobFifo {
239     inner: Injector<JobRef>,
240 }
241 
242 impl JobFifo {
new() -> Self243     pub(super) fn new() -> Self {
244         JobFifo {
245             inner: Injector::new(),
246         }
247     }
248 
push(&self, job_ref: JobRef) -> JobRef249     pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
250         // A little indirection ensures that spawns are always prioritized in FIFO order.  The
251         // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
252         // (FIFO), but either way they will end up popping from the front of this queue.
253         self.inner.push(job_ref);
254         JobRef::new(self)
255     }
256 }
257 
258 impl Job for JobFifo {
execute(this: *const ())259     unsafe fn execute(this: *const ()) {
260         // We "execute" a queue by executing its first job, FIFO.
261         let this = &*(this as *const Self);
262         loop {
263             match this.inner.steal() {
264                 Steal::Success(job_ref) => break job_ref.execute(),
265                 Steal::Empty => panic!("FIFO is empty"),
266                 Steal::Retry => {}
267             }
268         }
269     }
270 }
271