1#!/usr/bin/env python3 2# 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from concurrent.futures import ThreadPoolExecutor 18import queue 19import re 20import threading 21import time 22 23from acts import logger 24from acts.controllers.sl4a_lib import rpc_client 25 26 27class EventDispatcherError(Exception): 28 """The base class for all EventDispatcher exceptions.""" 29 30 31class IllegalStateError(EventDispatcherError): 32 """Raise when user tries to put event_dispatcher into an illegal state.""" 33 34 35class DuplicateError(EventDispatcherError): 36 """Raise when two event handlers have been assigned to an event name.""" 37 38 39class EventDispatcher: 40 """A class for managing the events for an SL4A Session. 41 42 Attributes: 43 _serial: The serial of the device. 44 _rpc_client: The rpc client for that session. 45 _started: A bool that holds whether or not the event dispatcher is 46 running. 47 _executor: The thread pool executor for running event handlers and 48 polling. 49 _event_dict: A dictionary of str eventName = Queue<Event> eventQueue 50 _handlers: A dictionary of str eventName => (lambda, args) handler 51 _lock: A lock that prevents multiple reads/writes to the event queues. 52 log: The EventDispatcher's logger. 53 """ 54 55 DEFAULT_TIMEOUT = 60 56 57 def __init__(self, serial, rpc_client): 58 self._serial = serial 59 self._rpc_client = rpc_client 60 self._started = False 61 self._executor = None 62 self._event_dict = {} 63 self._handlers = {} 64 self._lock = threading.RLock() 65 66 def _log_formatter(message): 67 """Defines the formatting used in the logger.""" 68 return '[E Dispatcher|%s|%s] %s' % (self._serial, 69 self._rpc_client.uid, message) 70 71 self.log = logger.create_logger(_log_formatter) 72 73 def poll_events(self): 74 """Continuously polls all types of events from sl4a. 75 76 Events are sorted by name and store in separate queues. 77 If there are registered handlers, the handlers will be called with 78 corresponding event immediately upon event discovery, and the event 79 won't be stored. If exceptions occur, stop the dispatcher and return 80 """ 81 while self._started: 82 try: 83 # 60000 in ms, timeout in second 84 event_obj = self._rpc_client.eventWait(60000, timeout=120) 85 except rpc_client.Sl4aConnectionError as e: 86 if self._rpc_client.is_alive: 87 self.log.warning('Closing due to closed session.') 88 break 89 else: 90 self.log.warning('Closing due to error: %s.' % e) 91 self.close() 92 raise e 93 if not event_obj: 94 continue 95 elif 'name' not in event_obj: 96 self.log.error('Received Malformed event {}'.format(event_obj)) 97 continue 98 else: 99 event_name = event_obj['name'] 100 # if handler registered, process event 101 if event_name == 'EventDispatcherShutdown': 102 self.log.debug('Received shutdown signal.') 103 # closeSl4aSession has been called, which closes the event 104 # dispatcher. Stop execution on this polling thread. 105 return 106 if event_name in self._handlers: 107 self.log.debug( 108 'Using handler %s for event: %r' % 109 (self._handlers[event_name].__name__, event_obj)) 110 self.handle_subscribed_event(event_obj, event_name) 111 else: 112 self.log.debug('Queuing event: %r' % event_obj) 113 self._lock.acquire() 114 if event_name in self._event_dict: # otherwise, cache event 115 self._event_dict[event_name].put(event_obj) 116 else: 117 q = queue.Queue() 118 q.put(event_obj) 119 self._event_dict[event_name] = q 120 self._lock.release() 121 122 def register_handler(self, handler, event_name, args): 123 """Registers an event handler. 124 125 One type of event can only have one event handler associated with it. 126 127 Args: 128 handler: The event handler function to be registered. 129 event_name: Name of the event the handler is for. 130 args: User arguments to be passed to the handler when it's called. 131 132 Raises: 133 IllegalStateError: Raised if attempts to register a handler after 134 the dispatcher starts running. 135 DuplicateError: Raised if attempts to register more than one 136 handler for one type of event. 137 """ 138 if self._started: 139 raise IllegalStateError('Cannot register service after polling is ' 140 'started.') 141 self._lock.acquire() 142 try: 143 if event_name in self._handlers: 144 raise DuplicateError( 145 'A handler for {} already exists'.format(event_name)) 146 self._handlers[event_name] = (handler, args) 147 finally: 148 self._lock.release() 149 150 def start(self): 151 """Starts the event dispatcher. 152 153 Initiates executor and start polling events. 154 155 Raises: 156 IllegalStateError: Can't start a dispatcher again when it's already 157 running. 158 """ 159 if not self._started: 160 self._started = True 161 self._executor = ThreadPoolExecutor(max_workers=32) 162 self._executor.submit(self.poll_events) 163 else: 164 raise IllegalStateError("Dispatcher is already started.") 165 166 def close(self): 167 """Clean up and release resources. 168 169 This function should only be called after a 170 rpc_client.closeSl4aSession() call. 171 """ 172 if not self._started: 173 return 174 self._started = False 175 self._executor.shutdown(wait=True) 176 self.clear_all_events() 177 178 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 179 """Pop an event from its queue. 180 181 Return and remove the oldest entry of an event. 182 Block until an event of specified name is available or 183 times out if timeout is set. 184 185 Args: 186 event_name: Name of the event to be popped. 187 timeout: Number of seconds to wait when event is not present. 188 Never times out if None. 189 190 Returns: 191 event: The oldest entry of the specified event. None if timed out. 192 193 Raises: 194 IllegalStateError: Raised if pop is called before the dispatcher 195 starts polling. 196 """ 197 if not self._started: 198 raise IllegalStateError( 199 'Dispatcher needs to be started before popping.') 200 201 e_queue = self.get_event_q(event_name) 202 203 if not e_queue: 204 raise IllegalStateError( 205 'Failed to get an event queue for {}'.format(event_name)) 206 207 try: 208 # Block for timeout 209 if timeout: 210 return e_queue.get(True, timeout) 211 # Non-blocking poll for event 212 elif timeout == 0: 213 return e_queue.get(False) 214 else: 215 # Block forever on event wait 216 return e_queue.get(True) 217 except queue.Empty: 218 msg = 'Timeout after {}s waiting for event: {}'.format( 219 timeout, event_name) 220 self.log.info(msg) 221 raise queue.Empty(msg) 222 223 def wait_for_event(self, 224 event_name, 225 predicate, 226 timeout=DEFAULT_TIMEOUT, 227 *args, 228 **kwargs): 229 """Wait for an event that satisfies a predicate to appear. 230 231 Continuously pop events of a particular name and check against the 232 predicate until an event that satisfies the predicate is popped or 233 timed out. Note this will remove all the events of the same name that 234 do not satisfy the predicate in the process. 235 236 Args: 237 event_name: Name of the event to be popped. 238 predicate: A function that takes an event and returns True if the 239 predicate is satisfied, False otherwise. 240 timeout: Number of seconds to wait. 241 *args: Optional positional args passed to predicate(). 242 **kwargs: Optional keyword args passed to predicate(). 243 consume_ignored_events: Whether or not to consume events while 244 searching for the desired event. Defaults to True if unset. 245 246 Returns: 247 The event that satisfies the predicate. 248 249 Raises: 250 queue.Empty: Raised if no event that satisfies the predicate was 251 found before time out. 252 """ 253 deadline = time.time() + timeout 254 ignored_events = [] 255 consume_events = kwargs.pop('consume_ignored_events', True) 256 while True: 257 event = None 258 try: 259 event = self.pop_event(event_name, 1) 260 if consume_events: 261 self.log.debug('Consuming event: %r' % event) 262 else: 263 self.log.debug('Peeking at event: %r' % event) 264 ignored_events.append(event) 265 except queue.Empty: 266 pass 267 268 if event and predicate(event, *args, **kwargs): 269 for ignored_event in ignored_events: 270 self.get_event_q(event_name).put(ignored_event) 271 self.log.debug('Matched event: %r with %s' % 272 (event, predicate.__name__)) 273 return event 274 275 if time.time() > deadline: 276 for ignored_event in ignored_events: 277 self.get_event_q(event_name).put(ignored_event) 278 msg = 'Timeout after {}s waiting for event: {}'.format( 279 timeout, event_name) 280 self.log.info(msg) 281 raise queue.Empty(msg) 282 283 def pop_events(self, regex_pattern, timeout, freq=1): 284 """Pop events whose names match a regex pattern. 285 286 If such event(s) exist, pop one event from each event queue that 287 satisfies the condition. Otherwise, wait for an event that satisfies 288 the condition to occur, with timeout. 289 290 Results are sorted by timestamp in ascending order. 291 292 Args: 293 regex_pattern: The regular expression pattern that an event name 294 should match in order to be popped. 295 timeout: Number of seconds to wait for events in case no event 296 matching the condition exits when the function is called. 297 298 Returns: 299 results: Pop events whose names match a regex pattern. 300 Empty if none exist and the wait timed out. 301 302 Raises: 303 IllegalStateError: Raised if pop is called before the dispatcher 304 starts polling. 305 queue.Empty: Raised if no event was found before time out. 306 """ 307 if not self._started: 308 raise IllegalStateError( 309 "Dispatcher needs to be started before popping.") 310 deadline = time.time() + timeout 311 while True: 312 # TODO: fix the sleep loop 313 results = self._match_and_pop(regex_pattern) 314 if len(results) != 0 or time.time() > deadline: 315 break 316 time.sleep(freq) 317 if len(results) == 0: 318 msg = 'Timeout after {}s waiting for event: {}'.format( 319 timeout, regex_pattern) 320 self.log.error(msg) 321 raise queue.Empty(msg) 322 323 return sorted(results, key=lambda event: event['time']) 324 325 def _match_and_pop(self, regex_pattern): 326 """Pop one event from each of the event queues whose names 327 match (in a sense of regular expression) regex_pattern. 328 """ 329 results = [] 330 self._lock.acquire() 331 for name in self._event_dict.keys(): 332 if re.match(regex_pattern, name): 333 q = self._event_dict[name] 334 if q: 335 try: 336 results.append(q.get(False)) 337 except queue.Empty: 338 pass 339 self._lock.release() 340 return results 341 342 def get_event_q(self, event_name): 343 """Obtain the queue storing events of the specified name. 344 345 If no event of this name has been polled, wait for one to. 346 347 Returns: A queue storing all the events of the specified name. 348 """ 349 self._lock.acquire() 350 if (event_name not in self._event_dict 351 or self._event_dict[event_name] is None): 352 self._event_dict[event_name] = queue.Queue() 353 self._lock.release() 354 355 event_queue = self._event_dict[event_name] 356 return event_queue 357 358 def handle_subscribed_event(self, event_obj, event_name): 359 """Execute the registered handler of an event. 360 361 Retrieve the handler and its arguments, and execute the handler in a 362 new thread. 363 364 Args: 365 event_obj: Json object of the event. 366 event_name: Name of the event to call handler for. 367 """ 368 handler, args = self._handlers[event_name] 369 self._executor.submit(handler, event_obj, *args) 370 371 def _handle(self, event_handler, event_name, user_args, event_timeout, 372 cond, cond_timeout): 373 """Pop an event of specified type and calls its handler on it. If 374 condition is not None, block until condition is met or timeout. 375 """ 376 if cond: 377 cond.wait(cond_timeout) 378 event = self.pop_event(event_name, event_timeout) 379 return event_handler(event, *user_args) 380 381 def handle_event(self, 382 event_handler, 383 event_name, 384 user_args, 385 event_timeout=None, 386 cond=None, 387 cond_timeout=None): 388 """Handle events that don't have registered handlers 389 390 In a new thread, poll one event of specified type from its queue and 391 execute its handler. If no such event exists, the thread waits until 392 one appears. 393 394 Args: 395 event_handler: Handler for the event, which should take at least 396 one argument - the event json object. 397 event_name: Name of the event to be handled. 398 user_args: User arguments for the handler; to be passed in after 399 the event json. 400 event_timeout: Number of seconds to wait for the event to come. 401 cond: A condition to wait on before executing the handler. Should 402 be a threading.Event object. 403 cond_timeout: Number of seconds to wait before the condition times 404 out. Never times out if None. 405 406 Returns: 407 worker: A concurrent.Future object associated with the handler. 408 If blocking call worker.result() is triggered, the handler 409 needs to return something to unblock. 410 """ 411 worker = self._executor.submit(self._handle, event_handler, event_name, 412 user_args, event_timeout, cond, 413 cond_timeout) 414 return worker 415 416 def pop_all(self, event_name): 417 """Return and remove all stored events of a specified name. 418 419 Pops all events from their queue. May miss the latest ones. 420 If no event is available, return immediately. 421 422 Args: 423 event_name: Name of the events to be popped. 424 425 Returns: 426 results: List of the desired events. 427 428 Raises: 429 IllegalStateError: Raised if pop is called before the dispatcher 430 starts polling. 431 """ 432 if not self._started: 433 raise IllegalStateError(("Dispatcher needs to be started before " 434 "popping.")) 435 results = [] 436 try: 437 self._lock.acquire() 438 while True: 439 e = self._event_dict[event_name].get(block=False) 440 results.append(e) 441 except (queue.Empty, KeyError): 442 return results 443 finally: 444 self._lock.release() 445 446 def clear_events(self, event_name): 447 """Clear all events of a particular name. 448 449 Args: 450 event_name: Name of the events to be popped. 451 """ 452 self._lock.acquire() 453 try: 454 q = self.get_event_q(event_name) 455 q.queue.clear() 456 except queue.Empty: 457 return 458 finally: 459 self._lock.release() 460 461 def clear_all_events(self): 462 """Clear all event queues and their cached events.""" 463 self._lock.acquire() 464 self._event_dict.clear() 465 self._lock.release() 466 467 def is_event_match(self, event, field, value): 468 return self.is_event_match_for_list(event, field, [value]) 469 470 def is_event_match_for_list(self, event, field, value_list): 471 try: 472 value_in_event = event['data'][field] 473 except KeyError: 474 return False 475 for value in value_list: 476 if value_in_event == value: 477 return True 478 return False