• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2#   Copyright 2016- The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from concurrent.futures import ThreadPoolExecutor
17import queue
18import re
19import socket
20import threading
21import time
22import traceback
23
24
25class EventDispatcherError(Exception):
26    pass
27
28
29class IllegalStateError(EventDispatcherError):
30    """Raise when user tries to put event_dispatcher into an illegal state.
31    """
32
33
34class DuplicateError(EventDispatcherError):
35    """Raise when a duplicate is being created and it shouldn't.
36    """
37
38
39class EventDispatcher:
40    """Class managing events for an sl4a connection.
41    """
42
43    DEFAULT_TIMEOUT = 60
44
45    def __init__(self, droid):
46        self.droid = droid
47        self.started = False
48        self.executor = None
49        self.poller = None
50        self.event_dict = {}
51        self.handlers = {}
52        self.lock = threading.RLock()
53
54    def poll_events(self):
55        """Continuously polls all types of events from sl4a.
56
57        Events are sorted by name and store in separate queues.
58        If there are registered handlers, the handlers will be called with
59        corresponding event immediately upon event discovery, and the event
60        won't be stored. If exceptions occur, stop the dispatcher and return
61        """
62        while self.started:
63            event_obj = None
64            event_name = None
65            try:
66                event_obj = self.droid.eventWait(50000)
67            except:
68                if self.started:
69                    print("Exception happened during polling.")
70                    print(traceback.format_exc())
71                    raise
72            if not event_obj:
73                continue
74            elif 'name' not in event_obj:
75                print("Received Malformed event {}".format(event_obj))
76                continue
77            else:
78                event_name = event_obj['name']
79            # if handler registered, process event
80            if event_name in self.handlers:
81                self.handle_subscribed_event(event_obj, event_name)
82            if event_name == "EventDispatcherShutdown":
83                self.droid.closeSl4aSession()
84                break
85            else:
86                self.lock.acquire()
87                if event_name in self.event_dict:  # otherwise, cache event
88                    self.event_dict[event_name].put(event_obj)
89                else:
90                    q = queue.Queue()
91                    q.put(event_obj)
92                    self.event_dict[event_name] = q
93                self.lock.release()
94
95    def register_handler(self, handler, event_name, args):
96        """Registers an event handler.
97
98        One type of event can only have one event handler associated with it.
99
100        Args:
101            handler: The event handler function to be registered.
102            event_name: Name of the event the handler is for.
103            args: User arguments to be passed to the handler when it's called.
104
105        Raises:
106            IllegalStateError: Raised if attempts to register a handler after
107                the dispatcher starts running.
108            DuplicateError: Raised if attempts to register more than one
109                handler for one type of event.
110        """
111        if self.started:
112            raise IllegalStateError(("Can't register service after polling is"
113                                     " started"))
114        self.lock.acquire()
115        try:
116            if event_name in self.handlers:
117                raise DuplicateError('A handler for {} already exists'.format(
118                    event_name))
119            self.handlers[event_name] = (handler, args)
120        finally:
121            self.lock.release()
122
123    def start(self):
124        """Starts the event dispatcher.
125
126        Initiates executor and start polling events.
127
128        Raises:
129            IllegalStateError: Can't start a dispatcher again when it's already
130                running.
131        """
132        if not self.started:
133            self.started = True
134            self.executor = ThreadPoolExecutor(max_workers=32)
135            self.poller = self.executor.submit(self.poll_events)
136        else:
137            raise IllegalStateError("Dispatcher is already started.")
138
139    def clean_up(self):
140        """Clean up and release resources after the event dispatcher polling
141        loop has been broken.
142
143        The following things happen:
144        1. Clear all events and flags.
145        2. Close the sl4a client the event_dispatcher object holds.
146        3. Shut down executor without waiting.
147        """
148        uid = self.droid.uid
149        if not self.started:
150            return
151        self.started = False
152        self.clear_all_events()
153        self.droid.close()
154        self.poller.set_result("Done")
155        # The polling thread is guaranteed to finish after a max of 60 seconds,
156        # so we don't wait here.
157        self.executor.shutdown(wait=False)
158
159    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
160        """Pop an event from its queue.
161
162        Return and remove the oldest entry of an event.
163        Block until an event of specified name is available or
164        times out if timeout is set.
165
166        Args:
167            event_name: Name of the event to be popped.
168            timeout: Number of seconds to wait when event is not present.
169                Never times out if None.
170
171        Returns:
172            event: The oldest entry of the specified event. None if timed out.
173
174        Raises:
175            IllegalStateError: Raised if pop is called before the dispatcher
176                starts polling.
177        """
178        if not self.started:
179            raise IllegalStateError(
180                "Dispatcher needs to be started before popping.")
181
182        e_queue = self.get_event_q(event_name)
183
184        if not e_queue:
185            raise TypeError("Failed to get an event queue for {}".format(
186                event_name))
187
188        try:
189            # Block for timeout
190            if timeout:
191                return e_queue.get(True, timeout)
192            # Non-blocking poll for event
193            elif timeout == 0:
194                return e_queue.get(False)
195            else:
196                # Block forever on event wait
197                return e_queue.get(True)
198        except queue.Empty:
199            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
200                timeout, event_name))
201
202    def wait_for_event(self,
203                       event_name,
204                       predicate,
205                       timeout=DEFAULT_TIMEOUT,
206                       *args,
207                       **kwargs):
208        """Wait for an event that satisfies a predicate to appear.
209
210        Continuously pop events of a particular name and check against the
211        predicate until an event that satisfies the predicate is popped or
212        timed out. Note this will remove all the events of the same name that
213        do not satisfy the predicate in the process.
214
215        Args:
216            event_name: Name of the event to be popped.
217            predicate: A function that takes an event and returns True if the
218                predicate is satisfied, False otherwise.
219            timeout: Number of seconds to wait.
220            *args: Optional positional args passed to predicate().
221            **kwargs: Optional keyword args passed to predicate().
222
223        Returns:
224            The event that satisfies the predicate.
225
226        Raises:
227            queue.Empty: Raised if no event that satisfies the predicate was
228                found before time out.
229        """
230        deadline = time.time() + timeout
231
232        while True:
233            event = None
234            try:
235                event = self.pop_event(event_name, 1)
236            except queue.Empty:
237                pass
238
239            if event and predicate(event, *args, **kwargs):
240                return event
241
242            if time.time() > deadline:
243                raise queue.Empty(
244                    'Timeout after {}s waiting for event: {}'.format(
245                        timeout, event_name))
246
247    def pop_events(self, regex_pattern, timeout):
248        """Pop events whose names match a regex pattern.
249
250        If such event(s) exist, pop one event from each event queue that
251        satisfies the condition. Otherwise, wait for an event that satisfies
252        the condition to occur, with timeout.
253
254        Results are sorted by timestamp in ascending order.
255
256        Args:
257            regex_pattern: The regular expression pattern that an event name
258                should match in order to be popped.
259            timeout: Number of seconds to wait for events in case no event
260                matching the condition exits when the function is called.
261
262        Returns:
263            results: Pop events whose names match a regex pattern.
264                Empty if none exist and the wait timed out.
265
266        Raises:
267            IllegalStateError: Raised if pop is called before the dispatcher
268                starts polling.
269            queue.Empty: Raised if no event was found before time out.
270        """
271        if not self.started:
272            raise IllegalStateError(
273                "Dispatcher needs to be started before popping.")
274        deadline = time.time() + timeout
275        while True:
276            #TODO: fix the sleep loop
277            results = self._match_and_pop(regex_pattern)
278            if len(results) != 0 or time.time() > deadline:
279                break
280            time.sleep(1)
281        if len(results) == 0:
282            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
283                timeout, regex_pattern))
284
285        return sorted(results, key=lambda event: event['time'])
286
287    def _match_and_pop(self, regex_pattern):
288        """Pop one event from each of the event queues whose names
289        match (in a sense of regular expression) regex_pattern.
290        """
291        results = []
292        self.lock.acquire()
293        for name in self.event_dict.keys():
294            if re.match(regex_pattern, name):
295                q = self.event_dict[name]
296                if q:
297                    try:
298                        results.append(q.get(False))
299                    except:
300                        pass
301        self.lock.release()
302        return results
303
304    def get_event_q(self, event_name):
305        """Obtain the queue storing events of the specified name.
306
307        If no event of this name has been polled, wait for one to.
308
309        Returns:
310            queue: A queue storing all the events of the specified name.
311                None if timed out.
312            timeout: Number of seconds to wait for the operation.
313
314        Raises:
315            queue.Empty: Raised if the queue does not exist and timeout has
316                passed.
317        """
318        self.lock.acquire()
319        if not event_name in self.event_dict or self.event_dict[
320                event_name] is None:
321            self.event_dict[event_name] = queue.Queue()
322        self.lock.release()
323
324        event_queue = self.event_dict[event_name]
325        return event_queue
326
327    def handle_subscribed_event(self, event_obj, event_name):
328        """Execute the registered handler of an event.
329
330        Retrieve the handler and its arguments, and execute the handler in a
331            new thread.
332
333        Args:
334            event_obj: Json object of the event.
335            event_name: Name of the event to call handler for.
336        """
337        handler, args = self.handlers[event_name]
338        self.executor.submit(handler, event_obj, *args)
339
340    def _handle(self, event_handler, event_name, user_args, event_timeout,
341                cond, cond_timeout):
342        """Pop an event of specified type and calls its handler on it. If
343        condition is not None, block until condition is met or timeout.
344        """
345        if cond:
346            cond.wait(cond_timeout)
347        event = self.pop_event(event_name, event_timeout)
348        return event_handler(event, *user_args)
349
350    def handle_event(self,
351                     event_handler,
352                     event_name,
353                     user_args,
354                     event_timeout=None,
355                     cond=None,
356                     cond_timeout=None):
357        """Handle events that don't have registered handlers
358
359        In a new thread, poll one event of specified type from its queue and
360        execute its handler. If no such event exists, the thread waits until
361        one appears.
362
363        Args:
364            event_handler: Handler for the event, which should take at least
365                one argument - the event json object.
366            event_name: Name of the event to be handled.
367            user_args: User arguments for the handler; to be passed in after
368                the event json.
369            event_timeout: Number of seconds to wait for the event to come.
370            cond: A condition to wait on before executing the handler. Should
371                be a threading.Event object.
372            cond_timeout: Number of seconds to wait before the condition times
373                out. Never times out if None.
374
375        Returns:
376            worker: A concurrent.Future object associated with the handler.
377                If blocking call worker.result() is triggered, the handler
378                needs to return something to unblock.
379        """
380        worker = self.executor.submit(self._handle, event_handler, event_name,
381                                      user_args, event_timeout, cond,
382                                      cond_timeout)
383        return worker
384
385    def pop_all(self, event_name):
386        """Return and remove all stored events of a specified name.
387
388        Pops all events from their queue. May miss the latest ones.
389        If no event is available, return immediately.
390
391        Args:
392            event_name: Name of the events to be popped.
393
394        Returns:
395           results: List of the desired events.
396
397        Raises:
398            IllegalStateError: Raised if pop is called before the dispatcher
399                starts polling.
400        """
401        if not self.started:
402            raise IllegalStateError(("Dispatcher needs to be started before "
403                                     "popping."))
404        results = []
405        try:
406            self.lock.acquire()
407            while True:
408                e = self.event_dict[event_name].get(block=False)
409                results.append(e)
410        except (queue.Empty, KeyError):
411            return results
412        finally:
413            self.lock.release()
414
415    def clear_events(self, event_name):
416        """Clear all events of a particular name.
417
418        Args:
419            event_name: Name of the events to be popped.
420        """
421        self.lock.acquire()
422        try:
423            q = self.get_event_q(event_name)
424            q.queue.clear()
425        except queue.Empty:
426            return
427        finally:
428            self.lock.release()
429
430    def clear_all_events(self):
431        """Clear all event queues and their cached events."""
432        self.lock.acquire()
433        self.event_dict.clear()
434        self.lock.release()
435