• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Coordinates idling workers
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 
6 use std::fmt;
7 use std::sync::atomic::Ordering::{self, SeqCst};
8 
9 pub(super) struct Idle {
10     /// Tracks both the number of searching workers and the number of unparked
11     /// workers.
12     ///
13     /// Used as a fast-path to avoid acquiring the lock when needed.
14     state: AtomicUsize,
15 
16     /// Sleeping workers
17     sleepers: Mutex<Vec<usize>>,
18 
19     /// Total number of workers.
20     num_workers: usize,
21 }
22 
23 const UNPARK_SHIFT: usize = 16;
24 const UNPARK_MASK: usize = !SEARCH_MASK;
25 const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
26 
27 #[derive(Copy, Clone)]
28 struct State(usize);
29 
30 impl Idle {
new(num_workers: usize) -> Idle31     pub(super) fn new(num_workers: usize) -> Idle {
32         let init = State::new(num_workers);
33 
34         Idle {
35             state: AtomicUsize::new(init.into()),
36             sleepers: Mutex::new(Vec::with_capacity(num_workers)),
37             num_workers,
38         }
39     }
40 
41     /// If there are no workers actively searching, returns the index of a
42     /// worker currently sleeping.
worker_to_notify(&self) -> Option<usize>43     pub(super) fn worker_to_notify(&self) -> Option<usize> {
44         // If at least one worker is spinning, work being notified will
45         // eventually be found. A searching thread will find **some** work and
46         // notify another worker, eventually leading to our work being found.
47         //
48         // For this to happen, this load must happen before the thread
49         // transitioning `num_searching` to zero. Acquire / Release does not
50         // provide sufficient guarantees, so this load is done with `SeqCst` and
51         // will pair with the `fetch_sub(1)` when transitioning out of
52         // searching.
53         if !self.notify_should_wakeup() {
54             return None;
55         }
56 
57         // Acquire the lock
58         let mut sleepers = self.sleepers.lock();
59 
60         // Check again, now that the lock is acquired
61         if !self.notify_should_wakeup() {
62             return None;
63         }
64 
65         // A worker should be woken up, atomically increment the number of
66         // searching workers as well as the number of unparked workers.
67         State::unpark_one(&self.state, 1);
68 
69         // Get the worker to unpark
70         let ret = sleepers.pop();
71         debug_assert!(ret.is_some());
72 
73         ret
74     }
75 
76     /// Returns `true` if the worker needs to do a final check for submitted
77     /// work.
transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool78     pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
79         // Acquire the lock
80         let mut sleepers = self.sleepers.lock();
81 
82         // Decrement the number of unparked threads
83         let ret = State::dec_num_unparked(&self.state, is_searching);
84 
85         // Track the sleeping worker
86         sleepers.push(worker);
87 
88         ret
89     }
90 
transition_worker_to_searching(&self) -> bool91     pub(super) fn transition_worker_to_searching(&self) -> bool {
92         let state = State::load(&self.state, SeqCst);
93         if 2 * state.num_searching() >= self.num_workers {
94             return false;
95         }
96 
97         // It is possible for this routine to allow more than 50% of the workers
98         // to search. That is OK. Limiting searchers is only an optimization to
99         // prevent too much contention.
100         State::inc_num_searching(&self.state, SeqCst);
101         true
102     }
103 
104     /// A lightweight transition from searching -> running.
105     ///
106     /// Returns `true` if this is the final searching worker. The caller
107     /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool108     pub(super) fn transition_worker_from_searching(&self) -> bool {
109         State::dec_num_searching(&self.state)
110     }
111 
112     /// Unpark a specific worker. This happens if tasks are submitted from
113     /// within the worker's park routine.
114     ///
115     /// Returns `true` if the worker was parked before calling the method.
unpark_worker_by_id(&self, worker_id: usize) -> bool116     pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
117         let mut sleepers = self.sleepers.lock();
118 
119         for index in 0..sleepers.len() {
120             if sleepers[index] == worker_id {
121                 sleepers.swap_remove(index);
122 
123                 // Update the state accordingly while the lock is held.
124                 State::unpark_one(&self.state, 0);
125 
126                 return true;
127             }
128         }
129 
130         false
131     }
132 
133     /// Returns `true` if `worker_id` is contained in the sleep set.
is_parked(&self, worker_id: usize) -> bool134     pub(super) fn is_parked(&self, worker_id: usize) -> bool {
135         let sleepers = self.sleepers.lock();
136         sleepers.contains(&worker_id)
137     }
138 
notify_should_wakeup(&self) -> bool139     fn notify_should_wakeup(&self) -> bool {
140         let state = State(self.state.fetch_add(0, SeqCst));
141         state.num_searching() == 0 && state.num_unparked() < self.num_workers
142     }
143 }
144 
145 impl State {
new(num_workers: usize) -> State146     fn new(num_workers: usize) -> State {
147         // All workers start in the unparked state
148         let ret = State(num_workers << UNPARK_SHIFT);
149         debug_assert_eq!(num_workers, ret.num_unparked());
150         debug_assert_eq!(0, ret.num_searching());
151         ret
152     }
153 
load(cell: &AtomicUsize, ordering: Ordering) -> State154     fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
155         State(cell.load(ordering))
156     }
157 
unpark_one(cell: &AtomicUsize, num_searching: usize)158     fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
159         cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
160     }
161 
inc_num_searching(cell: &AtomicUsize, ordering: Ordering)162     fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
163         cell.fetch_add(1, ordering);
164     }
165 
166     /// Returns `true` if this is the final searching worker
dec_num_searching(cell: &AtomicUsize) -> bool167     fn dec_num_searching(cell: &AtomicUsize) -> bool {
168         let state = State(cell.fetch_sub(1, SeqCst));
169         state.num_searching() == 1
170     }
171 
172     /// Track a sleeping worker
173     ///
174     /// Returns `true` if this is the final searching worker.
dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool175     fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
176         let mut dec = 1 << UNPARK_SHIFT;
177 
178         if is_searching {
179             dec += 1;
180         }
181 
182         let prev = State(cell.fetch_sub(dec, SeqCst));
183         is_searching && prev.num_searching() == 1
184     }
185 
186     /// Number of workers currently searching
num_searching(self) -> usize187     fn num_searching(self) -> usize {
188         self.0 & SEARCH_MASK
189     }
190 
191     /// Number of workers currently unparked
num_unparked(self) -> usize192     fn num_unparked(self) -> usize {
193         (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
194     }
195 }
196 
197 impl From<usize> for State {
from(src: usize) -> State198     fn from(src: usize) -> State {
199         State(src)
200     }
201 }
202 
203 impl From<State> for usize {
from(src: State) -> usize204     fn from(src: State) -> usize {
205         src.0
206     }
207 }
208 
209 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result210     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
211         fmt.debug_struct("worker::State")
212             .field("num_unparked", &self.num_unparked())
213             .field("num_searching", &self.num_searching())
214             .finish()
215     }
216 }
217 
218 #[test]
test_state()219 fn test_state() {
220     assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
221     assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
222 
223     let state = State::new(10);
224     assert_eq!(10, state.num_unparked());
225     assert_eq!(0, state.num_searching());
226 }
227