#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import bisect import logging import inspect from threading import RLock from acts.event.event_subscription import EventSubscription from acts.event.subscription_handle import SubscriptionHandle class _EventBus(object): """ Attributes: _subscriptions: A dictionary of {EventType: list}. _registration_id_map: A dictionary of {RegistrationID: EventSubscription} _subscription_lock: The lock to prevent concurrent removal or addition to events. """ def __init__(self): self._subscriptions = {} self._registration_id_map = {} self._subscription_lock = RLock() def register(self, event_type, func, filter_fn=None, order=0): """Subscribes the given function to the event type given. Args: event_type: The type of the event to subscribe to. func: The function to call when the event is posted. filter_fn: An option function to be called before calling the subscribed func. If this function returns falsy, then the function will not be invoked. order: The order the the subscription should run in. Lower values run first, with the default value set to 0. In the case of a tie between two subscriptions of the same event type, the subscriber added first executes first. In the case of a tie between two subscribers of a different type, the type of the subscription that is more specific goes first (i.e. BaseEventType will execute after ChildEventType if they share the same order). Returns: A registration ID. """ subscription = EventSubscription(event_type, func, event_filter=filter_fn, order=order) return self.register_subscription(subscription) def register_subscriptions(self, subscriptions): """Registers all subscriptions to the event bus. Args: subscriptions: an iterable that returns EventSubscriptions Returns: The list of registration IDs. """ registration_ids = [] for subscription in subscriptions: registration_ids.append(self.register_subscription(subscription)) return registration_ids def register_subscription(self, subscription): """Registers the given subscription to the event bus. Args: subscription: An EventSubscription object Returns: A registration ID. """ with self._subscription_lock: if subscription.event_type in self._subscriptions.keys(): subscription_list = self._subscriptions[subscription.event_type] subscription_list.append(subscription) subscription_list.sort(key=lambda x: x.order) else: subscription_list = list() bisect.insort(subscription_list, subscription) self._subscriptions[subscription.event_type] = subscription_list registration_id = id(subscription) self._registration_id_map[registration_id] = subscription return registration_id def post(self, event, ignore_errors=False): """Posts an event to its subscribers. Args: event: The event object to send to the subscribers. ignore_errors: Deliver to all subscribers, ignoring any errors. """ listening_subscriptions = [] for current_type in inspect.getmro(type(event)): if current_type not in self._subscriptions.keys(): continue for subscription in self._subscriptions[current_type]: listening_subscriptions.append(subscription) # The subscriptions will be collected in sorted runs of sorted order. # Running timsort here is the optimal way to sort this list. listening_subscriptions.sort(key=lambda x: x.order) for subscription in listening_subscriptions: try: subscription.deliver(event) except Exception: if ignore_errors: logging.exception('An exception occurred while handling ' 'an event.') continue raise def unregister(self, registration_id): """Unregisters an EventSubscription. Args: registration_id: the Subscription or registration_id to unsubscribe. """ if type(registration_id) is SubscriptionHandle: subscription = registration_id.subscription registration_id = id(registration_id.subscription) elif type(registration_id) is EventSubscription: subscription = registration_id registration_id = id(registration_id) elif registration_id in self._registration_id_map.keys(): subscription = self._registration_id_map[registration_id] elif type(registration_id) is not int: raise ValueError( 'Subscription ID "%s" is not a valid ID. This value' 'must be an integer ID returned from subscribe().' % registration_id) else: # The value is a "valid" id, but is not subscribed. It's possible # another thread has unsubscribed this value. logging.warning('Attempted to unsubscribe %s, but the matching ' 'subscription cannot be found.' % registration_id) return False event_type = subscription.event_type with self._subscription_lock: self._registration_id_map.pop(registration_id, None) if (event_type in self._subscriptions and subscription in self._subscriptions[event_type]): self._subscriptions[event_type].remove(subscription) return True def unregister_all(self, from_list=None, from_event=None): """Removes all event subscriptions. Args: from_list: Unregisters all events from a given list. from_event: Unregisters all events of a given event type. """ if from_list is None: from_list = list(self._registration_id_map.values()) for subscription in from_list: if from_event is None or subscription.event_type == from_event: self.unregister(subscription) _event_bus = _EventBus() def register(event_type, func, filter_fn=None, order=0): """Subscribes the given function to the event type given. Args: event_type: The type of the event to subscribe to. func: The function to call when the event is posted. filter_fn: An option function to be called before calling the subscribed func. If this function returns falsy, then the function will not be invoked. order: The order the the subscription should run in. Lower values run first, with the default value set to 0. In the case of a tie between two subscriptions of the same event type, the subscriber added first executes first. In the case of a tie between two subscribers of a different type, the type of the subscription that is more specific goes first (i.e. BaseEventType will execute after ChildEventType if they share the same order). Returns: A registration ID. """ return _event_bus.register(event_type, func, filter_fn=filter_fn, order=order) def register_subscriptions(subscriptions): """Registers all subscriptions to the event bus. Args: subscriptions: an iterable that returns EventSubscriptions Returns: The list of registration IDs. """ return _event_bus.register_subscriptions(subscriptions) def register_subscription(subscription): """Registers the given subscription to the event bus. Args: subscription: An EventSubscription object Returns: A registration ID. """ return _event_bus.register_subscription(subscription) def post(event, ignore_errors=False): """Posts an event to its subscribers. Args: event: The event object to send to the subscribers. ignore_errors: Deliver to all subscribers, ignoring any errors. """ _event_bus.post(event, ignore_errors) def unregister(registration_id): """Unregisters an EventSubscription. Args: registration_id: the Subscription or registration_id to unsubscribe. """ # null check for the corner case where the _event_bus is destroyed before # the subscribers unregister. In such case there is nothing else to # be done. if _event_bus is None: return True return _event_bus.unregister(registration_id) def unregister_all(from_list=None, from_event=None): """Removes all event subscriptions. Args: from_list: Unregisters all events from a given list. from_event: Unregisters all events of a given event type. """ return _event_bus.unregister_all(from_list=from_list, from_event=from_event) class listen_for(object): """A context-manager class (with statement) for listening to an event within a given section of code. Usage: with listen_for(EventType, event_listener): func_that_posts_event() # Will call event_listener func_that_posts_event() # Will not call event_listener """ def __init__(self, event_type, func, filter_fn=None, order=0): self.event_type = event_type self.func = func self.filter_fn = filter_fn self.order = order self.registration_id = None def __enter__(self): self.registration_id = _event_bus.register(self.event_type, self.func, filter_fn=self.filter_fn, order=self.order) def __exit__(self, *unused): _event_bus.unregister(self.registration_id)