1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" 16Avatar is a scalable multi-platform Bluetooth testing tool capable of running 17any Bluetooth test cases virtually and physically. 18""" 19 20__version__ = "0.0.1" 21 22import functools 23import grpc 24import grpc.aio 25import importlib 26import logging 27 28from avatar import pandora_server 29from avatar.aio import asynchronous 30from avatar.pandora_client import BumblePandoraClient as BumblePandoraDevice, PandoraClient as PandoraDevice 31from avatar.pandora_server import PandoraServer 32from mobly import base_test 33from typing import Any, Callable, Dict, Iterable, Iterator, List, Sized, Tuple, Type, TypeVar 34 35# public symbols 36__all__ = [ 37 'asynchronous', 38 'parameterized', 39 'rpc_except', 40 'PandoraDevices', 41 'PandoraDevice', 42 'BumblePandoraDevice', 43] 44 45 46PANDORA_COMMON_SERVER_CLASSES: Dict[str, Type[pandora_server.PandoraServer[Any]]] = { 47 'PandoraDevice': pandora_server.PandoraServer, 48 'AndroidDevice': pandora_server.AndroidPandoraServer, 49 'BumbleDevice': pandora_server.BumblePandoraServer, 50} 51 52KEY_PANDORA_SERVER_CLASS = 'pandora_server_class' 53 54 55class PandoraDevices(Sized, Iterable[PandoraDevice]): 56 """Utility for abstracting controller registration and Pandora setup.""" 57 58 _test: base_test.BaseTestClass 59 _clients: List[PandoraDevice] 60 _servers: List[PandoraServer[Any]] 61 62 def __init__(self, test: base_test.BaseTestClass) -> None: 63 """Creates a PandoraDevices list. 64 65 It performs three steps: 66 - Register the underlying controllers to the test. 67 - Start the corresponding PandoraServer for each controller. 68 - Store a PandoraClient for each server. 69 70 The order in which the clients are returned can be determined by the 71 (optional) `order_<controller_class>` params in user_params. Controllers 72 without such a param will be set up last (order=100). 73 74 Args: 75 test: Instance of the Mobly test class. 76 """ 77 self._test = test 78 self._clients = [] 79 self._servers = [] 80 81 user_params: Dict[str, Any] = test.user_params # type: ignore 82 controller_configs: Dict[str, Any] = test.controller_configs.copy() # type: ignore 83 sorted_controllers = sorted( 84 controller_configs.keys(), key=lambda controller: user_params.get(f'order_{controller}', 100) 85 ) 86 for controller in sorted_controllers: 87 # Find the corresponding PandoraServer class for the controller. 88 if f'{KEY_PANDORA_SERVER_CLASS}_{controller}' in user_params: 89 # Try to load the server dynamically if module specified in user_params. 90 class_path = user_params[f'{KEY_PANDORA_SERVER_CLASS}_{controller}'] 91 logging.info('Loading Pandora server class %s from config for %s.', class_path, controller) 92 server_cls = _load_pandora_server_class(class_path) 93 else: 94 # Search in the list of commonly-used controllers. 95 try: 96 server_cls = PANDORA_COMMON_SERVER_CLASSES[controller] 97 except KeyError as e: 98 raise RuntimeError( 99 f'PandoraServer module for {controller} not found in either the ' 100 'config or PANDORA_COMMON_SERVER_CLASSES.' 101 ) from e 102 103 # Register the controller and load its Pandora servers. 104 logging.info('Starting %s(s) for %s', server_cls.__name__, controller) 105 devices: Optional[List[Any]] = test.register_controller(server_cls.MOBLY_CONTROLLER_MODULE) # type: ignore 106 assert devices 107 for device in devices: # type: ignore 108 self._servers.append(server_cls(device)) 109 110 self.start_all() 111 112 def __len__(self) -> int: 113 return len(self._clients) 114 115 def __iter__(self) -> Iterator[PandoraDevice]: 116 return iter(self._clients) 117 118 def start_all(self) -> None: 119 """Start all Pandora servers and returns their clients.""" 120 if len(self._clients): 121 return 122 for server in self._servers: 123 self._clients.append(server.start()) 124 125 def stop_all(self) -> None: 126 """Closes all opened Pandora clients and servers.""" 127 if not len(self._clients): 128 return 129 for client in self: 130 client.close() 131 for server in self._servers: 132 server.stop() 133 self._clients.clear() 134 135 136def _load_pandora_server_class(class_path: str) -> Type[pandora_server.PandoraServer[Any]]: 137 """Dynamically load a PandoraServer from a user-specified module+class. 138 139 Args: 140 class_path: String in format '<module>.<class>', where the module is fully 141 importable using importlib.import_module. e.g.: 142 my.pandora.server.module.MyPandoraServer 143 144 Returns: 145 The loaded PandoraServer instance. 146 """ 147 # Dynamically import the module, and get the class 148 module_name, class_name = class_path.rsplit('.', 1) 149 module = importlib.import_module(module_name) 150 server_class = getattr(module, class_name) 151 # Check that the class is a subclass of PandoraServer 152 if not issubclass(server_class, pandora_server.PandoraServer): 153 raise TypeError(f'The specified class {class_path} is not a subclass of PandoraServer.') 154 return server_class # type: ignore 155 156 157class Wrapper(object): 158 func: Callable[..., Any] 159 160 def __init__(self, func: Callable[..., Any]) -> None: 161 self.func = func 162 163 164# Multiply the same function from `inputs` parameters 165def parameterized(*inputs: Tuple[Any, ...]) -> Type[Wrapper]: 166 class wrapper(Wrapper): 167 def __set_name__(self, owner: str, name: str) -> None: 168 for input in inputs: 169 170 def decorate(input: Tuple[Any, ...]) -> Callable[..., Any]: 171 @functools.wraps(self.func) 172 def wrapper(*args: Any, **kwargs: Any) -> Any: 173 return self.func(*args, *input, **kwargs) 174 175 return wrapper 176 177 # we need to pass `input` here, otherwise it will be set to the value 178 # from the last iteration of `inputs` 179 setattr(owner, f"{name}{input}".replace(' ', ''), decorate(input)) 180 delattr(owner, name) 181 182 return wrapper 183 184 185_T = TypeVar('_T') 186 187 188# Decorate a test function with a wrapper that catch gRPC errors 189# and call a callback if the status `code` match. 190def rpc_except( 191 excepts: Dict[grpc.StatusCode, Callable[[grpc.aio.AioRpcError], Any]], 192) -> Callable[[Callable[..., _T]], Callable[..., _T]]: 193 def wrap(func: Callable[..., _T]) -> Callable[..., _T]: 194 @functools.wraps(func) 195 def wrapper(*args: Any, **kwargs: Any) -> _T: 196 try: 197 return func(*args, **kwargs) 198 except (grpc.RpcError, grpc.aio.AioRpcError) as e: 199 if f := excepts.get(e.code(), None): # type: ignore 200 return f(e) # type: ignore 201 raise e 202 203 return wrapper 204 205 return wrap 206