1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import grpc 17import logging 18 19from avatar.bumble_server import utils 20from bumble.core import ProtocolError 21from bumble.device import Connection as BumbleConnection, Device, Peer 22from bumble.gatt_client import CharacteristicProxy, ServiceProxy 23from pandora_experimental.gatt_grpc_aio import GATTServicer 24from pandora_experimental.gatt_pb2 import ( 25 SUCCESS, 26 AttValue, 27 ClearCacheRequest, 28 ClearCacheResponse, 29 DiscoverServiceByUuidRequest, 30 DiscoverServicesRequest, 31 DiscoverServicesResponse, 32 ExchangeMTURequest, 33 ExchangeMTUResponse, 34 GattCharacteristic, 35 GattCharacteristicDescriptor, 36 GattService, 37 ReadCharacteristicDescriptorRequest, 38 ReadCharacteristicDescriptorResponse, 39 ReadCharacteristicRequest, 40 ReadCharacteristicResponse, 41 ReadCharacteristicsFromUuidRequest, 42 ReadCharacteristicsFromUuidResponse, 43 WriteRequest, 44 WriteResponse, 45) 46from typing import Dict, List 47 48 49class GATTService(GATTServicer): 50 device: Device 51 peers: Dict[int, Peer] 52 53 def __init__(self, device: Device) -> None: 54 super().__init__() 55 self.device = device 56 self.peers: Dict[int, Peer] = {} 57 self.device.on('connection', self.on_connection) # type: ignore 58 self.device.on('disconnection', self.on_disconnection) # type: ignore 59 60 def __del__(self) -> None: 61 self.device.remove_listener('connection', self.on_connection) # type: ignore 62 self.device.remove_listener('disconnection', self.on_disconnection) # type: ignore 63 64 def on_connection(self, connection: BumbleConnection) -> None: 65 self.peers[connection.handle] = Peer(connection) # type: ignore[no-untyped-call] 66 67 def on_disconnection(self, connection: BumbleConnection) -> None: 68 del self.peers[connection.handle] 69 70 @utils.rpc 71 async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse: 72 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 73 logging.info(f"ExchangeMTU: {connection_handle}") 74 75 connection = self.device.lookup_connection(connection_handle) 76 assert connection 77 peer = self.peers[connection.handle] 78 79 mtu = await peer.request_mtu(request.mtu) # type: ignore 80 assert mtu == request.mtu 81 82 return ExchangeMTUResponse() 83 84 @utils.rpc 85 async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse: 86 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 87 logging.info(f"WriteAttFromHandle: {connection_handle}") 88 89 connection = self.device.lookup_connection(connection_handle) 90 assert connection 91 peer = self.peers[connection.handle] 92 93 try: 94 await peer.write_value(request.handle, request.value, with_response=True) # type: ignore 95 status = SUCCESS 96 except ProtocolError as e: 97 status = e.error_code 98 99 return WriteResponse(handle=request.handle, status=status) 100 101 @utils.rpc 102 async def DiscoverServiceByUuid( 103 self, request: DiscoverServiceByUuidRequest, context: grpc.ServicerContext 104 ) -> DiscoverServicesResponse: 105 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 106 logging.info(f"DiscoverServiceByUuid: {connection_handle}") 107 108 connection = self.device.lookup_connection(connection_handle) 109 assert connection 110 peer = self.peers[connection.handle] 111 112 services: List[ServiceProxy] = await peer.discover_service(request.uuid) # type: ignore 113 114 async def feed_service(service: ServiceProxy) -> None: 115 characteristic: CharacteristicProxy 116 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 117 await characteristic.discover_descriptors() # type: ignore[no-untyped-call] 118 119 await asyncio.gather(*(feed_service(service) for service in services)) 120 121 return DiscoverServicesResponse( 122 services=[ 123 GattService( 124 handle=service.handle, 125 type=int.from_bytes(bytes(service.type), 'little'), 126 uuid=service.uuid.to_hex_str('-'), # type: ignore 127 characteristics=[ 128 GattCharacteristic( 129 properties=characteristic.properties, # type: ignore 130 permissions=0, # TODO 131 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 132 handle=characteristic.handle, # type: ignore 133 descriptors=[ 134 GattCharacteristicDescriptor( 135 handle=descriptor.handle, # type: ignore 136 permissions=0, # TODO 137 uuid=str(descriptor.type), # type: ignore 138 ) 139 for descriptor in characteristic.descriptors # type: ignore 140 ], 141 ) 142 for characteristic in service.characteristics # type: ignore 143 ], 144 ) 145 for service in services 146 ] 147 ) 148 149 @utils.rpc 150 async def DiscoverServices( 151 self, request: DiscoverServicesRequest, context: grpc.ServicerContext 152 ) -> DiscoverServicesResponse: 153 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 154 logging.info(f"DiscoverServices: {connection_handle}") 155 156 connection = self.device.lookup_connection(connection_handle) 157 assert connection 158 peer = self.peers[connection.handle] 159 160 services: List[ServiceProxy] = await peer.discover_services() # type: ignore 161 162 async def feed_service(service: ServiceProxy) -> None: 163 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 164 await characteristic.discover_descriptors() # type: ignore 165 166 await asyncio.gather(*(feed_service(service) for service in services)) 167 168 return DiscoverServicesResponse( 169 services=[ 170 GattService( 171 handle=service.handle, 172 type=int.from_bytes(bytes(service.type), 'little'), 173 uuid=service.uuid.to_hex_str('-'), # type: ignore 174 characteristics=[ 175 GattCharacteristic( 176 properties=characteristic.properties, # type: ignore 177 permissions=0, # TODO 178 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 179 handle=characteristic.handle, # type: ignore 180 descriptors=[ 181 GattCharacteristicDescriptor( 182 handle=descriptor.handle, # type: ignore 183 permissions=0, # TODO 184 uuid=str(descriptor.type), # type: ignore 185 ) 186 for descriptor in characteristic.descriptors # type: ignore 187 ], 188 ) 189 for characteristic in service.characteristics # type: ignore 190 ], 191 ) 192 for service in services 193 ] 194 ) 195 196 # TODO: implement `DiscoverServicesSdp` 197 198 @utils.rpc 199 async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse: 200 logging.info("ClearCache") 201 return ClearCacheResponse() 202 203 @utils.rpc 204 async def ReadCharacteristicFromHandle( 205 self, request: ReadCharacteristicRequest, context: grpc.ServicerContext 206 ) -> ReadCharacteristicResponse: 207 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 208 logging.info(f"ReadCharacteristicFromHandle: {connection_handle}") 209 210 connection = self.device.lookup_connection(connection_handle) 211 assert connection 212 peer = self.peers[connection.handle] 213 214 try: 215 value = await peer.read_value(request.handle) # type: ignore 216 status = SUCCESS 217 except ProtocolError as e: 218 value = bytes() 219 status = e.error_code 220 221 return ReadCharacteristicResponse(value=AttValue(value=value), status=status) 222 223 @utils.rpc 224 async def ReadCharacteristicsFromUuid( 225 self, request: ReadCharacteristicsFromUuidRequest, context: grpc.ServicerContext 226 ) -> ReadCharacteristicsFromUuidResponse: 227 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 228 logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}") 229 230 connection = self.device.lookup_connection(connection_handle) 231 assert connection 232 peer = self.peers[connection.handle] 233 234 service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})() 235 236 try: 237 characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock) # type: ignore 238 239 return ReadCharacteristicsFromUuidResponse( 240 characteristics_read=[ 241 ReadCharacteristicResponse( 242 value=AttValue(value=value, handle=handle), # type: ignore 243 status=SUCCESS, 244 ) 245 for handle, value in characteristics # type: ignore 246 ] 247 ) 248 249 except ProtocolError as e: 250 return ReadCharacteristicsFromUuidResponse( 251 characteristics_read=[ReadCharacteristicResponse(status=e.error_code)] 252 ) 253 254 @utils.rpc 255 async def ReadCharacteristicDescriptorFromHandle( 256 self, request: ReadCharacteristicDescriptorRequest, context: grpc.ServicerContext 257 ) -> ReadCharacteristicDescriptorResponse: 258 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 259 logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}") 260 261 connection = self.device.lookup_connection(connection_handle) 262 assert connection 263 peer = self.peers[connection.handle] 264 265 try: 266 value = await peer.read_value(request.handle) # type: ignore 267 status = SUCCESS 268 except ProtocolError as e: 269 value = bytes() 270 status = e.error_code 271 272 return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status) 273