• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Pandora client interface for Avatar tests."""
17
18import asyncio
19import avatar.aio
20import bumble
21import bumble.device
22import grpc
23import grpc.aio
24import logging
25
26from avatar.bumble_device import BumbleDevice
27from bumble.hci import Address as BumbleAddress
28from dataclasses import dataclass
29from pandora import host_grpc, host_grpc_aio, security_grpc, security_grpc_aio
30from typing import Any, Dict, MutableMapping, Optional, Tuple, Union
31
32
33class Address(bytes):
34    def __new__(cls, address: Union[bytes, str, BumbleAddress]) -> 'Address':
35        if type(address) is bytes:
36            address_bytes = address
37        elif type(address) is str:
38            address_bytes = bytes.fromhex(address.replace(':', ''))
39        elif isinstance(address, BumbleAddress):
40            address_bytes = bytes(reversed(bytes(address)))
41        else:
42            raise ValueError('Invalid address format')
43
44        if len(address_bytes) != 6:
45            raise ValueError('Invalid address length')
46
47        return bytes.__new__(cls, address_bytes)
48
49    def __str__(self) -> str:
50        return ':'.join([f'{x:02X}' for x in self])
51
52
53class PandoraClient:
54    """Provides Pandora interface access to a device via gRPC."""
55
56    # public fields
57    grpc_target: str  # Server address for the gRPC channel.
58    log: 'PandoraClientLoggerAdapter'  # Logger adapter.
59
60    # private fields
61    _channel: grpc.Channel  # Synchronous gRPC channel.
62    _address: Address  # Bluetooth device address
63    _aio: Optional['PandoraClient.Aio']  # Asynchronous gRPC channel.
64
65    def __init__(self, grpc_target: str, name: str = '..') -> None:
66        """Creates a PandoraClient.
67
68        Establishes a channel with the Pandora gRPC server.
69
70        Args:
71          grpc_target: Server address for the gRPC channel.
72        """
73        self.grpc_target = grpc_target
74        self.log = PandoraClientLoggerAdapter(logging.getLogger(), {'client': self, 'client_name': name})
75        self._channel = grpc.insecure_channel(grpc_target)  # type: ignore
76        self._address = Address(b'\x00\x00\x00\x00\x00\x00')
77        self._aio = None
78
79    def close(self) -> None:
80        """Closes the gRPC channels."""
81        self._channel.close()
82        if self._aio:
83            avatar.aio.run_until_complete(self._aio.channel.close())
84
85    @property
86    def address(self) -> Address:
87        """Returns the BD address."""
88        return self._address
89
90    @address.setter
91    def address(self, address: Union[bytes, str, BumbleAddress]) -> None:
92        """Sets the BD address."""
93        self._address = Address(address)
94
95    async def reset(self) -> None:
96        """Factory reset the device & read it's BD address."""
97        attempts, max_attempts = 1, 3
98        while True:
99            try:
100                await self.aio.host.FactoryReset(wait_for_ready=True, timeout=15.0)
101
102                # Factory reset stopped the server, close the client too.
103                assert self._aio
104                await self._aio.channel.close()
105                self._aio = None
106
107                # This call might fail if the server is unavailable.
108                self._address = Address(
109                    (await self.aio.host.ReadLocalAddress(wait_for_ready=True, timeout=15.0)).address
110                )
111                return
112            except grpc.aio.AioRpcError as e:
113                if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):
114                    if attempts <= max_attempts:
115                        self.log.debug(f'Server unavailable, retry [{attempts}/{max_attempts}].')
116                        attempts += 1
117                        continue
118                    self.log.exception(f'Server still unavailable after {attempts} attempts, abort.')
119                raise e
120
121    @property
122    def channel(self) -> grpc.Channel:
123        """Returns the synchronous gRPC channel."""
124        try:
125            _ = asyncio.get_running_loop()
126        except:
127            return self._channel
128        raise RuntimeError('Trying to use the synchronous gRPC channel from asynchronous code.')
129
130    # Pandora interfaces
131
132    @property
133    def host(self) -> host_grpc.Host:
134        """Returns the Pandora Host gRPC interface."""
135        return host_grpc.Host(self.channel)
136
137    @property
138    def security(self) -> security_grpc.Security:
139        """Returns the Pandora Security gRPC interface."""
140        return security_grpc.Security(self.channel)
141
142    @property
143    def security_storage(self) -> security_grpc.SecurityStorage:
144        """Returns the Pandora SecurityStorage gRPC interface."""
145        return security_grpc.SecurityStorage(self.channel)
146
147    @dataclass
148    class Aio:
149        channel: grpc.aio.Channel
150
151        @property
152        def host(self) -> host_grpc_aio.Host:
153            """Returns the Pandora Host gRPC interface."""
154            return host_grpc_aio.Host(self.channel)
155
156        @property
157        def security(self) -> security_grpc_aio.Security:
158            """Returns the Pandora Security gRPC interface."""
159            return security_grpc_aio.Security(self.channel)
160
161        @property
162        def security_storage(self) -> security_grpc_aio.SecurityStorage:
163            """Returns the Pandora SecurityStorage gRPC interface."""
164            return security_grpc_aio.SecurityStorage(self.channel)
165
166    @property
167    def aio(self) -> 'PandoraClient.Aio':
168        if not self._aio:
169            self._aio = PandoraClient.Aio(grpc.aio.insecure_channel(self.grpc_target))
170        return self._aio
171
172
173class PandoraClientLoggerAdapter(logging.LoggerAdapter):  # type: ignore
174    """Formats logs from the PandoraClient."""
175
176    def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]:
177        assert self.extra
178        client = self.extra['client']
179        assert isinstance(client, PandoraClient)
180        client_name = self.extra.get('client_name', client.__class__.__name__)
181        addr = ':'.join([f'{x:02X}' for x in client.address[4:]])
182        return (f'[{client_name}:{addr}] {msg}', kwargs)
183
184
185class BumblePandoraClient(PandoraClient):
186    """Special Pandora client which also give access to a Bumble device instance."""
187
188    _bumble: BumbleDevice  # Bumble device wrapper.
189
190    def __init__(self, grpc_target: str, bumble: BumbleDevice) -> None:
191        super().__init__(grpc_target, 'bumble')
192        self._bumble = bumble
193
194    @property
195    def config(self) -> Dict[str, Any]:
196        return self._bumble.config
197
198    @property
199    def device(self) -> bumble.device.Device:
200        return self._bumble.device
201
202    @property
203    def random_address(self) -> Address:
204        return Address(self.device.random_address)
205