• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! A module with mpmc channels for distributing global events.
16 
17 use netsim_proto::common::ChipKind;
18 use std::sync::mpsc::{channel, Receiver, Sender};
19 
20 use crate::devices::chip::ChipIdentifier;
21 use crate::devices::device::DeviceIdentifier;
22 use netsim_proto::stats::{
23     NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
24 };
25 
26 use std::sync::{Arc, Mutex};
27 
28 #[derive(Clone, Debug, Default)]
29 pub struct DeviceAdded {
30     pub id: DeviceIdentifier,
31     pub name: String,
32     pub builtin: bool,
33     pub device_stats: ProtoDeviceStats,
34 }
35 
36 #[derive(Clone, Debug, Default)]
37 pub struct DeviceRemoved {
38     pub id: DeviceIdentifier,
39     pub name: String,
40     pub builtin: bool,
41 }
42 
43 #[derive(Clone, Debug, Default)]
44 pub struct DevicePatched {
45     pub id: DeviceIdentifier,
46     pub name: String,
47 }
48 
49 #[derive(Clone, Debug, Default)]
50 pub struct ChipAdded {
51     pub chip_id: ChipIdentifier,
52     pub chip_kind: ChipKind,
53     pub device_name: String,
54     pub builtin: bool,
55 }
56 
57 #[derive(Clone, Debug, Default)]
58 pub struct ChipRemoved {
59     pub chip_id: ChipIdentifier,
60     pub device_id: DeviceIdentifier,
61     pub remaining_nonbuiltin_devices: usize,
62     pub radio_stats: Vec<ProtoRadioStats>,
63 }
64 
65 #[derive(Clone, Debug, Default)]
66 pub struct ShutDown {
67     pub reason: String,
68 }
69 
70 /// Event messages shared across various components in a loosely
71 /// coupled manner.
72 #[derive(Clone, Debug)]
73 pub enum Event {
74     DeviceAdded(DeviceAdded),
75     DeviceRemoved(DeviceRemoved),
76     DevicePatched(DevicePatched),
77     DeviceReset,
78     ChipAdded(ChipAdded),
79     ChipRemoved(ChipRemoved),
80     ShutDown(ShutDown),
81 }
82 
83 /// A multi-producer, multi-consumer broadcast queue based on
84 /// `std::sync::mpsc`.
85 ///
86 /// Each Event message `published` is seen by all subscribers.
87 ///
88 /// Warning: invoke `subscribe()` before `publish()` or else messages
89 /// will be lost.
90 ///
91 pub struct Events {
92     // For each subscriber this module retrain the sender half and the
93     // subscriber reads events from the receiver half.
94     subscribers: Mutex<Vec<Sender<Event>>>,
95 }
96 
97 impl Events {
98     // Events held by multiple publishers and subscribers across
99     // threads so return an Arc type.
new() -> Arc<Events>100     pub fn new() -> Arc<Events> {
101         Arc::new(Self { subscribers: Mutex::new(Vec::new()) })
102     }
103 
104     // Creates a new asynchronous channel, returning the receiver
105     // half. All `Event` messages sent through `publish` will become
106     // available on the receiver in the same order as it was sent.
subscribe(&self) -> Receiver<Event>107     pub fn subscribe(&self) -> Receiver<Event> {
108         let (tx, rx) = channel::<Event>();
109         self.subscribers.lock().expect("failed to lock subscribers").push(tx);
110         rx
111     }
112 
113     // Attempts to send an Event on the events channel.
publish(&self, msg: Event)114     pub fn publish(&self, msg: Event) {
115         if self.subscribers.lock().expect("failed to lock subscribers").is_empty() {
116             log::warn!("No Subscribers to the event: {msg:?}");
117         } else {
118             // Any channel with a disconnected receiver will return an
119             // error and be removed by retain.
120             log::info!("{msg:?}");
121             self.subscribers
122                 .lock()
123                 .expect("failed to lock subscribers")
124                 .retain(|subscriber| subscriber.send(msg.clone()).is_ok())
125         }
126     }
127 }
128 
129 #[cfg(test)]
130 mod tests {
131     use super::Events;
132     use super::*;
133     use std::sync::Arc;
134     use std::thread;
135 
136     impl Events {
subscriber_count(&self) -> usize137         pub fn subscriber_count(&self) -> usize {
138             self.subscribers.lock().expect("events subscribers lock").len()
139         }
140     }
141 
142     #[test]
test_subscribe_and_publish()143     fn test_subscribe_and_publish() {
144         let events = Events::new();
145 
146         let events_clone = Arc::clone(&events);
147         let rx = events_clone.subscribe();
148         let handle = thread::spawn(move || match rx.recv() {
149             Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
150                 assert_eq!(id.0, 123);
151                 assert_eq!(name, "Device1");
152                 assert!(!builtin);
153                 assert_eq!(device_stats, ProtoDeviceStats::new());
154             }
155             _ => panic!("Unexpected event"),
156         });
157 
158         events.publish(Event::DeviceAdded(DeviceAdded {
159             id: DeviceIdentifier(123),
160             name: "Device1".into(),
161             builtin: false,
162             device_stats: ProtoDeviceStats::new(),
163         }));
164 
165         // Wait for the other thread to process the message.
166         handle.join().unwrap();
167     }
168 
169     #[test]
test_publish_to_multiple_subscribers()170     fn test_publish_to_multiple_subscribers() {
171         let events = Events::new();
172 
173         let num_subscribers = 10;
174         let mut handles = Vec::with_capacity(num_subscribers);
175         for _ in 0..num_subscribers {
176             let events_clone = Arc::clone(&events);
177             let rx = events_clone.subscribe();
178             let handle = thread::spawn(move || match rx.recv() {
179                 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
180                     assert_eq!(id.0, 123);
181                     assert_eq!(name, "Device1");
182                     assert!(!builtin);
183                     assert_eq!(device_stats, ProtoDeviceStats::new());
184                 }
185                 _ => panic!("Unexpected event"),
186             });
187             handles.push(handle);
188         }
189 
190         events.publish(Event::DeviceAdded(DeviceAdded {
191             id: DeviceIdentifier(123),
192             name: "Device1".into(),
193             builtin: false,
194             device_stats: ProtoDeviceStats::new(),
195         }));
196 
197         // Wait for the other threads to process the message.
198         for handle in handles {
199             handle.join().unwrap();
200         }
201     }
202 
203     #[test]
204     // Test the case where the subscriber half of the channel returned
205     // by subscribe() is dropped. We expect the subscriber to be auto
206     // removed when send() notices an error.
test_publish_to_dropped_subscriber()207     fn test_publish_to_dropped_subscriber() {
208         let events = Events::new();
209         let rx = events.subscribe();
210         assert_eq!(events.subscriber_count(), 1);
211         std::mem::drop(rx);
212         events.publish(Event::DeviceAdded(DeviceAdded {
213             id: DeviceIdentifier(123),
214             name: "Device1".into(),
215             builtin: false,
216             device_stats: ProtoDeviceStats::new(),
217         }));
218         assert_eq!(events.subscriber_count(), 0);
219     }
220 }
221