1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import bisect 17import logging 18import inspect 19from threading import RLock 20 21from acts.event.event_subscription import EventSubscription 22from acts.event.subscription_handle import SubscriptionHandle 23 24 25class _EventBus(object): 26 """ 27 Attributes: 28 _subscriptions: A dictionary of {EventType: list<EventSubscription>}. 29 _registration_id_map: A dictionary of 30 {RegistrationID: EventSubscription} 31 _subscription_lock: The lock to prevent concurrent removal or addition 32 to events. 33 """ 34 35 def __init__(self): 36 self._subscriptions = {} 37 self._registration_id_map = {} 38 self._subscription_lock = RLock() 39 40 def register(self, event_type, func, filter_fn=None, order=0): 41 """Subscribes the given function to the event type given. 42 43 Args: 44 event_type: The type of the event to subscribe to. 45 func: The function to call when the event is posted. 46 filter_fn: An option function to be called before calling the 47 subscribed func. If this function returns falsy, then the 48 function will not be invoked. 49 order: The order the the subscription should run in. Lower values 50 run first, with the default value set to 0. In the case of a 51 tie between two subscriptions of the same event type, the 52 subscriber added first executes first. In the case of a tie 53 between two subscribers of a different type, the type of the 54 subscription that is more specific goes first (i.e. 55 BaseEventType will execute after ChildEventType if they share 56 the same order). 57 58 Returns: 59 A registration ID. 60 """ 61 subscription = EventSubscription(event_type, func, 62 event_filter=filter_fn, 63 order=order) 64 return self.register_subscription(subscription) 65 66 def register_subscriptions(self, subscriptions): 67 """Registers all subscriptions to the event bus. 68 69 Args: 70 subscriptions: an iterable that returns EventSubscriptions 71 72 Returns: 73 The list of registration IDs. 74 """ 75 registration_ids = [] 76 for subscription in subscriptions: 77 registration_ids.append(self.register_subscription(subscription)) 78 79 return registration_ids 80 81 def register_subscription(self, subscription): 82 """Registers the given subscription to the event bus. 83 84 Args: 85 subscription: An EventSubscription object 86 87 Returns: 88 A registration ID. 89 """ 90 with self._subscription_lock: 91 if subscription.event_type in self._subscriptions.keys(): 92 subscription_list = self._subscriptions[subscription.event_type] 93 subscription_list.append(subscription) 94 subscription_list.sort(key=lambda x: x.order) 95 else: 96 subscription_list = list() 97 bisect.insort(subscription_list, subscription) 98 self._subscriptions[subscription.event_type] = subscription_list 99 100 registration_id = id(subscription) 101 self._registration_id_map[registration_id] = subscription 102 103 return registration_id 104 105 def post(self, event, ignore_errors=False): 106 """Posts an event to its subscribers. 107 108 Args: 109 event: The event object to send to the subscribers. 110 ignore_errors: Deliver to all subscribers, ignoring any errors. 111 """ 112 listening_subscriptions = [] 113 for current_type in inspect.getmro(type(event)): 114 if current_type not in self._subscriptions.keys(): 115 continue 116 for subscription in self._subscriptions[current_type]: 117 listening_subscriptions.append(subscription) 118 119 # The subscriptions will be collected in sorted runs of sorted order. 120 # Running timsort here is the optimal way to sort this list. 121 listening_subscriptions.sort(key=lambda x: x.order) 122 for subscription in listening_subscriptions: 123 try: 124 subscription.deliver(event) 125 except Exception: 126 if ignore_errors: 127 logging.exception('An exception occurred while handling ' 128 'an event.') 129 continue 130 raise 131 132 def unregister(self, registration_id): 133 """Unregisters an EventSubscription. 134 135 Args: 136 registration_id: the Subscription or registration_id to unsubscribe. 137 """ 138 if type(registration_id) is SubscriptionHandle: 139 subscription = registration_id.subscription 140 registration_id = id(registration_id.subscription) 141 elif type(registration_id) is EventSubscription: 142 subscription = registration_id 143 registration_id = id(registration_id) 144 elif registration_id in self._registration_id_map.keys(): 145 subscription = self._registration_id_map[registration_id] 146 elif type(registration_id) is not int: 147 raise ValueError( 148 'Subscription ID "%s" is not a valid ID. This value' 149 'must be an integer ID returned from subscribe().' 150 % registration_id) 151 else: 152 # The value is a "valid" id, but is not subscribed. It's possible 153 # another thread has unsubscribed this value. 154 logging.warning('Attempted to unsubscribe %s, but the matching ' 155 'subscription cannot be found.' % registration_id) 156 return False 157 158 event_type = subscription.event_type 159 with self._subscription_lock: 160 self._registration_id_map.pop(registration_id, None) 161 if (event_type in self._subscriptions and 162 subscription in self._subscriptions[event_type]): 163 self._subscriptions[event_type].remove(subscription) 164 return True 165 166 def unregister_all(self, from_list=None, from_event=None): 167 """Removes all event subscriptions. 168 169 Args: 170 from_list: Unregisters all events from a given list. 171 from_event: Unregisters all events of a given event type. 172 """ 173 if from_list is None: 174 from_list = list(self._registration_id_map.values()) 175 176 for subscription in from_list: 177 if from_event is None or subscription.event_type == from_event: 178 self.unregister(subscription) 179 180 181_event_bus = _EventBus() 182 183 184def register(event_type, func, filter_fn=None, order=0): 185 """Subscribes the given function to the event type given. 186 187 Args: 188 event_type: The type of the event to subscribe to. 189 func: The function to call when the event is posted. 190 filter_fn: An option function to be called before calling the subscribed 191 func. If this function returns falsy, then the function will 192 not be invoked. 193 order: The order the the subscription should run in. Lower values run 194 first, with the default value set to 0. In the case of a tie 195 between two subscriptions of the same event type, the 196 subscriber added first executes first. In the case of a tie 197 between two subscribers of a different type, the type of the 198 subscription that is more specific goes first (i.e. BaseEventType 199 will execute after ChildEventType if they share the same order). 200 201 Returns: 202 A registration ID. 203 """ 204 return _event_bus.register(event_type, func, filter_fn=filter_fn, 205 order=order) 206 207 208def register_subscriptions(subscriptions): 209 """Registers all subscriptions to the event bus. 210 211 Args: 212 subscriptions: an iterable that returns EventSubscriptions 213 214 Returns: 215 The list of registration IDs. 216 """ 217 return _event_bus.register_subscriptions(subscriptions) 218 219 220def register_subscription(subscription): 221 """Registers the given subscription to the event bus. 222 223 Args: 224 subscription: An EventSubscription object 225 226 Returns: 227 A registration ID. 228 """ 229 return _event_bus.register_subscription(subscription) 230 231 232def post(event, ignore_errors=False): 233 """Posts an event to its subscribers. 234 235 Args: 236 event: The event object to send to the subscribers. 237 ignore_errors: Deliver to all subscribers, ignoring any errors. 238 """ 239 _event_bus.post(event, ignore_errors) 240 241 242def unregister(registration_id): 243 """Unregisters an EventSubscription. 244 245 Args: 246 registration_id: the Subscription or registration_id to unsubscribe. 247 """ 248 # null check for the corner case where the _event_bus is destroyed before 249 # the subscribers unregister. In such case there is nothing else to 250 # be done. 251 if _event_bus is None: 252 return True 253 return _event_bus.unregister(registration_id) 254 255 256def unregister_all(from_list=None, from_event=None): 257 """Removes all event subscriptions. 258 259 Args: 260 from_list: Unregisters all events from a given list. 261 from_event: Unregisters all events of a given event type. 262 """ 263 return _event_bus.unregister_all(from_list=from_list, from_event=from_event) 264 265 266class listen_for(object): 267 """A context-manager class (with statement) for listening to an event within 268 a given section of code. 269 270 Usage: 271 272 with listen_for(EventType, event_listener): 273 func_that_posts_event() # Will call event_listener 274 275 func_that_posts_event() # Will not call event_listener 276 277 """ 278 279 def __init__(self, event_type, func, filter_fn=None, order=0): 280 self.event_type = event_type 281 self.func = func 282 self.filter_fn = filter_fn 283 self.order = order 284 self.registration_id = None 285 286 def __enter__(self): 287 self.registration_id = _event_bus.register(self.event_type, self.func, 288 filter_fn=self.filter_fn, 289 order=self.order) 290 291 def __exit__(self, *unused): 292 _event_bus.unregister(self.registration_id) 293