1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# ATT - Attribute Protocol 17# 18# See Bluetooth spec @ Vol 3, Part F 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26import functools 27import struct 28from pyee import EventEmitter 29from typing import Dict, Type, TYPE_CHECKING 30 31from bumble.core import UUID, name_or_number, get_dict_key_by_value, ProtocolError 32from bumble.hci import HCI_Object, key_with_value, HCI_Constant 33from bumble.colors import color 34 35if TYPE_CHECKING: 36 from bumble.device import Connection 37 38# ----------------------------------------------------------------------------- 39# Constants 40# ----------------------------------------------------------------------------- 41# fmt: off 42# pylint: disable=line-too-long 43 44ATT_CID = 0x04 45 46ATT_ERROR_RESPONSE = 0x01 47ATT_EXCHANGE_MTU_REQUEST = 0x02 48ATT_EXCHANGE_MTU_RESPONSE = 0x03 49ATT_FIND_INFORMATION_REQUEST = 0x04 50ATT_FIND_INFORMATION_RESPONSE = 0x05 51ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 52ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 53ATT_READ_BY_TYPE_REQUEST = 0x08 54ATT_READ_BY_TYPE_RESPONSE = 0x09 55ATT_READ_REQUEST = 0x0A 56ATT_READ_RESPONSE = 0x0B 57ATT_READ_BLOB_REQUEST = 0x0C 58ATT_READ_BLOB_RESPONSE = 0x0D 59ATT_READ_MULTIPLE_REQUEST = 0x0E 60ATT_READ_MULTIPLE_RESPONSE = 0x0F 61ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 62ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 63ATT_WRITE_REQUEST = 0x12 64ATT_WRITE_RESPONSE = 0x13 65ATT_WRITE_COMMAND = 0x52 66ATT_SIGNED_WRITE_COMMAND = 0xD2 67ATT_PREPARE_WRITE_REQUEST = 0x16 68ATT_PREPARE_WRITE_RESPONSE = 0x17 69ATT_EXECUTE_WRITE_REQUEST = 0x18 70ATT_EXECUTE_WRITE_RESPONSE = 0x19 71ATT_HANDLE_VALUE_NOTIFICATION = 0x1B 72ATT_HANDLE_VALUE_INDICATION = 0x1D 73ATT_HANDLE_VALUE_CONFIRMATION = 0x1E 74 75ATT_PDU_NAMES = { 76 ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE', 77 ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST', 78 ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE', 79 ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST', 80 ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE', 81 ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST', 82 ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE', 83 ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST', 84 ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE', 85 ATT_READ_REQUEST: 'ATT_READ_REQUEST', 86 ATT_READ_RESPONSE: 'ATT_READ_RESPONSE', 87 ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST', 88 ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE', 89 ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST', 90 ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE', 91 ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST', 92 ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE', 93 ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST', 94 ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE', 95 ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND', 96 ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND', 97 ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST', 98 ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE', 99 ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST', 100 ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE', 101 ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION', 102 ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION', 103 ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION' 104} 105 106ATT_REQUESTS = [ 107 ATT_EXCHANGE_MTU_REQUEST, 108 ATT_FIND_INFORMATION_REQUEST, 109 ATT_FIND_BY_TYPE_VALUE_REQUEST, 110 ATT_READ_BY_TYPE_REQUEST, 111 ATT_READ_REQUEST, 112 ATT_READ_BLOB_REQUEST, 113 ATT_READ_MULTIPLE_REQUEST, 114 ATT_READ_BY_GROUP_TYPE_REQUEST, 115 ATT_WRITE_REQUEST, 116 ATT_PREPARE_WRITE_REQUEST, 117 ATT_EXECUTE_WRITE_REQUEST 118] 119 120ATT_RESPONSES = [ 121 ATT_ERROR_RESPONSE, 122 ATT_EXCHANGE_MTU_RESPONSE, 123 ATT_FIND_INFORMATION_RESPONSE, 124 ATT_FIND_BY_TYPE_VALUE_RESPONSE, 125 ATT_READ_BY_TYPE_RESPONSE, 126 ATT_READ_RESPONSE, 127 ATT_READ_BLOB_RESPONSE, 128 ATT_READ_MULTIPLE_RESPONSE, 129 ATT_READ_BY_GROUP_TYPE_RESPONSE, 130 ATT_WRITE_RESPONSE, 131 ATT_PREPARE_WRITE_RESPONSE, 132 ATT_EXECUTE_WRITE_RESPONSE 133] 134 135ATT_INVALID_HANDLE_ERROR = 0x01 136ATT_READ_NOT_PERMITTED_ERROR = 0x02 137ATT_WRITE_NOT_PERMITTED_ERROR = 0x03 138ATT_INVALID_PDU_ERROR = 0x04 139ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05 140ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06 141ATT_INVALID_OFFSET_ERROR = 0x07 142ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08 143ATT_PREPARE_QUEUE_FULL_ERROR = 0x09 144ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A 145ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B 146ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C 147ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D 148ATT_UNLIKELY_ERROR_ERROR = 0x0E 149ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F 150ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10 151ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11 152 153ATT_ERROR_NAMES = { 154 ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR', 155 ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR', 156 ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR', 157 ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR', 158 ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR', 159 ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR', 160 ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR', 161 ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR', 162 ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR', 163 ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR', 164 ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR', 165 ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR', 166 ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR', 167 ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR', 168 ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR', 169 ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR', 170 ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR' 171} 172 173ATT_DEFAULT_MTU = 23 174 175HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} 176# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 177UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) 178# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 179UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 180 181# fmt: on 182# pylint: enable=line-too-long 183# pylint: disable=invalid-name 184 185# ----------------------------------------------------------------------------- 186# Exceptions 187# ----------------------------------------------------------------------------- 188class ATT_Error(ProtocolError): 189 def __init__(self, error_code, att_handle=0x0000, message=''): 190 super().__init__( 191 error_code, 192 error_namespace='att', 193 error_name=ATT_PDU.error_name(error_code), 194 ) 195 self.att_handle = att_handle 196 self.message = message 197 198 def __str__(self): 199 return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}' 200 201 202# ----------------------------------------------------------------------------- 203# Attribute Protocol 204# ----------------------------------------------------------------------------- 205class ATT_PDU: 206 ''' 207 See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU 208 ''' 209 210 pdu_classes: Dict[int, Type[ATT_PDU]] = {} 211 op_code = 0 212 name = None 213 214 @staticmethod 215 def from_bytes(pdu): 216 op_code = pdu[0] 217 218 cls = ATT_PDU.pdu_classes.get(op_code) 219 if cls is None: 220 instance = ATT_PDU(pdu) 221 instance.name = ATT_PDU.pdu_name(op_code) 222 instance.op_code = op_code 223 return instance 224 self = cls.__new__(cls) 225 ATT_PDU.__init__(self, pdu) 226 if hasattr(self, 'fields'): 227 self.init_from_bytes(pdu, 1) 228 return self 229 230 @staticmethod 231 def pdu_name(op_code): 232 return name_or_number(ATT_PDU_NAMES, op_code, 2) 233 234 @staticmethod 235 def error_name(error_code): 236 return name_or_number(ATT_ERROR_NAMES, error_code, 2) 237 238 @staticmethod 239 def subclass(fields): 240 def inner(cls): 241 cls.name = cls.__name__.upper() 242 cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name) 243 if cls.op_code is None: 244 raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES') 245 cls.fields = fields 246 247 # Register a factory for this class 248 ATT_PDU.pdu_classes[cls.op_code] = cls 249 250 return cls 251 252 return inner 253 254 def __init__(self, pdu=None, **kwargs): 255 if hasattr(self, 'fields') and kwargs: 256 HCI_Object.init_from_fields(self, self.fields, kwargs) 257 if pdu is None: 258 pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 259 self.pdu = pdu 260 261 def init_from_bytes(self, pdu, offset): 262 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 263 264 def to_bytes(self): 265 return self.pdu 266 267 @property 268 def is_command(self): 269 return ((self.op_code >> 6) & 1) == 1 270 271 @property 272 def has_authentication_signature(self): 273 return ((self.op_code >> 7) & 1) == 1 274 275 def __bytes__(self): 276 return self.to_bytes() 277 278 def __str__(self): 279 result = color(self.name, 'yellow') 280 if fields := getattr(self, 'fields', None): 281 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 282 else: 283 if len(self.pdu) > 1: 284 result += f': {self.pdu.hex()}' 285 return result 286 287 288# ----------------------------------------------------------------------------- 289@ATT_PDU.subclass( 290 [ 291 ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}), 292 ('attribute_handle_in_error', HANDLE_FIELD_SPEC), 293 ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}), 294 ] 295) 296class ATT_Error_Response(ATT_PDU): 297 ''' 298 See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response 299 ''' 300 301 302# ----------------------------------------------------------------------------- 303@ATT_PDU.subclass([('client_rx_mtu', 2)]) 304class ATT_Exchange_MTU_Request(ATT_PDU): 305 ''' 306 See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request 307 ''' 308 309 310# ----------------------------------------------------------------------------- 311@ATT_PDU.subclass([('server_rx_mtu', 2)]) 312class ATT_Exchange_MTU_Response(ATT_PDU): 313 ''' 314 See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response 315 ''' 316 317 318# ----------------------------------------------------------------------------- 319@ATT_PDU.subclass( 320 [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)] 321) 322class ATT_Find_Information_Request(ATT_PDU): 323 ''' 324 See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request 325 ''' 326 327 328# ----------------------------------------------------------------------------- 329@ATT_PDU.subclass([('format', 1), ('information_data', '*')]) 330class ATT_Find_Information_Response(ATT_PDU): 331 ''' 332 See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response 333 ''' 334 335 def parse_information_data(self): 336 self.information = [] 337 offset = 0 338 uuid_size = 2 if self.format == 1 else 16 339 while offset + uuid_size <= len(self.information_data): 340 handle = struct.unpack_from('<H', self.information_data, offset)[0] 341 uuid = self.information_data[2 + offset : 2 + offset + uuid_size] 342 self.information.append((handle, uuid)) 343 offset += 2 + uuid_size 344 345 def __init__(self, *args, **kwargs): 346 super().__init__(*args, **kwargs) 347 self.parse_information_data() 348 349 def init_from_bytes(self, pdu, offset): 350 super().init_from_bytes(pdu, offset) 351 self.parse_information_data() 352 353 def __str__(self): 354 result = color(self.name, 'yellow') 355 result += ':\n' + HCI_Object.format_fields( 356 self.__dict__, 357 [ 358 ('format', 1), 359 ( 360 'information', 361 { 362 'mapper': lambda x: ', '.join( 363 [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x] 364 ) 365 }, 366 ), 367 ], 368 ' ', 369 ) 370 return result 371 372 373# ----------------------------------------------------------------------------- 374@ATT_PDU.subclass( 375 [ 376 ('starting_handle', HANDLE_FIELD_SPEC), 377 ('ending_handle', HANDLE_FIELD_SPEC), 378 ('attribute_type', UUID_2_FIELD_SPEC), 379 ('attribute_value', '*'), 380 ] 381) 382class ATT_Find_By_Type_Value_Request(ATT_PDU): 383 ''' 384 See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request 385 ''' 386 387 388# ----------------------------------------------------------------------------- 389@ATT_PDU.subclass([('handles_information_list', '*')]) 390class ATT_Find_By_Type_Value_Response(ATT_PDU): 391 ''' 392 See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response 393 ''' 394 395 def parse_handles_information_list(self): 396 self.handles_information = [] 397 offset = 0 398 while offset + 4 <= len(self.handles_information_list): 399 found_attribute_handle, group_end_handle = struct.unpack_from( 400 '<HH', self.handles_information_list, offset 401 ) 402 self.handles_information.append((found_attribute_handle, group_end_handle)) 403 offset += 4 404 405 def __init__(self, *args, **kwargs): 406 super().__init__(*args, **kwargs) 407 self.parse_handles_information_list() 408 409 def init_from_bytes(self, pdu, offset): 410 super().init_from_bytes(pdu, offset) 411 self.parse_handles_information_list() 412 413 def __str__(self): 414 result = color(self.name, 'yellow') 415 result += ':\n' + HCI_Object.format_fields( 416 self.__dict__, 417 [ 418 ( 419 'handles_information', 420 { 421 'mapper': lambda x: ', '.join( 422 [ 423 f'0x{handle1:04X}-0x{handle2:04X}' 424 for handle1, handle2 in x 425 ] 426 ) 427 }, 428 ) 429 ], 430 ' ', 431 ) 432 return result 433 434 435# ----------------------------------------------------------------------------- 436@ATT_PDU.subclass( 437 [ 438 ('starting_handle', HANDLE_FIELD_SPEC), 439 ('ending_handle', HANDLE_FIELD_SPEC), 440 ('attribute_type', UUID_2_16_FIELD_SPEC), 441 ] 442) 443class ATT_Read_By_Type_Request(ATT_PDU): 444 ''' 445 See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request 446 ''' 447 448 449# ----------------------------------------------------------------------------- 450@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 451class ATT_Read_By_Type_Response(ATT_PDU): 452 ''' 453 See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response 454 ''' 455 456 def parse_attribute_data_list(self): 457 self.attributes = [] 458 offset = 0 459 while self.length != 0 and offset + self.length <= len( 460 self.attribute_data_list 461 ): 462 (attribute_handle,) = struct.unpack_from( 463 '<H', self.attribute_data_list, offset 464 ) 465 attribute_value = self.attribute_data_list[ 466 offset + 2 : offset + self.length 467 ] 468 self.attributes.append((attribute_handle, attribute_value)) 469 offset += self.length 470 471 def __init__(self, *args, **kwargs): 472 super().__init__(*args, **kwargs) 473 self.parse_attribute_data_list() 474 475 def init_from_bytes(self, pdu, offset): 476 super().init_from_bytes(pdu, offset) 477 self.parse_attribute_data_list() 478 479 def __str__(self): 480 result = color(self.name, 'yellow') 481 result += ':\n' + HCI_Object.format_fields( 482 self.__dict__, 483 [ 484 ('length', 1), 485 ( 486 'attributes', 487 { 488 'mapper': lambda x: ', '.join( 489 [f'0x{handle:04X}:{value.hex()}' for handle, value in x] 490 ) 491 }, 492 ), 493 ], 494 ' ', 495 ) 496 return result 497 498 499# ----------------------------------------------------------------------------- 500@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)]) 501class ATT_Read_Request(ATT_PDU): 502 ''' 503 See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request 504 ''' 505 506 507# ----------------------------------------------------------------------------- 508@ATT_PDU.subclass([('attribute_value', '*')]) 509class ATT_Read_Response(ATT_PDU): 510 ''' 511 See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response 512 ''' 513 514 515# ----------------------------------------------------------------------------- 516@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)]) 517class ATT_Read_Blob_Request(ATT_PDU): 518 ''' 519 See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request 520 ''' 521 522 523# ----------------------------------------------------------------------------- 524@ATT_PDU.subclass([('part_attribute_value', '*')]) 525class ATT_Read_Blob_Response(ATT_PDU): 526 ''' 527 See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response 528 ''' 529 530 531# ----------------------------------------------------------------------------- 532@ATT_PDU.subclass([('set_of_handles', '*')]) 533class ATT_Read_Multiple_Request(ATT_PDU): 534 ''' 535 See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request 536 ''' 537 538 539# ----------------------------------------------------------------------------- 540@ATT_PDU.subclass([('set_of_values', '*')]) 541class ATT_Read_Multiple_Response(ATT_PDU): 542 ''' 543 See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response 544 ''' 545 546 547# ----------------------------------------------------------------------------- 548@ATT_PDU.subclass( 549 [ 550 ('starting_handle', HANDLE_FIELD_SPEC), 551 ('ending_handle', HANDLE_FIELD_SPEC), 552 ('attribute_group_type', UUID_2_16_FIELD_SPEC), 553 ] 554) 555class ATT_Read_By_Group_Type_Request(ATT_PDU): 556 ''' 557 See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request 558 ''' 559 560 561# ----------------------------------------------------------------------------- 562@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 563class ATT_Read_By_Group_Type_Response(ATT_PDU): 564 ''' 565 See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response 566 ''' 567 568 def parse_attribute_data_list(self): 569 self.attributes = [] 570 offset = 0 571 while self.length != 0 and offset + self.length <= len( 572 self.attribute_data_list 573 ): 574 attribute_handle, end_group_handle = struct.unpack_from( 575 '<HH', self.attribute_data_list, offset 576 ) 577 attribute_value = self.attribute_data_list[ 578 offset + 4 : offset + self.length 579 ] 580 self.attributes.append( 581 (attribute_handle, end_group_handle, attribute_value) 582 ) 583 offset += self.length 584 585 def __init__(self, *args, **kwargs): 586 super().__init__(*args, **kwargs) 587 self.parse_attribute_data_list() 588 589 def init_from_bytes(self, pdu, offset): 590 super().init_from_bytes(pdu, offset) 591 self.parse_attribute_data_list() 592 593 def __str__(self): 594 result = color(self.name, 'yellow') 595 result += ':\n' + HCI_Object.format_fields( 596 self.__dict__, 597 [ 598 ('length', 1), 599 ( 600 'attributes', 601 { 602 'mapper': lambda x: ', '.join( 603 [ 604 f'0x{handle:04X}-0x{end:04X}:{value.hex()}' 605 for handle, end, value in x 606 ] 607 ) 608 }, 609 ), 610 ], 611 ' ', 612 ) 613 return result 614 615 616# ----------------------------------------------------------------------------- 617@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 618class ATT_Write_Request(ATT_PDU): 619 ''' 620 See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request 621 ''' 622 623 624# ----------------------------------------------------------------------------- 625@ATT_PDU.subclass([]) 626class ATT_Write_Response(ATT_PDU): 627 ''' 628 See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response 629 ''' 630 631 632# ----------------------------------------------------------------------------- 633@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 634class ATT_Write_Command(ATT_PDU): 635 ''' 636 See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command 637 ''' 638 639 640# ----------------------------------------------------------------------------- 641@ATT_PDU.subclass( 642 [ 643 ('attribute_handle', HANDLE_FIELD_SPEC), 644 ('attribute_value', '*') 645 # ('authentication_signature', 'TODO') 646 ] 647) 648class ATT_Signed_Write_Command(ATT_PDU): 649 ''' 650 See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command 651 ''' 652 653 654# ----------------------------------------------------------------------------- 655@ATT_PDU.subclass( 656 [ 657 ('attribute_handle', HANDLE_FIELD_SPEC), 658 ('value_offset', 2), 659 ('part_attribute_value', '*'), 660 ] 661) 662class ATT_Prepare_Write_Request(ATT_PDU): 663 ''' 664 See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request 665 ''' 666 667 668# ----------------------------------------------------------------------------- 669@ATT_PDU.subclass( 670 [ 671 ('attribute_handle', HANDLE_FIELD_SPEC), 672 ('value_offset', 2), 673 ('part_attribute_value', '*'), 674 ] 675) 676class ATT_Prepare_Write_Response(ATT_PDU): 677 ''' 678 See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response 679 ''' 680 681 682# ----------------------------------------------------------------------------- 683@ATT_PDU.subclass([]) 684class ATT_Execute_Write_Request(ATT_PDU): 685 ''' 686 See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request 687 ''' 688 689 690# ----------------------------------------------------------------------------- 691@ATT_PDU.subclass([]) 692class ATT_Execute_Write_Response(ATT_PDU): 693 ''' 694 See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response 695 ''' 696 697 698# ----------------------------------------------------------------------------- 699@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 700class ATT_Handle_Value_Notification(ATT_PDU): 701 ''' 702 See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification 703 ''' 704 705 706# ----------------------------------------------------------------------------- 707@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 708class ATT_Handle_Value_Indication(ATT_PDU): 709 ''' 710 See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication 711 ''' 712 713 714# ----------------------------------------------------------------------------- 715@ATT_PDU.subclass([]) 716class ATT_Handle_Value_Confirmation(ATT_PDU): 717 ''' 718 See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation 719 ''' 720 721 722# ----------------------------------------------------------------------------- 723class Attribute(EventEmitter): 724 # Permission flags 725 READABLE = 0x01 726 WRITEABLE = 0x02 727 READ_REQUIRES_ENCRYPTION = 0x04 728 WRITE_REQUIRES_ENCRYPTION = 0x08 729 READ_REQUIRES_AUTHENTICATION = 0x10 730 WRITE_REQUIRES_AUTHENTICATION = 0x20 731 READ_REQUIRES_AUTHORIZATION = 0x40 732 WRITE_REQUIRES_AUTHORIZATION = 0x80 733 734 PERMISSION_NAMES = { 735 READABLE: 'READABLE', 736 WRITEABLE: 'WRITEABLE', 737 READ_REQUIRES_ENCRYPTION: 'READ_REQUIRES_ENCRYPTION', 738 WRITE_REQUIRES_ENCRYPTION: 'WRITE_REQUIRES_ENCRYPTION', 739 READ_REQUIRES_AUTHENTICATION: 'READ_REQUIRES_AUTHENTICATION', 740 WRITE_REQUIRES_AUTHENTICATION: 'WRITE_REQUIRES_AUTHENTICATION', 741 READ_REQUIRES_AUTHORIZATION: 'READ_REQUIRES_AUTHORIZATION', 742 WRITE_REQUIRES_AUTHORIZATION: 'WRITE_REQUIRES_AUTHORIZATION', 743 } 744 745 @staticmethod 746 def string_to_permissions(permissions_str: str): 747 try: 748 return functools.reduce( 749 lambda x, y: x | get_dict_key_by_value(Attribute.PERMISSION_NAMES, y), 750 permissions_str.split(","), 751 0, 752 ) 753 except TypeError: 754 raise TypeError( 755 f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}" 756 ) 757 758 def __init__(self, attribute_type, permissions, value=b''): 759 EventEmitter.__init__(self) 760 self.handle = 0 761 self.end_group_handle = 0 762 if isinstance(permissions, str): 763 self.permissions = self.string_to_permissions(permissions) 764 else: 765 self.permissions = permissions 766 767 # Convert the type to a UUID object if it isn't already 768 if isinstance(attribute_type, str): 769 self.type = UUID(attribute_type) 770 elif isinstance(attribute_type, bytes): 771 self.type = UUID.from_bytes(attribute_type) 772 else: 773 self.type = attribute_type 774 775 # Convert the value to a byte array 776 if isinstance(value, str): 777 self.value = bytes(value, 'utf-8') 778 else: 779 self.value = value 780 781 def encode_value(self, value): 782 return value 783 784 def decode_value(self, value_bytes): 785 return value_bytes 786 787 def read_value(self, connection: Connection): 788 if ( 789 self.permissions & self.READ_REQUIRES_ENCRYPTION 790 ) and not connection.encryption: 791 raise ATT_Error( 792 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 793 ) 794 if ( 795 self.permissions & self.READ_REQUIRES_AUTHENTICATION 796 ) and not connection.authenticated: 797 raise ATT_Error( 798 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 799 ) 800 if self.permissions & self.READ_REQUIRES_AUTHORIZATION: 801 # TODO: handle authorization better 802 raise ATT_Error( 803 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 804 ) 805 806 if read := getattr(self.value, 'read', None): 807 try: 808 value = read(connection) # pylint: disable=not-callable 809 except ATT_Error as error: 810 raise ATT_Error( 811 error_code=error.error_code, att_handle=self.handle 812 ) from error 813 else: 814 value = self.value 815 816 return self.encode_value(value) 817 818 def write_value(self, connection: Connection, value_bytes): 819 if ( 820 self.permissions & self.WRITE_REQUIRES_ENCRYPTION 821 ) and not connection.encryption: 822 raise ATT_Error( 823 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 824 ) 825 if ( 826 self.permissions & self.WRITE_REQUIRES_AUTHENTICATION 827 ) and not connection.authenticated: 828 raise ATT_Error( 829 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 830 ) 831 if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION: 832 # TODO: handle authorization better 833 raise ATT_Error( 834 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 835 ) 836 837 value = self.decode_value(value_bytes) 838 839 if write := getattr(self.value, 'write', None): 840 try: 841 write(connection, value) # pylint: disable=not-callable 842 except ATT_Error as error: 843 raise ATT_Error( 844 error_code=error.error_code, att_handle=self.handle 845 ) from error 846 else: 847 self.value = value 848 849 self.emit('write', connection, value) 850 851 def __repr__(self): 852 if isinstance(self.value, bytes): 853 value_str = self.value.hex() 854 else: 855 value_str = str(self.value) 856 if value_str: 857 value_string = f', value={self.value.hex()}' 858 else: 859 value_string = '' 860 return ( 861 f'Attribute(handle=0x{self.handle:04X}, ' 862 f'type={self.type}, ' 863 f'permissions={self.permissions}{value_string})' 864 ) 865