1 #![allow(clippy::unit_arg)]
2
3 use crate::signal::os::{OsExtraData, OsStorage};
4 use crate::sync::watch;
5 use crate::util::once_cell::OnceCell;
6
7 use std::ops;
8 use std::sync::atomic::{AtomicBool, Ordering};
9
10 pub(crate) type EventId = usize;
11
12 /// State for a specific event, whether a notification is pending delivery,
13 /// and what listeners are registered.
14 #[derive(Debug)]
15 pub(crate) struct EventInfo {
16 pending: AtomicBool,
17 tx: watch::Sender<()>,
18 }
19
20 impl Default for EventInfo {
default() -> Self21 fn default() -> Self {
22 let (tx, _rx) = watch::channel(());
23
24 Self {
25 pending: AtomicBool::new(false),
26 tx,
27 }
28 }
29 }
30
31 /// An interface for retrieving the `EventInfo` for a particular eventId.
32 pub(crate) trait Storage {
33 /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>34 fn event_info(&self, id: EventId) -> Option<&EventInfo>;
35
36 /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)37 fn for_each<'a, F>(&'a self, f: F)
38 where
39 F: FnMut(&'a EventInfo);
40 }
41
42 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>43 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
44 self.get(id)
45 }
46
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),47 fn for_each<'a, F>(&'a self, f: F)
48 where
49 F: FnMut(&'a EventInfo),
50 {
51 self.iter().for_each(f)
52 }
53 }
54
55 /// An interface for initializing a type. Useful for situations where we cannot
56 /// inject a configured instance in the constructor of another type.
57 pub(crate) trait Init {
init() -> Self58 fn init() -> Self;
59 }
60
61 /// Manages and distributes event notifications to any registered listeners.
62 ///
63 /// Generic over the underlying storage to allow for domain specific
64 /// optimizations (e.g. eventIds may or may not be contiguous).
65 #[derive(Debug)]
66 pub(crate) struct Registry<S> {
67 storage: S,
68 }
69
70 impl<S> Registry<S> {
new(storage: S) -> Self71 fn new(storage: S) -> Self {
72 Self { storage }
73 }
74 }
75
76 impl<S: Storage> Registry<S> {
77 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>78 fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
79 self.storage
80 .event_info(event_id)
81 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
82 .tx
83 .subscribe()
84 }
85
86 /// Marks `event_id` as having been delivered, without broadcasting it to
87 /// any listeners.
record_event(&self, event_id: EventId)88 fn record_event(&self, event_id: EventId) {
89 if let Some(event_info) = self.storage.event_info(event_id) {
90 event_info.pending.store(true, Ordering::SeqCst)
91 }
92 }
93
94 /// Broadcasts all previously recorded events to their respective listeners.
95 ///
96 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool97 fn broadcast(&self) -> bool {
98 let mut did_notify = false;
99 self.storage.for_each(|event_info| {
100 // Any signal of this kind arrived since we checked last?
101 if !event_info.pending.swap(false, Ordering::SeqCst) {
102 return;
103 }
104
105 // Ignore errors if there are no listeners
106 if event_info.tx.send(()).is_ok() {
107 did_notify = true;
108 }
109 });
110
111 did_notify
112 }
113 }
114
115 pub(crate) struct Globals {
116 extra: OsExtraData,
117 registry: Registry<OsStorage>,
118 }
119
120 impl ops::Deref for Globals {
121 type Target = OsExtraData;
122
deref(&self) -> &Self::Target123 fn deref(&self) -> &Self::Target {
124 &self.extra
125 }
126 }
127
128 impl Globals {
129 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>130 pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
131 self.registry.register_listener(event_id)
132 }
133
134 /// Marks `event_id` as having been delivered, without broadcasting it to
135 /// any listeners.
record_event(&self, event_id: EventId)136 pub(crate) fn record_event(&self, event_id: EventId) {
137 self.registry.record_event(event_id);
138 }
139
140 /// Broadcasts all previously recorded events to their respective listeners.
141 ///
142 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool143 pub(crate) fn broadcast(&self) -> bool {
144 self.registry.broadcast()
145 }
146
147 #[cfg(unix)]
storage(&self) -> &OsStorage148 pub(crate) fn storage(&self) -> &OsStorage {
149 &self.registry.storage
150 }
151 }
152
globals_init() -> Globals where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,153 fn globals_init() -> Globals
154 where
155 OsExtraData: 'static + Send + Sync + Init,
156 OsStorage: 'static + Send + Sync + Init,
157 {
158 Globals {
159 extra: OsExtraData::init(),
160 registry: Registry::new(OsStorage::init()),
161 }
162 }
163
globals() -> &'static Globals where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,164 pub(crate) fn globals() -> &'static Globals
165 where
166 OsExtraData: 'static + Send + Sync + Init,
167 OsStorage: 'static + Send + Sync + Init,
168 {
169 static GLOBALS: OnceCell<Globals> = OnceCell::new();
170
171 GLOBALS.get(globals_init)
172 }
173
174 #[cfg(all(test, not(loom)))]
175 mod tests {
176 use super::*;
177 use crate::runtime::{self, Runtime};
178 use crate::sync::{oneshot, watch};
179
180 use futures::future;
181
182 #[test]
smoke()183 fn smoke() {
184 let rt = rt();
185 rt.block_on(async move {
186 let registry = Registry::new(vec![
187 EventInfo::default(),
188 EventInfo::default(),
189 EventInfo::default(),
190 ]);
191
192 let first = registry.register_listener(0);
193 let second = registry.register_listener(1);
194 let third = registry.register_listener(2);
195
196 let (fire, wait) = oneshot::channel();
197
198 crate::spawn(async {
199 wait.await.expect("wait failed");
200
201 // Record some events which should get coalesced
202 registry.record_event(0);
203 registry.record_event(0);
204 registry.record_event(1);
205 registry.record_event(1);
206 registry.broadcast();
207
208 // Yield so the previous broadcast can get received
209 //
210 // This yields many times since the block_on task is only polled every 61
211 // ticks.
212 for _ in 0..100 {
213 crate::task::yield_now().await;
214 }
215
216 // Send subsequent signal
217 registry.record_event(0);
218 registry.broadcast();
219
220 drop(registry);
221 });
222
223 let _ = fire.send(());
224 let all = future::join3(collect(first), collect(second), collect(third));
225
226 let (first_results, second_results, third_results) = all.await;
227 assert_eq!(2, first_results.len());
228 assert_eq!(1, second_results.len());
229 assert_eq!(0, third_results.len());
230 });
231 }
232
233 #[test]
234 #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()235 fn register_panics_on_invalid_input() {
236 let registry = Registry::new(vec![EventInfo::default()]);
237
238 registry.register_listener(1);
239 }
240
241 #[test]
record_invalid_event_does_nothing()242 fn record_invalid_event_does_nothing() {
243 let registry = Registry::new(vec![EventInfo::default()]);
244 registry.record_event(1302);
245 }
246
247 #[test]
broadcast_returns_if_at_least_one_event_fired()248 fn broadcast_returns_if_at_least_one_event_fired() {
249 let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
250
251 registry.record_event(0);
252 assert!(!registry.broadcast());
253
254 let first = registry.register_listener(0);
255 let second = registry.register_listener(1);
256
257 registry.record_event(0);
258 assert!(registry.broadcast());
259
260 drop(first);
261 registry.record_event(0);
262 assert!(!registry.broadcast());
263
264 drop(second);
265 }
266
rt() -> Runtime267 fn rt() -> Runtime {
268 runtime::Builder::new_current_thread()
269 .enable_time()
270 .build()
271 .unwrap()
272 }
273
collect(mut rx: watch::Receiver<()>) -> Vec<()>274 async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
275 let mut ret = vec![];
276
277 while let Ok(v) = rx.changed().await {
278 ret.push(v);
279 }
280
281 ret
282 }
283 }
284