• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18import queue
19import re
20import socket
21import threading
22import time
23import traceback
24
25class EventDispatcherError(Exception):
26    pass
27
28class IllegalStateError(EventDispatcherError):
29    """Raise when user tries to put event_dispatcher into an illegal state.
30    """
31
32class DuplicateError(EventDispatcherError):
33    """Raise when a duplicate is being created and it shouldn't.
34    """
35
36class EventDispatcher:
37    """Class managing events for an sl4a connection.
38    """
39
40    DEFAULT_TIMEOUT = 60
41
42    def __init__(self, droid):
43        self.droid = droid
44        self.started = False
45        self.executor = None
46        self.poller = None
47        self.event_dict = {}
48        self.handlers = {}
49        self.lock = threading.RLock()
50
51    def poll_events(self):
52        """Continuously polls all types of events from sl4a.
53
54        Events are sorted by name and store in separate queues.
55        If there are registered handlers, the handlers will be called with
56        corresponding event immediately upon event discovery, and the event
57        won't be stored. If exceptions occur, stop the dispatcher and return
58        """
59        while self.started:
60            event_obj = None
61            event_name = None
62            try:
63                event_obj = self.droid.eventWait(50000)
64            except:
65                if self.started:
66                    print("Exception happened during polling.")
67                    print(traceback.format_exc())
68                    raise
69            if not event_obj:
70                continue
71            elif 'name' not in event_obj:
72                print("Received Malformed event {}".format(event_obj))
73                continue
74            else:
75                event_name = event_obj['name']
76            # if handler registered, process event
77            if event_name in self.handlers:
78                self.handle_subscribed_event(event_obj, event_name)
79            if event_name == "EventDispatcherShutdown":
80                self.droid.closeSl4aSession()
81                break
82            else:
83                self.lock.acquire()
84                if event_name in self.event_dict:  # otherwise, cache event
85                    self.event_dict[event_name].put(event_obj)
86                else:
87                    q = queue.Queue()
88                    q.put(event_obj)
89                    self.event_dict[event_name] = q
90                self.lock.release()
91
92    def register_handler(self, handler, event_name, args):
93        """Registers an event handler.
94
95        One type of event can only have one event handler associated with it.
96
97        Args:
98            handler: The event handler function to be registered.
99            event_name: Name of the event the handler is for.
100            args: User arguments to be passed to the handler when it's called.
101
102        Raises:
103            IllegalStateError: Raised if attempts to register a handler after
104                the dispatcher starts running.
105            DuplicateError: Raised if attempts to register more than one
106                handler for one type of event.
107        """
108        if self.started:
109            raise IllegalStateError(("Can't register service after polling is"
110                " started"))
111        self.lock.acquire()
112        try:
113            if event_name in self.handlers:
114                raise DuplicateError(
115                    'A handler for {} already exists'.format(event_name))
116            self.handlers[event_name] = (handler, args)
117        finally:
118            self.lock.release()
119
120    def start(self):
121        """Starts the event dispatcher.
122
123        Initiates executor and start polling events.
124
125        Raises:
126            IllegalStateError: Can't start a dispatcher again when it's already
127                running.
128        """
129        if not self.started:
130            self.started = True
131            self.executor = ThreadPoolExecutor(max_workers=32)
132            self.poller = self.executor.submit(self.poll_events)
133        else:
134            raise IllegalStateError("Dispatcher is already started.")
135
136    def clean_up(self):
137        """Clean up and release resources after the event dispatcher polling
138        loop has been broken.
139
140        The following things happen:
141        1. Clear all events and flags.
142        2. Close the sl4a client the event_dispatcher object holds.
143        3. Shut down executor without waiting.
144        """
145        uid = self.droid.uid
146        if not self.started:
147            return
148        self.started = False
149        self.clear_all_events()
150        self.droid.close()
151        self.poller.set_result("Done")
152        # The polling thread is guaranteed to finish after a max of 60 seconds,
153        # so we don't wait here.
154        self.executor.shutdown(wait=False)
155
156    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
157        """Pop an event from its queue.
158
159        Return and remove the oldest entry of an event.
160        Block until an event of specified name is available or
161        times out if timeout is set.
162
163        Args:
164            event_name: Name of the event to be popped.
165            timeout: Number of seconds to wait when event is not present.
166                Never times out if None.
167
168        Returns:
169            event: The oldest entry of the specified event. None if timed out.
170
171        Raises:
172            IllegalStateError: Raised if pop is called before the dispatcher
173                starts polling.
174        """
175        if not self.started:
176            raise IllegalStateError(
177                "Dispatcher needs to be started before popping.")
178
179        e_queue = self.get_event_q(event_name)
180
181        if not e_queue:
182            raise TypeError(
183                "Failed to get an event queue for {}".format(event_name))
184
185        try:
186            # Block for timeout
187            if timeout:
188                return e_queue.get(True, timeout)
189            # Non-blocking poll for event
190            elif timeout == 0:
191                return e_queue.get(False)
192            else:
193            # Block forever on event wait
194                return e_queue.get(True)
195        except queue.Empty:
196            raise queue.Empty(
197                'Timeout after {}s waiting for event: {}'.format(
198                    timeout, event_name))
199
200    def wait_for_event(self, event_name, predicate,
201                       timeout=DEFAULT_TIMEOUT, *args, **kwargs):
202        """Wait for an event that satisfies a predicate to appear.
203
204        Continuously pop events of a particular name and check against the
205        predicate until an event that satisfies the predicate is popped or
206        timed out. Note this will remove all the events of the same name that
207        do not satisfy the predicate in the process.
208
209        Args:
210            event_name: Name of the event to be popped.
211            predicate: A function that takes an event and returns True if the
212                predicate is satisfied, False otherwise.
213            timeout: Number of seconds to wait.
214            *args: Optional positional args passed to predicate().
215            **kwargs: Optional keyword args passed to predicate().
216
217        Returns:
218            The event that satisfies the predicate.
219
220        Raises:
221            queue.Empty: Raised if no event that satisfies the predicate was
222                found before time out.
223        """
224        deadline = time.time() + timeout
225
226        while True:
227            event = None
228            try:
229                event = self.pop_event(event_name, 1)
230            except queue.Empty:
231                pass
232
233            if event and predicate(event, *args, **kwargs):
234                return event
235
236            if time.time() > deadline:
237                raise queue.Empty(
238                    'Timeout after {}s waiting for event: {}'.format(
239                        timeout, event_name))
240
241    def pop_events(self, regex_pattern, timeout):
242        """Pop events whose names match a regex pattern.
243
244        If such event(s) exist, pop one event from each event queue that
245        satisfies the condition. Otherwise, wait for an event that satisfies
246        the condition to occur, with timeout.
247
248        Results are sorted by timestamp in ascending order.
249
250        Args:
251            regex_pattern: The regular expression pattern that an event name
252                should match in order to be popped.
253            timeout: Number of seconds to wait for events in case no event
254                matching the condition exits when the function is called.
255
256        Returns:
257            results: Pop events whose names match a regex pattern.
258                Empty if none exist and the wait timed out.
259
260        Raises:
261            IllegalStateError: Raised if pop is called before the dispatcher
262                starts polling.
263            queue.Empty: Raised if no event was found before time out.
264        """
265        if not self.started:
266            raise IllegalStateError(
267                "Dispatcher needs to be started before popping.")
268        deadline = time.time() + timeout
269        while True:
270            #TODO: fix the sleep loop
271            results = self._match_and_pop(regex_pattern)
272            if len(results) != 0 or time.time() > deadline:
273                break
274            time.sleep(1)
275        if len(results) == 0:
276            raise queue.Empty(
277                'Timeout after {}s waiting for event: {}'.format(
278                    timeout, regex_pattern))
279
280        return sorted(results, key=lambda event : event['time'])
281
282    def _match_and_pop(self, regex_pattern):
283        """Pop one event from each of the event queues whose names
284        match (in a sense of regular expression) regex_pattern.
285        """
286        results = []
287        self.lock.acquire()
288        for name in self.event_dict.keys():
289            if re.match(regex_pattern, name):
290                q = self.event_dict[name]
291                if q:
292                    try:
293                        results.append(q.get(False))
294                    except:
295                        pass
296        self.lock.release()
297        return results
298
299    def get_event_q(self, event_name):
300        """Obtain the queue storing events of the specified name.
301
302        If no event of this name has been polled, wait for one to.
303
304        Returns:
305            queue: A queue storing all the events of the specified name.
306                None if timed out.
307            timeout: Number of seconds to wait for the operation.
308
309        Raises:
310            queue.Empty: Raised if the queue does not exist and timeout has
311                passed.
312        """
313        self.lock.acquire()
314        if not event_name in self.event_dict or self.event_dict[event_name] is None:
315            self.event_dict[event_name] = queue.Queue()
316        self.lock.release()
317
318        event_queue = self.event_dict[event_name]
319        return event_queue
320
321    def handle_subscribed_event(self, event_obj, event_name):
322        """Execute the registered handler of an event.
323
324        Retrieve the handler and its arguments, and execute the handler in a
325            new thread.
326
327        Args:
328            event_obj: Json object of the event.
329            event_name: Name of the event to call handler for.
330        """
331        handler, args = self.handlers[event_name]
332        self.executor.submit(handler, event_obj, *args)
333
334
335    def _handle(self, event_handler, event_name, user_args, event_timeout,
336        cond, cond_timeout):
337        """Pop an event of specified type and calls its handler on it. If
338        condition is not None, block until condition is met or timeout.
339        """
340        if cond:
341            cond.wait(cond_timeout)
342        event = self.pop_event(event_name, event_timeout)
343        return event_handler(event, *user_args)
344
345    def handle_event(self, event_handler, event_name, user_args,
346        event_timeout=None, cond=None, cond_timeout=None):
347        """Handle events that don't have registered handlers
348
349        In a new thread, poll one event of specified type from its queue and
350        execute its handler. If no such event exists, the thread waits until
351        one appears.
352
353        Args:
354            event_handler: Handler for the event, which should take at least
355                one argument - the event json object.
356            event_name: Name of the event to be handled.
357            user_args: User arguments for the handler; to be passed in after
358                the event json.
359            event_timeout: Number of seconds to wait for the event to come.
360            cond: A condition to wait on before executing the handler. Should
361                be a threading.Event object.
362            cond_timeout: Number of seconds to wait before the condition times
363                out. Never times out if None.
364
365        Returns:
366            worker: A concurrent.Future object associated with the handler.
367                If blocking call worker.result() is triggered, the handler
368                needs to return something to unblock.
369        """
370        worker = self.executor.submit(self._handle, event_handler, event_name,
371            user_args, event_timeout, cond, cond_timeout)
372        return worker
373
374    def pop_all(self, event_name):
375        """Return and remove all stored events of a specified name.
376
377        Pops all events from their queue. May miss the latest ones.
378        If no event is available, return immediately.
379
380        Args:
381            event_name: Name of the events to be popped.
382
383        Returns:
384           results: List of the desired events.
385
386        Raises:
387            IllegalStateError: Raised if pop is called before the dispatcher
388                starts polling.
389        """
390        if not self.started:
391            raise IllegalStateError(("Dispatcher needs to be started before "
392                "popping."))
393        results = []
394        try:
395            self.lock.acquire()
396            while True:
397                e = self.event_dict[event_name].get(block=False)
398                results.append(e)
399        except (queue.Empty, KeyError):
400            return results
401        finally:
402            self.lock.release()
403
404    def clear_events(self, event_name):
405        """Clear all events of a particular name.
406
407        Args:
408            event_name: Name of the events to be popped.
409        """
410        self.lock.acquire()
411        try:
412            q = self.get_event_q(event_name)
413            q.queue.clear()
414        except queue.Empty:
415            return
416        finally:
417            self.lock.release()
418
419    def clear_all_events(self):
420        """Clear all event queues and their cached events."""
421        self.lock.acquire()
422        self.event_dict.clear()
423        self.lock.release()
424