• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::marker::PhantomData;
2 use std::ops::Deref;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::{Arc, Condvar, Mutex};
5 use std::usize;
6 
7 use crate::registry::{Registry, WorkerThread};
8 
9 /// We define various kinds of latches, which are all a primitive signaling
10 /// mechanism. A latch starts as false. Eventually someone calls `set()` and
11 /// it becomes true. You can test if it has been set by calling `probe()`.
12 ///
13 /// Some kinds of latches, but not all, support a `wait()` operation
14 /// that will wait until the latch is set, blocking efficiently. That
15 /// is not part of the trait since it is not possibly to do with all
16 /// latches.
17 ///
18 /// The intention is that `set()` is called once, but `probe()` may be
19 /// called any number of times. Once `probe()` returns true, the memory
20 /// effects that occurred before `set()` become visible.
21 ///
22 /// It'd probably be better to refactor the API into two paired types,
23 /// but that's a bit of work, and this is not a public API.
24 ///
25 /// ## Memory ordering
26 ///
27 /// Latches need to guarantee two things:
28 ///
29 /// - Once `probe()` returns true, all memory effects from the `set()`
30 ///   are visible (in other words, the set should synchronize-with
31 ///   the probe).
32 /// - Once `set()` occurs, the next `probe()` *will* observe it.  This
33 ///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
34 ///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
35 pub(super) trait Latch {
36     /// Set the latch, signalling others.
37     ///
38     /// # WARNING
39     ///
40     /// Setting a latch triggers other threads to wake up and (in some
41     /// cases) complete. This may, in turn, cause memory to be
42     /// deallocated and so forth. One must be very careful about this,
43     /// and it's typically better to read all the fields you will need
44     /// to access *before* a latch is set!
45     ///
46     /// This function operates on `*const Self` instead of `&self` to allow it
47     /// to become dangling during this call. The caller must ensure that the
48     /// pointer is valid upon entry, and not invalidated during the call by any
49     /// actions other than `set` itself.
set(this: *const Self)50     unsafe fn set(this: *const Self);
51 }
52 
53 pub(super) trait AsCoreLatch {
as_core_latch(&self) -> &CoreLatch54     fn as_core_latch(&self) -> &CoreLatch;
55 }
56 
57 /// Latch is not set, owning thread is awake
58 const UNSET: usize = 0;
59 
60 /// Latch is not set, owning thread is going to sleep on this latch
61 /// (but has not yet fallen asleep).
62 const SLEEPY: usize = 1;
63 
64 /// Latch is not set, owning thread is asleep on this latch and
65 /// must be awoken.
66 const SLEEPING: usize = 2;
67 
68 /// Latch is set.
69 const SET: usize = 3;
70 
71 /// Spin latches are the simplest, most efficient kind, but they do
72 /// not support a `wait()` operation. They just have a boolean flag
73 /// that becomes true when `set()` is called.
74 #[derive(Debug)]
75 pub(super) struct CoreLatch {
76     state: AtomicUsize,
77 }
78 
79 impl CoreLatch {
80     #[inline]
new() -> Self81     fn new() -> Self {
82         Self {
83             state: AtomicUsize::new(0),
84         }
85     }
86 
87     /// Returns the address of this core latch as an integer. Used
88     /// for logging.
89     #[inline]
addr(&self) -> usize90     pub(super) fn addr(&self) -> usize {
91         self as *const CoreLatch as usize
92     }
93 
94     /// Invoked by owning thread as it prepares to sleep. Returns true
95     /// if the owning thread may proceed to fall asleep, false if the
96     /// latch was set in the meantime.
97     #[inline]
get_sleepy(&self) -> bool98     pub(super) fn get_sleepy(&self) -> bool {
99         self.state
100             .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
101             .is_ok()
102     }
103 
104     /// Invoked by owning thread as it falls asleep sleep. Returns
105     /// true if the owning thread should block, or false if the latch
106     /// was set in the meantime.
107     #[inline]
fall_asleep(&self) -> bool108     pub(super) fn fall_asleep(&self) -> bool {
109         self.state
110             .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
111             .is_ok()
112     }
113 
114     /// Invoked by owning thread as it falls asleep sleep. Returns
115     /// true if the owning thread should block, or false if the latch
116     /// was set in the meantime.
117     #[inline]
wake_up(&self)118     pub(super) fn wake_up(&self) {
119         if !self.probe() {
120             let _ =
121                 self.state
122                     .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
123         }
124     }
125 
126     /// Set the latch. If this returns true, the owning thread was sleeping
127     /// and must be awoken.
128     ///
129     /// This is private because, typically, setting a latch involves
130     /// doing some wakeups; those are encapsulated in the surrounding
131     /// latch code.
132     #[inline]
set(this: *const Self) -> bool133     unsafe fn set(this: *const Self) -> bool {
134         let old_state = (*this).state.swap(SET, Ordering::AcqRel);
135         old_state == SLEEPING
136     }
137 
138     /// Test if this latch has been set.
139     #[inline]
probe(&self) -> bool140     pub(super) fn probe(&self) -> bool {
141         self.state.load(Ordering::Acquire) == SET
142     }
143 }
144 
145 /// Spin latches are the simplest, most efficient kind, but they do
146 /// not support a `wait()` operation. They just have a boolean flag
147 /// that becomes true when `set()` is called.
148 pub(super) struct SpinLatch<'r> {
149     core_latch: CoreLatch,
150     registry: &'r Arc<Registry>,
151     target_worker_index: usize,
152     cross: bool,
153 }
154 
155 impl<'r> SpinLatch<'r> {
156     /// Creates a new spin latch that is owned by `thread`. This means
157     /// that `thread` is the only thread that should be blocking on
158     /// this latch -- it also means that when the latch is set, we
159     /// will wake `thread` if it is sleeping.
160     #[inline]
new(thread: &'r WorkerThread) -> SpinLatch<'r>161     pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
162         SpinLatch {
163             core_latch: CoreLatch::new(),
164             registry: thread.registry(),
165             target_worker_index: thread.index(),
166             cross: false,
167         }
168     }
169 
170     /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
171     /// need to make sure the registry is kept alive after setting, so we can
172     /// safely call the notification.
173     #[inline]
cross(thread: &'r WorkerThread) -> SpinLatch<'r>174     pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
175         SpinLatch {
176             cross: true,
177             ..SpinLatch::new(thread)
178         }
179     }
180 
181     #[inline]
probe(&self) -> bool182     pub(super) fn probe(&self) -> bool {
183         self.core_latch.probe()
184     }
185 }
186 
187 impl<'r> AsCoreLatch for SpinLatch<'r> {
188     #[inline]
as_core_latch(&self) -> &CoreLatch189     fn as_core_latch(&self) -> &CoreLatch {
190         &self.core_latch
191     }
192 }
193 
194 impl<'r> Latch for SpinLatch<'r> {
195     #[inline]
set(this: *const Self)196     unsafe fn set(this: *const Self) {
197         let cross_registry;
198 
199         let registry: &Registry = if (*this).cross {
200             // Ensure the registry stays alive while we notify it.
201             // Otherwise, it would be possible that we set the spin
202             // latch and the other thread sees it and exits, causing
203             // the registry to be deallocated, all before we get a
204             // chance to invoke `registry.notify_worker_latch_is_set`.
205             cross_registry = Arc::clone((*this).registry);
206             &cross_registry
207         } else {
208             // If this is not a "cross-registry" spin-latch, then the
209             // thread which is performing `set` is itself ensuring
210             // that the registry stays alive. However, that doesn't
211             // include this *particular* `Arc` handle if the waiting
212             // thread then exits, so we must completely dereference it.
213             (*this).registry
214         };
215         let target_worker_index = (*this).target_worker_index;
216 
217         // NOTE: Once we `set`, the target may proceed and invalidate `this`!
218         if CoreLatch::set(&(*this).core_latch) {
219             // Subtle: at this point, we can no longer read from
220             // `self`, because the thread owning this spin latch may
221             // have awoken and deallocated the latch. Therefore, we
222             // only use fields whose values we already read.
223             registry.notify_worker_latch_is_set(target_worker_index);
224         }
225     }
226 }
227 
228 /// A Latch starts as false and eventually becomes true. You can block
229 /// until it becomes true.
230 #[derive(Debug)]
231 pub(super) struct LockLatch {
232     m: Mutex<bool>,
233     v: Condvar,
234 }
235 
236 impl LockLatch {
237     #[inline]
new() -> LockLatch238     pub(super) fn new() -> LockLatch {
239         LockLatch {
240             m: Mutex::new(false),
241             v: Condvar::new(),
242         }
243     }
244 
245     /// Block until latch is set, then resets this lock latch so it can be reused again.
wait_and_reset(&self)246     pub(super) fn wait_and_reset(&self) {
247         let mut guard = self.m.lock().unwrap();
248         while !*guard {
249             guard = self.v.wait(guard).unwrap();
250         }
251         *guard = false;
252     }
253 
254     /// Block until latch is set.
wait(&self)255     pub(super) fn wait(&self) {
256         let mut guard = self.m.lock().unwrap();
257         while !*guard {
258             guard = self.v.wait(guard).unwrap();
259         }
260     }
261 }
262 
263 impl Latch for LockLatch {
264     #[inline]
set(this: *const Self)265     unsafe fn set(this: *const Self) {
266         let mut guard = (*this).m.lock().unwrap();
267         *guard = true;
268         (*this).v.notify_all();
269     }
270 }
271 
272 /// Counting latches are used to implement scopes. They track a
273 /// counter. Unlike other latches, calling `set()` does not
274 /// necessarily make the latch be considered `set()`; instead, it just
275 /// decrements the counter. The latch is only "set" (in the sense that
276 /// `probe()` returns true) once the counter reaches zero.
277 ///
278 /// Note: like a `SpinLatch`, count laches are always associated with
279 /// some registry that is probing them, which must be tickled when
280 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
281 /// reference to that registry. This is because in some cases the
282 /// registry owns the count-latch, and that would create a cycle. So a
283 /// `CountLatch` must be given a reference to its owning registry when
284 /// it is set. For this reason, it does not implement the `Latch`
285 /// trait (but it doesn't have to, as it is not used in those generic
286 /// contexts).
287 #[derive(Debug)]
288 pub(super) struct CountLatch {
289     core_latch: CoreLatch,
290     counter: AtomicUsize,
291 }
292 
293 impl CountLatch {
294     #[inline]
new() -> CountLatch295     pub(super) fn new() -> CountLatch {
296         Self::with_count(1)
297     }
298 
299     #[inline]
with_count(n: usize) -> CountLatch300     pub(super) fn with_count(n: usize) -> CountLatch {
301         CountLatch {
302             core_latch: CoreLatch::new(),
303             counter: AtomicUsize::new(n),
304         }
305     }
306 
307     #[inline]
increment(&self)308     pub(super) fn increment(&self) {
309         debug_assert!(!self.core_latch.probe());
310         self.counter.fetch_add(1, Ordering::Relaxed);
311     }
312 
313     /// Decrements the latch counter by one. If this is the final
314     /// count, then the latch is **set**, and calls to `probe()` will
315     /// return true. Returns whether the latch was set.
316     #[inline]
set(this: *const Self) -> bool317     pub(super) unsafe fn set(this: *const Self) -> bool {
318         if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
319             CoreLatch::set(&(*this).core_latch);
320             true
321         } else {
322             false
323         }
324     }
325 
326     /// Decrements the latch counter by one and possibly set it.  If
327     /// the latch is set, then the specific worker thread is tickled,
328     /// which should be the one that owns this latch.
329     #[inline]
set_and_tickle_one( this: *const Self, registry: &Registry, target_worker_index: usize, )330     pub(super) unsafe fn set_and_tickle_one(
331         this: *const Self,
332         registry: &Registry,
333         target_worker_index: usize,
334     ) {
335         if Self::set(this) {
336             registry.notify_worker_latch_is_set(target_worker_index);
337         }
338     }
339 }
340 
341 impl AsCoreLatch for CountLatch {
342     #[inline]
as_core_latch(&self) -> &CoreLatch343     fn as_core_latch(&self) -> &CoreLatch {
344         &self.core_latch
345     }
346 }
347 
348 #[derive(Debug)]
349 pub(super) struct CountLockLatch {
350     lock_latch: LockLatch,
351     counter: AtomicUsize,
352 }
353 
354 impl CountLockLatch {
355     #[inline]
with_count(n: usize) -> CountLockLatch356     pub(super) fn with_count(n: usize) -> CountLockLatch {
357         CountLockLatch {
358             lock_latch: LockLatch::new(),
359             counter: AtomicUsize::new(n),
360         }
361     }
362 
363     #[inline]
increment(&self)364     pub(super) fn increment(&self) {
365         let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
366         debug_assert!(old_counter != 0);
367     }
368 
wait(&self)369     pub(super) fn wait(&self) {
370         self.lock_latch.wait();
371     }
372 }
373 
374 impl Latch for CountLockLatch {
375     #[inline]
set(this: *const Self)376     unsafe fn set(this: *const Self) {
377         if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
378             LockLatch::set(&(*this).lock_latch);
379         }
380     }
381 }
382 
383 /// `&L` without any implication of `dereferenceable` for `Latch::set`
384 pub(super) struct LatchRef<'a, L> {
385     inner: *const L,
386     marker: PhantomData<&'a L>,
387 }
388 
389 impl<L> LatchRef<'_, L> {
new(inner: &L) -> LatchRef<'_, L>390     pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
391         LatchRef {
392             inner,
393             marker: PhantomData,
394         }
395     }
396 }
397 
398 unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
399 
400 impl<L> Deref for LatchRef<'_, L> {
401     type Target = L;
402 
deref(&self) -> &L403     fn deref(&self) -> &L {
404         // SAFETY: if we have &self, the inner latch is still alive
405         unsafe { &*self.inner }
406     }
407 }
408 
409 impl<L: Latch> Latch for LatchRef<'_, L> {
410     #[inline]
set(this: *const Self)411     unsafe fn set(this: *const Self) {
412         L::set((*this).inner);
413     }
414 }
415