• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(clippy::unit_arg)]
2 
3 use crate::signal::os::{OsExtraData, OsStorage};
4 use crate::sync::watch;
5 use crate::util::once_cell::OnceCell;
6 
7 use std::ops;
8 use std::sync::atomic::{AtomicBool, Ordering};
9 
10 pub(crate) type EventId = usize;
11 
12 /// State for a specific event, whether a notification is pending delivery,
13 /// and what listeners are registered.
14 #[derive(Debug)]
15 pub(crate) struct EventInfo {
16     pending: AtomicBool,
17     tx: watch::Sender<()>,
18 }
19 
20 impl Default for EventInfo {
default() -> Self21     fn default() -> Self {
22         let (tx, _rx) = watch::channel(());
23 
24         Self {
25             pending: AtomicBool::new(false),
26             tx,
27         }
28     }
29 }
30 
31 /// An interface for retrieving the `EventInfo` for a particular eventId.
32 pub(crate) trait Storage {
33     /// Gets the `EventInfo` for `id` if it exists.
event_info(&self, id: EventId) -> Option<&EventInfo>34     fn event_info(&self, id: EventId) -> Option<&EventInfo>;
35 
36     /// Invokes `f` once for each defined `EventInfo` in this storage.
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo)37     fn for_each<'a, F>(&'a self, f: F)
38     where
39         F: FnMut(&'a EventInfo);
40 }
41 
42 impl Storage for Vec<EventInfo> {
event_info(&self, id: EventId) -> Option<&EventInfo>43     fn event_info(&self, id: EventId) -> Option<&EventInfo> {
44         self.get(id)
45     }
46 
for_each<'a, F>(&'a self, f: F) where F: FnMut(&'a EventInfo),47     fn for_each<'a, F>(&'a self, f: F)
48     where
49         F: FnMut(&'a EventInfo),
50     {
51         self.iter().for_each(f)
52     }
53 }
54 
55 /// An interface for initializing a type. Useful for situations where we cannot
56 /// inject a configured instance in the constructor of another type.
57 pub(crate) trait Init {
init() -> Self58     fn init() -> Self;
59 }
60 
61 /// Manages and distributes event notifications to any registered listeners.
62 ///
63 /// Generic over the underlying storage to allow for domain specific
64 /// optimizations (e.g. eventIds may or may not be contiguous).
65 #[derive(Debug)]
66 pub(crate) struct Registry<S> {
67     storage: S,
68 }
69 
70 impl<S> Registry<S> {
new(storage: S) -> Self71     fn new(storage: S) -> Self {
72         Self { storage }
73     }
74 }
75 
76 impl<S: Storage> Registry<S> {
77     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>78     fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
79         self.storage
80             .event_info(event_id)
81             .unwrap_or_else(|| panic!("invalid event_id: {}", event_id))
82             .tx
83             .subscribe()
84     }
85 
86     /// Marks `event_id` as having been delivered, without broadcasting it to
87     /// any listeners.
record_event(&self, event_id: EventId)88     fn record_event(&self, event_id: EventId) {
89         if let Some(event_info) = self.storage.event_info(event_id) {
90             event_info.pending.store(true, Ordering::SeqCst)
91         }
92     }
93 
94     /// Broadcasts all previously recorded events to their respective listeners.
95     ///
96     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool97     fn broadcast(&self) -> bool {
98         let mut did_notify = false;
99         self.storage.for_each(|event_info| {
100             // Any signal of this kind arrived since we checked last?
101             if !event_info.pending.swap(false, Ordering::SeqCst) {
102                 return;
103             }
104 
105             // Ignore errors if there are no listeners
106             if event_info.tx.send(()).is_ok() {
107                 did_notify = true;
108             }
109         });
110 
111         did_notify
112     }
113 }
114 
115 pub(crate) struct Globals {
116     extra: OsExtraData,
117     registry: Registry<OsStorage>,
118 }
119 
120 impl ops::Deref for Globals {
121     type Target = OsExtraData;
122 
deref(&self) -> &Self::Target123     fn deref(&self) -> &Self::Target {
124         &self.extra
125     }
126 }
127 
128 impl Globals {
129     /// Registers a new listener for `event_id`.
register_listener(&self, event_id: EventId) -> watch::Receiver<()>130     pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
131         self.registry.register_listener(event_id)
132     }
133 
134     /// Marks `event_id` as having been delivered, without broadcasting it to
135     /// any listeners.
record_event(&self, event_id: EventId)136     pub(crate) fn record_event(&self, event_id: EventId) {
137         self.registry.record_event(event_id);
138     }
139 
140     /// Broadcasts all previously recorded events to their respective listeners.
141     ///
142     /// Returns `true` if an event was delivered to at least one listener.
broadcast(&self) -> bool143     pub(crate) fn broadcast(&self) -> bool {
144         self.registry.broadcast()
145     }
146 
147     #[cfg(unix)]
storage(&self) -> &OsStorage148     pub(crate) fn storage(&self) -> &OsStorage {
149         &self.registry.storage
150     }
151 }
152 
globals_init() -> Globals where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,153 fn globals_init() -> Globals
154 where
155     OsExtraData: 'static + Send + Sync + Init,
156     OsStorage: 'static + Send + Sync + Init,
157 {
158     Globals {
159         extra: OsExtraData::init(),
160         registry: Registry::new(OsStorage::init()),
161     }
162 }
163 
globals() -> &'static Globals where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init,164 pub(crate) fn globals() -> &'static Globals
165 where
166     OsExtraData: 'static + Send + Sync + Init,
167     OsStorage: 'static + Send + Sync + Init,
168 {
169     static GLOBALS: OnceCell<Globals> = OnceCell::new();
170 
171     GLOBALS.get(globals_init)
172 }
173 
174 #[cfg(all(test, not(loom)))]
175 mod tests {
176     use super::*;
177     use crate::runtime::{self, Runtime};
178     use crate::sync::{oneshot, watch};
179 
180     use futures::future;
181 
182     #[test]
smoke()183     fn smoke() {
184         let rt = rt();
185         rt.block_on(async move {
186             let registry = Registry::new(vec![
187                 EventInfo::default(),
188                 EventInfo::default(),
189                 EventInfo::default(),
190             ]);
191 
192             let first = registry.register_listener(0);
193             let second = registry.register_listener(1);
194             let third = registry.register_listener(2);
195 
196             let (fire, wait) = oneshot::channel();
197 
198             crate::spawn(async {
199                 wait.await.expect("wait failed");
200 
201                 // Record some events which should get coalesced
202                 registry.record_event(0);
203                 registry.record_event(0);
204                 registry.record_event(1);
205                 registry.record_event(1);
206                 registry.broadcast();
207 
208                 // Yield so the previous broadcast can get received
209                 //
210                 // This yields many times since the block_on task is only polled every 61
211                 // ticks.
212                 for _ in 0..100 {
213                     crate::task::yield_now().await;
214                 }
215 
216                 // Send subsequent signal
217                 registry.record_event(0);
218                 registry.broadcast();
219 
220                 drop(registry);
221             });
222 
223             let _ = fire.send(());
224             let all = future::join3(collect(first), collect(second), collect(third));
225 
226             let (first_results, second_results, third_results) = all.await;
227             assert_eq!(2, first_results.len());
228             assert_eq!(1, second_results.len());
229             assert_eq!(0, third_results.len());
230         });
231     }
232 
233     #[test]
234     #[should_panic = "invalid event_id: 1"]
register_panics_on_invalid_input()235     fn register_panics_on_invalid_input() {
236         let registry = Registry::new(vec![EventInfo::default()]);
237 
238         registry.register_listener(1);
239     }
240 
241     #[test]
record_invalid_event_does_nothing()242     fn record_invalid_event_does_nothing() {
243         let registry = Registry::new(vec![EventInfo::default()]);
244         registry.record_event(1302);
245     }
246 
247     #[test]
broadcast_returns_if_at_least_one_event_fired()248     fn broadcast_returns_if_at_least_one_event_fired() {
249         let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
250 
251         registry.record_event(0);
252         assert!(!registry.broadcast());
253 
254         let first = registry.register_listener(0);
255         let second = registry.register_listener(1);
256 
257         registry.record_event(0);
258         assert!(registry.broadcast());
259 
260         drop(first);
261         registry.record_event(0);
262         assert!(!registry.broadcast());
263 
264         drop(second);
265     }
266 
rt() -> Runtime267     fn rt() -> Runtime {
268         runtime::Builder::new_current_thread()
269             .enable_time()
270             .build()
271             .unwrap()
272     }
273 
collect(mut rx: watch::Receiver<()>) -> Vec<()>274     async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
275         let mut ret = vec![];
276 
277         while let Ok(v) = rx.changed().await {
278             ret.push(v);
279         }
280 
281         ret
282     }
283 }
284