1#!/usr/bin/env python3 2# 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from concurrent.futures import ThreadPoolExecutor 18import queue 19import re 20import threading 21import time 22 23from acts import logger 24from acts.controllers.sl4a_lib import rpc_client 25 26 27class EventDispatcherError(Exception): 28 """The base class for all EventDispatcher exceptions.""" 29 30 31class IllegalStateError(EventDispatcherError): 32 """Raise when user tries to put event_dispatcher into an illegal state.""" 33 34 35class DuplicateError(EventDispatcherError): 36 """Raise when two event handlers have been assigned to an event name.""" 37 38 39class EventDispatcher: 40 """A class for managing the events for an SL4A Session. 41 42 Attributes: 43 _serial: The serial of the device. 44 _rpc_client: The rpc client for that session. 45 _started: A bool that holds whether or not the event dispatcher is 46 running. 47 _executor: The thread pool executor for running event handlers and 48 polling. 49 _event_dict: A dictionary of str eventName = Queue<Event> eventQueue 50 _handlers: A dictionary of str eventName => (lambda, args) handler 51 _lock: A lock that prevents multiple reads/writes to the event queues. 52 log: The EventDispatcher's logger. 53 """ 54 55 DEFAULT_TIMEOUT = 60 56 57 def __init__(self, serial, rpc_client): 58 self._serial = serial 59 self._rpc_client = rpc_client 60 self._started = False 61 self._executor = None 62 self._event_dict = {} 63 self._handlers = {} 64 self._lock = threading.RLock() 65 66 def _log_formatter(message): 67 """Defines the formatting used in the logger.""" 68 return '[E Dispatcher|%s|%s] %s' % (self._serial, 69 self._rpc_client.uid, message) 70 71 self.log = logger.create_logger(_log_formatter) 72 73 def poll_events(self): 74 """Continuously polls all types of events from sl4a. 75 76 Events are sorted by name and store in separate queues. 77 If there are registered handlers, the handlers will be called with 78 corresponding event immediately upon event discovery, and the event 79 won't be stored. If exceptions occur, stop the dispatcher and return 80 """ 81 while self._started: 82 try: 83 # 60000 in ms, timeout in second 84 event_obj = self._rpc_client.eventWait(60000, timeout=120) 85 except rpc_client.Sl4aConnectionError as e: 86 if self._rpc_client.is_alive: 87 self.log.warning('Closing due to closed session.') 88 break 89 else: 90 self.log.warning('Closing due to error: %s.' % e) 91 self.close() 92 raise e 93 if not event_obj: 94 continue 95 elif 'name' not in event_obj: 96 self.log.error('Received Malformed event {}'.format(event_obj)) 97 continue 98 else: 99 event_name = event_obj['name'] 100 # if handler registered, process event 101 if event_name == 'EventDispatcherShutdown': 102 self.log.debug('Received shutdown signal.') 103 # closeSl4aSession has been called, which closes the event 104 # dispatcher. Stop execution on this polling thread. 105 return 106 if event_name in self._handlers: 107 self.handle_subscribed_event(event_obj, event_name) 108 else: 109 self._lock.acquire() 110 if event_name in self._event_dict: # otherwise, cache event 111 self._event_dict[event_name].put(event_obj) 112 else: 113 q = queue.Queue() 114 q.put(event_obj) 115 self._event_dict[event_name] = q 116 self._lock.release() 117 118 def register_handler(self, handler, event_name, args): 119 """Registers an event handler. 120 121 One type of event can only have one event handler associated with it. 122 123 Args: 124 handler: The event handler function to be registered. 125 event_name: Name of the event the handler is for. 126 args: User arguments to be passed to the handler when it's called. 127 128 Raises: 129 IllegalStateError: Raised if attempts to register a handler after 130 the dispatcher starts running. 131 DuplicateError: Raised if attempts to register more than one 132 handler for one type of event. 133 """ 134 if self._started: 135 raise IllegalStateError('Cannot register service after polling is ' 136 'started.') 137 self._lock.acquire() 138 try: 139 if event_name in self._handlers: 140 raise DuplicateError( 141 'A handler for {} already exists'.format(event_name)) 142 self._handlers[event_name] = (handler, args) 143 finally: 144 self._lock.release() 145 146 def start(self): 147 """Starts the event dispatcher. 148 149 Initiates executor and start polling events. 150 151 Raises: 152 IllegalStateError: Can't start a dispatcher again when it's already 153 running. 154 """ 155 if not self._started: 156 self._started = True 157 self._executor = ThreadPoolExecutor(max_workers=32) 158 self._executor.submit(self.poll_events) 159 else: 160 raise IllegalStateError("Dispatcher is already started.") 161 162 def close(self): 163 """Clean up and release resources. 164 165 This function should only be called after a 166 rpc_client.closeSl4aSession() call. 167 """ 168 if not self._started: 169 return 170 self._started = False 171 self._executor.shutdown(wait=True) 172 self.clear_all_events() 173 174 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 175 """Pop an event from its queue. 176 177 Return and remove the oldest entry of an event. 178 Block until an event of specified name is available or 179 times out if timeout is set. 180 181 Args: 182 event_name: Name of the event to be popped. 183 timeout: Number of seconds to wait when event is not present. 184 Never times out if None. 185 186 Returns: 187 event: The oldest entry of the specified event. None if timed out. 188 189 Raises: 190 IllegalStateError: Raised if pop is called before the dispatcher 191 starts polling. 192 """ 193 if not self._started: 194 raise IllegalStateError( 195 'Dispatcher needs to be started before popping.') 196 197 e_queue = self.get_event_q(event_name) 198 199 if not e_queue: 200 raise IllegalStateError( 201 'Failed to get an event queue for {}'.format(event_name)) 202 203 try: 204 # Block for timeout 205 if timeout: 206 return e_queue.get(True, timeout) 207 # Non-blocking poll for event 208 elif timeout == 0: 209 return e_queue.get(False) 210 else: 211 # Block forever on event wait 212 return e_queue.get(True) 213 except queue.Empty: 214 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 215 timeout, event_name)) 216 217 def wait_for_event(self, 218 event_name, 219 predicate, 220 timeout=DEFAULT_TIMEOUT, 221 *args, 222 **kwargs): 223 """Wait for an event that satisfies a predicate to appear. 224 225 Continuously pop events of a particular name and check against the 226 predicate until an event that satisfies the predicate is popped or 227 timed out. Note this will remove all the events of the same name that 228 do not satisfy the predicate in the process. 229 230 Args: 231 event_name: Name of the event to be popped. 232 predicate: A function that takes an event and returns True if the 233 predicate is satisfied, False otherwise. 234 timeout: Number of seconds to wait. 235 *args: Optional positional args passed to predicate(). 236 **kwargs: Optional keyword args passed to predicate(). 237 consume_ignored_events: Whether or not to consume events while 238 searching for the desired event. Defaults to True if unset. 239 240 Returns: 241 The event that satisfies the predicate. 242 243 Raises: 244 queue.Empty: Raised if no event that satisfies the predicate was 245 found before time out. 246 """ 247 deadline = time.time() + timeout 248 ignored_events = [] 249 consume_events = kwargs.pop('consume_ignored_events', True) 250 while True: 251 event = None 252 try: 253 event = self.pop_event(event_name, 1) 254 if not consume_events: 255 ignored_events.append(event) 256 except queue.Empty: 257 pass 258 259 if event and predicate(event, *args, **kwargs): 260 for ignored_event in ignored_events: 261 self.get_event_q(event_name).put(ignored_event) 262 return event 263 264 if time.time() > deadline: 265 for ignored_event in ignored_events: 266 self.get_event_q(event_name).put(ignored_event) 267 raise queue.Empty( 268 'Timeout after {}s waiting for event: {}'.format( 269 timeout, event_name)) 270 271 def pop_events(self, regex_pattern, timeout, freq=1): 272 """Pop events whose names match a regex pattern. 273 274 If such event(s) exist, pop one event from each event queue that 275 satisfies the condition. Otherwise, wait for an event that satisfies 276 the condition to occur, with timeout. 277 278 Results are sorted by timestamp in ascending order. 279 280 Args: 281 regex_pattern: The regular expression pattern that an event name 282 should match in order to be popped. 283 timeout: Number of seconds to wait for events in case no event 284 matching the condition exits when the function is called. 285 286 Returns: 287 results: Pop events whose names match a regex pattern. 288 Empty if none exist and the wait timed out. 289 290 Raises: 291 IllegalStateError: Raised if pop is called before the dispatcher 292 starts polling. 293 queue.Empty: Raised if no event was found before time out. 294 """ 295 if not self._started: 296 raise IllegalStateError( 297 "Dispatcher needs to be started before popping.") 298 deadline = time.time() + timeout 299 while True: 300 # TODO: fix the sleep loop 301 results = self._match_and_pop(regex_pattern) 302 if len(results) != 0 or time.time() > deadline: 303 break 304 time.sleep(freq) 305 if len(results) == 0: 306 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 307 timeout, regex_pattern)) 308 309 return sorted(results, key=lambda event: event['time']) 310 311 def _match_and_pop(self, regex_pattern): 312 """Pop one event from each of the event queues whose names 313 match (in a sense of regular expression) regex_pattern. 314 """ 315 results = [] 316 self._lock.acquire() 317 for name in self._event_dict.keys(): 318 if re.match(regex_pattern, name): 319 q = self._event_dict[name] 320 if q: 321 try: 322 results.append(q.get(False)) 323 except queue.Empty: 324 pass 325 self._lock.release() 326 return results 327 328 def get_event_q(self, event_name): 329 """Obtain the queue storing events of the specified name. 330 331 If no event of this name has been polled, wait for one to. 332 333 Returns: A queue storing all the events of the specified name. 334 """ 335 self._lock.acquire() 336 if (event_name not in self._event_dict 337 or self._event_dict[event_name] is None): 338 self._event_dict[event_name] = queue.Queue() 339 self._lock.release() 340 341 event_queue = self._event_dict[event_name] 342 return event_queue 343 344 def handle_subscribed_event(self, event_obj, event_name): 345 """Execute the registered handler of an event. 346 347 Retrieve the handler and its arguments, and execute the handler in a 348 new thread. 349 350 Args: 351 event_obj: Json object of the event. 352 event_name: Name of the event to call handler for. 353 """ 354 handler, args = self._handlers[event_name] 355 self._executor.submit(handler, event_obj, *args) 356 357 def _handle(self, event_handler, event_name, user_args, event_timeout, 358 cond, cond_timeout): 359 """Pop an event of specified type and calls its handler on it. If 360 condition is not None, block until condition is met or timeout. 361 """ 362 if cond: 363 cond.wait(cond_timeout) 364 event = self.pop_event(event_name, event_timeout) 365 return event_handler(event, *user_args) 366 367 def handle_event(self, 368 event_handler, 369 event_name, 370 user_args, 371 event_timeout=None, 372 cond=None, 373 cond_timeout=None): 374 """Handle events that don't have registered handlers 375 376 In a new thread, poll one event of specified type from its queue and 377 execute its handler. If no such event exists, the thread waits until 378 one appears. 379 380 Args: 381 event_handler: Handler for the event, which should take at least 382 one argument - the event json object. 383 event_name: Name of the event to be handled. 384 user_args: User arguments for the handler; to be passed in after 385 the event json. 386 event_timeout: Number of seconds to wait for the event to come. 387 cond: A condition to wait on before executing the handler. Should 388 be a threading.Event object. 389 cond_timeout: Number of seconds to wait before the condition times 390 out. Never times out if None. 391 392 Returns: 393 worker: A concurrent.Future object associated with the handler. 394 If blocking call worker.result() is triggered, the handler 395 needs to return something to unblock. 396 """ 397 worker = self._executor.submit(self._handle, event_handler, event_name, 398 user_args, event_timeout, cond, 399 cond_timeout) 400 return worker 401 402 def pop_all(self, event_name): 403 """Return and remove all stored events of a specified name. 404 405 Pops all events from their queue. May miss the latest ones. 406 If no event is available, return immediately. 407 408 Args: 409 event_name: Name of the events to be popped. 410 411 Returns: 412 results: List of the desired events. 413 414 Raises: 415 IllegalStateError: Raised if pop is called before the dispatcher 416 starts polling. 417 """ 418 if not self._started: 419 raise IllegalStateError(("Dispatcher needs to be started before " 420 "popping.")) 421 results = [] 422 try: 423 self._lock.acquire() 424 while True: 425 e = self._event_dict[event_name].get(block=False) 426 results.append(e) 427 except (queue.Empty, KeyError): 428 return results 429 finally: 430 self._lock.release() 431 432 def clear_events(self, event_name): 433 """Clear all events of a particular name. 434 435 Args: 436 event_name: Name of the events to be popped. 437 """ 438 self._lock.acquire() 439 try: 440 q = self.get_event_q(event_name) 441 q.queue.clear() 442 except queue.Empty: 443 return 444 finally: 445 self._lock.release() 446 447 def clear_all_events(self): 448 """Clear all event queues and their cached events.""" 449 self._lock.acquire() 450 self._event_dict.clear() 451 self._lock.release() 452