• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Coordinates idling workers
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::runtime::scheduler::multi_thread::Shared;
5 
6 use std::fmt;
7 use std::sync::atomic::Ordering::{self, SeqCst};
8 
9 pub(super) struct Idle {
10     /// Tracks both the number of searching workers and the number of unparked
11     /// workers.
12     ///
13     /// Used as a fast-path to avoid acquiring the lock when needed.
14     state: AtomicUsize,
15 
16     /// Total number of workers.
17     num_workers: usize,
18 }
19 
20 /// Data synchronized by the scheduler mutex
21 pub(super) struct Synced {
22     /// Sleeping workers
23     sleepers: Vec<usize>,
24 }
25 
26 const UNPARK_SHIFT: usize = 16;
27 const UNPARK_MASK: usize = !SEARCH_MASK;
28 const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
29 
30 #[derive(Copy, Clone)]
31 struct State(usize);
32 
33 impl Idle {
new(num_workers: usize) -> (Idle, Synced)34     pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
35         let init = State::new(num_workers);
36 
37         let idle = Idle {
38             state: AtomicUsize::new(init.into()),
39             num_workers,
40         };
41 
42         let synced = Synced {
43             sleepers: Vec::with_capacity(num_workers),
44         };
45 
46         (idle, synced)
47     }
48 
49     /// If there are no workers actively searching, returns the index of a
50     /// worker currently sleeping.
worker_to_notify(&self, shared: &Shared) -> Option<usize>51     pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
52         // If at least one worker is spinning, work being notified will
53         // eventually be found. A searching thread will find **some** work and
54         // notify another worker, eventually leading to our work being found.
55         //
56         // For this to happen, this load must happen before the thread
57         // transitioning `num_searching` to zero. Acquire / Release does not
58         // provide sufficient guarantees, so this load is done with `SeqCst` and
59         // will pair with the `fetch_sub(1)` when transitioning out of
60         // searching.
61         if !self.notify_should_wakeup() {
62             return None;
63         }
64 
65         // Acquire the lock
66         let mut lock = shared.synced.lock();
67 
68         // Check again, now that the lock is acquired
69         if !self.notify_should_wakeup() {
70             return None;
71         }
72 
73         // A worker should be woken up, atomically increment the number of
74         // searching workers as well as the number of unparked workers.
75         State::unpark_one(&self.state, 1);
76 
77         // Get the worker to unpark
78         let ret = lock.idle.sleepers.pop();
79         debug_assert!(ret.is_some());
80 
81         ret
82     }
83 
84     /// Returns `true` if the worker needs to do a final check for submitted
85     /// work.
transition_worker_to_parked( &self, shared: &Shared, worker: usize, is_searching: bool, ) -> bool86     pub(super) fn transition_worker_to_parked(
87         &self,
88         shared: &Shared,
89         worker: usize,
90         is_searching: bool,
91     ) -> bool {
92         // Acquire the lock
93         let mut lock = shared.synced.lock();
94 
95         // Decrement the number of unparked threads
96         let ret = State::dec_num_unparked(&self.state, is_searching);
97 
98         // Track the sleeping worker
99         lock.idle.sleepers.push(worker);
100 
101         ret
102     }
103 
transition_worker_to_searching(&self) -> bool104     pub(super) fn transition_worker_to_searching(&self) -> bool {
105         let state = State::load(&self.state, SeqCst);
106         if 2 * state.num_searching() >= self.num_workers {
107             return false;
108         }
109 
110         // It is possible for this routine to allow more than 50% of the workers
111         // to search. That is OK. Limiting searchers is only an optimization to
112         // prevent too much contention.
113         State::inc_num_searching(&self.state, SeqCst);
114         true
115     }
116 
117     /// A lightweight transition from searching -> running.
118     ///
119     /// Returns `true` if this is the final searching worker. The caller
120     /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool121     pub(super) fn transition_worker_from_searching(&self) -> bool {
122         State::dec_num_searching(&self.state)
123     }
124 
125     /// Unpark a specific worker. This happens if tasks are submitted from
126     /// within the worker's park routine.
127     ///
128     /// Returns `true` if the worker was parked before calling the method.
unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool129     pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
130         let mut lock = shared.synced.lock();
131         let sleepers = &mut lock.idle.sleepers;
132 
133         for index in 0..sleepers.len() {
134             if sleepers[index] == worker_id {
135                 sleepers.swap_remove(index);
136 
137                 // Update the state accordingly while the lock is held.
138                 State::unpark_one(&self.state, 0);
139 
140                 return true;
141             }
142         }
143 
144         false
145     }
146 
147     /// Returns `true` if `worker_id` is contained in the sleep set.
is_parked(&self, shared: &Shared, worker_id: usize) -> bool148     pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
149         let lock = shared.synced.lock();
150         lock.idle.sleepers.contains(&worker_id)
151     }
152 
notify_should_wakeup(&self) -> bool153     fn notify_should_wakeup(&self) -> bool {
154         let state = State(self.state.fetch_add(0, SeqCst));
155         state.num_searching() == 0 && state.num_unparked() < self.num_workers
156     }
157 }
158 
159 impl State {
new(num_workers: usize) -> State160     fn new(num_workers: usize) -> State {
161         // All workers start in the unparked state
162         let ret = State(num_workers << UNPARK_SHIFT);
163         debug_assert_eq!(num_workers, ret.num_unparked());
164         debug_assert_eq!(0, ret.num_searching());
165         ret
166     }
167 
load(cell: &AtomicUsize, ordering: Ordering) -> State168     fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
169         State(cell.load(ordering))
170     }
171 
unpark_one(cell: &AtomicUsize, num_searching: usize)172     fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
173         cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
174     }
175 
inc_num_searching(cell: &AtomicUsize, ordering: Ordering)176     fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
177         cell.fetch_add(1, ordering);
178     }
179 
180     /// Returns `true` if this is the final searching worker
dec_num_searching(cell: &AtomicUsize) -> bool181     fn dec_num_searching(cell: &AtomicUsize) -> bool {
182         let state = State(cell.fetch_sub(1, SeqCst));
183         state.num_searching() == 1
184     }
185 
186     /// Track a sleeping worker
187     ///
188     /// Returns `true` if this is the final searching worker.
dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool189     fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
190         let mut dec = 1 << UNPARK_SHIFT;
191 
192         if is_searching {
193             dec += 1;
194         }
195 
196         let prev = State(cell.fetch_sub(dec, SeqCst));
197         is_searching && prev.num_searching() == 1
198     }
199 
200     /// Number of workers currently searching
num_searching(self) -> usize201     fn num_searching(self) -> usize {
202         self.0 & SEARCH_MASK
203     }
204 
205     /// Number of workers currently unparked
num_unparked(self) -> usize206     fn num_unparked(self) -> usize {
207         (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
208     }
209 }
210 
211 impl From<usize> for State {
from(src: usize) -> State212     fn from(src: usize) -> State {
213         State(src)
214     }
215 }
216 
217 impl From<State> for usize {
from(src: State) -> usize218     fn from(src: State) -> usize {
219         src.0
220     }
221 }
222 
223 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result224     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225         fmt.debug_struct("worker::State")
226             .field("num_unparked", &self.num_unparked())
227             .field("num_searching", &self.num_searching())
228             .finish()
229     }
230 }
231 
232 #[test]
test_state()233 fn test_state() {
234     assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
235     assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
236 
237     let state = State::new(10);
238     assert_eq!(10, state.num_unparked());
239     assert_eq!(0, state.num_searching());
240 }
241