1 // Copyright 2023 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //! A module with mpmc channels for distributing global events. 16 17 use netsim_proto::common::ChipKind; 18 use std::sync::mpsc::{channel, Receiver, Sender}; 19 20 use crate::devices::chip::ChipIdentifier; 21 use crate::devices::device::DeviceIdentifier; 22 use netsim_proto::stats::{ 23 NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats, 24 }; 25 26 use std::sync::{Arc, Mutex}; 27 28 #[derive(Clone, Debug, Default)] 29 pub struct DeviceAdded { 30 pub id: DeviceIdentifier, 31 pub name: String, 32 pub builtin: bool, 33 pub device_stats: ProtoDeviceStats, 34 } 35 36 #[derive(Clone, Debug, Default)] 37 pub struct DeviceRemoved { 38 pub id: DeviceIdentifier, 39 pub name: String, 40 pub builtin: bool, 41 } 42 43 #[derive(Clone, Debug, Default)] 44 pub struct DevicePatched { 45 pub id: DeviceIdentifier, 46 pub name: String, 47 } 48 49 #[derive(Clone, Debug, Default)] 50 pub struct ChipAdded { 51 pub chip_id: ChipIdentifier, 52 pub chip_kind: ChipKind, 53 pub device_name: String, 54 pub builtin: bool, 55 } 56 57 #[derive(Clone, Debug, Default)] 58 pub struct ChipRemoved { 59 pub chip_id: ChipIdentifier, 60 pub device_id: DeviceIdentifier, 61 pub remaining_nonbuiltin_devices: usize, 62 pub radio_stats: Vec<ProtoRadioStats>, 63 } 64 65 #[derive(Clone, Debug, Default)] 66 pub struct ShutDown { 67 pub reason: String, 68 } 69 70 /// Event messages shared across various components in a loosely 71 /// coupled manner. 72 #[derive(Clone, Debug)] 73 pub enum Event { 74 DeviceAdded(DeviceAdded), 75 DeviceRemoved(DeviceRemoved), 76 DevicePatched(DevicePatched), 77 DeviceReset, 78 ChipAdded(ChipAdded), 79 ChipRemoved(ChipRemoved), 80 ShutDown(ShutDown), 81 } 82 83 /// A multi-producer, multi-consumer broadcast queue based on 84 /// `std::sync::mpsc`. 85 /// 86 /// Each Event message `published` is seen by all subscribers. 87 /// 88 /// Warning: invoke `subscribe()` before `publish()` or else messages 89 /// will be lost. 90 /// 91 pub struct Events { 92 // For each subscriber this module retrain the sender half and the 93 // subscriber reads events from the receiver half. 94 subscribers: Mutex<Vec<Sender<Event>>>, 95 } 96 97 impl Events { 98 // Events held by multiple publishers and subscribers across 99 // threads so return an Arc type. new() -> Arc<Events>100 pub fn new() -> Arc<Events> { 101 Arc::new(Self { subscribers: Mutex::new(Vec::new()) }) 102 } 103 104 // Creates a new asynchronous channel, returning the receiver 105 // half. All `Event` messages sent through `publish` will become 106 // available on the receiver in the same order as it was sent. subscribe(&self) -> Receiver<Event>107 pub fn subscribe(&self) -> Receiver<Event> { 108 let (tx, rx) = channel::<Event>(); 109 self.subscribers.lock().expect("failed to lock subscribers").push(tx); 110 rx 111 } 112 113 // Attempts to send an Event on the events channel. publish(&self, msg: Event)114 pub fn publish(&self, msg: Event) { 115 if self.subscribers.lock().expect("failed to lock subscribers").is_empty() { 116 log::warn!("No Subscribers to the event: {msg:?}"); 117 } else { 118 // Any channel with a disconnected receiver will return an 119 // error and be removed by retain. 120 log::info!("{msg:?}"); 121 self.subscribers 122 .lock() 123 .expect("failed to lock subscribers") 124 .retain(|subscriber| subscriber.send(msg.clone()).is_ok()) 125 } 126 } 127 } 128 129 #[cfg(test)] 130 mod tests { 131 use super::Events; 132 use super::*; 133 use std::sync::Arc; 134 use std::thread; 135 136 impl Events { subscriber_count(&self) -> usize137 pub fn subscriber_count(&self) -> usize { 138 self.subscribers.lock().expect("events subscribers lock").len() 139 } 140 } 141 142 #[test] test_subscribe_and_publish()143 fn test_subscribe_and_publish() { 144 let events = Events::new(); 145 146 let events_clone = Arc::clone(&events); 147 let rx = events_clone.subscribe(); 148 let handle = thread::spawn(move || match rx.recv() { 149 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => { 150 assert_eq!(id.0, 123); 151 assert_eq!(name, "Device1"); 152 assert!(!builtin); 153 assert_eq!(device_stats, ProtoDeviceStats::new()); 154 } 155 _ => panic!("Unexpected event"), 156 }); 157 158 events.publish(Event::DeviceAdded(DeviceAdded { 159 id: DeviceIdentifier(123), 160 name: "Device1".into(), 161 builtin: false, 162 device_stats: ProtoDeviceStats::new(), 163 })); 164 165 // Wait for the other thread to process the message. 166 handle.join().unwrap(); 167 } 168 169 #[test] test_publish_to_multiple_subscribers()170 fn test_publish_to_multiple_subscribers() { 171 let events = Events::new(); 172 173 let num_subscribers = 10; 174 let mut handles = Vec::with_capacity(num_subscribers); 175 for _ in 0..num_subscribers { 176 let events_clone = Arc::clone(&events); 177 let rx = events_clone.subscribe(); 178 let handle = thread::spawn(move || match rx.recv() { 179 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => { 180 assert_eq!(id.0, 123); 181 assert_eq!(name, "Device1"); 182 assert!(!builtin); 183 assert_eq!(device_stats, ProtoDeviceStats::new()); 184 } 185 _ => panic!("Unexpected event"), 186 }); 187 handles.push(handle); 188 } 189 190 events.publish(Event::DeviceAdded(DeviceAdded { 191 id: DeviceIdentifier(123), 192 name: "Device1".into(), 193 builtin: false, 194 device_stats: ProtoDeviceStats::new(), 195 })); 196 197 // Wait for the other threads to process the message. 198 for handle in handles { 199 handle.join().unwrap(); 200 } 201 } 202 203 #[test] 204 // Test the case where the subscriber half of the channel returned 205 // by subscribe() is dropped. We expect the subscriber to be auto 206 // removed when send() notices an error. test_publish_to_dropped_subscriber()207 fn test_publish_to_dropped_subscriber() { 208 let events = Events::new(); 209 let rx = events.subscribe(); 210 assert_eq!(events.subscriber_count(), 1); 211 std::mem::drop(rx); 212 events.publish(Event::DeviceAdded(DeviceAdded { 213 id: DeviceIdentifier(123), 214 name: "Device1".into(), 215 builtin: false, 216 device_stats: ProtoDeviceStats::new(), 217 })); 218 assert_eq!(events.subscriber_count(), 0); 219 } 220 } 221