1# Copyright 2016 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16import queue 17import re 18import threading 19import time 20import traceback 21 22 23class EventDispatcherError(Exception): 24 pass 25 26 27class IllegalStateError(EventDispatcherError): 28 """Raise when user tries to put event_dispatcher into an illegal state. 29 """ 30 31 32class DuplicateError(EventDispatcherError): 33 """Raise when a duplicate is being created and it shouldn't. 34 """ 35 36 37class EventDispatcher: 38 """Class managing events for an sl4a connection. 39 """ 40 41 DEFAULT_TIMEOUT = 60 42 43 def __init__(self, sl4a): 44 self._sl4a = sl4a 45 self.started = False 46 self.executor = None 47 self.poller = None 48 self.event_dict = {} 49 self.handlers = {} 50 self.lock = threading.RLock() 51 52 def poll_events(self): 53 """Continuously polls all types of events from sl4a. 54 55 Events are sorted by name and store in separate queues. 56 If there are registered handlers, the handlers will be called with 57 corresponding event immediately upon event discovery, and the event 58 won't be stored. If exceptions occur, stop the dispatcher and return 59 """ 60 while self.started: 61 event_obj = None 62 event_name = None 63 try: 64 event_obj = self._sl4a.eventWait(50000) 65 except Exception: 66 if self.started: 67 print("Exception happened during polling.") 68 print(traceback.format_exc()) 69 raise 70 if not event_obj: 71 continue 72 elif 'name' not in event_obj: 73 print("Received Malformed event {}".format(event_obj)) 74 continue 75 else: 76 event_name = event_obj['name'] 77 # if handler registered, process event 78 if event_name in self.handlers: 79 self.handle_subscribed_event(event_obj, event_name) 80 if event_name == "EventDispatcherShutdown": 81 self._sl4a.closeSl4aSession() 82 break 83 else: 84 self.lock.acquire() 85 if event_name in self.event_dict: # otherwise, cache event 86 self.event_dict[event_name].put(event_obj) 87 else: 88 q = queue.Queue() 89 q.put(event_obj) 90 self.event_dict[event_name] = q 91 self.lock.release() 92 93 def register_handler(self, handler, event_name, args): 94 """Registers an event handler. 95 96 One type of event can only have one event handler associated with it. 97 98 Args: 99 handler: The event handler function to be registered. 100 event_name: Name of the event the handler is for. 101 args: User arguments to be passed to the handler when it's called. 102 103 Raises: 104 IllegalStateError: Raised if attempts to register a handler after 105 the dispatcher starts running. 106 DuplicateError: Raised if attempts to register more than one 107 handler for one type of event. 108 """ 109 if self.started: 110 raise IllegalStateError(("Can't register service after polling is" 111 " started")) 112 self.lock.acquire() 113 try: 114 if event_name in self.handlers: 115 raise DuplicateError( 116 'A handler for {} already exists'.format(event_name)) 117 self.handlers[event_name] = (handler, args) 118 finally: 119 self.lock.release() 120 121 def start(self): 122 """Starts the event dispatcher. 123 124 Initiates executor and start polling events. 125 126 Raises: 127 IllegalStateError: Can't start a dispatcher again when it's already 128 running. 129 """ 130 if not self.started: 131 self.started = True 132 self.executor = futures.ThreadPoolExecutor(max_workers=32) 133 self.poller = self.executor.submit(self.poll_events) 134 else: 135 raise IllegalStateError("Dispatcher is already started.") 136 137 def clean_up(self): 138 """Clean up and release resources after the event dispatcher polling 139 loop has been broken. 140 141 The following things happen: 142 1. Clear all events and flags. 143 2. Close the sl4a client the event_dispatcher object holds. 144 3. Shut down executor without waiting. 145 """ 146 if not self.started: 147 return 148 self.started = False 149 self.clear_all_events() 150 # At this point, the sl4a apk is destroyed and nothing is listening on 151 # the socket. Avoid sending any sl4a commands; just clean up the socket 152 # and return. 153 self._sl4a.disconnect() 154 self.poller.set_result("Done") 155 # The polling thread is guaranteed to finish after a max of 60 seconds, 156 # so we don't wait here. 157 self.executor.shutdown(wait=False) 158 159 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 160 """Pop an event from its queue. 161 162 Return and remove the oldest entry of an event. 163 Block until an event of specified name is available or 164 times out if timeout is set. 165 166 Args: 167 event_name: Name of the event to be popped. 168 timeout: Number of seconds to wait when event is not present. 169 Never times out if None. 170 171 Returns: 172 The oldest entry of the specified event. None if timed out. 173 174 Raises: 175 IllegalStateError: Raised if pop is called before the dispatcher 176 starts polling. 177 """ 178 if not self.started: 179 raise IllegalStateError("Dispatcher needs to be started before popping.") 180 181 e_queue = self.get_event_q(event_name) 182 183 if not e_queue: 184 raise TypeError("Failed to get an event queue for {}".format(event_name)) 185 186 try: 187 # Block for timeout 188 if timeout: 189 return e_queue.get(True, timeout) 190 # Non-blocking poll for event 191 elif timeout == 0: 192 return e_queue.get(False) 193 else: 194 # Block forever on event wait 195 return e_queue.get(True) 196 except queue.Empty: 197 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 198 timeout, event_name)) 199 200 def wait_for_event(self, 201 event_name, 202 predicate, 203 timeout=DEFAULT_TIMEOUT, 204 *args, 205 **kwargs): 206 """Wait for an event that satisfies a predicate to appear. 207 208 Continuously pop events of a particular name and check against the 209 predicate until an event that satisfies the predicate is popped or 210 timed out. Note this will remove all the events of the same name that 211 do not satisfy the predicate in the process. 212 213 Args: 214 event_name: Name of the event to be popped. 215 predicate: A function that takes an event and returns True if the 216 predicate is satisfied, False otherwise. 217 timeout: Number of seconds to wait. 218 *args: Optional positional args passed to predicate(). 219 **kwargs: Optional keyword args passed to predicate(). 220 221 Returns: 222 The event that satisfies the predicate. 223 224 Raises: 225 queue.Empty: Raised if no event that satisfies the predicate was 226 found before time out. 227 """ 228 deadline = time.perf_counter() + timeout 229 230 while True: 231 event = None 232 try: 233 event = self.pop_event(event_name, 1) 234 except queue.Empty: 235 pass 236 237 if event and predicate(event, *args, **kwargs): 238 return event 239 240 if time.perf_counter() > deadline: 241 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 242 timeout, event_name)) 243 244 def pop_events(self, regex_pattern, timeout): 245 """Pop events whose names match a regex pattern. 246 247 If such event(s) exist, pop one event from each event queue that 248 satisfies the condition. Otherwise, wait for an event that satisfies 249 the condition to occur, with timeout. 250 251 Results are sorted by timestamp in ascending order. 252 253 Args: 254 regex_pattern: The regular expression pattern that an event name 255 should match in order to be popped. 256 timeout: Number of seconds to wait for events in case no event 257 matching the condition exits when the function is called. 258 259 Returns: 260 Events whose names match a regex pattern. 261 Empty if none exist and the wait timed out. 262 263 Raises: 264 IllegalStateError: Raised if pop is called before the dispatcher 265 starts polling. 266 queue.Empty: Raised if no event was found before time out. 267 """ 268 if not self.started: 269 raise IllegalStateError("Dispatcher needs to be started before popping.") 270 deadline = time.perf_counter() + timeout 271 while True: 272 # TODO: fix the sleep loop 273 results = self._match_and_pop(regex_pattern) 274 if len(results) != 0 or time.perf_counter() > deadline: 275 break 276 time.sleep(1) 277 if len(results) == 0: 278 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 279 timeout, regex_pattern)) 280 281 return sorted(results, key=lambda event: event['time']) 282 283 def _match_and_pop(self, regex_pattern): 284 """Pop one event from each of the event queues whose names 285 match (in a sense of regular expression) regex_pattern. 286 """ 287 results = [] 288 self.lock.acquire() 289 for name in self.event_dict.keys(): 290 if re.match(regex_pattern, name): 291 q = self.event_dict[name] 292 if q: 293 try: 294 results.append(q.get(False)) 295 except Exception: 296 pass 297 self.lock.release() 298 return results 299 300 def get_event_q(self, event_name): 301 """Obtain the queue storing events of the specified name. 302 303 If no event of this name has been polled, wait for one to. 304 305 Returns: 306 A queue storing all the events of the specified name. 307 None if timed out. 308 309 Raises: 310 queue.Empty: Raised if the queue does not exist and timeout has 311 passed. 312 """ 313 self.lock.acquire() 314 if event_name not in self.event_dict or self.event_dict[event_name] is None: 315 self.event_dict[event_name] = queue.Queue() 316 self.lock.release() 317 318 event_queue = self.event_dict[event_name] 319 return event_queue 320 321 def handle_subscribed_event(self, event_obj, event_name): 322 """Execute the registered handler of an event. 323 324 Retrieve the handler and its arguments, and execute the handler in a 325 new thread. 326 327 Args: 328 event_obj: Json object of the event. 329 event_name: Name of the event to call handler for. 330 """ 331 handler, args = self.handlers[event_name] 332 self.executor.submit(handler, event_obj, *args) 333 334 def _handle(self, event_handler, event_name, user_args, event_timeout, cond, 335 cond_timeout): 336 """Pop an event of specified type and calls its handler on it. If 337 condition is not None, block until condition is met or timeout. 338 """ 339 if cond: 340 cond.wait(cond_timeout) 341 event = self.pop_event(event_name, event_timeout) 342 return event_handler(event, *user_args) 343 344 def handle_event(self, 345 event_handler, 346 event_name, 347 user_args, 348 event_timeout=None, 349 cond=None, 350 cond_timeout=None): 351 """Handle events that don't have registered handlers 352 353 In a new thread, poll one event of specified type from its queue and 354 execute its handler. If no such event exists, the thread waits until 355 one appears. 356 357 Args: 358 event_handler: Handler for the event, which should take at least 359 one argument - the event json object. 360 event_name: Name of the event to be handled. 361 user_args: User arguments for the handler; to be passed in after 362 the event json. 363 event_timeout: Number of seconds to wait for the event to come. 364 cond: A condition to wait on before executing the handler. Should 365 be a threading.Event object. 366 cond_timeout: Number of seconds to wait before the condition times 367 out. Never times out if None. 368 369 Returns: 370 A concurrent.Future object associated with the handler. 371 If blocking call worker.result() is triggered, the handler 372 needs to return something to unblock. 373 """ 374 worker = self.executor.submit(self._handle, event_handler, event_name, 375 user_args, event_timeout, cond, cond_timeout) 376 return worker 377 378 def pop_all(self, event_name): 379 """Return and remove all stored events of a specified name. 380 381 Pops all events from their queue. May miss the latest ones. 382 If no event is available, return immediately. 383 384 Args: 385 event_name: Name of the events to be popped. 386 387 Returns: 388 List of the desired events. 389 390 Raises: 391 IllegalStateError: Raised if pop is called before the dispatcher 392 starts polling. 393 """ 394 if not self.started: 395 raise IllegalStateError(("Dispatcher needs to be started before " 396 "popping.")) 397 results = [] 398 try: 399 self.lock.acquire() 400 while True: 401 e = self.event_dict[event_name].get(block=False) 402 results.append(e) 403 except (queue.Empty, KeyError): 404 return results 405 finally: 406 self.lock.release() 407 408 def clear_events(self, event_name): 409 """Clear all events of a particular name. 410 411 Args: 412 event_name: Name of the events to be popped. 413 """ 414 self.lock.acquire() 415 try: 416 q = self.get_event_q(event_name) 417 q.queue.clear() 418 except queue.Empty: 419 return 420 finally: 421 self.lock.release() 422 423 def clear_all_events(self): 424 """Clear all event queues and their cached events.""" 425 self.lock.acquire() 426 self.event_dict.clear() 427 self.lock.release() 428