1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# ATT - Attribute Protocol 17# 18# See Bluetooth spec @ Vol 3, Part F 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26import enum 27import functools 28import inspect 29import struct 30from typing import ( 31 Any, 32 Awaitable, 33 Callable, 34 Dict, 35 List, 36 Optional, 37 Type, 38 Union, 39 TYPE_CHECKING, 40) 41 42from pyee import EventEmitter 43 44from bumble.core import UUID, name_or_number, ProtocolError 45from bumble.hci import HCI_Object, key_with_value 46from bumble.colors import color 47 48if TYPE_CHECKING: 49 from bumble.device import Connection 50 51# ----------------------------------------------------------------------------- 52# Constants 53# ----------------------------------------------------------------------------- 54# fmt: off 55# pylint: disable=line-too-long 56 57ATT_CID = 0x04 58 59ATT_ERROR_RESPONSE = 0x01 60ATT_EXCHANGE_MTU_REQUEST = 0x02 61ATT_EXCHANGE_MTU_RESPONSE = 0x03 62ATT_FIND_INFORMATION_REQUEST = 0x04 63ATT_FIND_INFORMATION_RESPONSE = 0x05 64ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 65ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 66ATT_READ_BY_TYPE_REQUEST = 0x08 67ATT_READ_BY_TYPE_RESPONSE = 0x09 68ATT_READ_REQUEST = 0x0A 69ATT_READ_RESPONSE = 0x0B 70ATT_READ_BLOB_REQUEST = 0x0C 71ATT_READ_BLOB_RESPONSE = 0x0D 72ATT_READ_MULTIPLE_REQUEST = 0x0E 73ATT_READ_MULTIPLE_RESPONSE = 0x0F 74ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 75ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 76ATT_WRITE_REQUEST = 0x12 77ATT_WRITE_RESPONSE = 0x13 78ATT_WRITE_COMMAND = 0x52 79ATT_SIGNED_WRITE_COMMAND = 0xD2 80ATT_PREPARE_WRITE_REQUEST = 0x16 81ATT_PREPARE_WRITE_RESPONSE = 0x17 82ATT_EXECUTE_WRITE_REQUEST = 0x18 83ATT_EXECUTE_WRITE_RESPONSE = 0x19 84ATT_HANDLE_VALUE_NOTIFICATION = 0x1B 85ATT_HANDLE_VALUE_INDICATION = 0x1D 86ATT_HANDLE_VALUE_CONFIRMATION = 0x1E 87 88ATT_PDU_NAMES = { 89 ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE', 90 ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST', 91 ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE', 92 ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST', 93 ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE', 94 ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST', 95 ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE', 96 ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST', 97 ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE', 98 ATT_READ_REQUEST: 'ATT_READ_REQUEST', 99 ATT_READ_RESPONSE: 'ATT_READ_RESPONSE', 100 ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST', 101 ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE', 102 ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST', 103 ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE', 104 ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST', 105 ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE', 106 ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST', 107 ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE', 108 ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND', 109 ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND', 110 ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST', 111 ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE', 112 ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST', 113 ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE', 114 ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION', 115 ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION', 116 ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION' 117} 118 119ATT_REQUESTS = [ 120 ATT_EXCHANGE_MTU_REQUEST, 121 ATT_FIND_INFORMATION_REQUEST, 122 ATT_FIND_BY_TYPE_VALUE_REQUEST, 123 ATT_READ_BY_TYPE_REQUEST, 124 ATT_READ_REQUEST, 125 ATT_READ_BLOB_REQUEST, 126 ATT_READ_MULTIPLE_REQUEST, 127 ATT_READ_BY_GROUP_TYPE_REQUEST, 128 ATT_WRITE_REQUEST, 129 ATT_PREPARE_WRITE_REQUEST, 130 ATT_EXECUTE_WRITE_REQUEST 131] 132 133ATT_RESPONSES = [ 134 ATT_ERROR_RESPONSE, 135 ATT_EXCHANGE_MTU_RESPONSE, 136 ATT_FIND_INFORMATION_RESPONSE, 137 ATT_FIND_BY_TYPE_VALUE_RESPONSE, 138 ATT_READ_BY_TYPE_RESPONSE, 139 ATT_READ_RESPONSE, 140 ATT_READ_BLOB_RESPONSE, 141 ATT_READ_MULTIPLE_RESPONSE, 142 ATT_READ_BY_GROUP_TYPE_RESPONSE, 143 ATT_WRITE_RESPONSE, 144 ATT_PREPARE_WRITE_RESPONSE, 145 ATT_EXECUTE_WRITE_RESPONSE 146] 147 148ATT_INVALID_HANDLE_ERROR = 0x01 149ATT_READ_NOT_PERMITTED_ERROR = 0x02 150ATT_WRITE_NOT_PERMITTED_ERROR = 0x03 151ATT_INVALID_PDU_ERROR = 0x04 152ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05 153ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06 154ATT_INVALID_OFFSET_ERROR = 0x07 155ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08 156ATT_PREPARE_QUEUE_FULL_ERROR = 0x09 157ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A 158ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B 159ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C 160ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D 161ATT_UNLIKELY_ERROR_ERROR = 0x0E 162ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F 163ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10 164ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11 165 166ATT_ERROR_NAMES = { 167 ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR', 168 ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR', 169 ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR', 170 ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR', 171 ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR', 172 ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR', 173 ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR', 174 ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR', 175 ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR', 176 ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR', 177 ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR', 178 ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR', 179 ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR', 180 ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR', 181 ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR', 182 ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR', 183 ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR' 184} 185 186ATT_DEFAULT_MTU = 23 187 188HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} 189# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 190UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) 191# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 192UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 193 194# fmt: on 195# pylint: enable=line-too-long 196# pylint: disable=invalid-name 197 198 199# ----------------------------------------------------------------------------- 200# Exceptions 201# ----------------------------------------------------------------------------- 202class ATT_Error(ProtocolError): 203 def __init__(self, error_code, att_handle=0x0000, message=''): 204 super().__init__( 205 error_code, 206 error_namespace='att', 207 error_name=ATT_PDU.error_name(error_code), 208 ) 209 self.att_handle = att_handle 210 self.message = message 211 212 def __str__(self): 213 return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}' 214 215 216# ----------------------------------------------------------------------------- 217# Attribute Protocol 218# ----------------------------------------------------------------------------- 219class ATT_PDU: 220 ''' 221 See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU 222 ''' 223 224 pdu_classes: Dict[int, Type[ATT_PDU]] = {} 225 op_code = 0 226 name: str 227 228 @staticmethod 229 def from_bytes(pdu): 230 op_code = pdu[0] 231 232 cls = ATT_PDU.pdu_classes.get(op_code) 233 if cls is None: 234 instance = ATT_PDU(pdu) 235 instance.name = ATT_PDU.pdu_name(op_code) 236 instance.op_code = op_code 237 return instance 238 self = cls.__new__(cls) 239 ATT_PDU.__init__(self, pdu) 240 if hasattr(self, 'fields'): 241 self.init_from_bytes(pdu, 1) 242 return self 243 244 @staticmethod 245 def pdu_name(op_code): 246 return name_or_number(ATT_PDU_NAMES, op_code, 2) 247 248 @staticmethod 249 def error_name(error_code): 250 return name_or_number(ATT_ERROR_NAMES, error_code, 2) 251 252 @staticmethod 253 def subclass(fields): 254 def inner(cls): 255 cls.name = cls.__name__.upper() 256 cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name) 257 if cls.op_code is None: 258 raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES') 259 cls.fields = fields 260 261 # Register a factory for this class 262 ATT_PDU.pdu_classes[cls.op_code] = cls 263 264 return cls 265 266 return inner 267 268 def __init__(self, pdu=None, **kwargs): 269 if hasattr(self, 'fields') and kwargs: 270 HCI_Object.init_from_fields(self, self.fields, kwargs) 271 if pdu is None: 272 pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 273 self.pdu = pdu 274 275 def init_from_bytes(self, pdu, offset): 276 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 277 278 def to_bytes(self): 279 return self.pdu 280 281 @property 282 def is_command(self): 283 return ((self.op_code >> 6) & 1) == 1 284 285 @property 286 def has_authentication_signature(self): 287 return ((self.op_code >> 7) & 1) == 1 288 289 def __bytes__(self): 290 return self.to_bytes() 291 292 def __str__(self): 293 result = color(self.name, 'yellow') 294 if fields := getattr(self, 'fields', None): 295 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 296 else: 297 if len(self.pdu) > 1: 298 result += f': {self.pdu.hex()}' 299 return result 300 301 302# ----------------------------------------------------------------------------- 303@ATT_PDU.subclass( 304 [ 305 ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}), 306 ('attribute_handle_in_error', HANDLE_FIELD_SPEC), 307 ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}), 308 ] 309) 310class ATT_Error_Response(ATT_PDU): 311 ''' 312 See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response 313 ''' 314 315 316# ----------------------------------------------------------------------------- 317@ATT_PDU.subclass([('client_rx_mtu', 2)]) 318class ATT_Exchange_MTU_Request(ATT_PDU): 319 ''' 320 See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request 321 ''' 322 323 324# ----------------------------------------------------------------------------- 325@ATT_PDU.subclass([('server_rx_mtu', 2)]) 326class ATT_Exchange_MTU_Response(ATT_PDU): 327 ''' 328 See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response 329 ''' 330 331 332# ----------------------------------------------------------------------------- 333@ATT_PDU.subclass( 334 [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)] 335) 336class ATT_Find_Information_Request(ATT_PDU): 337 ''' 338 See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request 339 ''' 340 341 342# ----------------------------------------------------------------------------- 343@ATT_PDU.subclass([('format', 1), ('information_data', '*')]) 344class ATT_Find_Information_Response(ATT_PDU): 345 ''' 346 See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response 347 ''' 348 349 def parse_information_data(self): 350 self.information = [] 351 offset = 0 352 uuid_size = 2 if self.format == 1 else 16 353 while offset + uuid_size <= len(self.information_data): 354 handle = struct.unpack_from('<H', self.information_data, offset)[0] 355 uuid = self.information_data[2 + offset : 2 + offset + uuid_size] 356 self.information.append((handle, uuid)) 357 offset += 2 + uuid_size 358 359 def __init__(self, *args, **kwargs): 360 super().__init__(*args, **kwargs) 361 self.parse_information_data() 362 363 def init_from_bytes(self, pdu, offset): 364 super().init_from_bytes(pdu, offset) 365 self.parse_information_data() 366 367 def __str__(self): 368 result = color(self.name, 'yellow') 369 result += ':\n' + HCI_Object.format_fields( 370 self.__dict__, 371 [ 372 ('format', 1), 373 ( 374 'information', 375 { 376 'mapper': lambda x: ', '.join( 377 [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x] 378 ) 379 }, 380 ), 381 ], 382 ' ', 383 ) 384 return result 385 386 387# ----------------------------------------------------------------------------- 388@ATT_PDU.subclass( 389 [ 390 ('starting_handle', HANDLE_FIELD_SPEC), 391 ('ending_handle', HANDLE_FIELD_SPEC), 392 ('attribute_type', UUID_2_FIELD_SPEC), 393 ('attribute_value', '*'), 394 ] 395) 396class ATT_Find_By_Type_Value_Request(ATT_PDU): 397 ''' 398 See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request 399 ''' 400 401 402# ----------------------------------------------------------------------------- 403@ATT_PDU.subclass([('handles_information_list', '*')]) 404class ATT_Find_By_Type_Value_Response(ATT_PDU): 405 ''' 406 See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response 407 ''' 408 409 def parse_handles_information_list(self): 410 self.handles_information = [] 411 offset = 0 412 while offset + 4 <= len(self.handles_information_list): 413 found_attribute_handle, group_end_handle = struct.unpack_from( 414 '<HH', self.handles_information_list, offset 415 ) 416 self.handles_information.append((found_attribute_handle, group_end_handle)) 417 offset += 4 418 419 def __init__(self, *args, **kwargs): 420 super().__init__(*args, **kwargs) 421 self.parse_handles_information_list() 422 423 def init_from_bytes(self, pdu, offset): 424 super().init_from_bytes(pdu, offset) 425 self.parse_handles_information_list() 426 427 def __str__(self): 428 result = color(self.name, 'yellow') 429 result += ':\n' + HCI_Object.format_fields( 430 self.__dict__, 431 [ 432 ( 433 'handles_information', 434 { 435 'mapper': lambda x: ', '.join( 436 [ 437 f'0x{handle1:04X}-0x{handle2:04X}' 438 for handle1, handle2 in x 439 ] 440 ) 441 }, 442 ) 443 ], 444 ' ', 445 ) 446 return result 447 448 449# ----------------------------------------------------------------------------- 450@ATT_PDU.subclass( 451 [ 452 ('starting_handle', HANDLE_FIELD_SPEC), 453 ('ending_handle', HANDLE_FIELD_SPEC), 454 ('attribute_type', UUID_2_16_FIELD_SPEC), 455 ] 456) 457class ATT_Read_By_Type_Request(ATT_PDU): 458 ''' 459 See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request 460 ''' 461 462 463# ----------------------------------------------------------------------------- 464@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 465class ATT_Read_By_Type_Response(ATT_PDU): 466 ''' 467 See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response 468 ''' 469 470 def parse_attribute_data_list(self): 471 self.attributes = [] 472 offset = 0 473 while self.length != 0 and offset + self.length <= len( 474 self.attribute_data_list 475 ): 476 (attribute_handle,) = struct.unpack_from( 477 '<H', self.attribute_data_list, offset 478 ) 479 attribute_value = self.attribute_data_list[ 480 offset + 2 : offset + self.length 481 ] 482 self.attributes.append((attribute_handle, attribute_value)) 483 offset += self.length 484 485 def __init__(self, *args, **kwargs): 486 super().__init__(*args, **kwargs) 487 self.parse_attribute_data_list() 488 489 def init_from_bytes(self, pdu, offset): 490 super().init_from_bytes(pdu, offset) 491 self.parse_attribute_data_list() 492 493 def __str__(self): 494 result = color(self.name, 'yellow') 495 result += ':\n' + HCI_Object.format_fields( 496 self.__dict__, 497 [ 498 ('length', 1), 499 ( 500 'attributes', 501 { 502 'mapper': lambda x: ', '.join( 503 [f'0x{handle:04X}:{value.hex()}' for handle, value in x] 504 ) 505 }, 506 ), 507 ], 508 ' ', 509 ) 510 return result 511 512 513# ----------------------------------------------------------------------------- 514@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)]) 515class ATT_Read_Request(ATT_PDU): 516 ''' 517 See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request 518 ''' 519 520 521# ----------------------------------------------------------------------------- 522@ATT_PDU.subclass([('attribute_value', '*')]) 523class ATT_Read_Response(ATT_PDU): 524 ''' 525 See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response 526 ''' 527 528 529# ----------------------------------------------------------------------------- 530@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)]) 531class ATT_Read_Blob_Request(ATT_PDU): 532 ''' 533 See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request 534 ''' 535 536 537# ----------------------------------------------------------------------------- 538@ATT_PDU.subclass([('part_attribute_value', '*')]) 539class ATT_Read_Blob_Response(ATT_PDU): 540 ''' 541 See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response 542 ''' 543 544 545# ----------------------------------------------------------------------------- 546@ATT_PDU.subclass([('set_of_handles', '*')]) 547class ATT_Read_Multiple_Request(ATT_PDU): 548 ''' 549 See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request 550 ''' 551 552 553# ----------------------------------------------------------------------------- 554@ATT_PDU.subclass([('set_of_values', '*')]) 555class ATT_Read_Multiple_Response(ATT_PDU): 556 ''' 557 See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response 558 ''' 559 560 561# ----------------------------------------------------------------------------- 562@ATT_PDU.subclass( 563 [ 564 ('starting_handle', HANDLE_FIELD_SPEC), 565 ('ending_handle', HANDLE_FIELD_SPEC), 566 ('attribute_group_type', UUID_2_16_FIELD_SPEC), 567 ] 568) 569class ATT_Read_By_Group_Type_Request(ATT_PDU): 570 ''' 571 See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request 572 ''' 573 574 575# ----------------------------------------------------------------------------- 576@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 577class ATT_Read_By_Group_Type_Response(ATT_PDU): 578 ''' 579 See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response 580 ''' 581 582 def parse_attribute_data_list(self): 583 self.attributes = [] 584 offset = 0 585 while self.length != 0 and offset + self.length <= len( 586 self.attribute_data_list 587 ): 588 attribute_handle, end_group_handle = struct.unpack_from( 589 '<HH', self.attribute_data_list, offset 590 ) 591 attribute_value = self.attribute_data_list[ 592 offset + 4 : offset + self.length 593 ] 594 self.attributes.append( 595 (attribute_handle, end_group_handle, attribute_value) 596 ) 597 offset += self.length 598 599 def __init__(self, *args, **kwargs): 600 super().__init__(*args, **kwargs) 601 self.parse_attribute_data_list() 602 603 def init_from_bytes(self, pdu, offset): 604 super().init_from_bytes(pdu, offset) 605 self.parse_attribute_data_list() 606 607 def __str__(self): 608 result = color(self.name, 'yellow') 609 result += ':\n' + HCI_Object.format_fields( 610 self.__dict__, 611 [ 612 ('length', 1), 613 ( 614 'attributes', 615 { 616 'mapper': lambda x: ', '.join( 617 [ 618 f'0x{handle:04X}-0x{end:04X}:{value.hex()}' 619 for handle, end, value in x 620 ] 621 ) 622 }, 623 ), 624 ], 625 ' ', 626 ) 627 return result 628 629 630# ----------------------------------------------------------------------------- 631@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 632class ATT_Write_Request(ATT_PDU): 633 ''' 634 See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request 635 ''' 636 637 638# ----------------------------------------------------------------------------- 639@ATT_PDU.subclass([]) 640class ATT_Write_Response(ATT_PDU): 641 ''' 642 See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response 643 ''' 644 645 646# ----------------------------------------------------------------------------- 647@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 648class ATT_Write_Command(ATT_PDU): 649 ''' 650 See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command 651 ''' 652 653 654# ----------------------------------------------------------------------------- 655@ATT_PDU.subclass( 656 [ 657 ('attribute_handle', HANDLE_FIELD_SPEC), 658 ('attribute_value', '*'), 659 # ('authentication_signature', 'TODO') 660 ] 661) 662class ATT_Signed_Write_Command(ATT_PDU): 663 ''' 664 See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command 665 ''' 666 667 668# ----------------------------------------------------------------------------- 669@ATT_PDU.subclass( 670 [ 671 ('attribute_handle', HANDLE_FIELD_SPEC), 672 ('value_offset', 2), 673 ('part_attribute_value', '*'), 674 ] 675) 676class ATT_Prepare_Write_Request(ATT_PDU): 677 ''' 678 See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request 679 ''' 680 681 682# ----------------------------------------------------------------------------- 683@ATT_PDU.subclass( 684 [ 685 ('attribute_handle', HANDLE_FIELD_SPEC), 686 ('value_offset', 2), 687 ('part_attribute_value', '*'), 688 ] 689) 690class ATT_Prepare_Write_Response(ATT_PDU): 691 ''' 692 See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response 693 ''' 694 695 696# ----------------------------------------------------------------------------- 697@ATT_PDU.subclass([]) 698class ATT_Execute_Write_Request(ATT_PDU): 699 ''' 700 See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request 701 ''' 702 703 704# ----------------------------------------------------------------------------- 705@ATT_PDU.subclass([]) 706class ATT_Execute_Write_Response(ATT_PDU): 707 ''' 708 See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response 709 ''' 710 711 712# ----------------------------------------------------------------------------- 713@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 714class ATT_Handle_Value_Notification(ATT_PDU): 715 ''' 716 See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification 717 ''' 718 719 720# ----------------------------------------------------------------------------- 721@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 722class ATT_Handle_Value_Indication(ATT_PDU): 723 ''' 724 See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication 725 ''' 726 727 728# ----------------------------------------------------------------------------- 729@ATT_PDU.subclass([]) 730class ATT_Handle_Value_Confirmation(ATT_PDU): 731 ''' 732 See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation 733 ''' 734 735 736# ----------------------------------------------------------------------------- 737class AttributeValue: 738 ''' 739 Attribute value where reading and/or writing is delegated to functions 740 passed as arguments to the constructor. 741 ''' 742 743 def __init__( 744 self, 745 read: Union[ 746 Callable[[Optional[Connection]], bytes], 747 Callable[[Optional[Connection]], Awaitable[bytes]], 748 None, 749 ] = None, 750 write: Union[ 751 Callable[[Optional[Connection], bytes], None], 752 Callable[[Optional[Connection], bytes], Awaitable[None]], 753 None, 754 ] = None, 755 ): 756 self._read = read 757 self._write = write 758 759 def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]: 760 return self._read(connection) if self._read else b'' 761 762 def write( 763 self, connection: Optional[Connection], value: bytes 764 ) -> Union[Awaitable[None], None]: 765 if self._write: 766 return self._write(connection, value) 767 768 return None 769 770 771# ----------------------------------------------------------------------------- 772class Attribute(EventEmitter): 773 class Permissions(enum.IntFlag): 774 READABLE = 0x01 775 WRITEABLE = 0x02 776 READ_REQUIRES_ENCRYPTION = 0x04 777 WRITE_REQUIRES_ENCRYPTION = 0x08 778 READ_REQUIRES_AUTHENTICATION = 0x10 779 WRITE_REQUIRES_AUTHENTICATION = 0x20 780 READ_REQUIRES_AUTHORIZATION = 0x40 781 WRITE_REQUIRES_AUTHORIZATION = 0x80 782 783 @classmethod 784 def from_string(cls, permissions_str: str) -> Attribute.Permissions: 785 try: 786 return functools.reduce( 787 lambda x, y: x | Attribute.Permissions[y], 788 permissions_str.replace('|', ',').split(","), 789 Attribute.Permissions(0), 790 ) 791 except TypeError as exc: 792 # The check for `p.name is not None` here is needed because for InFlag 793 # enums, the .name property can be None, when the enum value is 0, 794 # so the type hint for .name is Optional[str]. 795 enum_list: List[str] = [p.name for p in cls if p.name is not None] 796 enum_list_str = ",".join(enum_list) 797 raise TypeError( 798 f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str }\nGot: {permissions_str}" 799 ) from exc 800 801 # Permission flags(legacy-use only) 802 READABLE = Permissions.READABLE 803 WRITEABLE = Permissions.WRITEABLE 804 READ_REQUIRES_ENCRYPTION = Permissions.READ_REQUIRES_ENCRYPTION 805 WRITE_REQUIRES_ENCRYPTION = Permissions.WRITE_REQUIRES_ENCRYPTION 806 READ_REQUIRES_AUTHENTICATION = Permissions.READ_REQUIRES_AUTHENTICATION 807 WRITE_REQUIRES_AUTHENTICATION = Permissions.WRITE_REQUIRES_AUTHENTICATION 808 READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION 809 WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION 810 811 value: Union[bytes, AttributeValue] 812 813 def __init__( 814 self, 815 attribute_type: Union[str, bytes, UUID], 816 permissions: Union[str, Attribute.Permissions], 817 value: Union[str, bytes, AttributeValue] = b'', 818 ) -> None: 819 EventEmitter.__init__(self) 820 self.handle = 0 821 self.end_group_handle = 0 822 if isinstance(permissions, str): 823 self.permissions = Attribute.Permissions.from_string(permissions) 824 else: 825 self.permissions = permissions 826 827 # Convert the type to a UUID object if it isn't already 828 if isinstance(attribute_type, str): 829 self.type = UUID(attribute_type) 830 elif isinstance(attribute_type, bytes): 831 self.type = UUID.from_bytes(attribute_type) 832 else: 833 self.type = attribute_type 834 835 # Convert the value to a byte array 836 if isinstance(value, str): 837 self.value = bytes(value, 'utf-8') 838 else: 839 self.value = value 840 841 def encode_value(self, value: Any) -> bytes: 842 return value 843 844 def decode_value(self, value_bytes: bytes) -> Any: 845 return value_bytes 846 847 async def read_value(self, connection: Optional[Connection]) -> bytes: 848 if ( 849 (self.permissions & self.READ_REQUIRES_ENCRYPTION) 850 and connection is not None 851 and not connection.encryption 852 ): 853 raise ATT_Error( 854 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 855 ) 856 if ( 857 (self.permissions & self.READ_REQUIRES_AUTHENTICATION) 858 and connection is not None 859 and not connection.authenticated 860 ): 861 raise ATT_Error( 862 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 863 ) 864 if self.permissions & self.READ_REQUIRES_AUTHORIZATION: 865 # TODO: handle authorization better 866 raise ATT_Error( 867 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 868 ) 869 870 if hasattr(self.value, 'read'): 871 try: 872 value = self.value.read(connection) 873 if inspect.isawaitable(value): 874 value = await value 875 except ATT_Error as error: 876 raise ATT_Error( 877 error_code=error.error_code, att_handle=self.handle 878 ) from error 879 else: 880 value = self.value 881 882 return self.encode_value(value) 883 884 async def write_value(self, connection: Connection, value_bytes: bytes) -> None: 885 if ( 886 self.permissions & self.WRITE_REQUIRES_ENCRYPTION 887 ) and not connection.encryption: 888 raise ATT_Error( 889 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 890 ) 891 if ( 892 self.permissions & self.WRITE_REQUIRES_AUTHENTICATION 893 ) and not connection.authenticated: 894 raise ATT_Error( 895 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 896 ) 897 if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION: 898 # TODO: handle authorization better 899 raise ATT_Error( 900 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 901 ) 902 903 value = self.decode_value(value_bytes) 904 905 if hasattr(self.value, 'write'): 906 try: 907 result = self.value.write(connection, value) 908 if inspect.isawaitable(result): 909 await result 910 except ATT_Error as error: 911 raise ATT_Error( 912 error_code=error.error_code, att_handle=self.handle 913 ) from error 914 else: 915 self.value = value 916 917 self.emit('write', connection, value) 918 919 def __repr__(self): 920 if isinstance(self.value, bytes): 921 value_str = self.value.hex() 922 else: 923 value_str = str(self.value) 924 if value_str: 925 value_string = f', value={self.value.hex()}' 926 else: 927 value_string = '' 928 return ( 929 f'Attribute(handle=0x{self.handle:04X}, ' 930 f'type={self.type}, ' 931 f'permissions={self.permissions}{value_string})' 932 ) 933