• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::BTreeMap;
6 use std::mem::drop;
7 use std::sync::Arc;
8 use std::sync::Weak;
9 use std::thread;
10 
11 use base::error;
12 use base::warn;
13 use base::AsRawDescriptor;
14 use base::Descriptor;
15 use base::Event;
16 use base::EventType;
17 use base::WaitContext;
18 use sync::Mutex;
19 
20 use super::error::Error;
21 use super::error::Result;
22 
23 /// A fail handle will do the clean up when we cannot recover from some error.
24 pub trait FailHandle: Send + Sync {
25     /// Fail the code.
fail(&self)26     fn fail(&self);
27     /// Returns true if already failed.
failed(&self) -> bool28     fn failed(&self) -> bool;
29 }
30 
31 impl FailHandle for Option<Arc<dyn FailHandle>> {
fail(&self)32     fn fail(&self) {
33         match self {
34             Some(handle) => handle.fail(),
35             None => error!("event loop trying to fail without a fail handle"),
36         }
37     }
38 
failed(&self) -> bool39     fn failed(&self) -> bool {
40         match self {
41             Some(handle) => handle.failed(),
42             None => false,
43         }
44     }
45 }
46 
47 /// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
48 /// event loop will invoke the mapped handler.
49 pub struct EventLoop {
50     fail_handle: Option<Arc<dyn FailHandle>>,
51     poll_ctx: Arc<WaitContext<Descriptor>>,
52     handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>,
53     stop_evt: Event,
54 }
55 
56 /// Interface for event handler.
57 pub trait EventHandler: Send + Sync {
on_event(&self) -> anyhow::Result<()>58     fn on_event(&self) -> anyhow::Result<()>;
59 }
60 
61 impl EventLoop {
62     /// Start an event loop. An optional fail handle could be passed to the event loop.
start( name: String, fail_handle: Option<Arc<dyn FailHandle>>, ) -> Result<(EventLoop, thread::JoinHandle<()>)>63     pub fn start(
64         name: String,
65         fail_handle: Option<Arc<dyn FailHandle>>,
66     ) -> Result<(EventLoop, thread::JoinHandle<()>)> {
67         let (self_stop_evt, stop_evt) = Event::new()
68             .and_then(|e| Ok((e.try_clone()?, e)))
69             .map_err(Error::CreateEvent)?;
70 
71         let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> =
72             Arc::new(Mutex::new(BTreeMap::new()));
73         let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
74             .and_then(|pc| {
75                 pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
76                     .and(Ok(pc))
77             })
78             .map_err(Error::CreateWaitContext)?;
79 
80         let poll_ctx = Arc::new(poll_ctx);
81         let event_loop = EventLoop {
82             fail_handle: fail_handle.clone(),
83             poll_ctx: poll_ctx.clone(),
84             handlers: fd_callbacks.clone(),
85             stop_evt: self_stop_evt,
86         };
87 
88         let handle = thread::Builder::new()
89             .name(name)
90             .spawn(move || {
91                 loop {
92                     if fail_handle.failed() {
93                         error!("xhci controller already failed, stopping event ring");
94                         return;
95                     }
96                     let events = match poll_ctx.wait() {
97                         Ok(events) => events,
98                         Err(e) => {
99                             error!("cannot wait on events {:?}", e);
100                             fail_handle.fail();
101                             return;
102                         }
103                     };
104                     for event in &events {
105                         let fd = event.token.as_raw_descriptor();
106                         if fd == stop_evt.as_raw_descriptor() {
107                             return;
108                         }
109 
110                         let mut locked = fd_callbacks.lock();
111                         let weak_handler = match locked.get(&Descriptor(fd)) {
112                             Some(cb) => cb.clone(),
113                             None => {
114                                 warn!("callback for fd {} already removed", fd);
115                                 continue;
116                             }
117                         };
118 
119                         // If the file descriptor is hung up, remove it after calling the handler
120                         // one final time.
121                         let mut remove = event.is_hungup;
122 
123                         if let Some(handler) = weak_handler.upgrade() {
124                             // Drop lock before triggering the event.
125                             drop(locked);
126                             if let Err(e) = handler.on_event() {
127                                 error!("removing event handler due to error: {:#}", e);
128                                 remove = true;
129                             }
130                             locked = fd_callbacks.lock();
131                         } else {
132                             // If the handler is already gone, we remove the fd.
133                             remove = true;
134                         }
135 
136                         if remove {
137                             let _ = poll_ctx.delete(&event.token);
138                             let _ = locked.remove(&Descriptor(fd));
139                         }
140                     }
141                 }
142             })
143             .map_err(Error::StartThread)?;
144 
145         Ok((event_loop, handle))
146     }
147 
148     /// Add a new event to event loop. The event handler will be invoked when `event` happens on
149     /// `descriptor`.
150     ///
151     /// If the same `descriptor` is added multiple times, the old handler will be replaced.
152     /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered,
153     /// the event will be removed.
add_event( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, handler: Weak<dyn EventHandler>, ) -> Result<()>154     pub fn add_event(
155         &self,
156         descriptor: &dyn AsRawDescriptor,
157         event_type: EventType,
158         handler: Weak<dyn EventHandler>,
159     ) -> Result<()> {
160         if self.fail_handle.failed() {
161             return Err(Error::EventLoopAlreadyFailed);
162         }
163         self.handlers
164             .lock()
165             .insert(Descriptor(descriptor.as_raw_descriptor()), handler);
166         // This might fail due to epoll syscall. Check epoll_ctl(2).
167         self.poll_ctx
168             .add_for_event(
169                 descriptor,
170                 event_type,
171                 Descriptor(descriptor.as_raw_descriptor()),
172             )
173             .map_err(Error::WaitContextAddDescriptor)
174     }
175 
176     /// Removes event for this `descriptor`. This function returns false if it fails.
177     ///
178     /// EventLoop does not guarantee all events for `descriptor` is handled.
remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>179     pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
180         if self.fail_handle.failed() {
181             return Err(Error::EventLoopAlreadyFailed);
182         }
183         // This might fail due to epoll syscall. Check epoll_ctl(2).
184         self.poll_ctx
185             .delete(descriptor)
186             .map_err(Error::WaitContextDeleteDescriptor)?;
187         self.handlers
188             .lock()
189             .remove(&Descriptor(descriptor.as_raw_descriptor()));
190         Ok(())
191     }
192 
193     /// Stops this event loop asynchronously. Previous events might not be handled.
stop(&self)194     pub fn stop(&self) {
195         match self.stop_evt.signal() {
196             Ok(_) => {}
197             Err(_) => {
198                 error!("fail to send event loop stop event, it might already stopped");
199             }
200         }
201     }
202 }
203 
204 #[cfg(test)]
205 mod tests {
206     use std::sync::Arc;
207     use std::sync::Condvar;
208     use std::sync::Mutex;
209 
210     use base::Event;
211 
212     use super::*;
213 
214     struct EventLoopTestHandler {
215         val: Mutex<u8>,
216         cvar: Condvar,
217         evt: Event,
218     }
219 
220     impl EventHandler for EventLoopTestHandler {
on_event(&self) -> anyhow::Result<()>221         fn on_event(&self) -> anyhow::Result<()> {
222             self.evt.wait().unwrap();
223             *self.val.lock().unwrap() += 1;
224             self.cvar.notify_one();
225             Ok(())
226         }
227     }
228 
229     #[test]
event_loop_test()230     fn event_loop_test() {
231         let (l, j) = EventLoop::start("test".to_string(), None).unwrap();
232         let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
233             Ok(v) => v,
234             Err(e) => {
235                 error!("failed creating Event pair: {:?}", e);
236                 return;
237             }
238         };
239         let h = Arc::new(EventLoopTestHandler {
240             val: Mutex::new(0),
241             cvar: Condvar::new(),
242             evt,
243         });
244         let t: Arc<dyn EventHandler> = h.clone();
245         l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
246             .unwrap();
247         self_evt.signal().unwrap();
248         {
249             let mut val = h.val.lock().unwrap();
250             while *val < 1 {
251                 val = h.cvar.wait(val).unwrap();
252             }
253         }
254         l.stop();
255         j.join().unwrap();
256         assert_eq!(*(h.val.lock().unwrap()), 1);
257     }
258 }
259