1 #![allow(clippy::unit_arg)]
2
3 use crate::signal::os::{OsExtraData, OsStorage};
4
5 use crate::sync::watch;
6
7 use once_cell::sync::Lazy;
8 use std::ops;
9 use std::pin::Pin;
10 use std::sync::atomic::{AtomicBool, Ordering};
11
12 pub(crate) type EventId = usize;
13
14 /// State for a specific event, whether a notification is pending delivery,
15 /// and what listeners are registered.
16 #[derive(Debug)]
17 pub(crate) struct EventInfo {
18 pending: AtomicBool,
19 tx: watch::Sender<()>,
20 }
21
22 impl Default for EventInfo {
default() -> Self23 fn default() -> Self {
24 let (tx, _rx) = watch::channel(());
25
26 Self {
27 pending: AtomicBool::new(false),
28 tx,
29 }
30 }
31 }
32
33 /// An interface for retrieving the `EventInfo` for a particular eventId.
34 pub(crate) trait Storage {
35 /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>36 fn event_info(&self, id: EventId) -> Option<&EventInfo>;
37
38 /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)39 fn for_each<'a, F>(&'a self, f: F)
40 where
41 F: FnMut(&'a EventInfo);
42 }
43
44 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>45 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
46 self.get(id)
47 }
48
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),49 fn for_each<'a, F>(&'a self, f: F)
50 where
51 F: FnMut(&'a EventInfo),
52 {
53 self.iter().for_each(f)
54 }
55 }
56
57 /// An interface for initializing a type. Useful for situations where we cannot
58 /// inject a configured instance in the constructor of another type.
59 pub(crate) trait Init {
init() -> Self60 fn init() -> Self;
61 }
62
63 /// Manages and distributes event notifications to any registered listeners.
64 ///
65 /// Generic over the underlying storage to allow for domain specific
66 /// optimizations (e.g. eventIds may or may not be contiguous).
67 #[derive(Debug)]
68 pub(crate) struct Registry<S> {
69 storage: S,
70 }
71
72 impl<S> Registry<S> {
new(storage: S) -> Self73 fn new(storage: S) -> Self {
74 Self { storage }
75 }
76 }
77
78 impl<S: Storage> Registry<S> {
79 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>80 fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
81 self.storage
82 .event_info(event_id)
83 .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
84 .tx
85 .subscribe()
86 }
87
88 /// Marks `event_id` as having been delivered, without broadcasting it to
89 /// any listeners.
record_event(&self, event_id: EventId)90 fn record_event(&self, event_id: EventId) {
91 if let Some(event_info) = self.storage.event_info(event_id) {
92 event_info.pending.store(true, Ordering::SeqCst)
93 }
94 }
95
96 /// Broadcasts all previously recorded events to their respective listeners.
97 ///
98 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool99 fn broadcast(&self) -> bool {
100 let mut did_notify = false;
101 self.storage.for_each(|event_info| {
102 // Any signal of this kind arrived since we checked last?
103 if !event_info.pending.swap(false, Ordering::SeqCst) {
104 return;
105 }
106
107 // Ignore errors if there are no listeners
108 if event_info.tx.send(()).is_ok() {
109 did_notify = true;
110 }
111 });
112
113 did_notify
114 }
115 }
116
117 pub(crate) struct Globals {
118 extra: OsExtraData,
119 registry: Registry<OsStorage>,
120 }
121
122 impl ops::Deref for Globals {
123 type Target = OsExtraData;
124
deref(&self) -> &Self::Target125 fn deref(&self) -> &Self::Target {
126 &self.extra
127 }
128 }
129
130 impl Globals {
131 /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>132 pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
133 self.registry.register_listener(event_id)
134 }
135
136 /// Marks `event_id` as having been delivered, without broadcasting it to
137 /// any listeners.
record_event(&self, event_id: EventId)138 pub(crate) fn record_event(&self, event_id: EventId) {
139 self.registry.record_event(event_id);
140 }
141
142 /// Broadcasts all previously recorded events to their respective listeners.
143 ///
144 /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool145 pub(crate) fn broadcast(&self) -> bool {
146 self.registry.broadcast()
147 }
148
149 #[cfg(unix)]
storage(&self) -> &OsStorage150 pub(crate) fn storage(&self) -> &OsStorage {
151 &self.registry.storage
152 }
153 }
154
globals() -> Pin<&'static Globals> where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,155 pub(crate) fn globals() -> Pin<&'static Globals>
156 where
157 OsExtraData: 'static + Send + Sync + Init,
158 OsStorage: 'static + Send + Sync + Init,
159 {
160 static GLOBALS: Lazy<Pin<Box<Globals>>> = Lazy::new(|| {
161 Box::pin(Globals {
162 extra: OsExtraData::init(),
163 registry: Registry::new(OsStorage::init()),
164 })
165 });
166
167 GLOBALS.as_ref()
168 }
169
170 #[cfg(all(test, not(loom)))]
171 mod tests {
172 use super::*;
173 use crate::runtime::{self, Runtime};
174 use crate::sync::{oneshot, watch};
175
176 use futures::future;
177
178 #[test]
smoke()179 fn smoke() {
180 let rt = rt();
181 rt.block_on(async move {
182 let registry = Registry::new(vec![
183 EventInfo::default(),
184 EventInfo::default(),
185 EventInfo::default(),
186 ]);
187
188 let first = registry.register_listener(0);
189 let second = registry.register_listener(1);
190 let third = registry.register_listener(2);
191
192 let (fire, wait) = oneshot::channel();
193
194 crate::spawn(async {
195 wait.await.expect("wait failed");
196
197 // Record some events which should get coalesced
198 registry.record_event(0);
199 registry.record_event(0);
200 registry.record_event(1);
201 registry.record_event(1);
202 registry.broadcast();
203
204 // Yield so the previous broadcast can get received
205 crate::time::sleep(std::time::Duration::from_millis(10)).await;
206
207 // Send subsequent signal
208 registry.record_event(0);
209 registry.broadcast();
210
211 drop(registry);
212 });
213
214 let _ = fire.send(());
215 let all = future::join3(collect(first), collect(second), collect(third));
216
217 let (first_results, second_results, third_results) = all.await;
218 assert_eq!(2, first_results.len());
219 assert_eq!(1, second_results.len());
220 assert_eq!(0, third_results.len());
221 });
222 }
223
224 #[test]
225 #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()226 fn register_panics_on_invalid_input() {
227 let registry = Registry::new(vec![EventInfo::default()]);
228
229 registry.register_listener(1);
230 }
231
232 #[test]
record_invalid_event_does_nothing()233 fn record_invalid_event_does_nothing() {
234 let registry = Registry::new(vec![EventInfo::default()]);
235 registry.record_event(42);
236 }
237
238 #[test]
broadcast_returns_if_at_least_one_event_fired()239 fn broadcast_returns_if_at_least_one_event_fired() {
240 let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
241
242 registry.record_event(0);
243 assert_eq!(false, registry.broadcast());
244
245 let first = registry.register_listener(0);
246 let second = registry.register_listener(1);
247
248 registry.record_event(0);
249 assert_eq!(true, registry.broadcast());
250
251 drop(first);
252 registry.record_event(0);
253 assert_eq!(false, registry.broadcast());
254
255 drop(second);
256 }
257
rt() -> Runtime258 fn rt() -> Runtime {
259 runtime::Builder::new_current_thread()
260 .enable_time()
261 .build()
262 .unwrap()
263 }
264
collect(mut rx: watch::Receiver<()>) -> Vec<()>265 async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
266 let mut ret = vec![];
267
268 while let Ok(v) = rx.changed().await {
269 ret.push(v);
270 }
271
272 ret
273 }
274 }
275