1 use std::sync::atomic::{AtomicUsize, Ordering}; 2 use std::sync::{Arc, Condvar, Mutex}; 3 use std::usize; 4 5 use crate::registry::{Registry, WorkerThread}; 6 7 /// We define various kinds of latches, which are all a primitive signaling 8 /// mechanism. A latch starts as false. Eventually someone calls `set()` and 9 /// it becomes true. You can test if it has been set by calling `probe()`. 10 /// 11 /// Some kinds of latches, but not all, support a `wait()` operation 12 /// that will wait until the latch is set, blocking efficiently. That 13 /// is not part of the trait since it is not possibly to do with all 14 /// latches. 15 /// 16 /// The intention is that `set()` is called once, but `probe()` may be 17 /// called any number of times. Once `probe()` returns true, the memory 18 /// effects that occurred before `set()` become visible. 19 /// 20 /// It'd probably be better to refactor the API into two paired types, 21 /// but that's a bit of work, and this is not a public API. 22 /// 23 /// ## Memory ordering 24 /// 25 /// Latches need to guarantee two things: 26 /// 27 /// - Once `probe()` returns true, all memory effects from the `set()` 28 /// are visible (in other words, the set should synchronize-with 29 /// the probe). 30 /// - Once `set()` occurs, the next `probe()` *will* observe it. This 31 /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep 32 /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. 33 pub(super) trait Latch { 34 /// Set the latch, signalling others. 35 /// 36 /// # WARNING 37 /// 38 /// Setting a latch triggers other threads to wake up and (in some 39 /// cases) complete. This may, in turn, cause memory to be 40 /// allocated and so forth. One must be very careful about this, 41 /// and it's typically better to read all the fields you will need 42 /// to access *before* a latch is set! set(&self)43 fn set(&self); 44 } 45 46 pub(super) trait AsCoreLatch { as_core_latch(&self) -> &CoreLatch47 fn as_core_latch(&self) -> &CoreLatch; 48 } 49 50 /// Latch is not set, owning thread is awake 51 const UNSET: usize = 0; 52 53 /// Latch is not set, owning thread is going to sleep on this latch 54 /// (but has not yet fallen asleep). 55 const SLEEPY: usize = 1; 56 57 /// Latch is not set, owning thread is asleep on this latch and 58 /// must be awoken. 59 const SLEEPING: usize = 2; 60 61 /// Latch is set. 62 const SET: usize = 3; 63 64 /// Spin latches are the simplest, most efficient kind, but they do 65 /// not support a `wait()` operation. They just have a boolean flag 66 /// that becomes true when `set()` is called. 67 #[derive(Debug)] 68 pub(super) struct CoreLatch { 69 state: AtomicUsize, 70 } 71 72 impl CoreLatch { 73 #[inline] new() -> Self74 fn new() -> Self { 75 Self { 76 state: AtomicUsize::new(0), 77 } 78 } 79 80 /// Returns the address of this core latch as an integer. Used 81 /// for logging. 82 #[inline] addr(&self) -> usize83 pub(super) fn addr(&self) -> usize { 84 self as *const CoreLatch as usize 85 } 86 87 /// Invoked by owning thread as it prepares to sleep. Returns true 88 /// if the owning thread may proceed to fall asleep, false if the 89 /// latch was set in the meantime. 90 #[inline] get_sleepy(&self) -> bool91 pub(super) fn get_sleepy(&self) -> bool { 92 self.state 93 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) 94 .is_ok() 95 } 96 97 /// Invoked by owning thread as it falls asleep sleep. Returns 98 /// true if the owning thread should block, or false if the latch 99 /// was set in the meantime. 100 #[inline] fall_asleep(&self) -> bool101 pub(super) fn fall_asleep(&self) -> bool { 102 self.state 103 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) 104 .is_ok() 105 } 106 107 /// Invoked by owning thread as it falls asleep sleep. Returns 108 /// true if the owning thread should block, or false if the latch 109 /// was set in the meantime. 110 #[inline] wake_up(&self)111 pub(super) fn wake_up(&self) { 112 if !self.probe() { 113 let _ = 114 self.state 115 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); 116 } 117 } 118 119 /// Set the latch. If this returns true, the owning thread was sleeping 120 /// and must be awoken. 121 /// 122 /// This is private because, typically, setting a latch involves 123 /// doing some wakeups; those are encapsulated in the surrounding 124 /// latch code. 125 #[inline] set(&self) -> bool126 fn set(&self) -> bool { 127 let old_state = self.state.swap(SET, Ordering::AcqRel); 128 old_state == SLEEPING 129 } 130 131 /// Test if this latch has been set. 132 #[inline] probe(&self) -> bool133 pub(super) fn probe(&self) -> bool { 134 self.state.load(Ordering::Acquire) == SET 135 } 136 } 137 138 /// Spin latches are the simplest, most efficient kind, but they do 139 /// not support a `wait()` operation. They just have a boolean flag 140 /// that becomes true when `set()` is called. 141 pub(super) struct SpinLatch<'r> { 142 core_latch: CoreLatch, 143 registry: &'r Arc<Registry>, 144 target_worker_index: usize, 145 cross: bool, 146 } 147 148 impl<'r> SpinLatch<'r> { 149 /// Creates a new spin latch that is owned by `thread`. This means 150 /// that `thread` is the only thread that should be blocking on 151 /// this latch -- it also means that when the latch is set, we 152 /// will wake `thread` if it is sleeping. 153 #[inline] new(thread: &'r WorkerThread) -> SpinLatch<'r>154 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { 155 SpinLatch { 156 core_latch: CoreLatch::new(), 157 registry: thread.registry(), 158 target_worker_index: thread.index(), 159 cross: false, 160 } 161 } 162 163 /// Creates a new spin latch for cross-threadpool blocking. Notably, we 164 /// need to make sure the registry is kept alive after setting, so we can 165 /// safely call the notification. 166 #[inline] cross(thread: &'r WorkerThread) -> SpinLatch<'r>167 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { 168 SpinLatch { 169 cross: true, 170 ..SpinLatch::new(thread) 171 } 172 } 173 174 #[inline] probe(&self) -> bool175 pub(super) fn probe(&self) -> bool { 176 self.core_latch.probe() 177 } 178 } 179 180 impl<'r> AsCoreLatch for SpinLatch<'r> { 181 #[inline] as_core_latch(&self) -> &CoreLatch182 fn as_core_latch(&self) -> &CoreLatch { 183 &self.core_latch 184 } 185 } 186 187 impl<'r> Latch for SpinLatch<'r> { 188 #[inline] set(&self)189 fn set(&self) { 190 let cross_registry; 191 192 let registry = if self.cross { 193 // Ensure the registry stays alive while we notify it. 194 // Otherwise, it would be possible that we set the spin 195 // latch and the other thread sees it and exits, causing 196 // the registry to be deallocated, all before we get a 197 // chance to invoke `registry.notify_worker_latch_is_set`. 198 cross_registry = Arc::clone(self.registry); 199 &cross_registry 200 } else { 201 // If this is not a "cross-registry" spin-latch, then the 202 // thread which is performing `set` is itself ensuring 203 // that the registry stays alive. 204 self.registry 205 }; 206 let target_worker_index = self.target_worker_index; 207 208 // NOTE: Once we `set`, the target may proceed and invalidate `&self`! 209 if self.core_latch.set() { 210 // Subtle: at this point, we can no longer read from 211 // `self`, because the thread owning this spin latch may 212 // have awoken and deallocated the latch. Therefore, we 213 // only use fields whose values we already read. 214 registry.notify_worker_latch_is_set(target_worker_index); 215 } 216 } 217 } 218 219 /// A Latch starts as false and eventually becomes true. You can block 220 /// until it becomes true. 221 pub(super) struct LockLatch { 222 m: Mutex<bool>, 223 v: Condvar, 224 } 225 226 impl LockLatch { 227 #[inline] new() -> LockLatch228 pub(super) fn new() -> LockLatch { 229 LockLatch { 230 m: Mutex::new(false), 231 v: Condvar::new(), 232 } 233 } 234 235 /// Block until latch is set, then resets this lock latch so it can be reused again. wait_and_reset(&self)236 pub(super) fn wait_and_reset(&self) { 237 let mut guard = self.m.lock().unwrap(); 238 while !*guard { 239 guard = self.v.wait(guard).unwrap(); 240 } 241 *guard = false; 242 } 243 244 /// Block until latch is set. wait(&self)245 pub(super) fn wait(&self) { 246 let mut guard = self.m.lock().unwrap(); 247 while !*guard { 248 guard = self.v.wait(guard).unwrap(); 249 } 250 } 251 } 252 253 impl Latch for LockLatch { 254 #[inline] set(&self)255 fn set(&self) { 256 let mut guard = self.m.lock().unwrap(); 257 *guard = true; 258 self.v.notify_all(); 259 } 260 } 261 262 /// Counting latches are used to implement scopes. They track a 263 /// counter. Unlike other latches, calling `set()` does not 264 /// necessarily make the latch be considered `set()`; instead, it just 265 /// decrements the counter. The latch is only "set" (in the sense that 266 /// `probe()` returns true) once the counter reaches zero. 267 /// 268 /// Note: like a `SpinLatch`, count laches are always associated with 269 /// some registry that is probing them, which must be tickled when 270 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a 271 /// reference to that registry. This is because in some cases the 272 /// registry owns the count-latch, and that would create a cycle. So a 273 /// `CountLatch` must be given a reference to its owning registry when 274 /// it is set. For this reason, it does not implement the `Latch` 275 /// trait (but it doesn't have to, as it is not used in those generic 276 /// contexts). 277 #[derive(Debug)] 278 pub(super) struct CountLatch { 279 core_latch: CoreLatch, 280 counter: AtomicUsize, 281 } 282 283 impl CountLatch { 284 #[inline] new() -> CountLatch285 pub(super) fn new() -> CountLatch { 286 CountLatch { 287 core_latch: CoreLatch::new(), 288 counter: AtomicUsize::new(1), 289 } 290 } 291 292 #[inline] increment(&self)293 pub(super) fn increment(&self) { 294 debug_assert!(!self.core_latch.probe()); 295 self.counter.fetch_add(1, Ordering::Relaxed); 296 } 297 298 /// Decrements the latch counter by one. If this is the final 299 /// count, then the latch is **set**, and calls to `probe()` will 300 /// return true. Returns whether the latch was set. This is an 301 /// internal operation, as it does not tickle, and to fail to 302 /// tickle would lead to deadlock. 303 #[inline] set(&self) -> bool304 fn set(&self) -> bool { 305 if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { 306 self.core_latch.set(); 307 true 308 } else { 309 false 310 } 311 } 312 313 /// Decrements the latch counter by one and possibly set it. If 314 /// the latch is set, then the specific worker thread is tickled, 315 /// which should be the one that owns this latch. 316 #[inline] set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize)317 pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) { 318 if self.set() { 319 registry.notify_worker_latch_is_set(target_worker_index); 320 } 321 } 322 } 323 324 impl AsCoreLatch for CountLatch { 325 #[inline] as_core_latch(&self) -> &CoreLatch326 fn as_core_latch(&self) -> &CoreLatch { 327 &self.core_latch 328 } 329 } 330 331 impl<'a, L> Latch for &'a L 332 where 333 L: Latch, 334 { 335 #[inline] set(&self)336 fn set(&self) { 337 L::set(self); 338 } 339 } 340