1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# ATT - Attribute Protocol 17# 18# See Bluetooth spec @ Vol 3, Part F 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from colors import color 26from pyee import EventEmitter 27 28from .core import * 29from .hci import * 30 31# ----------------------------------------------------------------------------- 32# Constants 33# ----------------------------------------------------------------------------- 34ATT_CID = 0x04 35 36ATT_ERROR_RESPONSE = 0x01 37ATT_EXCHANGE_MTU_REQUEST = 0x02 38ATT_EXCHANGE_MTU_RESPONSE = 0x03 39ATT_FIND_INFORMATION_REQUEST = 0x04 40ATT_FIND_INFORMATION_RESPONSE = 0x05 41ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 42ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 43ATT_READ_BY_TYPE_REQUEST = 0x08 44ATT_READ_BY_TYPE_RESPONSE = 0x09 45ATT_READ_REQUEST = 0x0A 46ATT_READ_RESPONSE = 0x0B 47ATT_READ_BLOB_REQUEST = 0x0C 48ATT_READ_BLOB_RESPONSE = 0x0D 49ATT_READ_MULTIPLE_REQUEST = 0x0E 50ATT_READ_MULTIPLE_RESPONSE = 0x0F 51ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 52ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 53ATT_WRITE_REQUEST = 0x12 54ATT_WRITE_RESPONSE = 0x13 55ATT_WRITE_COMMAND = 0x52 56ATT_SIGNED_WRITE_COMMAND = 0xD2 57ATT_PREPARE_WRITE_REQUEST = 0x16 58ATT_PREPARE_WRITE_RESPONSE = 0x17 59ATT_EXECUTE_WRITE_REQUEST = 0x18 60ATT_EXECUTE_WRITE_RESPONSE = 0x19 61ATT_HANDLE_VALUE_NOTIFICATION = 0x1B 62ATT_HANDLE_VALUE_INDICATION = 0x1D 63ATT_HANDLE_VALUE_CONFIRMATION = 0x1E 64 65ATT_PDU_NAMES = { 66 ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE', 67 ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST', 68 ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE', 69 ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST', 70 ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE', 71 ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST', 72 ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE', 73 ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST', 74 ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE', 75 ATT_READ_REQUEST: 'ATT_READ_REQUEST', 76 ATT_READ_RESPONSE: 'ATT_READ_RESPONSE', 77 ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST', 78 ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE', 79 ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST', 80 ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE', 81 ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST', 82 ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE', 83 ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST', 84 ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE', 85 ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND', 86 ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND', 87 ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST', 88 ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE', 89 ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST', 90 ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE', 91 ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION', 92 ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION', 93 ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION' 94} 95 96ATT_REQUESTS = [ 97 ATT_EXCHANGE_MTU_REQUEST, 98 ATT_FIND_INFORMATION_REQUEST, 99 ATT_FIND_BY_TYPE_VALUE_REQUEST, 100 ATT_READ_BY_TYPE_REQUEST, 101 ATT_READ_REQUEST, 102 ATT_READ_BLOB_REQUEST, 103 ATT_READ_MULTIPLE_REQUEST, 104 ATT_READ_BY_GROUP_TYPE_REQUEST, 105 ATT_WRITE_REQUEST, 106 ATT_PREPARE_WRITE_REQUEST, 107 ATT_EXECUTE_WRITE_REQUEST 108] 109 110ATT_RESPONSES = [ 111 ATT_ERROR_RESPONSE, 112 ATT_EXCHANGE_MTU_RESPONSE, 113 ATT_FIND_INFORMATION_RESPONSE, 114 ATT_FIND_BY_TYPE_VALUE_RESPONSE, 115 ATT_READ_BY_TYPE_RESPONSE, 116 ATT_READ_RESPONSE, 117 ATT_READ_BLOB_RESPONSE, 118 ATT_READ_MULTIPLE_RESPONSE, 119 ATT_READ_BY_GROUP_TYPE_RESPONSE, 120 ATT_WRITE_RESPONSE, 121 ATT_PREPARE_WRITE_RESPONSE, 122 ATT_EXECUTE_WRITE_RESPONSE 123] 124 125ATT_INVALID_HANDLE_ERROR = 0x01 126ATT_READ_NOT_PERMITTED_ERROR = 0x02 127ATT_WRITE_NOT_PERMITTED_ERROR = 0x03 128ATT_INVALID_PDU_ERROR = 0x04 129ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05 130ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06 131ATT_INVALID_OFFSET_ERROR = 0x07 132ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08 133ATT_PREPARE_QUEUE_FULL_ERROR = 0x09 134ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A 135ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B 136ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C 137ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D 138ATT_UNLIKELY_ERROR_ERROR = 0x0E 139ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F 140ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10 141ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11 142 143ATT_ERROR_NAMES = { 144 ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR', 145 ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR', 146 ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR', 147 ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR', 148 ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR', 149 ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR', 150 ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR', 151 ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR', 152 ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR', 153 ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR', 154 ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR', 155 ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR', 156 ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR', 157 ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR', 158 ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR', 159 ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR', 160 ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR' 161} 162 163ATT_DEFAULT_MTU = 23 164 165HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} 166UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) # noqa: E731 167UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 168 169 170# ----------------------------------------------------------------------------- 171# Utils 172# ----------------------------------------------------------------------------- 173def key_with_value(dictionary, target_value): 174 for key, value in dictionary.items(): 175 if value == target_value: 176 return key 177 return None 178 179 180# ----------------------------------------------------------------------------- 181# Exceptions 182# ----------------------------------------------------------------------------- 183class ATT_Error(Exception): 184 def __init__(self, error_code, att_handle=0x0000): 185 self.error_code = error_code 186 self.att_handle = att_handle 187 188 def __str__(self): 189 return f'ATT_Error({ATT_PDU.error_name(self.error_code)})' 190 191 192# ----------------------------------------------------------------------------- 193# Attribute Protocol 194# ----------------------------------------------------------------------------- 195class ATT_PDU: 196 ''' 197 See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU 198 ''' 199 pdu_classes = {} 200 op_code = 0 201 202 @staticmethod 203 def from_bytes(pdu): 204 op_code = pdu[0] 205 206 cls = ATT_PDU.pdu_classes.get(op_code) 207 if cls is None: 208 instance = ATT_PDU(pdu) 209 instance.name = ATT_PDU.pdu_name(op_code) 210 instance.op_code = op_code 211 return instance 212 self = cls.__new__(cls) 213 ATT_PDU.__init__(self, pdu) 214 if hasattr(self, 'fields'): 215 self.init_from_bytes(pdu, 1) 216 return self 217 218 @staticmethod 219 def pdu_name(op_code): 220 return name_or_number(ATT_PDU_NAMES, op_code, 2) 221 222 @staticmethod 223 def error_name(error_code): 224 return name_or_number(ATT_ERROR_NAMES, error_code, 2) 225 226 @staticmethod 227 def subclass(fields): 228 def inner(cls): 229 cls.name = cls.__name__.upper() 230 cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name) 231 if cls.op_code is None: 232 raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES') 233 cls.fields = fields 234 235 # Register a factory for this class 236 ATT_PDU.pdu_classes[cls.op_code] = cls 237 238 return cls 239 240 return inner 241 242 def __init__(self, pdu=None, **kwargs): 243 if hasattr(self, 'fields') and kwargs: 244 HCI_Object.init_from_fields(self, self.fields, kwargs) 245 if pdu is None: 246 pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 247 self.pdu = pdu 248 249 def init_from_bytes(self, pdu, offset): 250 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 251 252 def to_bytes(self): 253 return self.pdu 254 255 @property 256 def is_command(self): 257 return ((self.op_code >> 6) & 1) == 1 258 259 @property 260 def has_authentication_signature(self): 261 return ((self.op_code >> 7) & 1) == 1 262 263 def __bytes__(self): 264 return self.to_bytes() 265 266 def __str__(self): 267 result = color(self.name, 'yellow') 268 if fields := getattr(self, 'fields', None): 269 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 270 else: 271 if len(self.pdu) > 1: 272 result += f': {self.pdu.hex()}' 273 return result 274 275 276# ----------------------------------------------------------------------------- 277@ATT_PDU.subclass([ 278 ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}), 279 ('attribute_handle_in_error', HANDLE_FIELD_SPEC), 280 ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}) 281]) 282class ATT_Error_Response(ATT_PDU): 283 ''' 284 See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response 285 ''' 286 287 288# ----------------------------------------------------------------------------- 289@ATT_PDU.subclass([ 290 ('client_rx_mtu', 2) 291]) 292class ATT_Exchange_MTU_Request(ATT_PDU): 293 ''' 294 See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request 295 ''' 296 297 298# ----------------------------------------------------------------------------- 299@ATT_PDU.subclass([ 300 ('server_rx_mtu', 2) 301]) 302class ATT_Exchange_MTU_Response(ATT_PDU): 303 ''' 304 See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response 305 ''' 306 307 308# ----------------------------------------------------------------------------- 309@ATT_PDU.subclass([ 310 ('starting_handle', HANDLE_FIELD_SPEC), 311 ('ending_handle', HANDLE_FIELD_SPEC) 312]) 313class ATT_Find_Information_Request(ATT_PDU): 314 ''' 315 See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request 316 ''' 317 318 319# ----------------------------------------------------------------------------- 320@ATT_PDU.subclass([ 321 ('format', 1), 322 ('information_data', '*') 323]) 324class ATT_Find_Information_Response(ATT_PDU): 325 ''' 326 See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response 327 ''' 328 329 def parse_information_data(self): 330 self.information = [] 331 offset = 0 332 uuid_size = 2 if self.format == 1 else 16 333 while offset + uuid_size <= len(self.information_data): 334 handle = struct.unpack_from('<H', self.information_data, offset)[0] 335 uuid = self.information_data[2 + offset:2 + offset + uuid_size] 336 self.information.append((handle, uuid)) 337 offset += 2 + uuid_size 338 339 def __init__(self, *args, **kwargs): 340 super().__init__(*args, **kwargs) 341 self.parse_information_data() 342 343 def init_from_bytes(self, pdu, offset): 344 super().init_from_bytes(pdu, offset) 345 self.parse_information_data() 346 347 def __str__(self): 348 result = color(self.name, 'yellow') 349 result += ':\n' + HCI_Object.format_fields(self.__dict__, [ 350 ('format', 1), 351 ('information', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x])}) 352 ], ' ') 353 return result 354 355 356# ----------------------------------------------------------------------------- 357@ATT_PDU.subclass([ 358 ('starting_handle', HANDLE_FIELD_SPEC), 359 ('ending_handle', HANDLE_FIELD_SPEC), 360 ('attribute_type', UUID_2_FIELD_SPEC), 361 ('attribute_value', '*') 362]) 363class ATT_Find_By_Type_Value_Request(ATT_PDU): 364 ''' 365 See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request 366 ''' 367 368 369# ----------------------------------------------------------------------------- 370@ATT_PDU.subclass([ 371 ('handles_information_list', '*') 372]) 373class ATT_Find_By_Type_Value_Response(ATT_PDU): 374 ''' 375 See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response 376 ''' 377 378 def parse_handles_information_list(self): 379 self.handles_information = [] 380 offset = 0 381 while offset + 4 <= len(self.handles_information_list): 382 found_attribute_handle, group_end_handle = struct.unpack_from('<HH', self.handles_information_list, offset) 383 self.handles_information.append((found_attribute_handle, group_end_handle)) 384 offset += 4 385 386 def __init__(self, *args, **kwargs): 387 super().__init__(*args, **kwargs) 388 self.parse_handles_information_list() 389 390 def init_from_bytes(self, pdu, offset): 391 super().init_from_bytes(pdu, offset) 392 self.parse_handles_information_list() 393 394 def __str__(self): 395 result = color(self.name, 'yellow') 396 result += ':\n' + HCI_Object.format_fields(self.__dict__, [ 397 ('handles_information', {'mapper': lambda x: ', '.join([f'0x{handle1:04X}-0x{handle2:04X}' for handle1, handle2 in x])}) 398 ], ' ') 399 return result 400 401 402# ----------------------------------------------------------------------------- 403@ATT_PDU.subclass([ 404 ('starting_handle', HANDLE_FIELD_SPEC), 405 ('ending_handle', HANDLE_FIELD_SPEC), 406 ('attribute_type', UUID_2_16_FIELD_SPEC) 407]) 408class ATT_Read_By_Type_Request(ATT_PDU): 409 ''' 410 See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request 411 ''' 412 413 414# ----------------------------------------------------------------------------- 415@ATT_PDU.subclass([ 416 ('length', 1), 417 ('attribute_data_list', '*') 418]) 419class ATT_Read_By_Type_Response(ATT_PDU): 420 ''' 421 See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response 422 ''' 423 424 def parse_attribute_data_list(self): 425 self.attributes = [] 426 offset = 0 427 while self.length != 0 and offset + self.length <= len(self.attribute_data_list): 428 attribute_handle, = struct.unpack_from('<H', self.attribute_data_list, offset) 429 attribute_value = self.attribute_data_list[offset + 2:offset + self.length] 430 self.attributes.append((attribute_handle, attribute_value)) 431 offset += self.length 432 433 def __init__(self, *args, **kwargs): 434 super().__init__(*args, **kwargs) 435 self.parse_attribute_data_list() 436 437 def init_from_bytes(self, pdu, offset): 438 super().init_from_bytes(pdu, offset) 439 self.parse_attribute_data_list() 440 441 def __str__(self): 442 result = color(self.name, 'yellow') 443 result += ':\n' + HCI_Object.format_fields(self.__dict__, [ 444 ('length', 1), 445 ('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{value.hex()}' for handle, value in x])}) 446 ], ' ') 447 return result 448 449 450# ----------------------------------------------------------------------------- 451@ATT_PDU.subclass([ 452 ('attribute_handle', HANDLE_FIELD_SPEC) 453]) 454class ATT_Read_Request(ATT_PDU): 455 ''' 456 See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request 457 ''' 458 459 460# ----------------------------------------------------------------------------- 461@ATT_PDU.subclass([ 462 ('attribute_value', '*') 463]) 464class ATT_Read_Response(ATT_PDU): 465 ''' 466 See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response 467 ''' 468 469 470# ----------------------------------------------------------------------------- 471@ATT_PDU.subclass([ 472 ('attribute_handle', HANDLE_FIELD_SPEC), 473 ('value_offset', 2) 474]) 475class ATT_Read_Blob_Request(ATT_PDU): 476 ''' 477 See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request 478 ''' 479 480 481# ----------------------------------------------------------------------------- 482@ATT_PDU.subclass([ 483 ('part_attribute_value', '*') 484]) 485class ATT_Read_Blob_Response(ATT_PDU): 486 ''' 487 See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response 488 ''' 489 490 491# ----------------------------------------------------------------------------- 492@ATT_PDU.subclass([ 493 ('set_of_handles', '*') 494]) 495class ATT_Read_Multiple_Request(ATT_PDU): 496 ''' 497 See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request 498 ''' 499 500 501# ----------------------------------------------------------------------------- 502@ATT_PDU.subclass([ 503 ('set_of_values', '*') 504]) 505class ATT_Read_Multiple_Response(ATT_PDU): 506 ''' 507 See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response 508 ''' 509 510 511# ----------------------------------------------------------------------------- 512@ATT_PDU.subclass([ 513 ('starting_handle', HANDLE_FIELD_SPEC), 514 ('ending_handle', HANDLE_FIELD_SPEC), 515 ('attribute_group_type', UUID_2_16_FIELD_SPEC) 516]) 517class ATT_Read_By_Group_Type_Request(ATT_PDU): 518 ''' 519 See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request 520 ''' 521 522 523# ----------------------------------------------------------------------------- 524@ATT_PDU.subclass([ 525 ('length', 1), 526 ('attribute_data_list', '*') 527]) 528class ATT_Read_By_Group_Type_Response(ATT_PDU): 529 ''' 530 See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response 531 ''' 532 533 def parse_attribute_data_list(self): 534 self.attributes = [] 535 offset = 0 536 while self.length != 0 and offset + self.length <= len(self.attribute_data_list): 537 attribute_handle, end_group_handle = struct.unpack_from('<HH', self.attribute_data_list, offset) 538 attribute_value = self.attribute_data_list[offset + 4:offset + self.length] 539 self.attributes.append((attribute_handle, end_group_handle, attribute_value)) 540 offset += self.length 541 542 def __init__(self, *args, **kwargs): 543 super().__init__(*args, **kwargs) 544 self.parse_attribute_data_list() 545 546 def init_from_bytes(self, pdu, offset): 547 super().init_from_bytes(pdu, offset) 548 self.parse_attribute_data_list() 549 550 def __str__(self): 551 result = color(self.name, 'yellow') 552 result += ':\n' + HCI_Object.format_fields(self.__dict__, [ 553 ('length', 1), 554 ('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}-0x{end:04X}:{value.hex()}' for handle, end, value in x])}) 555 ], ' ') 556 return result 557 558 559# ----------------------------------------------------------------------------- 560@ATT_PDU.subclass([ 561 ('attribute_handle', HANDLE_FIELD_SPEC), 562 ('attribute_value', '*') 563]) 564class ATT_Write_Request(ATT_PDU): 565 ''' 566 See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request 567 ''' 568 569 570# ----------------------------------------------------------------------------- 571@ATT_PDU.subclass([]) 572class ATT_Write_Response(ATT_PDU): 573 ''' 574 See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response 575 ''' 576 577 578# ----------------------------------------------------------------------------- 579@ATT_PDU.subclass([ 580 ('attribute_handle', HANDLE_FIELD_SPEC), 581 ('attribute_value', '*') 582]) 583class ATT_Write_Command(ATT_PDU): 584 ''' 585 See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command 586 ''' 587 588 589# ----------------------------------------------------------------------------- 590@ATT_PDU.subclass([ 591 ('attribute_handle', HANDLE_FIELD_SPEC), 592 ('attribute_value', '*') 593 # ('authentication_signature', 'TODO') 594]) 595class ATT_Signed_Write_Command(ATT_PDU): 596 ''' 597 See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command 598 ''' 599 600 601# ----------------------------------------------------------------------------- 602@ATT_PDU.subclass([ 603 ('attribute_handle', HANDLE_FIELD_SPEC), 604 ('value_offset', 2), 605 ('part_attribute_value', '*') 606]) 607class ATT_Prepare_Write_Request(ATT_PDU): 608 ''' 609 See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request 610 ''' 611 612 613# ----------------------------------------------------------------------------- 614@ATT_PDU.subclass([ 615 ('attribute_handle', HANDLE_FIELD_SPEC), 616 ('value_offset', 2), 617 ('part_attribute_value', '*') 618]) 619class ATT_Prepare_Write_Response(ATT_PDU): 620 ''' 621 See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response 622 ''' 623 624 625# ----------------------------------------------------------------------------- 626@ATT_PDU.subclass([]) 627class ATT_Execute_Write_Request(ATT_PDU): 628 ''' 629 See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request 630 ''' 631 632 633# ----------------------------------------------------------------------------- 634@ATT_PDU.subclass([]) 635class ATT_Execute_Write_Response(ATT_PDU): 636 ''' 637 See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response 638 ''' 639 640 641# ----------------------------------------------------------------------------- 642@ATT_PDU.subclass([ 643 ('attribute_handle', HANDLE_FIELD_SPEC), 644 ('attribute_value', '*') 645]) 646class ATT_Handle_Value_Notification(ATT_PDU): 647 ''' 648 See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification 649 ''' 650 651 652# ----------------------------------------------------------------------------- 653@ATT_PDU.subclass([ 654 ('attribute_handle', HANDLE_FIELD_SPEC), 655 ('attribute_value', '*') 656]) 657class ATT_Handle_Value_Indication(ATT_PDU): 658 ''' 659 See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication 660 ''' 661 662 663# ----------------------------------------------------------------------------- 664@ATT_PDU.subclass([]) 665class ATT_Handle_Value_Confirmation(ATT_PDU): 666 ''' 667 See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation 668 ''' 669 670 671# ----------------------------------------------------------------------------- 672class Attribute(EventEmitter): 673 # Permission flags 674 READABLE = 0x01 675 WRITEABLE = 0x02 676 READ_REQUIRES_ENCRYPTION = 0x04 677 WRITE_REQUIRES_ENCRYPTION = 0x08 678 READ_REQUIRES_AUTHENTICATION = 0x10 679 WRITE_REQUIRES_AUTHENTICATION = 0x20 680 READ_REQUIRES_AUTHORIZATION = 0x40 681 WRITE_REQUIRES_AUTHORIZATION = 0x80 682 683 def __init__(self, attribute_type, permissions, value = b''): 684 EventEmitter.__init__(self) 685 self.handle = 0 686 self.end_group_handle = 0 687 self.permissions = permissions 688 689 # Convert the type to a UUID object if it isn't already 690 if type(attribute_type) is str: 691 self.type = UUID(attribute_type) 692 elif type(attribute_type) is bytes: 693 self.type = UUID.from_bytes(attribute_type) 694 else: 695 self.type = attribute_type 696 697 # Convert the value to a byte array 698 if type(value) is str: 699 self.value = bytes(value, 'utf-8') 700 else: 701 self.value = value 702 703 def read_value(self, connection): 704 if read := getattr(self.value, 'read', None): 705 try: 706 return read(connection) 707 except ATT_Error as error: 708 raise ATT_Error(error_code=error.error_code, att_handle=self.handle) 709 else: 710 return self.value 711 712 def write_value(self, connection, value): 713 if write := getattr(self.value, 'write', None): 714 try: 715 write(connection, value) 716 except ATT_Error as error: 717 raise ATT_Error(error_code=error.error_code, att_handle=self.handle) 718 else: 719 self.value = value 720 721 self.emit('write', connection, value) 722 723 def __repr__(self): 724 if len(self.value) > 0: 725 value_string = f', value={self.value.hex()}' 726 else: 727 value_string = '' 728 return f'Attribute(handle=0x{self.handle:04X}, type={self.type}, permissions={self.permissions}{value_string})' 729