• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Waking mechanism for threads blocked on channel operations.
2 
3 use std::ptr;
4 use std::sync::atomic::{AtomicBool, Ordering};
5 use std::thread::{self, ThreadId};
6 
7 use crate::context::Context;
8 use crate::select::{Operation, Selected};
9 use crate::utils::Spinlock;
10 
11 /// Represents a thread blocked on a specific channel operation.
12 pub(crate) struct Entry {
13     /// The operation.
14     pub(crate) oper: Operation,
15 
16     /// Optional packet.
17     pub(crate) packet: *mut (),
18 
19     /// Context associated with the thread owning this operation.
20     pub(crate) cx: Context,
21 }
22 
23 /// A queue of threads blocked on channel operations.
24 ///
25 /// This data structure is used by threads to register blocking operations and get woken up once
26 /// an operation becomes ready.
27 pub(crate) struct Waker {
28     /// A list of select operations.
29     selectors: Vec<Entry>,
30 
31     /// A list of operations waiting to be ready.
32     observers: Vec<Entry>,
33 }
34 
35 impl Waker {
36     /// Creates a new `Waker`.
37     #[inline]
new() -> Self38     pub(crate) fn new() -> Self {
39         Waker {
40             selectors: Vec::new(),
41             observers: Vec::new(),
42         }
43     }
44 
45     /// Registers a select operation.
46     #[inline]
register(&mut self, oper: Operation, cx: &Context)47     pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48         self.register_with_packet(oper, ptr::null_mut(), cx);
49     }
50 
51     /// Registers a select operation and a packet.
52     #[inline]
register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context)53     pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54         self.selectors.push(Entry {
55             oper,
56             packet,
57             cx: cx.clone(),
58         });
59     }
60 
61     /// Unregisters a select operation.
62     #[inline]
unregister(&mut self, oper: Operation) -> Option<Entry>63     pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64         if let Some((i, _)) = self
65             .selectors
66             .iter()
67             .enumerate()
68             .find(|&(_, entry)| entry.oper == oper)
69         {
70             let entry = self.selectors.remove(i);
71             Some(entry)
72         } else {
73             None
74         }
75     }
76 
77     /// Attempts to find another thread's entry, select the operation, and wake it up.
78     #[inline]
try_select(&mut self) -> Option<Entry>79     pub(crate) fn try_select(&mut self) -> Option<Entry> {
80         self.selectors
81             .iter()
82             .position(|selector| {
83                 // Does the entry belong to a different thread?
84                 selector.cx.thread_id() != current_thread_id()
85                     && selector // Try selecting this operation.
86                         .cx
87                         .try_select(Selected::Operation(selector.oper))
88                         .is_ok()
89                     && {
90                         // Provide the packet.
91                         selector.cx.store_packet(selector.packet);
92                         // Wake the thread up.
93                         selector.cx.unpark();
94                         true
95                     }
96             })
97             // Remove the entry from the queue to keep it clean and improve
98             // performance.
99             .map(|pos| self.selectors.remove(pos))
100     }
101 
102     /// Returns `true` if there is an entry which can be selected by the current thread.
103     #[inline]
can_select(&self) -> bool104     pub(crate) fn can_select(&self) -> bool {
105         if self.selectors.is_empty() {
106             false
107         } else {
108             let thread_id = current_thread_id();
109 
110             self.selectors.iter().any(|entry| {
111                 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
112             })
113         }
114     }
115 
116     /// Registers an operation waiting to be ready.
117     #[inline]
watch(&mut self, oper: Operation, cx: &Context)118     pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
119         self.observers.push(Entry {
120             oper,
121             packet: ptr::null_mut(),
122             cx: cx.clone(),
123         });
124     }
125 
126     /// Unregisters an operation waiting to be ready.
127     #[inline]
unwatch(&mut self, oper: Operation)128     pub(crate) fn unwatch(&mut self, oper: Operation) {
129         self.observers.retain(|e| e.oper != oper);
130     }
131 
132     /// Notifies all operations waiting to be ready.
133     #[inline]
notify(&mut self)134     pub(crate) fn notify(&mut self) {
135         for entry in self.observers.drain(..) {
136             if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
137                 entry.cx.unpark();
138             }
139         }
140     }
141 
142     /// Notifies all registered operations that the channel is disconnected.
143     #[inline]
disconnect(&mut self)144     pub(crate) fn disconnect(&mut self) {
145         for entry in self.selectors.iter() {
146             if entry.cx.try_select(Selected::Disconnected).is_ok() {
147                 // Wake the thread up.
148                 //
149                 // Here we don't remove the entry from the queue. Registered threads must
150                 // unregister from the waker by themselves. They might also want to recover the
151                 // packet value and destroy it, if necessary.
152                 entry.cx.unpark();
153             }
154         }
155 
156         self.notify();
157     }
158 }
159 
160 impl Drop for Waker {
161     #[inline]
drop(&mut self)162     fn drop(&mut self) {
163         debug_assert_eq!(self.selectors.len(), 0);
164         debug_assert_eq!(self.observers.len(), 0);
165     }
166 }
167 
168 /// A waker that can be shared among threads without locking.
169 ///
170 /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
171 pub(crate) struct SyncWaker {
172     /// The inner `Waker`.
173     inner: Spinlock<Waker>,
174 
175     /// `true` if the waker is empty.
176     is_empty: AtomicBool,
177 }
178 
179 impl SyncWaker {
180     /// Creates a new `SyncWaker`.
181     #[inline]
new() -> Self182     pub(crate) fn new() -> Self {
183         SyncWaker {
184             inner: Spinlock::new(Waker::new()),
185             is_empty: AtomicBool::new(true),
186         }
187     }
188 
189     /// Registers the current thread with an operation.
190     #[inline]
register(&self, oper: Operation, cx: &Context)191     pub(crate) fn register(&self, oper: Operation, cx: &Context) {
192         let mut inner = self.inner.lock();
193         inner.register(oper, cx);
194         self.is_empty.store(
195             inner.selectors.is_empty() && inner.observers.is_empty(),
196             Ordering::SeqCst,
197         );
198     }
199 
200     /// Unregisters an operation previously registered by the current thread.
201     #[inline]
unregister(&self, oper: Operation) -> Option<Entry>202     pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
203         let mut inner = self.inner.lock();
204         let entry = inner.unregister(oper);
205         self.is_empty.store(
206             inner.selectors.is_empty() && inner.observers.is_empty(),
207             Ordering::SeqCst,
208         );
209         entry
210     }
211 
212     /// Attempts to find one thread (not the current one), select its operation, and wake it up.
213     #[inline]
notify(&self)214     pub(crate) fn notify(&self) {
215         if !self.is_empty.load(Ordering::SeqCst) {
216             let mut inner = self.inner.lock();
217             if !self.is_empty.load(Ordering::SeqCst) {
218                 inner.try_select();
219                 inner.notify();
220                 self.is_empty.store(
221                     inner.selectors.is_empty() && inner.observers.is_empty(),
222                     Ordering::SeqCst,
223                 );
224             }
225         }
226     }
227 
228     /// Registers an operation waiting to be ready.
229     #[inline]
watch(&self, oper: Operation, cx: &Context)230     pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
231         let mut inner = self.inner.lock();
232         inner.watch(oper, cx);
233         self.is_empty.store(
234             inner.selectors.is_empty() && inner.observers.is_empty(),
235             Ordering::SeqCst,
236         );
237     }
238 
239     /// Unregisters an operation waiting to be ready.
240     #[inline]
unwatch(&self, oper: Operation)241     pub(crate) fn unwatch(&self, oper: Operation) {
242         let mut inner = self.inner.lock();
243         inner.unwatch(oper);
244         self.is_empty.store(
245             inner.selectors.is_empty() && inner.observers.is_empty(),
246             Ordering::SeqCst,
247         );
248     }
249 
250     /// Notifies all threads that the channel is disconnected.
251     #[inline]
disconnect(&self)252     pub(crate) fn disconnect(&self) {
253         let mut inner = self.inner.lock();
254         inner.disconnect();
255         self.is_empty.store(
256             inner.selectors.is_empty() && inner.observers.is_empty(),
257             Ordering::SeqCst,
258         );
259     }
260 }
261 
262 impl Drop for SyncWaker {
263     #[inline]
drop(&mut self)264     fn drop(&mut self) {
265         debug_assert!(self.is_empty.load(Ordering::SeqCst));
266     }
267 }
268 
269 /// Returns the id of the current thread.
270 #[inline]
current_thread_id() -> ThreadId271 fn current_thread_id() -> ThreadId {
272     thread_local! {
273         /// Cached thread-local id.
274         static THREAD_ID: ThreadId = thread::current().id();
275     }
276 
277     THREAD_ID
278         .try_with(|id| *id)
279         .unwrap_or_else(|_| thread::current().id())
280 }
281