1# 2# Copyright 2016- The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from concurrent.futures import ThreadPoolExecutor 17import queue 18import re 19import socket 20import threading 21import time 22import traceback 23 24 25class EventDispatcherError(Exception): 26 pass 27 28 29class IllegalStateError(EventDispatcherError): 30 """Raise when user tries to put event_dispatcher into an illegal state. 31 """ 32 33 34class DuplicateError(EventDispatcherError): 35 """Raise when a duplicate is being created and it shouldn't. 36 """ 37 38 39class EventDispatcher: 40 """Class managing events for an sl4a connection. 41 """ 42 43 DEFAULT_TIMEOUT = 60 44 45 def __init__(self, droid): 46 self.droid = droid 47 self.started = False 48 self.executor = None 49 self.poller = None 50 self.event_dict = {} 51 self.handlers = {} 52 self.lock = threading.RLock() 53 54 def poll_events(self): 55 """Continuously polls all types of events from sl4a. 56 57 Events are sorted by name and store in separate queues. 58 If there are registered handlers, the handlers will be called with 59 corresponding event immediately upon event discovery, and the event 60 won't be stored. If exceptions occur, stop the dispatcher and return 61 """ 62 while self.started: 63 event_obj = None 64 event_name = None 65 try: 66 event_obj = self.droid.eventWait(50000) 67 except: 68 if self.started: 69 print("Exception happened during polling.") 70 print(traceback.format_exc()) 71 raise 72 if not event_obj: 73 continue 74 elif 'name' not in event_obj: 75 print("Received Malformed event {}".format(event_obj)) 76 continue 77 else: 78 event_name = event_obj['name'] 79 # if handler registered, process event 80 if event_name in self.handlers: 81 self.handle_subscribed_event(event_obj, event_name) 82 if event_name == "EventDispatcherShutdown": 83 self.droid.closeSl4aSession() 84 break 85 else: 86 self.lock.acquire() 87 if event_name in self.event_dict: # otherwise, cache event 88 self.event_dict[event_name].put(event_obj) 89 else: 90 q = queue.Queue() 91 q.put(event_obj) 92 self.event_dict[event_name] = q 93 self.lock.release() 94 95 def register_handler(self, handler, event_name, args): 96 """Registers an event handler. 97 98 One type of event can only have one event handler associated with it. 99 100 Args: 101 handler: The event handler function to be registered. 102 event_name: Name of the event the handler is for. 103 args: User arguments to be passed to the handler when it's called. 104 105 Raises: 106 IllegalStateError: Raised if attempts to register a handler after 107 the dispatcher starts running. 108 DuplicateError: Raised if attempts to register more than one 109 handler for one type of event. 110 """ 111 if self.started: 112 raise IllegalStateError(("Can't register service after polling is" 113 " started")) 114 self.lock.acquire() 115 try: 116 if event_name in self.handlers: 117 raise DuplicateError('A handler for {} already exists'.format( 118 event_name)) 119 self.handlers[event_name] = (handler, args) 120 finally: 121 self.lock.release() 122 123 def start(self): 124 """Starts the event dispatcher. 125 126 Initiates executor and start polling events. 127 128 Raises: 129 IllegalStateError: Can't start a dispatcher again when it's already 130 running. 131 """ 132 if not self.started: 133 self.started = True 134 self.executor = ThreadPoolExecutor(max_workers=32) 135 self.poller = self.executor.submit(self.poll_events) 136 else: 137 raise IllegalStateError("Dispatcher is already started.") 138 139 def clean_up(self): 140 """Clean up and release resources after the event dispatcher polling 141 loop has been broken. 142 143 The following things happen: 144 1. Clear all events and flags. 145 2. Close the sl4a client the event_dispatcher object holds. 146 3. Shut down executor without waiting. 147 """ 148 uid = self.droid.uid 149 if not self.started: 150 return 151 self.started = False 152 self.clear_all_events() 153 self.droid.close() 154 self.poller.set_result("Done") 155 # The polling thread is guaranteed to finish after a max of 60 seconds, 156 # so we don't wait here. 157 self.executor.shutdown(wait=False) 158 159 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 160 """Pop an event from its queue. 161 162 Return and remove the oldest entry of an event. 163 Block until an event of specified name is available or 164 times out if timeout is set. 165 166 Args: 167 event_name: Name of the event to be popped. 168 timeout: Number of seconds to wait when event is not present. 169 Never times out if None. 170 171 Returns: 172 event: The oldest entry of the specified event. None if timed out. 173 174 Raises: 175 IllegalStateError: Raised if pop is called before the dispatcher 176 starts polling. 177 """ 178 if not self.started: 179 raise IllegalStateError( 180 "Dispatcher needs to be started before popping.") 181 182 e_queue = self.get_event_q(event_name) 183 184 if not e_queue: 185 raise TypeError("Failed to get an event queue for {}".format( 186 event_name)) 187 188 try: 189 # Block for timeout 190 if timeout: 191 return e_queue.get(True, timeout) 192 # Non-blocking poll for event 193 elif timeout == 0: 194 return e_queue.get(False) 195 else: 196 # Block forever on event wait 197 return e_queue.get(True) 198 except queue.Empty: 199 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 200 timeout, event_name)) 201 202 def wait_for_event(self, 203 event_name, 204 predicate, 205 timeout=DEFAULT_TIMEOUT, 206 *args, 207 **kwargs): 208 """Wait for an event that satisfies a predicate to appear. 209 210 Continuously pop events of a particular name and check against the 211 predicate until an event that satisfies the predicate is popped or 212 timed out. Note this will remove all the events of the same name that 213 do not satisfy the predicate in the process. 214 215 Args: 216 event_name: Name of the event to be popped. 217 predicate: A function that takes an event and returns True if the 218 predicate is satisfied, False otherwise. 219 timeout: Number of seconds to wait. 220 *args: Optional positional args passed to predicate(). 221 **kwargs: Optional keyword args passed to predicate(). 222 223 Returns: 224 The event that satisfies the predicate. 225 226 Raises: 227 queue.Empty: Raised if no event that satisfies the predicate was 228 found before time out. 229 """ 230 deadline = time.time() + timeout 231 232 while True: 233 event = None 234 try: 235 event = self.pop_event(event_name, 1) 236 except queue.Empty: 237 pass 238 239 if event and predicate(event, *args, **kwargs): 240 return event 241 242 if time.time() > deadline: 243 raise queue.Empty( 244 'Timeout after {}s waiting for event: {}'.format( 245 timeout, event_name)) 246 247 def pop_events(self, regex_pattern, timeout): 248 """Pop events whose names match a regex pattern. 249 250 If such event(s) exist, pop one event from each event queue that 251 satisfies the condition. Otherwise, wait for an event that satisfies 252 the condition to occur, with timeout. 253 254 Results are sorted by timestamp in ascending order. 255 256 Args: 257 regex_pattern: The regular expression pattern that an event name 258 should match in order to be popped. 259 timeout: Number of seconds to wait for events in case no event 260 matching the condition exits when the function is called. 261 262 Returns: 263 results: Pop events whose names match a regex pattern. 264 Empty if none exist and the wait timed out. 265 266 Raises: 267 IllegalStateError: Raised if pop is called before the dispatcher 268 starts polling. 269 queue.Empty: Raised if no event was found before time out. 270 """ 271 if not self.started: 272 raise IllegalStateError( 273 "Dispatcher needs to be started before popping.") 274 deadline = time.time() + timeout 275 while True: 276 #TODO: fix the sleep loop 277 results = self._match_and_pop(regex_pattern) 278 if len(results) != 0 or time.time() > deadline: 279 break 280 time.sleep(1) 281 if len(results) == 0: 282 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 283 timeout, regex_pattern)) 284 285 return sorted(results, key=lambda event: event['time']) 286 287 def _match_and_pop(self, regex_pattern): 288 """Pop one event from each of the event queues whose names 289 match (in a sense of regular expression) regex_pattern. 290 """ 291 results = [] 292 self.lock.acquire() 293 for name in self.event_dict.keys(): 294 if re.match(regex_pattern, name): 295 q = self.event_dict[name] 296 if q: 297 try: 298 results.append(q.get(False)) 299 except: 300 pass 301 self.lock.release() 302 return results 303 304 def get_event_q(self, event_name): 305 """Obtain the queue storing events of the specified name. 306 307 If no event of this name has been polled, wait for one to. 308 309 Returns: 310 queue: A queue storing all the events of the specified name. 311 None if timed out. 312 timeout: Number of seconds to wait for the operation. 313 314 Raises: 315 queue.Empty: Raised if the queue does not exist and timeout has 316 passed. 317 """ 318 self.lock.acquire() 319 if not event_name in self.event_dict or self.event_dict[ 320 event_name] is None: 321 self.event_dict[event_name] = queue.Queue() 322 self.lock.release() 323 324 event_queue = self.event_dict[event_name] 325 return event_queue 326 327 def handle_subscribed_event(self, event_obj, event_name): 328 """Execute the registered handler of an event. 329 330 Retrieve the handler and its arguments, and execute the handler in a 331 new thread. 332 333 Args: 334 event_obj: Json object of the event. 335 event_name: Name of the event to call handler for. 336 """ 337 handler, args = self.handlers[event_name] 338 self.executor.submit(handler, event_obj, *args) 339 340 def _handle(self, event_handler, event_name, user_args, event_timeout, 341 cond, cond_timeout): 342 """Pop an event of specified type and calls its handler on it. If 343 condition is not None, block until condition is met or timeout. 344 """ 345 if cond: 346 cond.wait(cond_timeout) 347 event = self.pop_event(event_name, event_timeout) 348 return event_handler(event, *user_args) 349 350 def handle_event(self, 351 event_handler, 352 event_name, 353 user_args, 354 event_timeout=None, 355 cond=None, 356 cond_timeout=None): 357 """Handle events that don't have registered handlers 358 359 In a new thread, poll one event of specified type from its queue and 360 execute its handler. If no such event exists, the thread waits until 361 one appears. 362 363 Args: 364 event_handler: Handler for the event, which should take at least 365 one argument - the event json object. 366 event_name: Name of the event to be handled. 367 user_args: User arguments for the handler; to be passed in after 368 the event json. 369 event_timeout: Number of seconds to wait for the event to come. 370 cond: A condition to wait on before executing the handler. Should 371 be a threading.Event object. 372 cond_timeout: Number of seconds to wait before the condition times 373 out. Never times out if None. 374 375 Returns: 376 worker: A concurrent.Future object associated with the handler. 377 If blocking call worker.result() is triggered, the handler 378 needs to return something to unblock. 379 """ 380 worker = self.executor.submit(self._handle, event_handler, event_name, 381 user_args, event_timeout, cond, 382 cond_timeout) 383 return worker 384 385 def pop_all(self, event_name): 386 """Return and remove all stored events of a specified name. 387 388 Pops all events from their queue. May miss the latest ones. 389 If no event is available, return immediately. 390 391 Args: 392 event_name: Name of the events to be popped. 393 394 Returns: 395 results: List of the desired events. 396 397 Raises: 398 IllegalStateError: Raised if pop is called before the dispatcher 399 starts polling. 400 """ 401 if not self.started: 402 raise IllegalStateError(("Dispatcher needs to be started before " 403 "popping.")) 404 results = [] 405 try: 406 self.lock.acquire() 407 while True: 408 e = self.event_dict[event_name].get(block=False) 409 results.append(e) 410 except (queue.Empty, KeyError): 411 return results 412 finally: 413 self.lock.release() 414 415 def clear_events(self, event_name): 416 """Clear all events of a particular name. 417 418 Args: 419 event_name: Name of the events to be popped. 420 """ 421 self.lock.acquire() 422 try: 423 q = self.get_event_q(event_name) 424 q.queue.clear() 425 except queue.Empty: 426 return 427 finally: 428 self.lock.release() 429 430 def clear_all_events(self): 431 """Clear all event queues and their cached events.""" 432 self.lock.acquire() 433 self.event_dict.clear() 434 self.lock.release() 435