• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import bisect
17import logging
18import inspect
19from threading import RLock
20
21from acts.event.event_subscription import EventSubscription
22from acts.event.subscription_handle import SubscriptionHandle
23
24
25class _EventBus(object):
26    """
27    Attributes:
28        _subscriptions: A dictionary of {EventType: list<EventSubscription>}.
29        _registration_id_map: A dictionary of
30                             {RegistrationID: EventSubscription}
31        _subscription_lock: The lock to prevent concurrent removal or addition
32                            to events.
33    """
34
35    def __init__(self):
36        self._subscriptions = {}
37        self._registration_id_map = {}
38        self._subscription_lock = RLock()
39
40    def register(self, event_type, func, filter_fn=None, order=0):
41        """Subscribes the given function to the event type given.
42
43        Args:
44            event_type: The type of the event to subscribe to.
45            func: The function to call when the event is posted.
46            filter_fn: An option function to be called before calling the
47                       subscribed func. If this function returns falsy, then the
48                       function will not be invoked.
49            order: The order the the subscription should run in. Lower values
50                   run first, with the default value set to 0. In the case of a
51                   tie between two subscriptions of the same event type, the
52                   subscriber added first executes first. In the case of a tie
53                   between two subscribers of a different type, the type of the
54                   subscription that is more specific goes first (i.e.
55                   BaseEventType will execute after ChildEventType if they share
56                   the same order).
57
58        Returns:
59            A registration ID.
60        """
61        subscription = EventSubscription(event_type, func,
62                                         event_filter=filter_fn,
63                                         order=order)
64        return self.register_subscription(subscription)
65
66    def register_subscriptions(self, subscriptions):
67        """Registers all subscriptions to the event bus.
68
69        Args:
70            subscriptions: an iterable that returns EventSubscriptions
71
72        Returns:
73            The list of registration IDs.
74        """
75        registration_ids = []
76        for subscription in subscriptions:
77            registration_ids.append(self.register_subscription(subscription))
78
79        return registration_ids
80
81    def register_subscription(self, subscription):
82        """Registers the given subscription to the event bus.
83
84        Args:
85            subscription: An EventSubscription object
86
87        Returns:
88            A registration ID.
89        """
90        with self._subscription_lock:
91            if subscription.event_type in self._subscriptions.keys():
92                subscription_list = self._subscriptions[subscription.event_type]
93                subscription_list.append(subscription)
94                subscription_list.sort(key=lambda x: x.order)
95            else:
96                subscription_list = list()
97                bisect.insort(subscription_list, subscription)
98                self._subscriptions[subscription.event_type] = subscription_list
99
100            registration_id = id(subscription)
101            self._registration_id_map[registration_id] = subscription
102
103        return registration_id
104
105    def post(self, event, ignore_errors=False):
106        """Posts an event to its subscribers.
107
108        Args:
109            event: The event object to send to the subscribers.
110            ignore_errors: Deliver to all subscribers, ignoring any errors.
111        """
112        listening_subscriptions = []
113        for current_type in inspect.getmro(type(event)):
114            if current_type not in self._subscriptions.keys():
115                continue
116            for subscription in self._subscriptions[current_type]:
117                listening_subscriptions.append(subscription)
118
119        # The subscriptions will be collected in sorted runs of sorted order.
120        # Running timsort here is the optimal way to sort this list.
121        listening_subscriptions.sort(key=lambda x: x.order)
122        for subscription in listening_subscriptions:
123            try:
124                subscription.deliver(event)
125            except Exception:
126                if ignore_errors:
127                    logging.exception('An exception occurred while handling '
128                                      'an event.')
129                    continue
130                raise
131
132    def unregister(self, registration_id):
133        """Unregisters an EventSubscription.
134
135        Args:
136            registration_id: the Subscription or registration_id to unsubscribe.
137        """
138        if type(registration_id) is SubscriptionHandle:
139            subscription = registration_id.subscription
140            registration_id = id(registration_id.subscription)
141        elif type(registration_id) is EventSubscription:
142            subscription = registration_id
143            registration_id = id(registration_id)
144        elif registration_id in self._registration_id_map.keys():
145            subscription = self._registration_id_map[registration_id]
146        elif type(registration_id) is not int:
147            raise ValueError(
148                'Subscription ID "%s" is not a valid ID. This value'
149                'must be an integer ID returned from subscribe().'
150                % registration_id)
151        else:
152            # The value is a "valid" id, but is not subscribed. It's possible
153            # another thread has unsubscribed this value.
154            logging.warning('Attempted to unsubscribe %s, but the matching '
155                            'subscription cannot be found.' % registration_id)
156            return False
157
158        event_type = subscription.event_type
159        with self._subscription_lock:
160            self._registration_id_map.pop(registration_id, None)
161            if (event_type in self._subscriptions and
162                    subscription in self._subscriptions[event_type]):
163                self._subscriptions[event_type].remove(subscription)
164        return True
165
166    def unregister_all(self, from_list=None, from_event=None):
167        """Removes all event subscriptions.
168
169        Args:
170            from_list: Unregisters all events from a given list.
171            from_event: Unregisters all events of a given event type.
172        """
173        if from_list is None:
174            from_list = list(self._registration_id_map.values())
175
176        for subscription in from_list:
177            if from_event is None or subscription.event_type == from_event:
178                self.unregister(subscription)
179
180
181_event_bus = _EventBus()
182
183
184def register(event_type, func, filter_fn=None, order=0):
185    """Subscribes the given function to the event type given.
186
187    Args:
188        event_type: The type of the event to subscribe to.
189        func: The function to call when the event is posted.
190        filter_fn: An option function to be called before calling the subscribed
191                   func. If this function returns falsy, then the function will
192                   not be invoked.
193        order: The order the the subscription should run in. Lower values run
194               first, with the default value set to 0. In the case of a tie
195               between two subscriptions of the same event type, the
196               subscriber added first executes first. In the case of a tie
197               between two subscribers of a different type, the type of the
198               subscription that is more specific goes first (i.e. BaseEventType
199               will execute after ChildEventType if they share the same order).
200
201    Returns:
202        A registration ID.
203    """
204    return _event_bus.register(event_type, func, filter_fn=filter_fn,
205                               order=order)
206
207
208def register_subscriptions(subscriptions):
209    """Registers all subscriptions to the event bus.
210
211    Args:
212        subscriptions: an iterable that returns EventSubscriptions
213
214    Returns:
215        The list of registration IDs.
216    """
217    return _event_bus.register_subscriptions(subscriptions)
218
219
220def register_subscription(subscription):
221    """Registers the given subscription to the event bus.
222
223    Args:
224        subscription: An EventSubscription object
225
226    Returns:
227        A registration ID.
228    """
229    return _event_bus.register_subscription(subscription)
230
231
232def post(event, ignore_errors=False):
233    """Posts an event to its subscribers.
234
235    Args:
236        event: The event object to send to the subscribers.
237        ignore_errors: Deliver to all subscribers, ignoring any errors.
238    """
239    _event_bus.post(event, ignore_errors)
240
241
242def unregister(registration_id):
243    """Unregisters an EventSubscription.
244
245    Args:
246        registration_id: the Subscription or registration_id to unsubscribe.
247    """
248    # null check for the corner case where the _event_bus is destroyed before
249    # the subscribers unregister. In such case there is nothing else to
250    # be done.
251    if _event_bus is None:
252        return True
253    return _event_bus.unregister(registration_id)
254
255
256def unregister_all(from_list=None, from_event=None):
257    """Removes all event subscriptions.
258
259    Args:
260        from_list: Unregisters all events from a given list.
261        from_event: Unregisters all events of a given event type.
262    """
263    return _event_bus.unregister_all(from_list=from_list, from_event=from_event)
264
265
266class listen_for(object):
267    """A context-manager class (with statement) for listening to an event within
268    a given section of code.
269
270    Usage:
271
272    with listen_for(EventType, event_listener):
273        func_that_posts_event()  # Will call event_listener
274
275    func_that_posts_event()  # Will not call event_listener
276
277    """
278
279    def __init__(self, event_type, func, filter_fn=None, order=0):
280        self.event_type = event_type
281        self.func = func
282        self.filter_fn = filter_fn
283        self.order = order
284        self.registration_id = None
285
286    def __enter__(self):
287        self.registration_id = _event_bus.register(self.event_type, self.func,
288                                                   filter_fn=self.filter_fn,
289                                                   order=self.order)
290
291    def __exit__(self, *unused):
292        _event_bus.unregister(self.registration_id)
293