• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent import futures
16import queue
17import re
18import threading
19import time
20import traceback
21
22
23class EventDispatcherError(Exception):
24  pass
25
26
27class IllegalStateError(EventDispatcherError):
28  """Raise when user tries to put event_dispatcher into an illegal state.
29  """
30
31
32class DuplicateError(EventDispatcherError):
33  """Raise when a duplicate is being created and it shouldn't.
34  """
35
36
37class EventDispatcher:
38  """Class managing events for an sl4a connection.
39  """
40
41  DEFAULT_TIMEOUT = 60
42
43  def __init__(self, sl4a):
44    self._sl4a = sl4a
45    self.started = False
46    self.executor = None
47    self.poller = None
48    self.event_dict = {}
49    self.handlers = {}
50    self.lock = threading.RLock()
51
52  def poll_events(self):
53    """Continuously polls all types of events from sl4a.
54
55    Events are sorted by name and store in separate queues.
56    If there are registered handlers, the handlers will be called with
57    corresponding event immediately upon event discovery, and the event
58    won't be stored. If exceptions occur, stop the dispatcher and return
59    """
60    while self.started:
61      event_obj = None
62      event_name = None
63      try:
64        event_obj = self._sl4a.eventWait(50000)
65      except Exception:
66        if self.started:
67          print("Exception happened during polling.")
68          print(traceback.format_exc())
69          raise
70      if not event_obj:
71        continue
72      elif 'name' not in event_obj:
73        print("Received Malformed event {}".format(event_obj))
74        continue
75      else:
76        event_name = event_obj['name']
77      # if handler registered, process event
78      if event_name in self.handlers:
79        self.handle_subscribed_event(event_obj, event_name)
80      if event_name == "EventDispatcherShutdown":
81        self._sl4a.closeSl4aSession()
82        break
83      else:
84        self.lock.acquire()
85        if event_name in self.event_dict:  # otherwise, cache event
86          self.event_dict[event_name].put(event_obj)
87        else:
88          q = queue.Queue()
89          q.put(event_obj)
90          self.event_dict[event_name] = q
91        self.lock.release()
92
93  def register_handler(self, handler, event_name, args):
94    """Registers an event handler.
95
96    One type of event can only have one event handler associated with it.
97
98    Args:
99      handler: The event handler function to be registered.
100      event_name: Name of the event the handler is for.
101      args: User arguments to be passed to the handler when it's called.
102
103    Raises:
104      IllegalStateError: Raised if attempts to register a handler after
105        the dispatcher starts running.
106      DuplicateError: Raised if attempts to register more than one
107        handler for one type of event.
108    """
109    if self.started:
110      raise IllegalStateError(("Can't register service after polling is"
111                               " started"))
112    self.lock.acquire()
113    try:
114      if event_name in self.handlers:
115        raise DuplicateError(
116            'A handler for {} already exists'.format(event_name))
117      self.handlers[event_name] = (handler, args)
118    finally:
119      self.lock.release()
120
121  def start(self):
122    """Starts the event dispatcher.
123
124    Initiates executor and start polling events.
125
126    Raises:
127      IllegalStateError: Can't start a dispatcher again when it's already
128        running.
129    """
130    if not self.started:
131      self.started = True
132      self.executor = futures.ThreadPoolExecutor(max_workers=32)
133      self.poller = self.executor.submit(self.poll_events)
134    else:
135      raise IllegalStateError("Dispatcher is already started.")
136
137  def clean_up(self):
138    """Clean up and release resources after the event dispatcher polling
139    loop has been broken.
140
141    The following things happen:
142    1. Clear all events and flags.
143    2. Close the sl4a client the event_dispatcher object holds.
144    3. Shut down executor without waiting.
145    """
146    if not self.started:
147      return
148    self.started = False
149    self.clear_all_events()
150    # At this point, the sl4a apk is destroyed and nothing is listening on
151    # the socket. Avoid sending any sl4a commands; just clean up the socket
152    # and return.
153    self._sl4a.disconnect()
154    self.poller.set_result("Done")
155    # The polling thread is guaranteed to finish after a max of 60 seconds,
156    # so we don't wait here.
157    self.executor.shutdown(wait=False)
158
159  def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
160    """Pop an event from its queue.
161
162    Return and remove the oldest entry of an event.
163    Block until an event of specified name is available or
164    times out if timeout is set.
165
166    Args:
167      event_name: Name of the event to be popped.
168      timeout: Number of seconds to wait when event is not present.
169        Never times out if None.
170
171    Returns:
172      The oldest entry of the specified event. None if timed out.
173
174    Raises:
175      IllegalStateError: Raised if pop is called before the dispatcher
176        starts polling.
177    """
178    if not self.started:
179      raise IllegalStateError("Dispatcher needs to be started before popping.")
180
181    e_queue = self.get_event_q(event_name)
182
183    if not e_queue:
184      raise TypeError("Failed to get an event queue for {}".format(event_name))
185
186    try:
187      # Block for timeout
188      if timeout:
189        return e_queue.get(True, timeout)
190      # Non-blocking poll for event
191      elif timeout == 0:
192        return e_queue.get(False)
193      else:
194        # Block forever on event wait
195        return e_queue.get(True)
196    except queue.Empty:
197      raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
198          timeout, event_name))
199
200  def wait_for_event(self,
201                     event_name,
202                     predicate,
203                     timeout=DEFAULT_TIMEOUT,
204                     *args,
205                     **kwargs):
206    """Wait for an event that satisfies a predicate to appear.
207
208    Continuously pop events of a particular name and check against the
209    predicate until an event that satisfies the predicate is popped or
210    timed out. Note this will remove all the events of the same name that
211    do not satisfy the predicate in the process.
212
213    Args:
214      event_name: Name of the event to be popped.
215      predicate: A function that takes an event and returns True if the
216        predicate is satisfied, False otherwise.
217      timeout: Number of seconds to wait.
218      *args: Optional positional args passed to predicate().
219      **kwargs: Optional keyword args passed to predicate().
220
221    Returns:
222      The event that satisfies the predicate.
223
224    Raises:
225      queue.Empty: Raised if no event that satisfies the predicate was
226        found before time out.
227    """
228    deadline = time.perf_counter() + timeout
229
230    while True:
231      event = None
232      try:
233        event = self.pop_event(event_name, 1)
234      except queue.Empty:
235        pass
236
237      if event and predicate(event, *args, **kwargs):
238        return event
239
240      if time.perf_counter() > deadline:
241        raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
242            timeout, event_name))
243
244  def pop_events(self, regex_pattern, timeout):
245    """Pop events whose names match a regex pattern.
246
247    If such event(s) exist, pop one event from each event queue that
248    satisfies the condition. Otherwise, wait for an event that satisfies
249    the condition to occur, with timeout.
250
251    Results are sorted by timestamp in ascending order.
252
253    Args:
254      regex_pattern: The regular expression pattern that an event name
255        should match in order to be popped.
256      timeout: Number of seconds to wait for events in case no event
257        matching the condition exits when the function is called.
258
259    Returns:
260      Events whose names match a regex pattern.
261      Empty if none exist and the wait timed out.
262
263    Raises:
264      IllegalStateError: Raised if pop is called before the dispatcher
265        starts polling.
266      queue.Empty: Raised if no event was found before time out.
267    """
268    if not self.started:
269      raise IllegalStateError("Dispatcher needs to be started before popping.")
270    deadline = time.perf_counter() + timeout
271    while True:
272      # TODO: fix the sleep loop
273      results = self._match_and_pop(regex_pattern)
274      if len(results) != 0 or time.perf_counter() > deadline:
275        break
276      time.sleep(1)
277    if len(results) == 0:
278      raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
279          timeout, regex_pattern))
280
281    return sorted(results, key=lambda event: event['time'])
282
283  def _match_and_pop(self, regex_pattern):
284    """Pop one event from each of the event queues whose names
285    match (in a sense of regular expression) regex_pattern.
286    """
287    results = []
288    self.lock.acquire()
289    for name in self.event_dict.keys():
290      if re.match(regex_pattern, name):
291        q = self.event_dict[name]
292        if q:
293          try:
294            results.append(q.get(False))
295          except Exception:
296            pass
297    self.lock.release()
298    return results
299
300  def get_event_q(self, event_name):
301    """Obtain the queue storing events of the specified name.
302
303    If no event of this name has been polled, wait for one to.
304
305    Returns:
306      A queue storing all the events of the specified name.
307      None if timed out.
308
309    Raises:
310      queue.Empty: Raised if the queue does not exist and timeout has
311        passed.
312    """
313    self.lock.acquire()
314    if event_name not in self.event_dict or self.event_dict[event_name] is None:
315      self.event_dict[event_name] = queue.Queue()
316    self.lock.release()
317
318    event_queue = self.event_dict[event_name]
319    return event_queue
320
321  def handle_subscribed_event(self, event_obj, event_name):
322    """Execute the registered handler of an event.
323
324    Retrieve the handler and its arguments, and execute the handler in a
325      new thread.
326
327    Args:
328      event_obj: Json object of the event.
329      event_name: Name of the event to call handler for.
330    """
331    handler, args = self.handlers[event_name]
332    self.executor.submit(handler, event_obj, *args)
333
334  def _handle(self, event_handler, event_name, user_args, event_timeout, cond,
335              cond_timeout):
336    """Pop an event of specified type and calls its handler on it. If
337    condition is not None, block until condition is met or timeout.
338    """
339    if cond:
340      cond.wait(cond_timeout)
341    event = self.pop_event(event_name, event_timeout)
342    return event_handler(event, *user_args)
343
344  def handle_event(self,
345                   event_handler,
346                   event_name,
347                   user_args,
348                   event_timeout=None,
349                   cond=None,
350                   cond_timeout=None):
351    """Handle events that don't have registered handlers
352
353    In a new thread, poll one event of specified type from its queue and
354    execute its handler. If no such event exists, the thread waits until
355    one appears.
356
357    Args:
358      event_handler: Handler for the event, which should take at least
359        one argument - the event json object.
360      event_name: Name of the event to be handled.
361      user_args: User arguments for the handler; to be passed in after
362        the event json.
363      event_timeout: Number of seconds to wait for the event to come.
364      cond: A condition to wait on before executing the handler. Should
365        be a threading.Event object.
366      cond_timeout: Number of seconds to wait before the condition times
367        out. Never times out if None.
368
369    Returns:
370      A concurrent.Future object associated with the handler.
371      If blocking call worker.result() is triggered, the handler
372      needs to return something to unblock.
373    """
374    worker = self.executor.submit(self._handle, event_handler, event_name,
375                                  user_args, event_timeout, cond, cond_timeout)
376    return worker
377
378  def pop_all(self, event_name):
379    """Return and remove all stored events of a specified name.
380
381    Pops all events from their queue. May miss the latest ones.
382    If no event is available, return immediately.
383
384    Args:
385      event_name: Name of the events to be popped.
386
387    Returns:
388      List of the desired events.
389
390    Raises:
391      IllegalStateError: Raised if pop is called before the dispatcher
392        starts polling.
393    """
394    if not self.started:
395      raise IllegalStateError(("Dispatcher needs to be started before "
396                               "popping."))
397    results = []
398    try:
399      self.lock.acquire()
400      while True:
401        e = self.event_dict[event_name].get(block=False)
402        results.append(e)
403    except (queue.Empty, KeyError):
404      return results
405    finally:
406      self.lock.release()
407
408  def clear_events(self, event_name):
409    """Clear all events of a particular name.
410
411    Args:
412      event_name: Name of the events to be popped.
413    """
414    self.lock.acquire()
415    try:
416      q = self.get_event_q(event_name)
417      q.queue.clear()
418    except queue.Empty:
419      return
420    finally:
421      self.lock.release()
422
423  def clear_all_events(self):
424    """Clear all event queues and their cached events."""
425    self.lock.acquire()
426    self.event_dict.clear()
427    self.lock.release()
428