1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16"""Pandora client interface for Avatar tests.""" 17 18import asyncio 19import avatar.aio 20import bumble 21import bumble.device 22import grpc 23import grpc.aio 24import logging 25 26from avatar.bumble_device import BumbleDevice 27from bumble.hci import Address as BumbleAddress 28from dataclasses import dataclass 29from pandora import host_grpc, host_grpc_aio, security_grpc, security_grpc_aio 30from typing import Any, Dict, MutableMapping, Optional, Tuple, Union 31 32 33class Address(bytes): 34 def __new__(cls, address: Union[bytes, str, BumbleAddress]) -> 'Address': 35 if type(address) is bytes: 36 address_bytes = address 37 elif type(address) is str: 38 address_bytes = bytes.fromhex(address.replace(':', '')) 39 elif isinstance(address, BumbleAddress): 40 address_bytes = bytes(reversed(bytes(address))) 41 else: 42 raise ValueError('Invalid address format') 43 44 if len(address_bytes) != 6: 45 raise ValueError('Invalid address length') 46 47 return bytes.__new__(cls, address_bytes) 48 49 def __str__(self) -> str: 50 return ':'.join([f'{x:02X}' for x in self]) 51 52 53class PandoraClient: 54 """Provides Pandora interface access to a device via gRPC.""" 55 56 # public fields 57 grpc_target: str # Server address for the gRPC channel. 58 log: 'PandoraClientLoggerAdapter' # Logger adapter. 59 60 # private fields 61 _channel: grpc.Channel # Synchronous gRPC channel. 62 _address: Address # Bluetooth device address 63 _aio: Optional['PandoraClient.Aio'] # Asynchronous gRPC channel. 64 65 def __init__(self, grpc_target: str, name: str = '..') -> None: 66 """Creates a PandoraClient. 67 68 Establishes a channel with the Pandora gRPC server. 69 70 Args: 71 grpc_target: Server address for the gRPC channel. 72 """ 73 self.grpc_target = grpc_target 74 self.log = PandoraClientLoggerAdapter(logging.getLogger(), {'client': self, 'client_name': name}) 75 self._channel = grpc.insecure_channel(grpc_target) # type: ignore 76 self._address = Address(b'\x00\x00\x00\x00\x00\x00') 77 self._aio = None 78 79 def close(self) -> None: 80 """Closes the gRPC channels.""" 81 self._channel.close() 82 if self._aio: 83 avatar.aio.run_until_complete(self._aio.channel.close()) 84 85 @property 86 def address(self) -> Address: 87 """Returns the BD address.""" 88 return self._address 89 90 @address.setter 91 def address(self, address: Union[bytes, str, BumbleAddress]) -> None: 92 """Sets the BD address.""" 93 self._address = Address(address) 94 95 async def reset(self) -> None: 96 """Factory reset the device & read it's BD address.""" 97 attempts, max_attempts = 1, 3 98 while True: 99 try: 100 await self.aio.host.FactoryReset(wait_for_ready=True, timeout=15.0) 101 102 # Factory reset stopped the server, close the client too. 103 assert self._aio 104 await self._aio.channel.close() 105 self._aio = None 106 107 # This call might fail if the server is unavailable. 108 self._address = Address( 109 (await self.aio.host.ReadLocalAddress(wait_for_ready=True, timeout=15.0)).address 110 ) 111 return 112 except grpc.aio.AioRpcError as e: 113 if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED): 114 if attempts <= max_attempts: 115 self.log.debug(f'Server unavailable, retry [{attempts}/{max_attempts}].') 116 attempts += 1 117 continue 118 self.log.exception(f'Server still unavailable after {attempts} attempts, abort.') 119 raise e 120 121 @property 122 def channel(self) -> grpc.Channel: 123 """Returns the synchronous gRPC channel.""" 124 try: 125 _ = asyncio.get_running_loop() 126 except: 127 return self._channel 128 raise RuntimeError('Trying to use the synchronous gRPC channel from asynchronous code.') 129 130 # Pandora interfaces 131 132 @property 133 def host(self) -> host_grpc.Host: 134 """Returns the Pandora Host gRPC interface.""" 135 return host_grpc.Host(self.channel) 136 137 @property 138 def security(self) -> security_grpc.Security: 139 """Returns the Pandora Security gRPC interface.""" 140 return security_grpc.Security(self.channel) 141 142 @property 143 def security_storage(self) -> security_grpc.SecurityStorage: 144 """Returns the Pandora SecurityStorage gRPC interface.""" 145 return security_grpc.SecurityStorage(self.channel) 146 147 @dataclass 148 class Aio: 149 channel: grpc.aio.Channel 150 151 @property 152 def host(self) -> host_grpc_aio.Host: 153 """Returns the Pandora Host gRPC interface.""" 154 return host_grpc_aio.Host(self.channel) 155 156 @property 157 def security(self) -> security_grpc_aio.Security: 158 """Returns the Pandora Security gRPC interface.""" 159 return security_grpc_aio.Security(self.channel) 160 161 @property 162 def security_storage(self) -> security_grpc_aio.SecurityStorage: 163 """Returns the Pandora SecurityStorage gRPC interface.""" 164 return security_grpc_aio.SecurityStorage(self.channel) 165 166 @property 167 def aio(self) -> 'PandoraClient.Aio': 168 if not self._aio: 169 self._aio = PandoraClient.Aio(grpc.aio.insecure_channel(self.grpc_target)) 170 return self._aio 171 172 173class PandoraClientLoggerAdapter(logging.LoggerAdapter): # type: ignore 174 """Formats logs from the PandoraClient.""" 175 176 def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: 177 assert self.extra 178 client = self.extra['client'] 179 assert isinstance(client, PandoraClient) 180 client_name = self.extra.get('client_name', client.__class__.__name__) 181 addr = ':'.join([f'{x:02X}' for x in client.address[4:]]) 182 return (f'[{client_name}:{addr}] {msg}', kwargs) 183 184 185class BumblePandoraClient(PandoraClient): 186 """Special Pandora client which also give access to a Bumble device instance.""" 187 188 _bumble: BumbleDevice # Bumble device wrapper. 189 190 def __init__(self, grpc_target: str, bumble: BumbleDevice) -> None: 191 super().__init__(grpc_target, 'bumble') 192 self._bumble = bumble 193 194 @property 195 def config(self) -> Dict[str, Any]: 196 return self._bumble.config 197 198 @property 199 def device(self) -> bumble.device.Device: 200 return self._bumble.device 201 202 @property 203 def random_address(self) -> Address: 204 return Address(self.device.random_address) 205