1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import bumble.device 17import grpc 18import grpc.aio 19import logging 20import struct 21 22from . import utils 23from bumble.core import ( 24 BT_BR_EDR_TRANSPORT, 25 BT_LE_TRANSPORT, 26 BT_PERIPHERAL_ROLE, 27 UUID, 28 AdvertisingData, 29 ConnectionError, 30) 31from bumble.device import ( 32 DEVICE_DEFAULT_SCAN_INTERVAL, 33 DEVICE_DEFAULT_SCAN_WINDOW, 34 Advertisement, 35 AdvertisingType, 36 Device, 37) 38from bumble.gatt import Service 39from bumble.hci import ( 40 HCI_CONNECTION_ALREADY_EXISTS_ERROR, 41 HCI_PAGE_TIMEOUT_ERROR, 42 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 43 Address, 44) 45from google.protobuf import any_pb2, empty_pb2 # pytype: disable=pyi-error 46from pandora.host_grpc_aio import HostServicer 47from pandora.host_pb2 import ( 48 NOT_CONNECTABLE, 49 NOT_DISCOVERABLE, 50 PRIMARY_1M, 51 PRIMARY_CODED, 52 SECONDARY_1M, 53 SECONDARY_2M, 54 SECONDARY_CODED, 55 SECONDARY_NONE, 56 AdvertiseRequest, 57 AdvertiseResponse, 58 Connection, 59 ConnectLERequest, 60 ConnectLEResponse, 61 ConnectRequest, 62 ConnectResponse, 63 DataTypes, 64 DisconnectRequest, 65 InquiryResponse, 66 PrimaryPhy, 67 ReadLocalAddressResponse, 68 ScanningResponse, 69 ScanRequest, 70 SecondaryPhy, 71 SetConnectabilityModeRequest, 72 SetDiscoverabilityModeRequest, 73 WaitConnectionRequest, 74 WaitConnectionResponse, 75 WaitDisconnectionRequest, 76) 77from typing import AsyncGenerator, Dict, List, Optional, Set, Tuple, cast 78 79PRIMARY_PHY_MAP: Dict[int, PrimaryPhy] = { 80 # Default value reported by Bumble for legacy Advertising reports. 81 # FIXME(uael): `None` might be a better value, but Bumble need to change accordingly. 82 0: PRIMARY_1M, 83 1: PRIMARY_1M, 84 3: PRIMARY_CODED, 85} 86 87SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = { 88 0: SECONDARY_NONE, 89 1: SECONDARY_1M, 90 2: SECONDARY_2M, 91 3: SECONDARY_CODED, 92} 93 94 95class HostService(HostServicer): 96 grpc_server: grpc.aio.Server 97 device: Device 98 waited_connections: Set[int] 99 100 def __init__(self, grpc_server: grpc.aio.Server, device: Device) -> None: 101 super().__init__() 102 self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {'service_name': 'Host', 'device': device}) 103 self.grpc_server = grpc_server 104 self.device = device 105 self.waited_connections = set() 106 107 @utils.rpc 108 async def FactoryReset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: 109 self.log.info('FactoryReset') 110 111 # delete all bonds 112 if self.device.keystore is not None: 113 await self.device.keystore.delete_all() 114 115 # trigger gRCP server stop then return 116 asyncio.create_task(self.grpc_server.stop(None)) 117 return empty_pb2.Empty() 118 119 @utils.rpc 120 async def Reset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: 121 self.log.info('Reset') 122 123 # clear service. 124 self.waited_connections.clear() 125 126 # (re) power device on 127 await self.device.power_on() 128 return empty_pb2.Empty() 129 130 @utils.rpc 131 async def ReadLocalAddress( 132 self, request: empty_pb2.Empty, context: grpc.ServicerContext 133 ) -> ReadLocalAddressResponse: 134 self.log.info('ReadLocalAddress') 135 return ReadLocalAddressResponse(address=bytes(reversed(bytes(self.device.public_address)))) 136 137 @utils.rpc 138 async def Connect(self, request: ConnectRequest, context: grpc.ServicerContext) -> ConnectResponse: 139 # Need to reverse bytes order since Bumble Address is using MSB. 140 address = Address(bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS) 141 self.log.info(f"Connect to {address}") 142 143 try: 144 connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT) 145 except ConnectionError as e: 146 if e.error_code == HCI_PAGE_TIMEOUT_ERROR: 147 self.log.warning(f"Peer not found: {e}") 148 return ConnectResponse(peer_not_found=empty_pb2.Empty()) 149 if e.error_code == HCI_CONNECTION_ALREADY_EXISTS_ERROR: 150 self.log.warning(f"Connection already exists: {e}") 151 return ConnectResponse(connection_already_exists=empty_pb2.Empty()) 152 raise e 153 154 self.log.info(f"Connect to {address} done (handle={connection.handle})") 155 156 cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big')) 157 return ConnectResponse(connection=Connection(cookie=cookie)) 158 159 @utils.rpc 160 async def WaitConnection( 161 self, request: WaitConnectionRequest, context: grpc.ServicerContext 162 ) -> WaitConnectionResponse: 163 if not request.address: 164 raise ValueError('Request address field must be set') 165 166 # Need to reverse bytes order since Bumble Address is using MSB. 167 address = Address(bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS) 168 if address in (Address.NIL, Address.ANY): 169 raise ValueError('Invalid address') 170 171 self.log.info(f"WaitConnection from {address}...") 172 173 connection = self.device.find_connection_by_bd_addr(address, transport=BT_BR_EDR_TRANSPORT) 174 if connection and id(connection) in self.waited_connections: 175 # this connection was already returned: wait for a new one. 176 connection = None 177 178 if not connection: 179 connection = await self.device.accept(address) 180 181 # save connection has waited and respond. 182 self.waited_connections.add(id(connection)) 183 184 self.log.info(f"WaitConnection from {address} done (handle={connection.handle})") 185 186 cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big')) 187 return WaitConnectionResponse(connection=Connection(cookie=cookie)) 188 189 @utils.rpc 190 async def ConnectLE(self, request: ConnectLERequest, context: grpc.ServicerContext) -> ConnectLEResponse: 191 address = utils.address_from_request(request, request.WhichOneof("address")) 192 if address in (Address.NIL, Address.ANY): 193 raise ValueError('Invalid address') 194 195 self.log.info(f"ConnectLE to {address}...") 196 197 try: 198 connection = await self.device.connect( 199 address, transport=BT_LE_TRANSPORT, own_address_type=request.own_address_type 200 ) 201 except ConnectionError as e: 202 if e.error_code == HCI_PAGE_TIMEOUT_ERROR: 203 self.log.warning(f"Peer not found: {e}") 204 return ConnectLEResponse(peer_not_found=empty_pb2.Empty()) 205 if e.error_code == HCI_CONNECTION_ALREADY_EXISTS_ERROR: 206 self.log.warning(f"Connection already exists: {e}") 207 return ConnectLEResponse(connection_already_exists=empty_pb2.Empty()) 208 raise e 209 210 self.log.info(f"ConnectLE to {address} done (handle={connection.handle})") 211 212 cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big')) 213 return ConnectLEResponse(connection=Connection(cookie=cookie)) 214 215 @utils.rpc 216 async def Disconnect(self, request: DisconnectRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: 217 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 218 self.log.info(f"Disconnect: {connection_handle}") 219 220 self.log.info("Disconnecting...") 221 if connection := self.device.lookup_connection(connection_handle): 222 await connection.disconnect(HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR) 223 self.log.info("Disconnected") 224 225 return empty_pb2.Empty() 226 227 @utils.rpc 228 async def WaitDisconnection( 229 self, request: WaitDisconnectionRequest, context: grpc.ServicerContext 230 ) -> empty_pb2.Empty: 231 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 232 self.log.info(f"WaitDisconnection: {connection_handle}") 233 234 if connection := self.device.lookup_connection(connection_handle): 235 disconnection_future: asyncio.Future[None] = asyncio.get_running_loop().create_future() 236 237 def on_disconnection(_: None) -> None: 238 disconnection_future.set_result(None) 239 240 connection.on('disconnection', on_disconnection) 241 try: 242 await disconnection_future 243 self.log.info("Disconnected") 244 finally: 245 connection.remove_listener('disconnection', on_disconnection) # type: ignore 246 247 return empty_pb2.Empty() 248 249 @utils.rpc 250 async def Advertise( 251 self, request: AdvertiseRequest, context: grpc.ServicerContext 252 ) -> AsyncGenerator[AdvertiseResponse, None]: 253 if not request.legacy: 254 raise NotImplementedError("TODO: add support for extended advertising in Bumble") 255 if request.interval: 256 raise NotImplementedError("TODO: add support for `request.interval`") 257 if request.interval_range: 258 raise NotImplementedError("TODO: add support for `request.interval_range`") 259 if request.primary_phy: 260 raise NotImplementedError("TODO: add support for `request.primary_phy`") 261 if request.secondary_phy: 262 raise NotImplementedError("TODO: add support for `request.secondary_phy`") 263 264 if self.device.is_advertising: 265 raise NotImplementedError('TODO: add support for advertising sets') 266 267 if data := request.data: 268 self.device.advertising_data = bytes(self.unpack_data_types(data)) 269 270 if scan_response_data := request.scan_response_data: 271 self.device.scan_response_data = bytes(self.unpack_data_types(scan_response_data)) 272 scannable = True 273 else: 274 scannable = False 275 276 # Retrieve services data 277 for service in self.device.gatt_server.attributes: 278 if isinstance(service, Service) and (service_data := service.get_advertising_data()): 279 service_uuid = service.uuid.to_hex_str() 280 if ( 281 service_uuid in request.data.incomplete_service_class_uuids16 282 or service_uuid in request.data.complete_service_class_uuids16 283 or service_uuid in request.data.incomplete_service_class_uuids32 284 or service_uuid in request.data.complete_service_class_uuids32 285 or service_uuid in request.data.incomplete_service_class_uuids128 286 or service_uuid in request.data.complete_service_class_uuids128 287 ): 288 self.device.advertising_data += service_data 289 if ( 290 service_uuid in scan_response_data.incomplete_service_class_uuids16 291 or service_uuid in scan_response_data.complete_service_class_uuids16 292 or service_uuid in scan_response_data.incomplete_service_class_uuids32 293 or service_uuid in scan_response_data.complete_service_class_uuids32 294 or service_uuid in scan_response_data.incomplete_service_class_uuids128 295 or service_uuid in scan_response_data.complete_service_class_uuids128 296 ): 297 self.device.scan_response_data += service_data 298 299 target = None 300 if request.connectable and scannable: 301 advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE 302 elif scannable: 303 advertising_type = AdvertisingType.UNDIRECTED_SCANNABLE 304 else: 305 advertising_type = AdvertisingType.UNDIRECTED 306 else: 307 target = None 308 advertising_type = AdvertisingType.UNDIRECTED 309 310 if request.target: 311 # Need to reverse bytes order since Bumble Address is using MSB. 312 target_bytes = bytes(reversed(request.target)) 313 if request.target_variant() == "public": 314 target = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS) 315 advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY # FIXME: HIGH_DUTY ? 316 else: 317 target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS) 318 advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY # FIXME: HIGH_DUTY ? 319 320 if request.connectable: 321 322 def on_connection(connection: bumble.device.Connection) -> None: 323 if connection.transport == BT_LE_TRANSPORT and connection.role == BT_PERIPHERAL_ROLE: 324 pending_connection.set_result(connection) 325 326 self.device.on('connection', on_connection) 327 328 try: 329 while True: 330 if not self.device.is_advertising: 331 self.log.info('Advertise') 332 await self.device.start_advertising( 333 target=target, advertising_type=advertising_type, own_address_type=request.own_address_type 334 ) 335 336 if not request.connectable: 337 await asyncio.sleep(1) 338 continue 339 340 pending_connection: asyncio.Future[ 341 bumble.device.Connection 342 ] = asyncio.get_running_loop().create_future() 343 344 self.log.info('Wait for LE connection...') 345 connection = await pending_connection 346 347 self.log.info(f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})") 348 349 cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big')) 350 yield AdvertiseResponse(connection=Connection(cookie=cookie)) 351 352 # wait a small delay before restarting the advertisement. 353 await asyncio.sleep(1) 354 finally: 355 if request.connectable: 356 self.device.remove_listener('connection', on_connection) # type: ignore 357 358 try: 359 self.log.info('Stop advertising') 360 await self.device.abort_on('flush', self.device.stop_advertising()) 361 except: 362 pass 363 364 @utils.rpc 365 async def Scan( 366 self, request: ScanRequest, context: grpc.ServicerContext 367 ) -> AsyncGenerator[ScanningResponse, None]: 368 # TODO: modify `start_scanning` to accept floats instead of int for ms values 369 if request.phys: 370 raise NotImplementedError("TODO: add support for `request.phys`") 371 372 self.log.info('Scan') 373 374 scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue() 375 handler = self.device.on('advertisement', scan_queue.put_nowait) 376 await self.device.start_scanning( 377 legacy=request.legacy, 378 active=not request.passive, 379 own_address_type=request.own_address_type, 380 scan_interval=int(request.interval) if request.interval else DEVICE_DEFAULT_SCAN_INTERVAL, 381 scan_window=int(request.window) if request.window else DEVICE_DEFAULT_SCAN_WINDOW, 382 ) 383 384 try: 385 # TODO: add support for `direct_address` in Bumble 386 # TODO: add support for `periodic_advertising_interval` in Bumble 387 while adv := await scan_queue.get(): 388 sr = ScanningResponse( 389 legacy=adv.is_legacy, 390 connectable=adv.is_connectable, 391 scannable=adv.is_scannable, 392 truncated=adv.is_truncated, 393 sid=adv.sid, 394 primary_phy=PRIMARY_PHY_MAP[adv.primary_phy], 395 secondary_phy=SECONDARY_PHY_MAP[adv.secondary_phy], 396 tx_power=adv.tx_power, 397 rssi=adv.rssi, 398 data=self.pack_data_types(adv.data), 399 ) 400 401 if adv.address.address_type == Address.PUBLIC_DEVICE_ADDRESS: 402 sr.public = bytes(reversed(bytes(adv.address))) 403 elif adv.address.address_type == Address.RANDOM_DEVICE_ADDRESS: 404 sr.random = bytes(reversed(bytes(adv.address))) 405 elif adv.address.address_type == Address.PUBLIC_IDENTITY_ADDRESS: 406 sr.public_identity = bytes(reversed(bytes(adv.address))) 407 else: 408 sr.random_static_identity = bytes(reversed(bytes(adv.address))) 409 410 yield sr 411 412 finally: 413 self.device.remove_listener('advertisement', handler) # type: ignore 414 try: 415 self.log.info('Stop scanning') 416 await self.device.abort_on('flush', self.device.stop_scanning()) 417 except: 418 pass 419 420 @utils.rpc 421 async def Inquiry( 422 self, request: empty_pb2.Empty, context: grpc.ServicerContext 423 ) -> AsyncGenerator[InquiryResponse, None]: 424 self.log.info('Inquiry') 425 426 inquiry_queue: asyncio.Queue[Optional[Tuple[Address, int, AdvertisingData, int]]] = asyncio.Queue() 427 complete_handler = self.device.on('inquiry_complete', lambda: inquiry_queue.put_nowait(None)) 428 result_handler = self.device.on( # type: ignore 429 'inquiry_result', 430 lambda address, class_of_device, eir_data, rssi: inquiry_queue.put_nowait( # type: ignore 431 (address, class_of_device, eir_data, rssi) # type: ignore 432 ), 433 ) 434 435 await self.device.start_discovery(auto_restart=False) 436 try: 437 while inquiry_result := await inquiry_queue.get(): 438 (address, class_of_device, eir_data, rssi) = inquiry_result 439 # FIXME: if needed, add support for `page_scan_repetition_mode` and `clock_offset` in Bumble 440 yield InquiryResponse( 441 address=bytes(reversed(bytes(address))), 442 class_of_device=class_of_device, 443 rssi=rssi, 444 data=self.pack_data_types(eir_data), 445 ) 446 447 finally: 448 self.device.remove_listener('inquiry_complete', complete_handler) # type: ignore 449 self.device.remove_listener('inquiry_result', result_handler) # type: ignore 450 try: 451 self.log.info('Stop inquiry') 452 await self.device.abort_on('flush', self.device.stop_discovery()) 453 except: 454 pass 455 456 @utils.rpc 457 async def SetDiscoverabilityMode( 458 self, request: SetDiscoverabilityModeRequest, context: grpc.ServicerContext 459 ) -> empty_pb2.Empty: 460 self.log.info("SetDiscoverabilityMode") 461 await self.device.set_discoverable(request.mode != NOT_DISCOVERABLE) 462 return empty_pb2.Empty() 463 464 @utils.rpc 465 async def SetConnectabilityMode( 466 self, request: SetConnectabilityModeRequest, context: grpc.ServicerContext 467 ) -> empty_pb2.Empty: 468 self.log.info("SetConnectabilityMode") 469 await self.device.set_connectable(request.mode != NOT_CONNECTABLE) 470 return empty_pb2.Empty() 471 472 def unpack_data_types(self, dt: DataTypes) -> AdvertisingData: 473 ad_structures: List[Tuple[int, bytes]] = [] 474 475 uuids: List[str] 476 datas: Dict[str, bytes] 477 478 def uuid128_from_str(uuid: str) -> bytes: 479 """Decode a 128-bit uuid encoded as XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX 480 to byte format.""" 481 return bytes(reversed(bytes.fromhex(uuid.replace('-', '')))) 482 483 def uuid32_from_str(uuid: str) -> bytes: 484 """Decode a 32-bit uuid encoded as XXXXXXXX to byte format.""" 485 return bytes(reversed(bytes.fromhex(uuid))) 486 487 def uuid16_from_str(uuid: str) -> bytes: 488 """Decode a 16-bit uuid encoded as XXXX to byte format.""" 489 return bytes(reversed(bytes.fromhex(uuid))) 490 491 if uuids := dt.incomplete_service_class_uuids16: 492 ad_structures.append( 493 ( 494 AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 495 b''.join([uuid16_from_str(uuid) for uuid in uuids]), 496 ) 497 ) 498 if uuids := dt.complete_service_class_uuids16: 499 ad_structures.append( 500 ( 501 AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 502 b''.join([uuid16_from_str(uuid) for uuid in uuids]), 503 ) 504 ) 505 if uuids := dt.incomplete_service_class_uuids32: 506 ad_structures.append( 507 ( 508 AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, 509 b''.join([uuid32_from_str(uuid) for uuid in uuids]), 510 ) 511 ) 512 if uuids := dt.complete_service_class_uuids32: 513 ad_structures.append( 514 ( 515 AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, 516 b''.join([uuid32_from_str(uuid) for uuid in uuids]), 517 ) 518 ) 519 if uuids := dt.incomplete_service_class_uuids128: 520 ad_structures.append( 521 ( 522 AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, 523 b''.join([uuid128_from_str(uuid) for uuid in uuids]), 524 ) 525 ) 526 if uuids := dt.complete_service_class_uuids128: 527 ad_structures.append( 528 ( 529 AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, 530 b''.join([uuid128_from_str(uuid) for uuid in uuids]), 531 ) 532 ) 533 if dt.HasField('include_shortened_local_name'): 534 ad_structures.append((AdvertisingData.SHORTENED_LOCAL_NAME, bytes(self.device.name[:8], 'utf-8'))) 535 elif dt.shortened_local_name: 536 ad_structures.append((AdvertisingData.SHORTENED_LOCAL_NAME, bytes(dt.shortened_local_name, 'utf-8'))) 537 if dt.HasField('include_complete_local_name'): 538 ad_structures.append((AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.device.name, 'utf-8'))) 539 elif dt.complete_local_name: 540 ad_structures.append((AdvertisingData.COMPLETE_LOCAL_NAME, bytes(dt.complete_local_name, 'utf-8'))) 541 if dt.HasField('include_tx_power_level'): 542 raise ValueError('unsupported data type') 543 elif dt.tx_power_level: 544 ad_structures.append((AdvertisingData.TX_POWER_LEVEL, bytes(struct.pack('<I', dt.tx_power_level)[:1]))) 545 if dt.HasField('include_class_of_device'): 546 ad_structures.append( 547 (AdvertisingData.CLASS_OF_DEVICE, bytes(struct.pack('<I', self.device.class_of_device)[:-1])) 548 ) 549 elif dt.class_of_device: 550 ad_structures.append((AdvertisingData.CLASS_OF_DEVICE, bytes(struct.pack('<I', dt.class_of_device)[:-1]))) 551 if dt.peripheral_connection_interval_min: 552 ad_structures.append( 553 ( 554 AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE, 555 bytes( 556 [ 557 *struct.pack('<H', dt.peripheral_connection_interval_min), 558 *struct.pack( 559 '<H', 560 dt.peripheral_connection_interval_max 561 if dt.peripheral_connection_interval_max 562 else dt.peripheral_connection_interval_min, 563 ), 564 ] 565 ), 566 ) 567 ) 568 if uuids := dt.service_solicitation_uuids16: 569 ad_structures.append( 570 ( 571 AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS, 572 b''.join([uuid16_from_str(uuid) for uuid in uuids]), 573 ) 574 ) 575 if uuids := dt.service_solicitation_uuids32: 576 ad_structures.append( 577 ( 578 AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS, 579 b''.join([uuid32_from_str(uuid) for uuid in uuids]), 580 ) 581 ) 582 if uuids := dt.service_solicitation_uuids128: 583 ad_structures.append( 584 ( 585 AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS, 586 b''.join([uuid128_from_str(uuid) for uuid in uuids]), 587 ) 588 ) 589 if datas := dt.service_data_uuid16: 590 ad_structures.extend( 591 [ 592 (AdvertisingData.SERVICE_DATA_16_BIT_UUID, uuid16_from_str(uuid) + data) 593 for uuid, data in datas.items() 594 ] 595 ) 596 if datas := dt.service_data_uuid32: 597 ad_structures.extend( 598 [ 599 (AdvertisingData.SERVICE_DATA_32_BIT_UUID, uuid32_from_str(uuid) + data) 600 for uuid, data in datas.items() 601 ] 602 ) 603 if datas := dt.service_data_uuid128: 604 ad_structures.extend( 605 [ 606 (AdvertisingData.SERVICE_DATA_128_BIT_UUID, uuid128_from_str(uuid) + data) 607 for uuid, data in datas.items() 608 ] 609 ) 610 if dt.appearance: 611 ad_structures.append((AdvertisingData.APPEARANCE, struct.pack('<H', dt.appearance))) 612 if dt.advertising_interval: 613 ad_structures.append((AdvertisingData.ADVERTISING_INTERVAL, struct.pack('<H', dt.advertising_interval))) 614 if dt.uri: 615 ad_structures.append((AdvertisingData.URI, bytes(dt.uri, 'utf-8'))) 616 if dt.le_supported_features: 617 ad_structures.append((AdvertisingData.LE_SUPPORTED_FEATURES, dt.le_supported_features)) 618 if dt.manufacturer_specific_data: 619 ad_structures.append((AdvertisingData.MANUFACTURER_SPECIFIC_DATA, dt.manufacturer_specific_data)) 620 621 return AdvertisingData(ad_structures) 622 623 def pack_data_types(self, ad: AdvertisingData) -> DataTypes: 624 dt = DataTypes() 625 uuids: List[UUID] 626 s: str 627 i: int 628 ij: Tuple[int, int] 629 uuid_data: Tuple[UUID, bytes] 630 data: bytes 631 632 if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS)): 633 dt.incomplete_service_class_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids))) 634 if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS)): 635 dt.complete_service_class_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids))) 636 if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS)): 637 dt.incomplete_service_class_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids))) 638 if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS)): 639 dt.complete_service_class_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids))) 640 if uuids := cast(List[UUID], ad.get(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS)): 641 dt.incomplete_service_class_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids))) 642 if uuids := cast(List[UUID], ad.get(AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS)): 643 dt.complete_service_class_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids))) 644 if s := cast(str, ad.get(AdvertisingData.SHORTENED_LOCAL_NAME)): 645 dt.shortened_local_name = s 646 if s := cast(str, ad.get(AdvertisingData.COMPLETE_LOCAL_NAME)): 647 dt.complete_local_name = s 648 if i := cast(int, ad.get(AdvertisingData.TX_POWER_LEVEL)): 649 dt.tx_power_level = i 650 if i := cast(int, ad.get(AdvertisingData.CLASS_OF_DEVICE)): 651 dt.class_of_device = i 652 if ij := cast(Tuple[int, int], ad.get(AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE)): 653 dt.peripheral_connection_interval_min = ij[0] 654 dt.peripheral_connection_interval_max = ij[1] 655 if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS)): 656 dt.service_solicitation_uuids16.extend(list(map(lambda x: x.to_hex_str(), uuids))) 657 if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS)): 658 dt.service_solicitation_uuids32.extend(list(map(lambda x: x.to_hex_str(), uuids))) 659 if uuids := cast(List[UUID], ad.get(AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS)): 660 dt.service_solicitation_uuids128.extend(list(map(lambda x: x.to_hex_str(), uuids))) 661 if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_16_BIT_UUID)): 662 dt.service_data_uuid16[uuid_data[0].to_hex_str()] = uuid_data[1] 663 if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_32_BIT_UUID)): 664 dt.service_data_uuid32[uuid_data[0].to_hex_str()] = uuid_data[1] 665 if uuid_data := cast(Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_128_BIT_UUID)): 666 dt.service_data_uuid128[uuid_data[0].to_hex_str()] = uuid_data[1] 667 if data := cast(bytes, ad.get(AdvertisingData.PUBLIC_TARGET_ADDRESS, raw=True)): 668 dt.public_target_addresses.extend([data[i * 6 :: i * 6 + 6] for i in range(int(len(data) / 6))]) 669 if data := cast(bytes, ad.get(AdvertisingData.RANDOM_TARGET_ADDRESS, raw=True)): 670 dt.random_target_addresses.extend([data[i * 6 :: i * 6 + 6] for i in range(int(len(data) / 6))]) 671 if i := cast(int, ad.get(AdvertisingData.APPEARANCE)): 672 dt.appearance = i 673 if i := cast(int, ad.get(AdvertisingData.ADVERTISING_INTERVAL)): 674 dt.advertising_interval = i 675 if s := cast(str, ad.get(AdvertisingData.URI)): 676 dt.uri = s 677 if data := cast(bytes, ad.get(AdvertisingData.LE_SUPPORTED_FEATURES, raw=True)): 678 dt.le_supported_features = data 679 if data := cast(bytes, ad.get(AdvertisingData.MANUFACTURER_SPECIFIC_DATA, raw=True)): 680 dt.manufacturer_specific_data = data 681 682 return dt 683