1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import grpc 17import logging 18 19from bumble.att import Attribute 20from bumble.core import ProtocolError 21from bumble.device import Connection as BumbleConnection, Device, Peer 22from bumble.gatt import Characteristic, Descriptor, Service 23from bumble.gatt_client import CharacteristicProxy, ServiceProxy 24from bumble.pandora import utils 25from pandora_experimental.gatt_grpc_aio import GATTServicer 26from pandora_experimental.gatt_pb2 import ( 27 ATTRIBUTE_NOT_FOUND, 28 SUCCESS, 29 AttStatusCode, 30 AttValue, 31 ClearCacheRequest, 32 ClearCacheResponse, 33 DiscoverServiceByUuidRequest, 34 DiscoverServicesRequest, 35 DiscoverServicesResponse, 36 ExchangeMTURequest, 37 ExchangeMTUResponse, 38 GattCharacteristic, 39 GattCharacteristicDescriptor, 40 GattService, 41 IndicateOnCharacteristicRequest, 42 IndicateOnCharacteristicResponse, 43 NotifyOnCharacteristicRequest, 44 NotifyOnCharacteristicResponse, 45 ReadCharacteristicDescriptorRequest, 46 ReadCharacteristicDescriptorResponse, 47 ReadCharacteristicRequest, 48 ReadCharacteristicResponse, 49 ReadCharacteristicsFromUuidRequest, 50 ReadCharacteristicsFromUuidResponse, 51 RegisterServiceRequest, 52 RegisterServiceResponse, 53 WriteRequest, 54 WriteResponse, 55) 56from typing import Dict, List 57 58 59class GATTService(GATTServicer): 60 device: Device 61 peers: Dict[int, Peer] 62 63 def __init__(self, device: Device) -> None: 64 super().__init__() 65 self.device = device 66 self.peers: Dict[int, Peer] = {} 67 self.device.on('connection', self.on_connection) # type: ignore 68 self.device.on('disconnection', self.on_disconnection) # type: ignore 69 70 def __del__(self) -> None: 71 self.device.remove_listener('connection', self.on_connection) # type: ignore 72 self.device.remove_listener('disconnection', self.on_disconnection) # type: ignore 73 74 def on_connection(self, connection: BumbleConnection) -> None: 75 self.peers[connection.handle] = Peer(connection) # type: ignore[no-untyped-call] 76 77 def on_disconnection(self, connection: BumbleConnection) -> None: 78 del self.peers[connection.handle] 79 80 @utils.rpc 81 async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse: 82 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 83 logging.info(f"ExchangeMTU: {connection_handle}") 84 85 connection = self.device.lookup_connection(connection_handle) 86 assert connection 87 peer = self.peers[connection.handle] 88 89 mtu = await peer.request_mtu(request.mtu) # type: ignore 90 assert mtu == request.mtu 91 92 return ExchangeMTUResponse() 93 94 @utils.rpc 95 async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse: 96 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 97 logging.info(f"WriteAttFromHandle: {connection_handle}") 98 99 connection = self.device.lookup_connection(connection_handle) 100 assert connection 101 peer = self.peers[connection.handle] 102 103 try: 104 await peer.write_value(request.handle, request.value, with_response=True) # type: ignore 105 status: AttStatusCode = SUCCESS 106 except ProtocolError as e: 107 status = e.error_code # type: ignore 108 109 return WriteResponse(handle=request.handle, status=status) 110 111 @utils.rpc 112 async def DiscoverServiceByUuid(self, request: DiscoverServiceByUuidRequest, 113 context: grpc.ServicerContext) -> DiscoverServicesResponse: 114 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 115 logging.info(f"DiscoverServiceByUuid: {connection_handle}") 116 117 connection = self.device.lookup_connection(connection_handle) 118 assert connection 119 peer = self.peers[connection.handle] 120 121 services: List[ServiceProxy] = await peer.discover_service(request.uuid) # type: ignore 122 123 async def feed_service(service: ServiceProxy) -> None: 124 characteristic: CharacteristicProxy 125 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 126 await characteristic.discover_descriptors() # type: ignore[no-untyped-call] 127 128 await asyncio.gather(*(feed_service(service) for service in services)) 129 130 return DiscoverServicesResponse(services=[ 131 GattService( 132 handle=service.handle, 133 type=int.from_bytes(bytes(service.type), 'little'), 134 uuid=service.uuid.to_hex_str('-'), # type: ignore 135 characteristics=[ 136 GattCharacteristic( 137 properties=characteristic.properties, # type: ignore 138 permissions=0, # TODO 139 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 140 handle=characteristic.handle, # type: ignore 141 descriptors=[ 142 GattCharacteristicDescriptor( 143 handle=descriptor.handle, # type: ignore 144 permissions=0, # TODO 145 uuid=str(descriptor.type), # type: ignore 146 ) for descriptor in characteristic.descriptors # type: ignore 147 ], 148 ) for characteristic in service.characteristics # type: ignore 149 ], 150 ) for service in services 151 ]) 152 153 @utils.rpc 154 async def DiscoverServices(self, request: DiscoverServicesRequest, 155 context: grpc.ServicerContext) -> DiscoverServicesResponse: 156 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 157 logging.info(f"DiscoverServices: {connection_handle}") 158 159 connection = self.device.lookup_connection(connection_handle) 160 assert connection 161 peer = self.peers[connection.handle] 162 163 services: List[ServiceProxy] = await peer.discover_services() # type: ignore 164 165 async def feed_service(service: ServiceProxy) -> None: 166 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 167 await characteristic.discover_descriptors() # type: ignore 168 169 await asyncio.gather(*(feed_service(service) for service in services)) 170 171 return DiscoverServicesResponse(services=[ 172 GattService( 173 handle=service.handle, 174 type=int.from_bytes(bytes(service.type), 'little'), 175 uuid=service.uuid.to_hex_str('-'), # type: ignore 176 characteristics=[ 177 GattCharacteristic( 178 properties=characteristic.properties, # type: ignore 179 permissions=0, # TODO 180 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 181 handle=characteristic.handle, # type: ignore 182 descriptors=[ 183 GattCharacteristicDescriptor( 184 handle=descriptor.handle, # type: ignore 185 permissions=0, # TODO 186 uuid=str(descriptor.type), # type: ignore 187 ) for descriptor in characteristic.descriptors # type: ignore 188 ], 189 ) for characteristic in service.characteristics # type: ignore 190 ], 191 ) for service in services 192 ]) 193 194 # TODO: implement `DiscoverServicesSdp` 195 196 @utils.rpc 197 async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse: 198 logging.info("ClearCache") 199 return ClearCacheResponse() 200 201 @utils.rpc 202 async def ReadCharacteristicFromHandle(self, request: ReadCharacteristicRequest, 203 context: grpc.ServicerContext) -> ReadCharacteristicResponse: 204 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 205 logging.info(f"ReadCharacteristicFromHandle: {connection_handle}") 206 207 connection = self.device.lookup_connection(connection_handle) 208 assert connection 209 peer = self.peers[connection.handle] 210 211 try: 212 value = await peer.read_value(request.handle) # type: ignore 213 status: AttStatusCode = SUCCESS 214 except ProtocolError as e: 215 value = bytes() 216 status = e.error_code # type: ignore 217 218 return ReadCharacteristicResponse(value=AttValue(value=value), status=status) 219 220 @utils.rpc 221 async def ReadCharacteristicsFromUuid(self, request: ReadCharacteristicsFromUuidRequest, 222 context: grpc.ServicerContext) -> ReadCharacteristicsFromUuidResponse: 223 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 224 logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}") 225 226 connection = self.device.lookup_connection(connection_handle) 227 assert connection 228 peer = self.peers[connection.handle] 229 230 service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})() 231 232 try: 233 characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock) # type: ignore 234 235 return ReadCharacteristicsFromUuidResponse(characteristics_read=[ 236 ReadCharacteristicResponse( 237 value=AttValue(value=value, handle=handle), # type: ignore 238 status=SUCCESS, 239 ) for handle, value in characteristics # type: ignore 240 ]) 241 242 except ProtocolError as e: 243 return ReadCharacteristicsFromUuidResponse( 244 characteristics_read=[ReadCharacteristicResponse(status=e.error_code)] # type: ignore 245 ) 246 247 @utils.rpc 248 async def ReadCharacteristicDescriptorFromHandle( 249 self, request: ReadCharacteristicDescriptorRequest, 250 context: grpc.ServicerContext) -> ReadCharacteristicDescriptorResponse: 251 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 252 logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}") 253 254 connection = self.device.lookup_connection(connection_handle) 255 assert connection 256 peer = self.peers[connection.handle] 257 258 try: 259 value = await peer.read_value(request.handle) # type: ignore 260 status: AttStatusCode = SUCCESS 261 except ProtocolError as e: 262 value = bytes() 263 status = e.error_code # type: ignore 264 265 return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status) 266 267 @utils.rpc 268 def RegisterService(self, request: RegisterServiceRequest, 269 context: grpc.ServicerContext) -> RegisterServiceResponse: 270 logging.info(f"RegisterService") 271 272 serviceUUID = request.service.uuid 273 characteristics = [ 274 Characteristic( 275 properties=Characteristic.Properties(characteristicParam.properties), 276 permissions=Attribute.Permissions(characteristicParam.permissions), 277 uuid=characteristicParam.uuid, 278 descriptors=[ 279 Descriptor( 280 attribute_type=descParam.uuid, 281 permissions=Attribute.Permissions(descParam.permissions), 282 ) for descParam in characteristicParam.descriptors 283 ], 284 ) for characteristicParam in request.service.characteristics 285 ] 286 service = Service(serviceUUID, characteristics) 287 self.device.add_service(service) # type: ignore[no-untyped-call] 288 289 logging.info(f"RegisterService complete") 290 return RegisterServiceResponse() 291 292 @utils.rpc 293 async def NotifyOnCharacteristic(self, request: NotifyOnCharacteristicRequest, 294 context: grpc.ServicerContext) -> NotifyOnCharacteristicResponse: 295 logging.info(f"NotifyOnCharacteristic") 296 297 attr = self.device.gatt_server.get_attribute(request.handle) 298 if not attr: 299 return NotifyOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND) 300 await self.device.notify_subscribers(attr, request.value) 301 return NotifyOnCharacteristicResponse(status=SUCCESS) 302 303 @utils.rpc 304 async def IndicateOnCharacteristic(self, request: IndicateOnCharacteristicRequest, 305 context: grpc.ServicerContext) -> IndicateOnCharacteristicResponse: 306 logging.info(f"IndicateOnCharacteristic") 307 308 attr = self.device.gatt_server.get_attribute(request.handle) 309 if not attr: 310 return IndicateOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND) 311 await self.device.indicate_subscribers(attr, request.value) 312 return IndicateOnCharacteristicResponse(status=SUCCESS) 313