1 use crate::latch::Latch; 2 use crate::unwind; 3 use crossbeam_deque::{Injector, Steal}; 4 use std::any::Any; 5 use std::cell::UnsafeCell; 6 use std::mem; 7 use std::sync::Arc; 8 9 pub(super) enum JobResult<T> { 10 None, 11 Ok(T), 12 Panic(Box<dyn Any + Send>), 13 } 14 15 /// A `Job` is used to advertise work for other threads that they may 16 /// want to steal. In accordance with time honored tradition, jobs are 17 /// arranged in a deque, so that thieves can take from the top of the 18 /// deque while the main worker manages the bottom of the deque. This 19 /// deque is managed by the `thread_pool` module. 20 pub(super) trait Job { 21 /// Unsafe: this may be called from a different thread than the one 22 /// which scheduled the job, so the implementer must ensure the 23 /// appropriate traits are met, whether `Send`, `Sync`, or both. execute(this: *const ())24 unsafe fn execute(this: *const ()); 25 } 26 27 /// Effectively a Job trait object. Each JobRef **must** be executed 28 /// exactly once, or else data may leak. 29 /// 30 /// Internally, we store the job's data in a `*const ()` pointer. The 31 /// true type is something like `*const StackJob<...>`, but we hide 32 /// it. We also carry the "execute fn" from the `Job` trait. 33 pub(super) struct JobRef { 34 pointer: *const (), 35 execute_fn: unsafe fn(*const ()), 36 } 37 38 unsafe impl Send for JobRef {} 39 unsafe impl Sync for JobRef {} 40 41 impl JobRef { 42 /// Unsafe: caller asserts that `data` will remain valid until the 43 /// job is executed. new<T>(data: *const T) -> JobRef where T: Job,44 pub(super) unsafe fn new<T>(data: *const T) -> JobRef 45 where 46 T: Job, 47 { 48 // erase types: 49 JobRef { 50 pointer: data as *const (), 51 execute_fn: <T as Job>::execute, 52 } 53 } 54 55 /// Returns an opaque handle that can be saved and compared, 56 /// without making `JobRef` itself `Copy + Eq`. 57 #[inline] id(&self) -> impl Eq58 pub(super) fn id(&self) -> impl Eq { 59 (self.pointer, self.execute_fn) 60 } 61 62 #[inline] execute(self)63 pub(super) unsafe fn execute(self) { 64 (self.execute_fn)(self.pointer) 65 } 66 } 67 68 /// A job that will be owned by a stack slot. This means that when it 69 /// executes it need not free any heap data, the cleanup occurs when 70 /// the stack frame is later popped. The function parameter indicates 71 /// `true` if the job was stolen -- executed on a different thread. 72 pub(super) struct StackJob<L, F, R> 73 where 74 L: Latch + Sync, 75 F: FnOnce(bool) -> R + Send, 76 R: Send, 77 { 78 pub(super) latch: L, 79 func: UnsafeCell<Option<F>>, 80 result: UnsafeCell<JobResult<R>>, 81 } 82 83 impl<L, F, R> StackJob<L, F, R> 84 where 85 L: Latch + Sync, 86 F: FnOnce(bool) -> R + Send, 87 R: Send, 88 { new(func: F, latch: L) -> StackJob<L, F, R>89 pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> { 90 StackJob { 91 latch, 92 func: UnsafeCell::new(Some(func)), 93 result: UnsafeCell::new(JobResult::None), 94 } 95 } 96 as_job_ref(&self) -> JobRef97 pub(super) unsafe fn as_job_ref(&self) -> JobRef { 98 JobRef::new(self) 99 } 100 run_inline(self, stolen: bool) -> R101 pub(super) unsafe fn run_inline(self, stolen: bool) -> R { 102 self.func.into_inner().unwrap()(stolen) 103 } 104 into_result(self) -> R105 pub(super) unsafe fn into_result(self) -> R { 106 self.result.into_inner().into_return_value() 107 } 108 } 109 110 impl<L, F, R> Job for StackJob<L, F, R> 111 where 112 L: Latch + Sync, 113 F: FnOnce(bool) -> R + Send, 114 R: Send, 115 { execute(this: *const ())116 unsafe fn execute(this: *const ()) { 117 let this = &*(this as *const Self); 118 let abort = unwind::AbortIfPanic; 119 let func = (*this.func.get()).take().unwrap(); 120 (*this.result.get()) = JobResult::call(func); 121 Latch::set(&this.latch); 122 mem::forget(abort); 123 } 124 } 125 126 /// Represents a job stored in the heap. Used to implement 127 /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply 128 /// invokes a closure, which then triggers the appropriate logic to 129 /// signal that the job executed. 130 /// 131 /// (Probably `StackJob` should be refactored in a similar fashion.) 132 pub(super) struct HeapJob<BODY> 133 where 134 BODY: FnOnce() + Send, 135 { 136 job: BODY, 137 } 138 139 impl<BODY> HeapJob<BODY> 140 where 141 BODY: FnOnce() + Send, 142 { new(job: BODY) -> Box<Self>143 pub(super) fn new(job: BODY) -> Box<Self> { 144 Box::new(HeapJob { job }) 145 } 146 147 /// Creates a `JobRef` from this job -- note that this hides all 148 /// lifetimes, so it is up to you to ensure that this JobRef 149 /// doesn't outlive any data that it closes over. into_job_ref(self: Box<Self>) -> JobRef150 pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { 151 JobRef::new(Box::into_raw(self)) 152 } 153 154 /// Creates a static `JobRef` from this job. into_static_job_ref(self: Box<Self>) -> JobRef where BODY: 'static,155 pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef 156 where 157 BODY: 'static, 158 { 159 unsafe { self.into_job_ref() } 160 } 161 } 162 163 impl<BODY> Job for HeapJob<BODY> 164 where 165 BODY: FnOnce() + Send, 166 { execute(this: *const ())167 unsafe fn execute(this: *const ()) { 168 let this = Box::from_raw(this as *mut Self); 169 (this.job)(); 170 } 171 } 172 173 /// Represents a job stored in an `Arc` -- like `HeapJob`, but may 174 /// be turned into multiple `JobRef`s and called multiple times. 175 pub(super) struct ArcJob<BODY> 176 where 177 BODY: Fn() + Send + Sync, 178 { 179 job: BODY, 180 } 181 182 impl<BODY> ArcJob<BODY> 183 where 184 BODY: Fn() + Send + Sync, 185 { new(job: BODY) -> Arc<Self>186 pub(super) fn new(job: BODY) -> Arc<Self> { 187 Arc::new(ArcJob { job }) 188 } 189 190 /// Creates a `JobRef` from this job -- note that this hides all 191 /// lifetimes, so it is up to you to ensure that this JobRef 192 /// doesn't outlive any data that it closes over. as_job_ref(this: &Arc<Self>) -> JobRef193 pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { 194 JobRef::new(Arc::into_raw(Arc::clone(this))) 195 } 196 197 /// Creates a static `JobRef` from this job. as_static_job_ref(this: &Arc<Self>) -> JobRef where BODY: 'static,198 pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef 199 where 200 BODY: 'static, 201 { 202 unsafe { Self::as_job_ref(this) } 203 } 204 } 205 206 impl<BODY> Job for ArcJob<BODY> 207 where 208 BODY: Fn() + Send + Sync, 209 { execute(this: *const ())210 unsafe fn execute(this: *const ()) { 211 let this = Arc::from_raw(this as *mut Self); 212 (this.job)(); 213 } 214 } 215 216 impl<T> JobResult<T> { call(func: impl FnOnce(bool) -> T) -> Self217 fn call(func: impl FnOnce(bool) -> T) -> Self { 218 match unwind::halt_unwinding(|| func(true)) { 219 Ok(x) => JobResult::Ok(x), 220 Err(x) => JobResult::Panic(x), 221 } 222 } 223 224 /// Convert the `JobResult` for a job that has finished (and hence 225 /// its JobResult is populated) into its return value. 226 /// 227 /// NB. This will panic if the job panicked. into_return_value(self) -> T228 pub(super) fn into_return_value(self) -> T { 229 match self { 230 JobResult::None => unreachable!(), 231 JobResult::Ok(x) => x, 232 JobResult::Panic(x) => unwind::resume_unwinding(x), 233 } 234 } 235 } 236 237 /// Indirect queue to provide FIFO job priority. 238 pub(super) struct JobFifo { 239 inner: Injector<JobRef>, 240 } 241 242 impl JobFifo { new() -> Self243 pub(super) fn new() -> Self { 244 JobFifo { 245 inner: Injector::new(), 246 } 247 } 248 push(&self, job_ref: JobRef) -> JobRef249 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { 250 // A little indirection ensures that spawns are always prioritized in FIFO order. The 251 // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front 252 // (FIFO), but either way they will end up popping from the front of this queue. 253 self.inner.push(job_ref); 254 JobRef::new(self) 255 } 256 } 257 258 impl Job for JobFifo { execute(this: *const ())259 unsafe fn execute(this: *const ()) { 260 // We "execute" a queue by executing its first job, FIFO. 261 let this = &*(this as *const Self); 262 loop { 263 match this.inner.steal() { 264 Steal::Success(job_ref) => break job_ref.execute(), 265 Steal::Empty => panic!("FIFO is empty"), 266 Steal::Retry => {} 267 } 268 } 269 } 270 } 271