1 //! Waking mechanism for threads blocked on channel operations.
2
3 use std::ptr;
4 use std::sync::atomic::{AtomicBool, Ordering};
5 use std::thread::{self, ThreadId};
6
7 use crate::context::Context;
8 use crate::select::{Operation, Selected};
9 use crate::utils::Spinlock;
10
11 /// Represents a thread blocked on a specific channel operation.
12 pub(crate) struct Entry {
13 /// The operation.
14 pub(crate) oper: Operation,
15
16 /// Optional packet.
17 pub(crate) packet: *mut (),
18
19 /// Context associated with the thread owning this operation.
20 pub(crate) cx: Context,
21 }
22
23 /// A queue of threads blocked on channel operations.
24 ///
25 /// This data structure is used by threads to register blocking operations and get woken up once
26 /// an operation becomes ready.
27 pub(crate) struct Waker {
28 /// A list of select operations.
29 selectors: Vec<Entry>,
30
31 /// A list of operations waiting to be ready.
32 observers: Vec<Entry>,
33 }
34
35 impl Waker {
36 /// Creates a new `Waker`.
37 #[inline]
new() -> Self38 pub(crate) fn new() -> Self {
39 Waker {
40 selectors: Vec::new(),
41 observers: Vec::new(),
42 }
43 }
44
45 /// Registers a select operation.
46 #[inline]
register(&mut self, oper: Operation, cx: &Context)47 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48 self.register_with_packet(oper, ptr::null_mut(), cx);
49 }
50
51 /// Registers a select operation and a packet.
52 #[inline]
register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context)53 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54 self.selectors.push(Entry {
55 oper,
56 packet,
57 cx: cx.clone(),
58 });
59 }
60
61 /// Unregisters a select operation.
62 #[inline]
unregister(&mut self, oper: Operation) -> Option<Entry>63 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64 if let Some((i, _)) = self
65 .selectors
66 .iter()
67 .enumerate()
68 .find(|&(_, entry)| entry.oper == oper)
69 {
70 let entry = self.selectors.remove(i);
71 Some(entry)
72 } else {
73 None
74 }
75 }
76
77 /// Attempts to find another thread's entry, select the operation, and wake it up.
78 #[inline]
try_select(&mut self) -> Option<Entry>79 pub(crate) fn try_select(&mut self) -> Option<Entry> {
80 self.selectors
81 .iter()
82 .position(|selector| {
83 // Does the entry belong to a different thread?
84 selector.cx.thread_id() != current_thread_id()
85 && selector // Try selecting this operation.
86 .cx
87 .try_select(Selected::Operation(selector.oper))
88 .is_ok()
89 && {
90 // Provide the packet.
91 selector.cx.store_packet(selector.packet);
92 // Wake the thread up.
93 selector.cx.unpark();
94 true
95 }
96 })
97 // Remove the entry from the queue to keep it clean and improve
98 // performance.
99 .map(|pos| self.selectors.remove(pos))
100 }
101
102 /// Returns `true` if there is an entry which can be selected by the current thread.
103 #[inline]
can_select(&self) -> bool104 pub(crate) fn can_select(&self) -> bool {
105 if self.selectors.is_empty() {
106 false
107 } else {
108 let thread_id = current_thread_id();
109
110 self.selectors.iter().any(|entry| {
111 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
112 })
113 }
114 }
115
116 /// Registers an operation waiting to be ready.
117 #[inline]
watch(&mut self, oper: Operation, cx: &Context)118 pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
119 self.observers.push(Entry {
120 oper,
121 packet: ptr::null_mut(),
122 cx: cx.clone(),
123 });
124 }
125
126 /// Unregisters an operation waiting to be ready.
127 #[inline]
unwatch(&mut self, oper: Operation)128 pub(crate) fn unwatch(&mut self, oper: Operation) {
129 self.observers.retain(|e| e.oper != oper);
130 }
131
132 /// Notifies all operations waiting to be ready.
133 #[inline]
notify(&mut self)134 pub(crate) fn notify(&mut self) {
135 for entry in self.observers.drain(..) {
136 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
137 entry.cx.unpark();
138 }
139 }
140 }
141
142 /// Notifies all registered operations that the channel is disconnected.
143 #[inline]
disconnect(&mut self)144 pub(crate) fn disconnect(&mut self) {
145 for entry in self.selectors.iter() {
146 if entry.cx.try_select(Selected::Disconnected).is_ok() {
147 // Wake the thread up.
148 //
149 // Here we don't remove the entry from the queue. Registered threads must
150 // unregister from the waker by themselves. They might also want to recover the
151 // packet value and destroy it, if necessary.
152 entry.cx.unpark();
153 }
154 }
155
156 self.notify();
157 }
158 }
159
160 impl Drop for Waker {
161 #[inline]
drop(&mut self)162 fn drop(&mut self) {
163 debug_assert_eq!(self.selectors.len(), 0);
164 debug_assert_eq!(self.observers.len(), 0);
165 }
166 }
167
168 /// A waker that can be shared among threads without locking.
169 ///
170 /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
171 pub(crate) struct SyncWaker {
172 /// The inner `Waker`.
173 inner: Spinlock<Waker>,
174
175 /// `true` if the waker is empty.
176 is_empty: AtomicBool,
177 }
178
179 impl SyncWaker {
180 /// Creates a new `SyncWaker`.
181 #[inline]
new() -> Self182 pub(crate) fn new() -> Self {
183 SyncWaker {
184 inner: Spinlock::new(Waker::new()),
185 is_empty: AtomicBool::new(true),
186 }
187 }
188
189 /// Registers the current thread with an operation.
190 #[inline]
register(&self, oper: Operation, cx: &Context)191 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
192 let mut inner = self.inner.lock();
193 inner.register(oper, cx);
194 self.is_empty.store(
195 inner.selectors.is_empty() && inner.observers.is_empty(),
196 Ordering::SeqCst,
197 );
198 }
199
200 /// Unregisters an operation previously registered by the current thread.
201 #[inline]
unregister(&self, oper: Operation) -> Option<Entry>202 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
203 let mut inner = self.inner.lock();
204 let entry = inner.unregister(oper);
205 self.is_empty.store(
206 inner.selectors.is_empty() && inner.observers.is_empty(),
207 Ordering::SeqCst,
208 );
209 entry
210 }
211
212 /// Attempts to find one thread (not the current one), select its operation, and wake it up.
213 #[inline]
notify(&self)214 pub(crate) fn notify(&self) {
215 if !self.is_empty.load(Ordering::SeqCst) {
216 let mut inner = self.inner.lock();
217 if !self.is_empty.load(Ordering::SeqCst) {
218 inner.try_select();
219 inner.notify();
220 self.is_empty.store(
221 inner.selectors.is_empty() && inner.observers.is_empty(),
222 Ordering::SeqCst,
223 );
224 }
225 }
226 }
227
228 /// Registers an operation waiting to be ready.
229 #[inline]
watch(&self, oper: Operation, cx: &Context)230 pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
231 let mut inner = self.inner.lock();
232 inner.watch(oper, cx);
233 self.is_empty.store(
234 inner.selectors.is_empty() && inner.observers.is_empty(),
235 Ordering::SeqCst,
236 );
237 }
238
239 /// Unregisters an operation waiting to be ready.
240 #[inline]
unwatch(&self, oper: Operation)241 pub(crate) fn unwatch(&self, oper: Operation) {
242 let mut inner = self.inner.lock();
243 inner.unwatch(oper);
244 self.is_empty.store(
245 inner.selectors.is_empty() && inner.observers.is_empty(),
246 Ordering::SeqCst,
247 );
248 }
249
250 /// Notifies all threads that the channel is disconnected.
251 #[inline]
disconnect(&self)252 pub(crate) fn disconnect(&self) {
253 let mut inner = self.inner.lock();
254 inner.disconnect();
255 self.is_empty.store(
256 inner.selectors.is_empty() && inner.observers.is_empty(),
257 Ordering::SeqCst,
258 );
259 }
260 }
261
262 impl Drop for SyncWaker {
263 #[inline]
drop(&mut self)264 fn drop(&mut self) {
265 debug_assert!(self.is_empty.load(Ordering::SeqCst));
266 }
267 }
268
269 /// Returns the id of the current thread.
270 #[inline]
current_thread_id() -> ThreadId271 fn current_thread_id() -> ThreadId {
272 thread_local! {
273 /// Cached thread-local id.
274 static THREAD_ID: ThreadId = thread::current().id();
275 }
276
277 THREAD_ID
278 .try_with(|id| *id)
279 .unwrap_or_else(|_| thread::current().id())
280 }
281