1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Avatar is a scalable multi-platform Bluetooth testing tool capable of running 17any Bluetooth test cases virtually and physically. 18""" 19 20__version__ = "0.0.10" 21 22import argparse 23import enum 24import functools 25import grpc 26import grpc.aio 27import importlib 28import logging 29import pathlib 30import re 31 32from avatar import pandora_server 33from avatar.aio import asynchronous 34from avatar.metrics import trace 35from avatar.pandora_client import BumblePandoraClient as BumblePandoraDevice 36from avatar.pandora_client import PandoraClient as PandoraDevice 37from avatar.pandora_server import PandoraServer 38from avatar.runner import SuiteRunner 39from mobly import base_test 40from mobly import signals 41from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Sized, Tuple, Type, TypeVar 42 43# public symbols 44__all__ = [ 45 'asynchronous', 46 'enableFlag', 47 'parameterized', 48 'rpc_except', 49 'PandoraDevices', 50 'PandoraDevice', 51 'BumblePandoraDevice', 52] 53 54 55PANDORA_COMMON_SERVER_CLASSES: Dict[str, Type[pandora_server.PandoraServer[Any]]] = { 56 'PandoraDevice': pandora_server.PandoraServer, 57 'AndroidDevice': pandora_server.AndroidPandoraServer, 58 'BumbleDevice': pandora_server.BumblePandoraServer, 59 'UsbDevice': pandora_server.UsbBumblePandoraServer, 60} 61 62KEY_PANDORA_SERVER_CLASS = 'pandora_server_class' 63 64 65class PandoraDevices(Sized, Iterable[PandoraDevice]): 66 """Utility for abstracting controller registration and Pandora setup.""" 67 68 _test: base_test.BaseTestClass 69 _clients: List[PandoraDevice] 70 _servers: List[PandoraServer[Any]] 71 72 def __init__(self, test: base_test.BaseTestClass) -> None: 73 """Creates a PandoraDevices list. 74 75 It performs three steps: 76 - Register the underlying controllers to the test. 77 - Start the corresponding PandoraServer for each controller. 78 - Store a PandoraClient for each server. 79 80 The order in which the clients are returned can be determined by the 81 (optional) `order_<controller_class>` params in user_params. Controllers 82 without such a param will be set up last (order=100). 83 84 Args: 85 test: Instance of the Mobly test class. 86 """ 87 self._test = test 88 self._clients = [] 89 self._servers = [] 90 91 trace.hook_test(test, self) 92 user_params: Dict[str, Any] = test.user_params # type: ignore 93 controller_configs: Dict[str, Any] = test.controller_configs.copy() # type: ignore 94 sorted_controllers = sorted( 95 controller_configs.keys(), key=lambda controller: user_params.get(f'order_{controller}', 100) 96 ) 97 for controller in sorted_controllers: 98 # Find the corresponding PandoraServer class for the controller. 99 if f'{KEY_PANDORA_SERVER_CLASS}_{controller}' in user_params: 100 # Try to load the server dynamically if module specified in user_params. 101 class_path = user_params[f'{KEY_PANDORA_SERVER_CLASS}_{controller}'] 102 logging.info('Loading Pandora server class %s from config for %s.', class_path, controller) 103 server_cls = _load_pandora_server_class(class_path) 104 else: 105 # Search in the list of commonly-used controllers. 106 try: 107 server_cls = PANDORA_COMMON_SERVER_CLASSES[controller] 108 except KeyError as e: 109 raise RuntimeError( 110 f'PandoraServer module for {controller} not found in either the ' 111 'config or PANDORA_COMMON_SERVER_CLASSES.' 112 ) from e 113 114 # Register the controller and load its Pandora servers. 115 logging.info('Starting %s(s) for %s', server_cls.__name__, controller) 116 try: 117 devices: Optional[List[Any]] = test.register_controller( # type: ignore 118 server_cls.MOBLY_CONTROLLER_MODULE 119 ) 120 except Exception: 121 logging.exception('abort: failed to register controller') 122 raise signals.TestAbortAll("") 123 assert devices 124 for device in devices: # type: ignore 125 self._servers.append(server_cls(device)) 126 127 self.start_all() 128 129 def __len__(self) -> int: 130 return len(self._clients) 131 132 def __iter__(self) -> Iterator[PandoraDevice]: 133 return iter(self._clients) 134 135 def start_all(self) -> None: 136 """Start all Pandora servers and returns their clients.""" 137 if len(self._clients): 138 return 139 for server in self._servers: 140 self._clients.append(server.start()) 141 142 def stop_all(self) -> None: 143 """Closes all opened Pandora clients and servers.""" 144 if not len(self._clients): 145 return 146 for client in self: 147 client.close() 148 for server in self._servers: 149 server.stop() 150 self._clients.clear() 151 152 153def _load_pandora_server_class(class_path: str) -> Type[pandora_server.PandoraServer[Any]]: 154 """Dynamically load a PandoraServer from a user-specified module+class. 155 156 Args: 157 class_path: String in format '<module>.<class>', where the module is fully 158 importable using importlib.import_module. e.g.: 159 my.pandora.server.module.MyPandoraServer 160 161 Returns: 162 The loaded PandoraServer instance. 163 """ 164 # Dynamically import the module, and get the class 165 module_name, class_name = class_path.rsplit('.', 1) 166 module = importlib.import_module(module_name) 167 server_class = getattr(module, class_name) 168 # Check that the class is a subclass of PandoraServer 169 if not issubclass(server_class, pandora_server.PandoraServer): 170 raise TypeError(f'The specified class {class_path} is not a subclass of PandoraServer.') 171 return server_class # type: ignore 172 173 174class Wrapper(object): 175 func: Callable[..., Any] 176 177 def __init__(self, func: Callable[..., Any]) -> None: 178 self.func = func 179 180 181# Multiply the same function from `inputs` parameters 182def parameterized(*inputs: Tuple[Any, ...]) -> Type[Wrapper]: 183 class wrapper(Wrapper): 184 def __set_name__(self, owner: str, name: str) -> None: 185 for input in inputs: 186 187 def decorate(input: Tuple[Any, ...]) -> Callable[..., Any]: 188 @functools.wraps(self.func) 189 def wrapper(*args: Any, **kwargs: Any) -> Any: 190 return self.func(*args, *input, **kwargs) 191 192 return wrapper 193 194 def normalize(a: Any) -> Any: 195 if isinstance(a, enum.Enum): 196 return a.value 197 return a 198 199 # we need to pass `input` here, otherwise it will be set to the value 200 # from the last iteration of `inputs` 201 setattr( 202 owner, 203 f"{name}{tuple([normalize(a) for a in input])}".replace(" ", ""), 204 decorate(input), 205 ) 206 delattr(owner, name) 207 208 return wrapper 209 210 211def enableFlag(flag: str) -> Callable[..., Any]: 212 """Enable aconfig flag. 213 214 Requires that the test class declares a devices: Optional[PandoraDevices] attribute. 215 216 Args: 217 flag: aconfig flag name including package, e.g.: 'com.android.bluetooth.flags.<flag_name>' 218 219 Raises: 220 AttributeError: when the 'devices' attribute is not found or not set 221 TypeError: when the provided flag argument is not a string 222 """ 223 224 def getFlagValue(server: PandoraServer[Any], flag: str) -> str: 225 cmd_output = server.device.adb.shell(f'aflags list -c com.android.bt | grep {flag}').decode().split('\n') 226 cmd_output = [x for x in cmd_output if x] # Filter out empty lines from shell result 227 if len(cmd_output) == 0: 228 raise signals.TestError(f'Flag [{flag}] is not present in the aflags list of the device') 229 if len(cmd_output) != 1: 230 raise signals.TestError(f'Flag [{flag}] has multiple entries in the aflags list of the device. Output was {cmd_output}') 231 return cmd_output[0] 232 233 def isFlagEnabled(server: PandoraServer[Any], flag: str) -> bool: 234 return bool(re.search(flag + '.* enabled', getFlagValue(server, flag))) 235 236 # A "valid" flag is either already enabled or writable 237 def isFlagValidForTest(server: PandoraServer[Any], flag: str) -> bool: 238 return bool(re.search(flag + '.* (enabled|read-write)', getFlagValue(server, flag))) 239 240 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 241 @functools.wraps(func) 242 def wrapper(self: base_test.BaseTestClass, *args: Any, **kwargs: Any) -> Any: 243 devices = getattr(self, 'devices', None) 244 245 if not devices: 246 raise AttributeError("Attribute 'devices' not found in test class or is None") 247 248 if not isinstance(devices, PandoraDevices): 249 raise TypeError("devices attribute must be of a PandoraDevices type") 250 251 listOfServerToRestoreFlag: List[PandoraServer[Any]] = [] 252 253 for server in devices._servers: 254 if isinstance(server, pandora_server.AndroidPandoraServer): 255 if not isFlagValidForTest(server, flag): 256 raise signals.TestSkip('Flag cannot be enabled on this device') 257 if isFlagEnabled(server, flag): 258 continue # Nothing to do flag is already active 259 server.device.adb.shell(f'aflags enable --immediate {flag}') # type: ignore 260 if not isFlagEnabled(server, flag): 261 raise signals.TestError('Despite writable flag, runner couldn\'t enable it') 262 listOfServerToRestoreFlag.append(server) 263 264 result = func(self, *args, **kwargs) 265 266 for server in listOfServerToRestoreFlag: 267 server.device.adb.shell(f'aflags unset --immediate {flag}') # type: ignore 268 if isFlagEnabled(server, flag): 269 raise signals.TestError('Despite writable flag, runner couldn\'t reset its initial value') 270 271 return result 272 273 return wrapper 274 275 return decorator 276 277 278_T = TypeVar('_T') 279 280 281# Decorate a test function with a wrapper that catch gRPC errors 282# and call a callback if the status `code` match. 283def rpc_except( 284 excepts: Dict[grpc.StatusCode, Callable[[grpc.aio.AioRpcError], Any]], 285) -> Callable[[Callable[..., _T]], Callable[..., _T]]: 286 def wrap(func: Callable[..., _T]) -> Callable[..., _T]: 287 @functools.wraps(func) 288 def wrapper(*args: Any, **kwargs: Any) -> _T: 289 try: 290 return func(*args, **kwargs) 291 except (grpc.RpcError, grpc.aio.AioRpcError) as e: 292 if f := excepts.get(e.code(), None): # type: ignore 293 return f(e) # type: ignore 294 raise e 295 296 return wrapper 297 298 return wrap 299 300 301def args_parser() -> argparse.ArgumentParser: 302 parser = argparse.ArgumentParser(description='Avatar test runner.') 303 parser.add_argument( 304 'input', 305 type=str, 306 nargs='*', 307 metavar='<PATH>', 308 help='Lits of folder or test file to run', 309 default=[], 310 ) 311 parser.add_argument('-c', '--config', type=str, metavar='<PATH>', help='Path to the test configuration file.') 312 parser.add_argument( 313 '-l', 314 '--list', 315 '--list_tests', # For backward compatibility with tradefed `MoblyBinaryHostTest` 316 action='store_true', 317 help='Print the names of the tests defined in a script without ' 'executing them.', 318 ) 319 parser.add_argument( 320 '-o', 321 '--log-path', 322 '--log_path', # For backward compatibility with tradefed `MoblyBinaryHostTest` 323 type=str, 324 metavar='<PATH>', 325 help='Path to the test configuration file.', 326 ) 327 parser.add_argument( 328 '-t', 329 '--tests', 330 nargs='+', 331 type=str, 332 metavar='[ClassA[.test_a] ClassB[.test_b] ...]', 333 help='A list of test classes and optional tests to execute.', 334 ) 335 parser.add_argument( 336 '-b', 337 '--test-beds', 338 '--test_bed', # For backward compatibility with tradefed `MoblyBinaryHostTest` 339 nargs='+', 340 type=str, 341 metavar='[<TEST BED NAME1> <TEST BED NAME2> ...]', 342 help='Specify which test beds to run tests on.', 343 ) 344 parser.add_argument('-v', '--verbose', action='store_true', help='Set console logger level to DEBUG') 345 parser.add_argument('-x', '--no-default-cases', action='store_true', help='Dot no include default test cases') 346 return parser 347 348 349# Avatar default entry point 350def main(args: Optional[argparse.Namespace] = None) -> None: 351 import sys 352 353 # Create an Avatar suite runner. 354 runner = SuiteRunner() 355 356 # Parse arguments. 357 argv = args or args_parser().parse_args() 358 if argv.input: 359 for path in argv.input: 360 runner.add_path(pathlib.Path(path)) 361 if argv.config: 362 runner.add_config_file(pathlib.Path(argv.config)) 363 if argv.log_path: 364 runner.set_logs_dir(pathlib.Path(argv.log_path)) 365 if argv.tests: 366 runner.add_test_filters(argv.tests) 367 if argv.test_beds: 368 runner.add_test_beds(argv.test_beds) 369 if argv.verbose: 370 runner.set_logs_verbose() 371 if not argv.no_default_cases: 372 runner.add_path(pathlib.Path(__file__).resolve().parent / 'cases') 373 374 # List tests to standard output. 375 if argv.list: 376 for _, (tag, test_names) in runner.included_tests.items(): 377 for name in test_names: 378 print(f"{tag}.{name}") 379 sys.exit(0) 380 381 # Run the test suite. 382 logging.basicConfig(level=logging.INFO) 383 if not runner.run(): 384 sys.exit(1) 385