1 // Copyright 2018 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::BTreeMap; 6 use std::mem::drop; 7 use std::sync::Arc; 8 use std::sync::Weak; 9 use std::thread; 10 11 use base::error; 12 use base::warn; 13 use base::AsRawDescriptor; 14 use base::Descriptor; 15 use base::Event; 16 use base::EventType; 17 use base::WaitContext; 18 use sync::Mutex; 19 20 use super::error::Error; 21 use super::error::Result; 22 23 /// A fail handle will do the clean up when we cannot recover from some error. 24 pub trait FailHandle: Send + Sync { 25 /// Fail the code. fail(&self)26 fn fail(&self); 27 /// Returns true if already failed. failed(&self) -> bool28 fn failed(&self) -> bool; 29 } 30 31 impl FailHandle for Option<Arc<dyn FailHandle>> { fail(&self)32 fn fail(&self) { 33 match self { 34 Some(handle) => handle.fail(), 35 None => error!("event loop trying to fail without a fail handle"), 36 } 37 } 38 failed(&self) -> bool39 fn failed(&self) -> bool { 40 match self { 41 Some(handle) => handle.failed(), 42 None => false, 43 } 44 } 45 } 46 47 /// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, 48 /// event loop will invoke the mapped handler. 49 pub struct EventLoop { 50 fail_handle: Option<Arc<dyn FailHandle>>, 51 poll_ctx: Arc<WaitContext<Descriptor>>, 52 handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>, 53 stop_evt: Event, 54 } 55 56 /// Interface for event handler. 57 pub trait EventHandler: Send + Sync { on_event(&self) -> anyhow::Result<()>58 fn on_event(&self) -> anyhow::Result<()>; 59 } 60 61 impl EventLoop { 62 /// Start an event loop. An optional fail handle could be passed to the event loop. start( name: String, fail_handle: Option<Arc<dyn FailHandle>>, ) -> Result<(EventLoop, thread::JoinHandle<()>)>63 pub fn start( 64 name: String, 65 fail_handle: Option<Arc<dyn FailHandle>>, 66 ) -> Result<(EventLoop, thread::JoinHandle<()>)> { 67 let (self_stop_evt, stop_evt) = Event::new() 68 .and_then(|e| Ok((e.try_clone()?, e))) 69 .map_err(Error::CreateEvent)?; 70 71 let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> = 72 Arc::new(Mutex::new(BTreeMap::new())); 73 let poll_ctx: WaitContext<Descriptor> = WaitContext::new() 74 .and_then(|pc| { 75 pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor())) 76 .and(Ok(pc)) 77 }) 78 .map_err(Error::CreateWaitContext)?; 79 80 let poll_ctx = Arc::new(poll_ctx); 81 let event_loop = EventLoop { 82 fail_handle: fail_handle.clone(), 83 poll_ctx: poll_ctx.clone(), 84 handlers: fd_callbacks.clone(), 85 stop_evt: self_stop_evt, 86 }; 87 88 let handle = thread::Builder::new() 89 .name(name) 90 .spawn(move || { 91 loop { 92 if fail_handle.failed() { 93 error!("xhci controller already failed, stopping event ring"); 94 return; 95 } 96 let events = match poll_ctx.wait() { 97 Ok(events) => events, 98 Err(e) => { 99 error!("cannot wait on events {:?}", e); 100 fail_handle.fail(); 101 return; 102 } 103 }; 104 for event in &events { 105 let fd = event.token.as_raw_descriptor(); 106 if fd == stop_evt.as_raw_descriptor() { 107 return; 108 } 109 110 let mut locked = fd_callbacks.lock(); 111 let weak_handler = match locked.get(&Descriptor(fd)) { 112 Some(cb) => cb.clone(), 113 None => { 114 warn!("callback for fd {} already removed", fd); 115 continue; 116 } 117 }; 118 119 // If the file descriptor is hung up, remove it after calling the handler 120 // one final time. 121 let mut remove = event.is_hungup; 122 123 if let Some(handler) = weak_handler.upgrade() { 124 // Drop lock before triggering the event. 125 drop(locked); 126 if let Err(e) = handler.on_event() { 127 error!("removing event handler due to error: {:#}", e); 128 remove = true; 129 } 130 locked = fd_callbacks.lock(); 131 } else { 132 // If the handler is already gone, we remove the fd. 133 remove = true; 134 } 135 136 if remove { 137 let _ = poll_ctx.delete(&event.token); 138 let _ = locked.remove(&Descriptor(fd)); 139 } 140 } 141 } 142 }) 143 .map_err(Error::StartThread)?; 144 145 Ok((event_loop, handle)) 146 } 147 148 /// Add a new event to event loop. The event handler will be invoked when `event` happens on 149 /// `descriptor`. 150 /// 151 /// If the same `descriptor` is added multiple times, the old handler will be replaced. 152 /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered, 153 /// the event will be removed. add_event( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, handler: Weak<dyn EventHandler>, ) -> Result<()>154 pub fn add_event( 155 &self, 156 descriptor: &dyn AsRawDescriptor, 157 event_type: EventType, 158 handler: Weak<dyn EventHandler>, 159 ) -> Result<()> { 160 if self.fail_handle.failed() { 161 return Err(Error::EventLoopAlreadyFailed); 162 } 163 self.handlers 164 .lock() 165 .insert(Descriptor(descriptor.as_raw_descriptor()), handler); 166 // This might fail due to epoll syscall. Check epoll_ctl(2). 167 self.poll_ctx 168 .add_for_event( 169 descriptor, 170 event_type, 171 Descriptor(descriptor.as_raw_descriptor()), 172 ) 173 .map_err(Error::WaitContextAddDescriptor) 174 } 175 176 /// Removes event for this `descriptor`. This function returns false if it fails. 177 /// 178 /// EventLoop does not guarantee all events for `descriptor` is handled. remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>179 pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> { 180 if self.fail_handle.failed() { 181 return Err(Error::EventLoopAlreadyFailed); 182 } 183 // This might fail due to epoll syscall. Check epoll_ctl(2). 184 self.poll_ctx 185 .delete(descriptor) 186 .map_err(Error::WaitContextDeleteDescriptor)?; 187 self.handlers 188 .lock() 189 .remove(&Descriptor(descriptor.as_raw_descriptor())); 190 Ok(()) 191 } 192 193 /// Stops this event loop asynchronously. Previous events might not be handled. stop(&self)194 pub fn stop(&self) { 195 match self.stop_evt.signal() { 196 Ok(_) => {} 197 Err(_) => { 198 error!("fail to send event loop stop event, it might already stopped"); 199 } 200 } 201 } 202 } 203 204 #[cfg(test)] 205 mod tests { 206 use std::sync::Arc; 207 use std::sync::Condvar; 208 use std::sync::Mutex; 209 210 use base::Event; 211 212 use super::*; 213 214 struct EventLoopTestHandler { 215 val: Mutex<u8>, 216 cvar: Condvar, 217 evt: Event, 218 } 219 220 impl EventHandler for EventLoopTestHandler { on_event(&self) -> anyhow::Result<()>221 fn on_event(&self) -> anyhow::Result<()> { 222 self.evt.wait().unwrap(); 223 *self.val.lock().unwrap() += 1; 224 self.cvar.notify_one(); 225 Ok(()) 226 } 227 } 228 229 #[test] event_loop_test()230 fn event_loop_test() { 231 let (l, j) = EventLoop::start("test".to_string(), None).unwrap(); 232 let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) { 233 Ok(v) => v, 234 Err(e) => { 235 error!("failed creating Event pair: {:?}", e); 236 return; 237 } 238 }; 239 let h = Arc::new(EventLoopTestHandler { 240 val: Mutex::new(0), 241 cvar: Condvar::new(), 242 evt, 243 }); 244 let t: Arc<dyn EventHandler> = h.clone(); 245 l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t)) 246 .unwrap(); 247 self_evt.signal().unwrap(); 248 { 249 let mut val = h.val.lock().unwrap(); 250 while *val < 1 { 251 val = h.cvar.wait(val).unwrap(); 252 } 253 } 254 l.stop(); 255 j.join().unwrap(); 256 assert_eq!(*(h.val.lock().unwrap()), 1); 257 } 258 } 259