1 use std::sync::atomic::{AtomicUsize, Ordering};
2 
3 pub(super) struct AtomicCounters {
4     /// Packs together a number of counters. The counters are ordered as
5     /// follows, from least to most significant bits (here, we assuming
6     /// that [`THREADS_BITS`] is equal to 10):
7     ///
8     /// * Bits 0..10: Stores the number of **sleeping threads**
9     /// * Bits 10..20: Stores the number of **inactive threads**
10     /// * Bits 20..: Stores the **job event counter** (JEC)
11     ///
12     /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
13     /// that the total number of bits (and hence the number of bits used for the
14     /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
15     value: AtomicUsize,
16 }
17 
18 #[derive(Copy, Clone)]
19 pub(super) struct Counters {
20     word: usize,
21 }
22 
23 /// A value read from the **Jobs Event Counter**.
24 /// See the [`README.md`](README.md) for more
25 /// coverage of how the jobs event counter works.
26 #[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
27 pub(super) struct JobsEventCounter(usize);
28 
29 impl JobsEventCounter {
30     pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
31 
32     #[inline]
as_usize(self) -> usize33     pub(super) fn as_usize(self) -> usize {
34         self.0
35     }
36 
37     /// The JEC "is sleepy" if the last thread to increment it was in the
38     /// process of becoming sleepy. This is indicated by its value being *even*.
39     /// When new jobs are posted, they check if the JEC is sleepy, and if so
40     /// they incremented it.
41     #[inline]
is_sleepy(self) -> bool42     pub(super) fn is_sleepy(self) -> bool {
43         (self.as_usize() & 1) == 0
44     }
45 
46     /// The JEC "is active" if the last thread to increment it was posting new
47     /// work. This is indicated by its value being *odd*. When threads get
48     /// sleepy, they will check if the JEC is active, and increment it.
49     #[inline]
is_active(self) -> bool50     pub(super) fn is_active(self) -> bool {
51         !self.is_sleepy()
52     }
53 }
54 
55 /// Number of bits used for the thread counters.
56 #[cfg(target_pointer_width = "64")]
57 const THREADS_BITS: usize = 16;
58 
59 #[cfg(target_pointer_width = "32")]
60 const THREADS_BITS: usize = 8;
61 
62 /// Bits to shift to select the sleeping threads
63 /// (used with `select_bits`).
64 #[allow(clippy::erasing_op)]
65 const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
66 
67 /// Bits to shift to select the inactive threads
68 /// (used with `select_bits`).
69 #[allow(clippy::identity_op)]
70 const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
71 
72 /// Bits to shift to select the JEC
73 /// (use JOBS_BITS).
74 const JEC_SHIFT: usize = 2 * THREADS_BITS;
75 
76 /// Max value for the thread counters.
77 pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
78 
79 /// Constant that can be added to add one sleeping thread.
80 const ONE_SLEEPING: usize = 1;
81 
82 /// Constant that can be added to add one inactive thread.
83 /// An inactive thread is either idle, sleepy, or sleeping.
84 const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
85 
86 /// Constant that can be added to add one to the JEC.
87 const ONE_JEC: usize = 1 << JEC_SHIFT;
88 
89 impl AtomicCounters {
90     #[inline]
new() -> AtomicCounters91     pub(super) fn new() -> AtomicCounters {
92         AtomicCounters {
93             value: AtomicUsize::new(0),
94         }
95     }
96 
97     /// Load and return the current value of the various counters.
98     /// This value can then be given to other method which will
99     /// attempt to update the counters via compare-and-swap.
100     #[inline]
load(&self, ordering: Ordering) -> Counters101     pub(super) fn load(&self, ordering: Ordering) -> Counters {
102         Counters::new(self.value.load(ordering))
103     }
104 
105     #[inline]
try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool106     fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
107         self.value
108             .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
109             .is_ok()
110     }
111 
112     /// Adds an inactive thread. This cannot fail.
113     ///
114     /// This should be invoked when a thread enters its idle loop looking
115     /// for work. It is decremented when work is found. Note that it is
116     /// not decremented if the thread transitions from idle to sleepy or sleeping;
117     /// so the number of inactive threads is always greater-than-or-equal
118     /// to the number of sleeping threads.
119     #[inline]
add_inactive_thread(&self)120     pub(super) fn add_inactive_thread(&self) {
121         self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
122     }
123 
124     /// Increments the jobs event counter if `increment_when`, when applied to
125     /// the current value, is true. Used to toggle the JEC from even (sleepy) to
126     /// odd (active) or vice versa. Returns the final value of the counters, for
127     /// which `increment_when` is guaranteed to return false.
increment_jobs_event_counter_if( &self, increment_when: impl Fn(JobsEventCounter) -> bool, ) -> Counters128     pub(super) fn increment_jobs_event_counter_if(
129         &self,
130         increment_when: impl Fn(JobsEventCounter) -> bool,
131     ) -> Counters {
132         loop {
133             let old_value = self.load(Ordering::SeqCst);
134             if increment_when(old_value.jobs_counter()) {
135                 let new_value = old_value.increment_jobs_counter();
136                 if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
137                     return new_value;
138                 }
139             } else {
140                 return old_value;
141             }
142         }
143     }
144 
145     /// Subtracts an inactive thread. This cannot fail. It is invoked
146     /// when a thread finds work and hence becomes active. It returns the
147     /// number of sleeping threads to wake up (if any).
148     ///
149     /// See `add_inactive_thread`.
150     #[inline]
sub_inactive_thread(&self) -> usize151     pub(super) fn sub_inactive_thread(&self) -> usize {
152         let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
153         debug_assert!(
154             old_value.inactive_threads() > 0,
155             "sub_inactive_thread: old_value {:?} has no inactive threads",
156             old_value,
157         );
158         debug_assert!(
159             old_value.sleeping_threads() <= old_value.inactive_threads(),
160             "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
161             old_value,
162             old_value.sleeping_threads(),
163             old_value.inactive_threads(),
164         );
165 
166         // Current heuristic: whenever an inactive thread goes away, if
167         // there are any sleeping threads, wake 'em up.
168         let sleeping_threads = old_value.sleeping_threads();
169         std::cmp::min(sleeping_threads, 2)
170     }
171 
172     /// Subtracts a sleeping thread. This cannot fail, but it is only
173     /// safe to do if you you know the number of sleeping threads is
174     /// non-zero (i.e., because you have just awoken a sleeping
175     /// thread).
176     #[inline]
sub_sleeping_thread(&self)177     pub(super) fn sub_sleeping_thread(&self) {
178         let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
179         debug_assert!(
180             old_value.sleeping_threads() > 0,
181             "sub_sleeping_thread: old_value {:?} had no sleeping threads",
182             old_value,
183         );
184         debug_assert!(
185             old_value.sleeping_threads() <= old_value.inactive_threads(),
186             "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
187             old_value,
188             old_value.sleeping_threads(),
189             old_value.inactive_threads(),
190         );
191     }
192 
193     #[inline]
try_add_sleeping_thread(&self, old_value: Counters) -> bool194     pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
195         debug_assert!(
196             old_value.inactive_threads() > 0,
197             "try_add_sleeping_thread: old_value {:?} has no inactive threads",
198             old_value,
199         );
200         debug_assert!(
201             old_value.sleeping_threads() < THREADS_MAX,
202             "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
203             old_value,
204         );
205 
206         let mut new_value = old_value;
207         new_value.word += ONE_SLEEPING;
208 
209         self.try_exchange(old_value, new_value, Ordering::SeqCst)
210     }
211 }
212 
213 #[inline]
select_thread(word: usize, shift: usize) -> usize214 fn select_thread(word: usize, shift: usize) -> usize {
215     ((word >> shift) as usize) & THREADS_MAX
216 }
217 
218 #[inline]
select_jec(word: usize) -> usize219 fn select_jec(word: usize) -> usize {
220     (word >> JEC_SHIFT) as usize
221 }
222 
223 impl Counters {
224     #[inline]
new(word: usize) -> Counters225     fn new(word: usize) -> Counters {
226         Counters { word }
227     }
228 
229     #[inline]
increment_jobs_counter(self) -> Counters230     fn increment_jobs_counter(self) -> Counters {
231         // We can freely add to JEC because it occupies the most significant bits.
232         // Thus it doesn't overflow into the other counters, just wraps itself.
233         Counters {
234             word: self.word.wrapping_add(ONE_JEC),
235         }
236     }
237 
238     #[inline]
jobs_counter(self) -> JobsEventCounter239     pub(super) fn jobs_counter(self) -> JobsEventCounter {
240         JobsEventCounter(select_jec(self.word))
241     }
242 
243     /// The number of threads that are not actively
244     /// executing work. They may be idle, sleepy, or asleep.
245     #[inline]
inactive_threads(self) -> usize246     pub(super) fn inactive_threads(self) -> usize {
247         select_thread(self.word, INACTIVE_SHIFT)
248     }
249 
250     #[inline]
awake_but_idle_threads(self) -> usize251     pub(super) fn awake_but_idle_threads(self) -> usize {
252         debug_assert!(
253             self.sleeping_threads() <= self.inactive_threads(),
254             "sleeping threads: {} > raw idle threads {}",
255             self.sleeping_threads(),
256             self.inactive_threads()
257         );
258         self.inactive_threads() - self.sleeping_threads()
259     }
260 
261     #[inline]
sleeping_threads(self) -> usize262     pub(super) fn sleeping_threads(self) -> usize {
263         select_thread(self.word, SLEEPING_SHIFT)
264     }
265 }
266 
267 impl std::fmt::Debug for Counters {
fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result268     fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269         let word = format!("{:016x}", self.word);
270         fmt.debug_struct("Counters")
271             .field("word", &word)
272             .field("jobs", &self.jobs_counter().0)
273             .field("inactive", &self.inactive_threads())
274             .field("sleeping", &self.sleeping_threads())
275             .finish()
276     }
277 }
278