1 use std::marker::PhantomData; 2 use std::ops::Deref; 3 use std::sync::atomic::{AtomicUsize, Ordering}; 4 use std::sync::{Arc, Condvar, Mutex}; 5 use std::usize; 6 7 use crate::registry::{Registry, WorkerThread}; 8 9 /// We define various kinds of latches, which are all a primitive signaling 10 /// mechanism. A latch starts as false. Eventually someone calls `set()` and 11 /// it becomes true. You can test if it has been set by calling `probe()`. 12 /// 13 /// Some kinds of latches, but not all, support a `wait()` operation 14 /// that will wait until the latch is set, blocking efficiently. That 15 /// is not part of the trait since it is not possibly to do with all 16 /// latches. 17 /// 18 /// The intention is that `set()` is called once, but `probe()` may be 19 /// called any number of times. Once `probe()` returns true, the memory 20 /// effects that occurred before `set()` become visible. 21 /// 22 /// It'd probably be better to refactor the API into two paired types, 23 /// but that's a bit of work, and this is not a public API. 24 /// 25 /// ## Memory ordering 26 /// 27 /// Latches need to guarantee two things: 28 /// 29 /// - Once `probe()` returns true, all memory effects from the `set()` 30 /// are visible (in other words, the set should synchronize-with 31 /// the probe). 32 /// - Once `set()` occurs, the next `probe()` *will* observe it. This 33 /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep 34 /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. 35 pub(super) trait Latch { 36 /// Set the latch, signalling others. 37 /// 38 /// # WARNING 39 /// 40 /// Setting a latch triggers other threads to wake up and (in some 41 /// cases) complete. This may, in turn, cause memory to be 42 /// deallocated and so forth. One must be very careful about this, 43 /// and it's typically better to read all the fields you will need 44 /// to access *before* a latch is set! 45 /// 46 /// This function operates on `*const Self` instead of `&self` to allow it 47 /// to become dangling during this call. The caller must ensure that the 48 /// pointer is valid upon entry, and not invalidated during the call by any 49 /// actions other than `set` itself. set(this: *const Self)50 unsafe fn set(this: *const Self); 51 } 52 53 pub(super) trait AsCoreLatch { as_core_latch(&self) -> &CoreLatch54 fn as_core_latch(&self) -> &CoreLatch; 55 } 56 57 /// Latch is not set, owning thread is awake 58 const UNSET: usize = 0; 59 60 /// Latch is not set, owning thread is going to sleep on this latch 61 /// (but has not yet fallen asleep). 62 const SLEEPY: usize = 1; 63 64 /// Latch is not set, owning thread is asleep on this latch and 65 /// must be awoken. 66 const SLEEPING: usize = 2; 67 68 /// Latch is set. 69 const SET: usize = 3; 70 71 /// Spin latches are the simplest, most efficient kind, but they do 72 /// not support a `wait()` operation. They just have a boolean flag 73 /// that becomes true when `set()` is called. 74 #[derive(Debug)] 75 pub(super) struct CoreLatch { 76 state: AtomicUsize, 77 } 78 79 impl CoreLatch { 80 #[inline] new() -> Self81 fn new() -> Self { 82 Self { 83 state: AtomicUsize::new(0), 84 } 85 } 86 87 /// Returns the address of this core latch as an integer. Used 88 /// for logging. 89 #[inline] addr(&self) -> usize90 pub(super) fn addr(&self) -> usize { 91 self as *const CoreLatch as usize 92 } 93 94 /// Invoked by owning thread as it prepares to sleep. Returns true 95 /// if the owning thread may proceed to fall asleep, false if the 96 /// latch was set in the meantime. 97 #[inline] get_sleepy(&self) -> bool98 pub(super) fn get_sleepy(&self) -> bool { 99 self.state 100 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) 101 .is_ok() 102 } 103 104 /// Invoked by owning thread as it falls asleep sleep. Returns 105 /// true if the owning thread should block, or false if the latch 106 /// was set in the meantime. 107 #[inline] fall_asleep(&self) -> bool108 pub(super) fn fall_asleep(&self) -> bool { 109 self.state 110 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) 111 .is_ok() 112 } 113 114 /// Invoked by owning thread as it falls asleep sleep. Returns 115 /// true if the owning thread should block, or false if the latch 116 /// was set in the meantime. 117 #[inline] wake_up(&self)118 pub(super) fn wake_up(&self) { 119 if !self.probe() { 120 let _ = 121 self.state 122 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); 123 } 124 } 125 126 /// Set the latch. If this returns true, the owning thread was sleeping 127 /// and must be awoken. 128 /// 129 /// This is private because, typically, setting a latch involves 130 /// doing some wakeups; those are encapsulated in the surrounding 131 /// latch code. 132 #[inline] set(this: *const Self) -> bool133 unsafe fn set(this: *const Self) -> bool { 134 let old_state = (*this).state.swap(SET, Ordering::AcqRel); 135 old_state == SLEEPING 136 } 137 138 /// Test if this latch has been set. 139 #[inline] probe(&self) -> bool140 pub(super) fn probe(&self) -> bool { 141 self.state.load(Ordering::Acquire) == SET 142 } 143 } 144 145 /// Spin latches are the simplest, most efficient kind, but they do 146 /// not support a `wait()` operation. They just have a boolean flag 147 /// that becomes true when `set()` is called. 148 pub(super) struct SpinLatch<'r> { 149 core_latch: CoreLatch, 150 registry: &'r Arc<Registry>, 151 target_worker_index: usize, 152 cross: bool, 153 } 154 155 impl<'r> SpinLatch<'r> { 156 /// Creates a new spin latch that is owned by `thread`. This means 157 /// that `thread` is the only thread that should be blocking on 158 /// this latch -- it also means that when the latch is set, we 159 /// will wake `thread` if it is sleeping. 160 #[inline] new(thread: &'r WorkerThread) -> SpinLatch<'r>161 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { 162 SpinLatch { 163 core_latch: CoreLatch::new(), 164 registry: thread.registry(), 165 target_worker_index: thread.index(), 166 cross: false, 167 } 168 } 169 170 /// Creates a new spin latch for cross-threadpool blocking. Notably, we 171 /// need to make sure the registry is kept alive after setting, so we can 172 /// safely call the notification. 173 #[inline] cross(thread: &'r WorkerThread) -> SpinLatch<'r>174 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { 175 SpinLatch { 176 cross: true, 177 ..SpinLatch::new(thread) 178 } 179 } 180 181 #[inline] probe(&self) -> bool182 pub(super) fn probe(&self) -> bool { 183 self.core_latch.probe() 184 } 185 } 186 187 impl<'r> AsCoreLatch for SpinLatch<'r> { 188 #[inline] as_core_latch(&self) -> &CoreLatch189 fn as_core_latch(&self) -> &CoreLatch { 190 &self.core_latch 191 } 192 } 193 194 impl<'r> Latch for SpinLatch<'r> { 195 #[inline] set(this: *const Self)196 unsafe fn set(this: *const Self) { 197 let cross_registry; 198 199 let registry: &Registry = if (*this).cross { 200 // Ensure the registry stays alive while we notify it. 201 // Otherwise, it would be possible that we set the spin 202 // latch and the other thread sees it and exits, causing 203 // the registry to be deallocated, all before we get a 204 // chance to invoke `registry.notify_worker_latch_is_set`. 205 cross_registry = Arc::clone((*this).registry); 206 &cross_registry 207 } else { 208 // If this is not a "cross-registry" spin-latch, then the 209 // thread which is performing `set` is itself ensuring 210 // that the registry stays alive. However, that doesn't 211 // include this *particular* `Arc` handle if the waiting 212 // thread then exits, so we must completely dereference it. 213 (*this).registry 214 }; 215 let target_worker_index = (*this).target_worker_index; 216 217 // NOTE: Once we `set`, the target may proceed and invalidate `this`! 218 if CoreLatch::set(&(*this).core_latch) { 219 // Subtle: at this point, we can no longer read from 220 // `self`, because the thread owning this spin latch may 221 // have awoken and deallocated the latch. Therefore, we 222 // only use fields whose values we already read. 223 registry.notify_worker_latch_is_set(target_worker_index); 224 } 225 } 226 } 227 228 /// A Latch starts as false and eventually becomes true. You can block 229 /// until it becomes true. 230 #[derive(Debug)] 231 pub(super) struct LockLatch { 232 m: Mutex<bool>, 233 v: Condvar, 234 } 235 236 impl LockLatch { 237 #[inline] new() -> LockLatch238 pub(super) fn new() -> LockLatch { 239 LockLatch { 240 m: Mutex::new(false), 241 v: Condvar::new(), 242 } 243 } 244 245 /// Block until latch is set, then resets this lock latch so it can be reused again. wait_and_reset(&self)246 pub(super) fn wait_and_reset(&self) { 247 let mut guard = self.m.lock().unwrap(); 248 while !*guard { 249 guard = self.v.wait(guard).unwrap(); 250 } 251 *guard = false; 252 } 253 254 /// Block until latch is set. wait(&self)255 pub(super) fn wait(&self) { 256 let mut guard = self.m.lock().unwrap(); 257 while !*guard { 258 guard = self.v.wait(guard).unwrap(); 259 } 260 } 261 } 262 263 impl Latch for LockLatch { 264 #[inline] set(this: *const Self)265 unsafe fn set(this: *const Self) { 266 let mut guard = (*this).m.lock().unwrap(); 267 *guard = true; 268 (*this).v.notify_all(); 269 } 270 } 271 272 /// Counting latches are used to implement scopes. They track a 273 /// counter. Unlike other latches, calling `set()` does not 274 /// necessarily make the latch be considered `set()`; instead, it just 275 /// decrements the counter. The latch is only "set" (in the sense that 276 /// `probe()` returns true) once the counter reaches zero. 277 /// 278 /// Note: like a `SpinLatch`, count laches are always associated with 279 /// some registry that is probing them, which must be tickled when 280 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a 281 /// reference to that registry. This is because in some cases the 282 /// registry owns the count-latch, and that would create a cycle. So a 283 /// `CountLatch` must be given a reference to its owning registry when 284 /// it is set. For this reason, it does not implement the `Latch` 285 /// trait (but it doesn't have to, as it is not used in those generic 286 /// contexts). 287 #[derive(Debug)] 288 pub(super) struct CountLatch { 289 core_latch: CoreLatch, 290 counter: AtomicUsize, 291 } 292 293 impl CountLatch { 294 #[inline] new() -> CountLatch295 pub(super) fn new() -> CountLatch { 296 Self::with_count(1) 297 } 298 299 #[inline] with_count(n: usize) -> CountLatch300 pub(super) fn with_count(n: usize) -> CountLatch { 301 CountLatch { 302 core_latch: CoreLatch::new(), 303 counter: AtomicUsize::new(n), 304 } 305 } 306 307 #[inline] increment(&self)308 pub(super) fn increment(&self) { 309 debug_assert!(!self.core_latch.probe()); 310 self.counter.fetch_add(1, Ordering::Relaxed); 311 } 312 313 /// Decrements the latch counter by one. If this is the final 314 /// count, then the latch is **set**, and calls to `probe()` will 315 /// return true. Returns whether the latch was set. 316 #[inline] set(this: *const Self) -> bool317 pub(super) unsafe fn set(this: *const Self) -> bool { 318 if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { 319 CoreLatch::set(&(*this).core_latch); 320 true 321 } else { 322 false 323 } 324 } 325 326 /// Decrements the latch counter by one and possibly set it. If 327 /// the latch is set, then the specific worker thread is tickled, 328 /// which should be the one that owns this latch. 329 #[inline] set_and_tickle_one( this: *const Self, registry: &Registry, target_worker_index: usize, )330 pub(super) unsafe fn set_and_tickle_one( 331 this: *const Self, 332 registry: &Registry, 333 target_worker_index: usize, 334 ) { 335 if Self::set(this) { 336 registry.notify_worker_latch_is_set(target_worker_index); 337 } 338 } 339 } 340 341 impl AsCoreLatch for CountLatch { 342 #[inline] as_core_latch(&self) -> &CoreLatch343 fn as_core_latch(&self) -> &CoreLatch { 344 &self.core_latch 345 } 346 } 347 348 #[derive(Debug)] 349 pub(super) struct CountLockLatch { 350 lock_latch: LockLatch, 351 counter: AtomicUsize, 352 } 353 354 impl CountLockLatch { 355 #[inline] with_count(n: usize) -> CountLockLatch356 pub(super) fn with_count(n: usize) -> CountLockLatch { 357 CountLockLatch { 358 lock_latch: LockLatch::new(), 359 counter: AtomicUsize::new(n), 360 } 361 } 362 363 #[inline] increment(&self)364 pub(super) fn increment(&self) { 365 let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); 366 debug_assert!(old_counter != 0); 367 } 368 wait(&self)369 pub(super) fn wait(&self) { 370 self.lock_latch.wait(); 371 } 372 } 373 374 impl Latch for CountLockLatch { 375 #[inline] set(this: *const Self)376 unsafe fn set(this: *const Self) { 377 if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { 378 LockLatch::set(&(*this).lock_latch); 379 } 380 } 381 } 382 383 /// `&L` without any implication of `dereferenceable` for `Latch::set` 384 pub(super) struct LatchRef<'a, L> { 385 inner: *const L, 386 marker: PhantomData<&'a L>, 387 } 388 389 impl<L> LatchRef<'_, L> { new(inner: &L) -> LatchRef<'_, L>390 pub(super) fn new(inner: &L) -> LatchRef<'_, L> { 391 LatchRef { 392 inner, 393 marker: PhantomData, 394 } 395 } 396 } 397 398 unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} 399 400 impl<L> Deref for LatchRef<'_, L> { 401 type Target = L; 402 deref(&self) -> &L403 fn deref(&self) -> &L { 404 // SAFETY: if we have &self, the inner latch is still alive 405 unsafe { &*self.inner } 406 } 407 } 408 409 impl<L: Latch> Latch for LatchRef<'_, L> { 410 #[inline] set(this: *const Self)411 unsafe fn set(this: *const Self) { 412 L::set((*this).inner); 413 } 414 } 415