• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Waking mechanism for threads blocked on channel operations.
2 
3 use std::ptr;
4 use std::sync::atomic::{AtomicBool, Ordering};
5 use std::sync::Mutex;
6 use std::thread::{self, ThreadId};
7 
8 use crate::context::Context;
9 use crate::select::{Operation, Selected};
10 
11 /// Represents a thread blocked on a specific channel operation.
12 pub(crate) struct Entry {
13     /// The operation.
14     pub(crate) oper: Operation,
15 
16     /// Optional packet.
17     pub(crate) packet: *mut (),
18 
19     /// Context associated with the thread owning this operation.
20     pub(crate) cx: Context,
21 }
22 
23 /// A queue of threads blocked on channel operations.
24 ///
25 /// This data structure is used by threads to register blocking operations and get woken up once
26 /// an operation becomes ready.
27 pub(crate) struct Waker {
28     /// A list of select operations.
29     selectors: Vec<Entry>,
30 
31     /// A list of operations waiting to be ready.
32     observers: Vec<Entry>,
33 }
34 
35 impl Waker {
36     /// Creates a new `Waker`.
37     #[inline]
new() -> Self38     pub(crate) fn new() -> Self {
39         Waker {
40             selectors: Vec::new(),
41             observers: Vec::new(),
42         }
43     }
44 
45     /// Registers a select operation.
46     #[inline]
register(&mut self, oper: Operation, cx: &Context)47     pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48         self.register_with_packet(oper, ptr::null_mut(), cx);
49     }
50 
51     /// Registers a select operation and a packet.
52     #[inline]
register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context)53     pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54         self.selectors.push(Entry {
55             oper,
56             packet,
57             cx: cx.clone(),
58         });
59     }
60 
61     /// Unregisters a select operation.
62     #[inline]
unregister(&mut self, oper: Operation) -> Option<Entry>63     pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64         if let Some((i, _)) = self
65             .selectors
66             .iter()
67             .enumerate()
68             .find(|&(_, entry)| entry.oper == oper)
69         {
70             let entry = self.selectors.remove(i);
71             Some(entry)
72         } else {
73             None
74         }
75     }
76 
77     /// Attempts to find another thread's entry, select the operation, and wake it up.
78     #[inline]
try_select(&mut self) -> Option<Entry>79     pub(crate) fn try_select(&mut self) -> Option<Entry> {
80         if self.selectors.is_empty() {
81             None
82         } else {
83             let thread_id = current_thread_id();
84 
85             self.selectors
86                 .iter()
87                 .position(|selector| {
88                     // Does the entry belong to a different thread?
89                     selector.cx.thread_id() != thread_id
90                         && selector // Try selecting this operation.
91                             .cx
92                             .try_select(Selected::Operation(selector.oper))
93                             .is_ok()
94                         && {
95                             // Provide the packet.
96                             selector.cx.store_packet(selector.packet);
97                             // Wake the thread up.
98                             selector.cx.unpark();
99                             true
100                         }
101                 })
102                 // Remove the entry from the queue to keep it clean and improve
103                 // performance.
104                 .map(|pos| self.selectors.remove(pos))
105         }
106     }
107 
108     /// Returns `true` if there is an entry which can be selected by the current thread.
109     #[inline]
can_select(&self) -> bool110     pub(crate) fn can_select(&self) -> bool {
111         if self.selectors.is_empty() {
112             false
113         } else {
114             let thread_id = current_thread_id();
115 
116             self.selectors.iter().any(|entry| {
117                 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
118             })
119         }
120     }
121 
122     /// Registers an operation waiting to be ready.
123     #[inline]
watch(&mut self, oper: Operation, cx: &Context)124     pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
125         self.observers.push(Entry {
126             oper,
127             packet: ptr::null_mut(),
128             cx: cx.clone(),
129         });
130     }
131 
132     /// Unregisters an operation waiting to be ready.
133     #[inline]
unwatch(&mut self, oper: Operation)134     pub(crate) fn unwatch(&mut self, oper: Operation) {
135         self.observers.retain(|e| e.oper != oper);
136     }
137 
138     /// Notifies all operations waiting to be ready.
139     #[inline]
notify(&mut self)140     pub(crate) fn notify(&mut self) {
141         for entry in self.observers.drain(..) {
142             if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
143                 entry.cx.unpark();
144             }
145         }
146     }
147 
148     /// Notifies all registered operations that the channel is disconnected.
149     #[inline]
disconnect(&mut self)150     pub(crate) fn disconnect(&mut self) {
151         for entry in self.selectors.iter() {
152             if entry.cx.try_select(Selected::Disconnected).is_ok() {
153                 // Wake the thread up.
154                 //
155                 // Here we don't remove the entry from the queue. Registered threads must
156                 // unregister from the waker by themselves. They might also want to recover the
157                 // packet value and destroy it, if necessary.
158                 entry.cx.unpark();
159             }
160         }
161 
162         self.notify();
163     }
164 }
165 
166 impl Drop for Waker {
167     #[inline]
drop(&mut self)168     fn drop(&mut self) {
169         debug_assert_eq!(self.selectors.len(), 0);
170         debug_assert_eq!(self.observers.len(), 0);
171     }
172 }
173 
174 /// A waker that can be shared among threads without locking.
175 ///
176 /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
177 pub(crate) struct SyncWaker {
178     /// The inner `Waker`.
179     inner: Mutex<Waker>,
180 
181     /// `true` if the waker is empty.
182     is_empty: AtomicBool,
183 }
184 
185 impl SyncWaker {
186     /// Creates a new `SyncWaker`.
187     #[inline]
new() -> Self188     pub(crate) fn new() -> Self {
189         SyncWaker {
190             inner: Mutex::new(Waker::new()),
191             is_empty: AtomicBool::new(true),
192         }
193     }
194 
195     /// Registers the current thread with an operation.
196     #[inline]
register(&self, oper: Operation, cx: &Context)197     pub(crate) fn register(&self, oper: Operation, cx: &Context) {
198         let mut inner = self.inner.lock().unwrap();
199         inner.register(oper, cx);
200         self.is_empty.store(
201             inner.selectors.is_empty() && inner.observers.is_empty(),
202             Ordering::SeqCst,
203         );
204     }
205 
206     /// Unregisters an operation previously registered by the current thread.
207     #[inline]
unregister(&self, oper: Operation) -> Option<Entry>208     pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
209         let mut inner = self.inner.lock().unwrap();
210         let entry = inner.unregister(oper);
211         self.is_empty.store(
212             inner.selectors.is_empty() && inner.observers.is_empty(),
213             Ordering::SeqCst,
214         );
215         entry
216     }
217 
218     /// Attempts to find one thread (not the current one), select its operation, and wake it up.
219     #[inline]
notify(&self)220     pub(crate) fn notify(&self) {
221         if !self.is_empty.load(Ordering::SeqCst) {
222             let mut inner = self.inner.lock().unwrap();
223             if !self.is_empty.load(Ordering::SeqCst) {
224                 inner.try_select();
225                 inner.notify();
226                 self.is_empty.store(
227                     inner.selectors.is_empty() && inner.observers.is_empty(),
228                     Ordering::SeqCst,
229                 );
230             }
231         }
232     }
233 
234     /// Registers an operation waiting to be ready.
235     #[inline]
watch(&self, oper: Operation, cx: &Context)236     pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
237         let mut inner = self.inner.lock().unwrap();
238         inner.watch(oper, cx);
239         self.is_empty.store(
240             inner.selectors.is_empty() && inner.observers.is_empty(),
241             Ordering::SeqCst,
242         );
243     }
244 
245     /// Unregisters an operation waiting to be ready.
246     #[inline]
unwatch(&self, oper: Operation)247     pub(crate) fn unwatch(&self, oper: Operation) {
248         let mut inner = self.inner.lock().unwrap();
249         inner.unwatch(oper);
250         self.is_empty.store(
251             inner.selectors.is_empty() && inner.observers.is_empty(),
252             Ordering::SeqCst,
253         );
254     }
255 
256     /// Notifies all threads that the channel is disconnected.
257     #[inline]
disconnect(&self)258     pub(crate) fn disconnect(&self) {
259         let mut inner = self.inner.lock().unwrap();
260         inner.disconnect();
261         self.is_empty.store(
262             inner.selectors.is_empty() && inner.observers.is_empty(),
263             Ordering::SeqCst,
264         );
265     }
266 }
267 
268 impl Drop for SyncWaker {
269     #[inline]
drop(&mut self)270     fn drop(&mut self) {
271         debug_assert!(self.is_empty.load(Ordering::SeqCst));
272     }
273 }
274 
275 /// Returns the id of the current thread.
276 #[inline]
current_thread_id() -> ThreadId277 fn current_thread_id() -> ThreadId {
278     thread_local! {
279         /// Cached thread-local id.
280         static THREAD_ID: ThreadId = thread::current().id();
281     }
282 
283     THREAD_ID
284         .try_with(|id| *id)
285         .unwrap_or_else(|_| thread::current().id())
286 }
287