• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Avatar is a scalable multi-platform Bluetooth testing tool capable of running
17any Bluetooth test cases virtually and physically.
18"""
19
20__version__ = "0.0.10"
21
22import argparse
23import enum
24import functools
25import grpc
26import grpc.aio
27import importlib
28import logging
29import pathlib
30import re
31
32from avatar import pandora_server
33from avatar.aio import asynchronous
34from avatar.metrics import trace
35from avatar.pandora_client import BumblePandoraClient as BumblePandoraDevice
36from avatar.pandora_client import PandoraClient as PandoraDevice
37from avatar.pandora_server import PandoraServer
38from avatar.runner import SuiteRunner
39from mobly import base_test
40from mobly import signals
41from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Sized, Tuple, Type, TypeVar
42
43# public symbols
44__all__ = [
45    'asynchronous',
46    'enableFlag',
47    'parameterized',
48    'rpc_except',
49    'PandoraDevices',
50    'PandoraDevice',
51    'BumblePandoraDevice',
52]
53
54
55PANDORA_COMMON_SERVER_CLASSES: Dict[str, Type[pandora_server.PandoraServer[Any]]] = {
56    'PandoraDevice': pandora_server.PandoraServer,
57    'AndroidDevice': pandora_server.AndroidPandoraServer,
58    'BumbleDevice': pandora_server.BumblePandoraServer,
59    'UsbDevice': pandora_server.UsbBumblePandoraServer,
60}
61
62KEY_PANDORA_SERVER_CLASS = 'pandora_server_class'
63
64
65class PandoraDevices(Sized, Iterable[PandoraDevice]):
66    """Utility for abstracting controller registration and Pandora setup."""
67
68    _test: base_test.BaseTestClass
69    _clients: List[PandoraDevice]
70    _servers: List[PandoraServer[Any]]
71
72    def __init__(self, test: base_test.BaseTestClass) -> None:
73        """Creates a PandoraDevices list.
74
75        It performs three steps:
76          - Register the underlying controllers to the test.
77          - Start the corresponding PandoraServer for each controller.
78          - Store a PandoraClient for each server.
79
80        The order in which the clients are returned can be determined by the
81        (optional) `order_<controller_class>` params in user_params. Controllers
82        without such a param will be set up last (order=100).
83
84        Args:
85            test: Instance of the Mobly test class.
86        """
87        self._test = test
88        self._clients = []
89        self._servers = []
90
91        trace.hook_test(test, self)
92        user_params: Dict[str, Any] = test.user_params  # type: ignore
93        controller_configs: Dict[str, Any] = test.controller_configs.copy()  # type: ignore
94        sorted_controllers = sorted(
95            controller_configs.keys(), key=lambda controller: user_params.get(f'order_{controller}', 100)
96        )
97        for controller in sorted_controllers:
98            # Find the corresponding PandoraServer class for the controller.
99            if f'{KEY_PANDORA_SERVER_CLASS}_{controller}' in user_params:
100                # Try to load the server dynamically if module specified in user_params.
101                class_path = user_params[f'{KEY_PANDORA_SERVER_CLASS}_{controller}']
102                logging.info('Loading Pandora server class %s from config for %s.', class_path, controller)
103                server_cls = _load_pandora_server_class(class_path)
104            else:
105                # Search in the list of commonly-used controllers.
106                try:
107                    server_cls = PANDORA_COMMON_SERVER_CLASSES[controller]
108                except KeyError as e:
109                    raise RuntimeError(
110                        f'PandoraServer module for {controller} not found in either the '
111                        'config or PANDORA_COMMON_SERVER_CLASSES.'
112                    ) from e
113
114            # Register the controller and load its Pandora servers.
115            logging.info('Starting %s(s) for %s', server_cls.__name__, controller)
116            try:
117                devices: Optional[List[Any]] = test.register_controller(  # type: ignore
118                    server_cls.MOBLY_CONTROLLER_MODULE
119                )
120            except Exception:
121                logging.exception('abort: failed to register controller')
122                raise signals.TestAbortAll("")
123            assert devices
124            for device in devices:  # type: ignore
125                self._servers.append(server_cls(device))
126
127        self.start_all()
128
129    def __len__(self) -> int:
130        return len(self._clients)
131
132    def __iter__(self) -> Iterator[PandoraDevice]:
133        return iter(self._clients)
134
135    def start_all(self) -> None:
136        """Start all Pandora servers and returns their clients."""
137        if len(self._clients):
138            return
139        for server in self._servers:
140            self._clients.append(server.start())
141
142    def stop_all(self) -> None:
143        """Closes all opened Pandora clients and servers."""
144        if not len(self._clients):
145            return
146        for client in self:
147            client.close()
148        for server in self._servers:
149            server.stop()
150        self._clients.clear()
151
152
153def _load_pandora_server_class(class_path: str) -> Type[pandora_server.PandoraServer[Any]]:
154    """Dynamically load a PandoraServer from a user-specified module+class.
155
156    Args:
157      class_path: String in format '<module>.<class>', where the module is fully
158        importable using importlib.import_module. e.g.:
159        my.pandora.server.module.MyPandoraServer
160
161    Returns:
162      The loaded PandoraServer instance.
163    """
164    # Dynamically import the module, and get the class
165    module_name, class_name = class_path.rsplit('.', 1)
166    module = importlib.import_module(module_name)
167    server_class = getattr(module, class_name)
168    # Check that the class is a subclass of PandoraServer
169    if not issubclass(server_class, pandora_server.PandoraServer):
170        raise TypeError(f'The specified class {class_path} is not a subclass of PandoraServer.')
171    return server_class  # type: ignore
172
173
174class Wrapper(object):
175    func: Callable[..., Any]
176
177    def __init__(self, func: Callable[..., Any]) -> None:
178        self.func = func
179
180
181# Multiply the same function from `inputs` parameters
182def parameterized(*inputs: Tuple[Any, ...]) -> Type[Wrapper]:
183    class wrapper(Wrapper):
184        def __set_name__(self, owner: str, name: str) -> None:
185            for input in inputs:
186
187                def decorate(input: Tuple[Any, ...]) -> Callable[..., Any]:
188                    @functools.wraps(self.func)
189                    def wrapper(*args: Any, **kwargs: Any) -> Any:
190                        return self.func(*args, *input, **kwargs)
191
192                    return wrapper
193
194                def normalize(a: Any) -> Any:
195                    if isinstance(a, enum.Enum):
196                        return a.value
197                    return a
198
199                # we need to pass `input` here, otherwise it will be set to the value
200                # from the last iteration of `inputs`
201                setattr(
202                    owner,
203                    f"{name}{tuple([normalize(a) for a in input])}".replace(" ", ""),
204                    decorate(input),
205                )
206            delattr(owner, name)
207
208    return wrapper
209
210
211def enableFlag(flag: str) -> Callable[..., Any]:
212    """Enable aconfig flag.
213
214    Requires that the test class declares a devices: Optional[PandoraDevices] attribute.
215
216    Args:
217        flag: aconfig flag name including package, e.g.: 'com.android.bluetooth.flags.<flag_name>'
218
219    Raises:
220        AttributeError: when the 'devices' attribute is not found or not set
221        TypeError: when the provided flag argument is not a string
222    """
223
224    def getFlagValue(server: PandoraServer[Any], flag: str) -> str:
225        cmd_output = server.device.adb.shell(f'aflags list -c com.android.bt | grep {flag}').decode().split('\n')
226        cmd_output = [x for x in cmd_output if x] # Filter out empty lines from shell result
227        if len(cmd_output) == 0:
228            raise signals.TestError(f'Flag [{flag}] is not present in the aflags list of the device')
229        if len(cmd_output) != 1:
230            raise signals.TestError(f'Flag [{flag}] has multiple entries in the aflags list of the device. Output was {cmd_output}')
231        return cmd_output[0]
232
233    def isFlagEnabled(server: PandoraServer[Any], flag: str) -> bool:
234        return bool(re.search(flag + '.* enabled', getFlagValue(server, flag)))
235
236    # A "valid" flag is either already enabled or writable
237    def isFlagValidForTest(server: PandoraServer[Any], flag: str) -> bool:
238        return bool(re.search(flag + '.* (enabled|read-write)', getFlagValue(server, flag)))
239
240    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
241        @functools.wraps(func)
242        def wrapper(self: base_test.BaseTestClass, *args: Any, **kwargs: Any) -> Any:
243            devices = getattr(self, 'devices', None)
244
245            if not devices:
246                raise AttributeError("Attribute 'devices' not found in test class or is None")
247
248            if not isinstance(devices, PandoraDevices):
249                raise TypeError("devices attribute must be of a PandoraDevices type")
250
251            listOfServerToRestoreFlag: List[PandoraServer[Any]] = []
252
253            for server in devices._servers:
254                if isinstance(server, pandora_server.AndroidPandoraServer):
255                    if not isFlagValidForTest(server, flag):
256                        raise signals.TestSkip('Flag cannot be enabled on this device')
257                    if isFlagEnabled(server, flag):
258                        continue # Nothing to do flag is already active
259                    server.device.adb.shell(f'aflags enable --immediate {flag}')  # type: ignore
260                    if not isFlagEnabled(server, flag):
261                        raise signals.TestError('Despite writable flag, runner couldn\'t enable it')
262                    listOfServerToRestoreFlag.append(server)
263
264            result = func(self, *args, **kwargs)
265
266            for server in listOfServerToRestoreFlag:
267                server.device.adb.shell(f'aflags unset --immediate {flag}')  # type: ignore
268                if isFlagEnabled(server, flag):
269                    raise signals.TestError('Despite writable flag, runner couldn\'t reset its initial value')
270
271            return result
272
273        return wrapper
274
275    return decorator
276
277
278_T = TypeVar('_T')
279
280
281# Decorate a test function with a wrapper that catch gRPC errors
282# and call a callback if the status `code` match.
283def rpc_except(
284    excepts: Dict[grpc.StatusCode, Callable[[grpc.aio.AioRpcError], Any]],
285) -> Callable[[Callable[..., _T]], Callable[..., _T]]:
286    def wrap(func: Callable[..., _T]) -> Callable[..., _T]:
287        @functools.wraps(func)
288        def wrapper(*args: Any, **kwargs: Any) -> _T:
289            try:
290                return func(*args, **kwargs)
291            except (grpc.RpcError, grpc.aio.AioRpcError) as e:
292                if f := excepts.get(e.code(), None):  # type: ignore
293                    return f(e)  # type: ignore
294                raise e
295
296        return wrapper
297
298    return wrap
299
300
301def args_parser() -> argparse.ArgumentParser:
302    parser = argparse.ArgumentParser(description='Avatar test runner.')
303    parser.add_argument(
304        'input',
305        type=str,
306        nargs='*',
307        metavar='<PATH>',
308        help='Lits of folder or test file to run',
309        default=[],
310    )
311    parser.add_argument('-c', '--config', type=str, metavar='<PATH>', help='Path to the test configuration file.')
312    parser.add_argument(
313        '-l',
314        '--list',
315        '--list_tests',  # For backward compatibility with tradefed `MoblyBinaryHostTest`
316        action='store_true',
317        help='Print the names of the tests defined in a script without ' 'executing them.',
318    )
319    parser.add_argument(
320        '-o',
321        '--log-path',
322        '--log_path',  # For backward compatibility with tradefed `MoblyBinaryHostTest`
323        type=str,
324        metavar='<PATH>',
325        help='Path to the test configuration file.',
326    )
327    parser.add_argument(
328        '-t',
329        '--tests',
330        nargs='+',
331        type=str,
332        metavar='[ClassA[.test_a] ClassB[.test_b] ...]',
333        help='A list of test classes and optional tests to execute.',
334    )
335    parser.add_argument(
336        '-b',
337        '--test-beds',
338        '--test_bed',  # For backward compatibility with tradefed `MoblyBinaryHostTest`
339        nargs='+',
340        type=str,
341        metavar='[<TEST BED NAME1> <TEST BED NAME2> ...]',
342        help='Specify which test beds to run tests on.',
343    )
344    parser.add_argument('-v', '--verbose', action='store_true', help='Set console logger level to DEBUG')
345    parser.add_argument('-x', '--no-default-cases', action='store_true', help='Dot no include default test cases')
346    return parser
347
348
349# Avatar default entry point
350def main(args: Optional[argparse.Namespace] = None) -> None:
351    import sys
352
353    # Create an Avatar suite runner.
354    runner = SuiteRunner()
355
356    # Parse arguments.
357    argv = args or args_parser().parse_args()
358    if argv.input:
359        for path in argv.input:
360            runner.add_path(pathlib.Path(path))
361    if argv.config:
362        runner.add_config_file(pathlib.Path(argv.config))
363    if argv.log_path:
364        runner.set_logs_dir(pathlib.Path(argv.log_path))
365    if argv.tests:
366        runner.add_test_filters(argv.tests)
367    if argv.test_beds:
368        runner.add_test_beds(argv.test_beds)
369    if argv.verbose:
370        runner.set_logs_verbose()
371    if not argv.no_default_cases:
372        runner.add_path(pathlib.Path(__file__).resolve().parent / 'cases')
373
374    # List tests to standard output.
375    if argv.list:
376        for _, (tag, test_names) in runner.included_tests.items():
377            for name in test_names:
378                print(f"{tag}.{name}")
379        sys.exit(0)
380
381    # Run the test suite.
382    logging.basicConfig(level=logging.INFO)
383    if not runner.run():
384        sys.exit(1)
385