• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18import queue
19import re
20import threading
21import time
22
23from acts import logger
24from acts.controllers.sl4a_lib import rpc_client
25
26
27class EventDispatcherError(Exception):
28    """The base class for all EventDispatcher exceptions."""
29
30
31class IllegalStateError(EventDispatcherError):
32    """Raise when user tries to put event_dispatcher into an illegal state."""
33
34
35class DuplicateError(EventDispatcherError):
36    """Raise when two event handlers have been assigned to an event name."""
37
38
39class EventDispatcher:
40    """A class for managing the events for an SL4A Session.
41
42    Attributes:
43        _serial: The serial of the device.
44        _rpc_client: The rpc client for that session.
45        _started: A bool that holds whether or not the event dispatcher is
46                  running.
47        _executor: The thread pool executor for running event handlers and
48                   polling.
49        _event_dict: A dictionary of str eventName = Queue<Event> eventQueue
50        _handlers: A dictionary of str eventName => (lambda, args) handler
51        _lock: A lock that prevents multiple reads/writes to the event queues.
52        log: The EventDispatcher's logger.
53    """
54
55    DEFAULT_TIMEOUT = 60
56
57    def __init__(self, serial, rpc_client):
58        self._serial = serial
59        self._rpc_client = rpc_client
60        self._started = False
61        self._executor = None
62        self._event_dict = {}
63        self._handlers = {}
64        self._lock = threading.RLock()
65
66        def _log_formatter(message):
67            """Defines the formatting used in the logger."""
68            return '[E Dispatcher|%s|%s] %s' % (self._serial,
69                                                self._rpc_client.uid, message)
70
71        self.log = logger.create_logger(_log_formatter)
72
73    def poll_events(self):
74        """Continuously polls all types of events from sl4a.
75
76        Events are sorted by name and store in separate queues.
77        If there are registered handlers, the handlers will be called with
78        corresponding event immediately upon event discovery, and the event
79        won't be stored. If exceptions occur, stop the dispatcher and return
80        """
81        while self._started:
82            try:
83                # 60000 in ms, timeout in second
84                event_obj = self._rpc_client.eventWait(60000, timeout=120)
85            except rpc_client.Sl4aConnectionError as e:
86                if self._rpc_client.is_alive:
87                    self.log.warning('Closing due to closed session.')
88                    break
89                else:
90                    self.log.warning('Closing due to error: %s.' % e)
91                    self.close()
92                    raise e
93            if not event_obj:
94                continue
95            elif 'name' not in event_obj:
96                self.log.error('Received Malformed event {}'.format(event_obj))
97                continue
98            else:
99                event_name = event_obj['name']
100            # if handler registered, process event
101            if event_name == 'EventDispatcherShutdown':
102                self.log.debug('Received shutdown signal.')
103                # closeSl4aSession has been called, which closes the event
104                # dispatcher. Stop execution on this polling thread.
105                return
106            if event_name in self._handlers:
107                self.handle_subscribed_event(event_obj, event_name)
108            else:
109                self._lock.acquire()
110                if event_name in self._event_dict:  # otherwise, cache event
111                    self._event_dict[event_name].put(event_obj)
112                else:
113                    q = queue.Queue()
114                    q.put(event_obj)
115                    self._event_dict[event_name] = q
116                self._lock.release()
117
118    def register_handler(self, handler, event_name, args):
119        """Registers an event handler.
120
121        One type of event can only have one event handler associated with it.
122
123        Args:
124            handler: The event handler function to be registered.
125            event_name: Name of the event the handler is for.
126            args: User arguments to be passed to the handler when it's called.
127
128        Raises:
129            IllegalStateError: Raised if attempts to register a handler after
130                the dispatcher starts running.
131            DuplicateError: Raised if attempts to register more than one
132                handler for one type of event.
133        """
134        if self._started:
135            raise IllegalStateError('Cannot register service after polling is '
136                                    'started.')
137        self._lock.acquire()
138        try:
139            if event_name in self._handlers:
140                raise DuplicateError(
141                    'A handler for {} already exists'.format(event_name))
142            self._handlers[event_name] = (handler, args)
143        finally:
144            self._lock.release()
145
146    def start(self):
147        """Starts the event dispatcher.
148
149        Initiates executor and start polling events.
150
151        Raises:
152            IllegalStateError: Can't start a dispatcher again when it's already
153                running.
154        """
155        if not self._started:
156            self._started = True
157            self._executor = ThreadPoolExecutor(max_workers=32)
158            self._executor.submit(self.poll_events)
159        else:
160            raise IllegalStateError("Dispatcher is already started.")
161
162    def close(self):
163        """Clean up and release resources.
164
165        This function should only be called after a
166        rpc_client.closeSl4aSession() call.
167        """
168        if not self._started:
169            return
170        self._started = False
171        self._executor.shutdown(wait=True)
172        self.clear_all_events()
173
174    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
175        """Pop an event from its queue.
176
177        Return and remove the oldest entry of an event.
178        Block until an event of specified name is available or
179        times out if timeout is set.
180
181        Args:
182            event_name: Name of the event to be popped.
183            timeout: Number of seconds to wait when event is not present.
184                Never times out if None.
185
186        Returns:
187            event: The oldest entry of the specified event. None if timed out.
188
189        Raises:
190            IllegalStateError: Raised if pop is called before the dispatcher
191                starts polling.
192        """
193        if not self._started:
194            raise IllegalStateError(
195                'Dispatcher needs to be started before popping.')
196
197        e_queue = self.get_event_q(event_name)
198
199        if not e_queue:
200            raise IllegalStateError(
201                'Failed to get an event queue for {}'.format(event_name))
202
203        try:
204            # Block for timeout
205            if timeout:
206                return e_queue.get(True, timeout)
207            # Non-blocking poll for event
208            elif timeout == 0:
209                return e_queue.get(False)
210            else:
211                # Block forever on event wait
212                return e_queue.get(True)
213        except queue.Empty:
214            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
215                timeout, event_name))
216
217    def wait_for_event(self,
218                       event_name,
219                       predicate,
220                       timeout=DEFAULT_TIMEOUT,
221                       *args,
222                       **kwargs):
223        """Wait for an event that satisfies a predicate to appear.
224
225        Continuously pop events of a particular name and check against the
226        predicate until an event that satisfies the predicate is popped or
227        timed out. Note this will remove all the events of the same name that
228        do not satisfy the predicate in the process.
229
230        Args:
231            event_name: Name of the event to be popped.
232            predicate: A function that takes an event and returns True if the
233                predicate is satisfied, False otherwise.
234            timeout: Number of seconds to wait.
235            *args: Optional positional args passed to predicate().
236            **kwargs: Optional keyword args passed to predicate().
237                consume_ignored_events: Whether or not to consume events while
238                    searching for the desired event. Defaults to True if unset.
239
240        Returns:
241            The event that satisfies the predicate.
242
243        Raises:
244            queue.Empty: Raised if no event that satisfies the predicate was
245                found before time out.
246        """
247        deadline = time.time() + timeout
248        ignored_events = []
249        consume_events = kwargs.pop('consume_ignored_events', True)
250        while True:
251            event = None
252            try:
253                event = self.pop_event(event_name, 1)
254                if not consume_events:
255                    ignored_events.append(event)
256            except queue.Empty:
257                pass
258
259            if event and predicate(event, *args, **kwargs):
260                for ignored_event in ignored_events:
261                    self.get_event_q(event_name).put(ignored_event)
262                return event
263
264            if time.time() > deadline:
265                for ignored_event in ignored_events:
266                    self.get_event_q(event_name).put(ignored_event)
267                raise queue.Empty(
268                    'Timeout after {}s waiting for event: {}'.format(
269                        timeout, event_name))
270
271    def pop_events(self, regex_pattern, timeout, freq=1):
272        """Pop events whose names match a regex pattern.
273
274        If such event(s) exist, pop one event from each event queue that
275        satisfies the condition. Otherwise, wait for an event that satisfies
276        the condition to occur, with timeout.
277
278        Results are sorted by timestamp in ascending order.
279
280        Args:
281            regex_pattern: The regular expression pattern that an event name
282                should match in order to be popped.
283            timeout: Number of seconds to wait for events in case no event
284                matching the condition exits when the function is called.
285
286        Returns:
287            results: Pop events whose names match a regex pattern.
288                Empty if none exist and the wait timed out.
289
290        Raises:
291            IllegalStateError: Raised if pop is called before the dispatcher
292                starts polling.
293            queue.Empty: Raised if no event was found before time out.
294        """
295        if not self._started:
296            raise IllegalStateError(
297                "Dispatcher needs to be started before popping.")
298        deadline = time.time() + timeout
299        while True:
300            # TODO: fix the sleep loop
301            results = self._match_and_pop(regex_pattern)
302            if len(results) != 0 or time.time() > deadline:
303                break
304            time.sleep(freq)
305        if len(results) == 0:
306            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
307                timeout, regex_pattern))
308
309        return sorted(results, key=lambda event: event['time'])
310
311    def _match_and_pop(self, regex_pattern):
312        """Pop one event from each of the event queues whose names
313        match (in a sense of regular expression) regex_pattern.
314        """
315        results = []
316        self._lock.acquire()
317        for name in self._event_dict.keys():
318            if re.match(regex_pattern, name):
319                q = self._event_dict[name]
320                if q:
321                    try:
322                        results.append(q.get(False))
323                    except queue.Empty:
324                        pass
325        self._lock.release()
326        return results
327
328    def get_event_q(self, event_name):
329        """Obtain the queue storing events of the specified name.
330
331        If no event of this name has been polled, wait for one to.
332
333        Returns: A queue storing all the events of the specified name.
334        """
335        self._lock.acquire()
336        if (event_name not in self._event_dict
337                or self._event_dict[event_name] is None):
338            self._event_dict[event_name] = queue.Queue()
339        self._lock.release()
340
341        event_queue = self._event_dict[event_name]
342        return event_queue
343
344    def handle_subscribed_event(self, event_obj, event_name):
345        """Execute the registered handler of an event.
346
347        Retrieve the handler and its arguments, and execute the handler in a
348            new thread.
349
350        Args:
351            event_obj: Json object of the event.
352            event_name: Name of the event to call handler for.
353        """
354        handler, args = self._handlers[event_name]
355        self._executor.submit(handler, event_obj, *args)
356
357    def _handle(self, event_handler, event_name, user_args, event_timeout,
358                cond, cond_timeout):
359        """Pop an event of specified type and calls its handler on it. If
360        condition is not None, block until condition is met or timeout.
361        """
362        if cond:
363            cond.wait(cond_timeout)
364        event = self.pop_event(event_name, event_timeout)
365        return event_handler(event, *user_args)
366
367    def handle_event(self,
368                     event_handler,
369                     event_name,
370                     user_args,
371                     event_timeout=None,
372                     cond=None,
373                     cond_timeout=None):
374        """Handle events that don't have registered handlers
375
376        In a new thread, poll one event of specified type from its queue and
377        execute its handler. If no such event exists, the thread waits until
378        one appears.
379
380        Args:
381            event_handler: Handler for the event, which should take at least
382                one argument - the event json object.
383            event_name: Name of the event to be handled.
384            user_args: User arguments for the handler; to be passed in after
385                the event json.
386            event_timeout: Number of seconds to wait for the event to come.
387            cond: A condition to wait on before executing the handler. Should
388                be a threading.Event object.
389            cond_timeout: Number of seconds to wait before the condition times
390                out. Never times out if None.
391
392        Returns:
393            worker: A concurrent.Future object associated with the handler.
394                If blocking call worker.result() is triggered, the handler
395                needs to return something to unblock.
396        """
397        worker = self._executor.submit(self._handle, event_handler, event_name,
398                                       user_args, event_timeout, cond,
399                                       cond_timeout)
400        return worker
401
402    def pop_all(self, event_name):
403        """Return and remove all stored events of a specified name.
404
405        Pops all events from their queue. May miss the latest ones.
406        If no event is available, return immediately.
407
408        Args:
409            event_name: Name of the events to be popped.
410
411        Returns:
412           results: List of the desired events.
413
414        Raises:
415            IllegalStateError: Raised if pop is called before the dispatcher
416                starts polling.
417        """
418        if not self._started:
419            raise IllegalStateError(("Dispatcher needs to be started before "
420                                     "popping."))
421        results = []
422        try:
423            self._lock.acquire()
424            while True:
425                e = self._event_dict[event_name].get(block=False)
426                results.append(e)
427        except (queue.Empty, KeyError):
428            return results
429        finally:
430            self._lock.release()
431
432    def clear_events(self, event_name):
433        """Clear all events of a particular name.
434
435        Args:
436            event_name: Name of the events to be popped.
437        """
438        self._lock.acquire()
439        try:
440            q = self.get_event_q(event_name)
441            q.queue.clear()
442        except queue.Empty:
443            return
444        finally:
445            self._lock.release()
446
447    def clear_all_events(self):
448        """Clear all event queues and their cached events."""
449        self._lock.acquire()
450        self._event_dict.clear()
451        self._lock.release()
452