• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# ATT - Attribute Protocol
17#
18# See Bluetooth spec @ Vol 3, Part F
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from colors import color
26from pyee import EventEmitter
27
28from .core import *
29from .hci import *
30
31# -----------------------------------------------------------------------------
32# Constants
33# -----------------------------------------------------------------------------
34ATT_CID = 0x04
35
36ATT_ERROR_RESPONSE              = 0x01
37ATT_EXCHANGE_MTU_REQUEST        = 0x02
38ATT_EXCHANGE_MTU_RESPONSE       = 0x03
39ATT_FIND_INFORMATION_REQUEST    = 0x04
40ATT_FIND_INFORMATION_RESPONSE   = 0x05
41ATT_FIND_BY_TYPE_VALUE_REQUEST  = 0x06
42ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
43ATT_READ_BY_TYPE_REQUEST        = 0x08
44ATT_READ_BY_TYPE_RESPONSE       = 0x09
45ATT_READ_REQUEST                = 0x0A
46ATT_READ_RESPONSE               = 0x0B
47ATT_READ_BLOB_REQUEST           = 0x0C
48ATT_READ_BLOB_RESPONSE          = 0x0D
49ATT_READ_MULTIPLE_REQUEST       = 0x0E
50ATT_READ_MULTIPLE_RESPONSE      = 0x0F
51ATT_READ_BY_GROUP_TYPE_REQUEST  = 0x10
52ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
53ATT_WRITE_REQUEST               = 0x12
54ATT_WRITE_RESPONSE              = 0x13
55ATT_WRITE_COMMAND               = 0x52
56ATT_SIGNED_WRITE_COMMAND        = 0xD2
57ATT_PREPARE_WRITE_REQUEST       = 0x16
58ATT_PREPARE_WRITE_RESPONSE      = 0x17
59ATT_EXECUTE_WRITE_REQUEST       = 0x18
60ATT_EXECUTE_WRITE_RESPONSE      = 0x19
61ATT_HANDLE_VALUE_NOTIFICATION   = 0x1B
62ATT_HANDLE_VALUE_INDICATION     = 0x1D
63ATT_HANDLE_VALUE_CONFIRMATION   = 0x1E
64
65ATT_PDU_NAMES = {
66    ATT_ERROR_RESPONSE:              'ATT_ERROR_RESPONSE',
67    ATT_EXCHANGE_MTU_REQUEST:        'ATT_EXCHANGE_MTU_REQUEST',
68    ATT_EXCHANGE_MTU_RESPONSE:       'ATT_EXCHANGE_MTU_RESPONSE',
69    ATT_FIND_INFORMATION_REQUEST:    'ATT_FIND_INFORMATION_REQUEST',
70    ATT_FIND_INFORMATION_RESPONSE:   'ATT_FIND_INFORMATION_RESPONSE',
71    ATT_FIND_BY_TYPE_VALUE_REQUEST:  'ATT_FIND_BY_TYPE_VALUE_REQUEST',
72    ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
73    ATT_READ_BY_TYPE_REQUEST:        'ATT_READ_BY_TYPE_REQUEST',
74    ATT_READ_BY_TYPE_RESPONSE:       'ATT_READ_BY_TYPE_RESPONSE',
75    ATT_READ_REQUEST:                'ATT_READ_REQUEST',
76    ATT_READ_RESPONSE:               'ATT_READ_RESPONSE',
77    ATT_READ_BLOB_REQUEST:           'ATT_READ_BLOB_REQUEST',
78    ATT_READ_BLOB_RESPONSE:          'ATT_READ_BLOB_RESPONSE',
79    ATT_READ_MULTIPLE_REQUEST:       'ATT_READ_MULTIPLE_REQUEST',
80    ATT_READ_MULTIPLE_RESPONSE:      'ATT_READ_MULTIPLE_RESPONSE',
81    ATT_READ_BY_GROUP_TYPE_REQUEST:  'ATT_READ_BY_GROUP_TYPE_REQUEST',
82    ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
83    ATT_WRITE_REQUEST:               'ATT_WRITE_REQUEST',
84    ATT_WRITE_RESPONSE:              'ATT_WRITE_RESPONSE',
85    ATT_WRITE_COMMAND:               'ATT_WRITE_COMMAND',
86    ATT_SIGNED_WRITE_COMMAND:        'ATT_SIGNED_WRITE_COMMAND',
87    ATT_PREPARE_WRITE_REQUEST:       'ATT_PREPARE_WRITE_REQUEST',
88    ATT_PREPARE_WRITE_RESPONSE:      'ATT_PREPARE_WRITE_RESPONSE',
89    ATT_EXECUTE_WRITE_REQUEST:       'ATT_EXECUTE_WRITE_REQUEST',
90    ATT_EXECUTE_WRITE_RESPONSE:      'ATT_EXECUTE_WRITE_RESPONSE',
91    ATT_HANDLE_VALUE_NOTIFICATION:   'ATT_HANDLE_VALUE_NOTIFICATION',
92    ATT_HANDLE_VALUE_INDICATION:     'ATT_HANDLE_VALUE_INDICATION',
93    ATT_HANDLE_VALUE_CONFIRMATION:   'ATT_HANDLE_VALUE_CONFIRMATION'
94}
95
96ATT_REQUESTS = [
97    ATT_EXCHANGE_MTU_REQUEST,
98    ATT_FIND_INFORMATION_REQUEST,
99    ATT_FIND_BY_TYPE_VALUE_REQUEST,
100    ATT_READ_BY_TYPE_REQUEST,
101    ATT_READ_REQUEST,
102    ATT_READ_BLOB_REQUEST,
103    ATT_READ_MULTIPLE_REQUEST,
104    ATT_READ_BY_GROUP_TYPE_REQUEST,
105    ATT_WRITE_REQUEST,
106    ATT_PREPARE_WRITE_REQUEST,
107    ATT_EXECUTE_WRITE_REQUEST
108]
109
110ATT_RESPONSES = [
111    ATT_ERROR_RESPONSE,
112    ATT_EXCHANGE_MTU_RESPONSE,
113    ATT_FIND_INFORMATION_RESPONSE,
114    ATT_FIND_BY_TYPE_VALUE_RESPONSE,
115    ATT_READ_BY_TYPE_RESPONSE,
116    ATT_READ_RESPONSE,
117    ATT_READ_BLOB_RESPONSE,
118    ATT_READ_MULTIPLE_RESPONSE,
119    ATT_READ_BY_GROUP_TYPE_RESPONSE,
120    ATT_WRITE_RESPONSE,
121    ATT_PREPARE_WRITE_RESPONSE,
122    ATT_EXECUTE_WRITE_RESPONSE
123]
124
125ATT_INVALID_HANDLE_ERROR                   = 0x01
126ATT_READ_NOT_PERMITTED_ERROR               = 0x02
127ATT_WRITE_NOT_PERMITTED_ERROR              = 0x03
128ATT_INVALID_PDU_ERROR                      = 0x04
129ATT_INSUFFICIENT_AUTHENTICATION_ERROR      = 0x05
130ATT_REQUEST_NOT_SUPPORTED_ERROR            = 0x06
131ATT_INVALID_OFFSET_ERROR                   = 0x07
132ATT_INSUFFICIENT_AUTHORIZATION_ERROR       = 0x08
133ATT_PREPARE_QUEUE_FULL_ERROR               = 0x09
134ATT_ATTRIBUTE_NOT_FOUND_ERROR              = 0x0A
135ATT_ATTRIBUTE_NOT_LONG_ERROR               = 0x0B
136ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C
137ATT_INVALID_ATTRIBUTE_LENGTH_ERROR         = 0x0D
138ATT_UNLIKELY_ERROR_ERROR                   = 0x0E
139ATT_INSUFFICIENT_ENCRYPTION_ERROR          = 0x0F
140ATT_UNSUPPORTED_GROUP_TYPE_ERROR           = 0x10
141ATT_INSUFFICIENT_RESOURCES_ERROR           = 0x11
142
143ATT_ERROR_NAMES = {
144    ATT_INVALID_HANDLE_ERROR:                   'ATT_INVALID_HANDLE_ERROR',
145    ATT_READ_NOT_PERMITTED_ERROR:               'ATT_READ_NOT_PERMITTED_ERROR',
146    ATT_WRITE_NOT_PERMITTED_ERROR:              'ATT_WRITE_NOT_PERMITTED_ERROR',
147    ATT_INVALID_PDU_ERROR:                      'ATT_INVALID_PDU_ERROR',
148    ATT_INSUFFICIENT_AUTHENTICATION_ERROR:      'ATT_INSUFFICIENT_AUTHENTICATION_ERROR',
149    ATT_REQUEST_NOT_SUPPORTED_ERROR:            'ATT_REQUEST_NOT_SUPPORTED_ERROR',
150    ATT_INVALID_OFFSET_ERROR:                   'ATT_INVALID_OFFSET_ERROR',
151    ATT_INSUFFICIENT_AUTHORIZATION_ERROR:       'ATT_INSUFFICIENT_AUTHORIZATION_ERROR',
152    ATT_PREPARE_QUEUE_FULL_ERROR:               'ATT_PREPARE_QUEUE_FULL_ERROR',
153    ATT_ATTRIBUTE_NOT_FOUND_ERROR:              'ATT_ATTRIBUTE_NOT_FOUND_ERROR',
154    ATT_ATTRIBUTE_NOT_LONG_ERROR:               'ATT_ATTRIBUTE_NOT_LONG_ERROR',
155    ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR',
156    ATT_INVALID_ATTRIBUTE_LENGTH_ERROR:         'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR',
157    ATT_UNLIKELY_ERROR_ERROR:                   'ATT_UNLIKELY_ERROR_ERROR',
158    ATT_INSUFFICIENT_ENCRYPTION_ERROR:          'ATT_INSUFFICIENT_ENCRYPTION_ERROR',
159    ATT_UNSUPPORTED_GROUP_TYPE_ERROR:           'ATT_UNSUPPORTED_GROUP_TYPE_ERROR',
160    ATT_INSUFFICIENT_RESOURCES_ERROR:           'ATT_INSUFFICIENT_RESOURCES_ERROR'
161}
162
163ATT_DEFAULT_MTU = 23
164
165HANDLE_FIELD_SPEC    = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
166UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y)    # noqa: E731
167UUID_2_FIELD_SPEC    = lambda x, y: UUID.parse_uuid_2(x, y)  # noqa: E731
168
169
170# -----------------------------------------------------------------------------
171# Utils
172# -----------------------------------------------------------------------------
173def key_with_value(dictionary, target_value):
174    for key, value in dictionary.items():
175        if value == target_value:
176            return key
177    return None
178
179
180# -----------------------------------------------------------------------------
181# Exceptions
182# -----------------------------------------------------------------------------
183class ATT_Error(Exception):
184    def __init__(self, error_code, att_handle=0x0000):
185        self.error_code = error_code
186        self.att_handle = att_handle
187
188    def __str__(self):
189        return f'ATT_Error({ATT_PDU.error_name(self.error_code)})'
190
191
192# -----------------------------------------------------------------------------
193# Attribute Protocol
194# -----------------------------------------------------------------------------
195class ATT_PDU:
196    '''
197    See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
198    '''
199    pdu_classes = {}
200    op_code = 0
201
202    @staticmethod
203    def from_bytes(pdu):
204        op_code = pdu[0]
205
206        cls = ATT_PDU.pdu_classes.get(op_code)
207        if cls is None:
208            instance = ATT_PDU(pdu)
209            instance.name = ATT_PDU.pdu_name(op_code)
210            instance.op_code = op_code
211            return instance
212        self = cls.__new__(cls)
213        ATT_PDU.__init__(self, pdu)
214        if hasattr(self, 'fields'):
215            self.init_from_bytes(pdu, 1)
216        return self
217
218    @staticmethod
219    def pdu_name(op_code):
220        return name_or_number(ATT_PDU_NAMES, op_code, 2)
221
222    @staticmethod
223    def error_name(error_code):
224        return name_or_number(ATT_ERROR_NAMES, error_code, 2)
225
226    @staticmethod
227    def subclass(fields):
228        def inner(cls):
229            cls.name = cls.__name__.upper()
230            cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
231            if cls.op_code is None:
232                raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
233            cls.fields = fields
234
235            # Register a factory for this class
236            ATT_PDU.pdu_classes[cls.op_code] = cls
237
238            return cls
239
240        return inner
241
242    def __init__(self, pdu=None, **kwargs):
243        if hasattr(self, 'fields') and kwargs:
244            HCI_Object.init_from_fields(self, self.fields, kwargs)
245        if pdu is None:
246            pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
247        self.pdu = pdu
248
249    def init_from_bytes(self, pdu, offset):
250        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
251
252    def to_bytes(self):
253        return self.pdu
254
255    @property
256    def is_command(self):
257        return ((self.op_code >> 6) & 1) == 1
258
259    @property
260    def has_authentication_signature(self):
261        return ((self.op_code >> 7) & 1) == 1
262
263    def __bytes__(self):
264        return self.to_bytes()
265
266    def __str__(self):
267        result = color(self.name, 'yellow')
268        if fields := getattr(self, 'fields', None):
269            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
270        else:
271            if len(self.pdu) > 1:
272                result += f': {self.pdu.hex()}'
273        return result
274
275
276# -----------------------------------------------------------------------------
277@ATT_PDU.subclass([
278    ('request_opcode_in_error',   {'size': 1, 'mapper': ATT_PDU.pdu_name}),
279    ('attribute_handle_in_error', HANDLE_FIELD_SPEC),
280    ('error_code',                {'size': 1, 'mapper': ATT_PDU.error_name})
281])
282class ATT_Error_Response(ATT_PDU):
283    '''
284    See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
285    '''
286
287
288# -----------------------------------------------------------------------------
289@ATT_PDU.subclass([
290    ('client_rx_mtu', 2)
291])
292class ATT_Exchange_MTU_Request(ATT_PDU):
293    '''
294    See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
295    '''
296
297
298# -----------------------------------------------------------------------------
299@ATT_PDU.subclass([
300    ('server_rx_mtu', 2)
301])
302class ATT_Exchange_MTU_Response(ATT_PDU):
303    '''
304    See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
305    '''
306
307
308# -----------------------------------------------------------------------------
309@ATT_PDU.subclass([
310    ('starting_handle', HANDLE_FIELD_SPEC),
311    ('ending_handle',   HANDLE_FIELD_SPEC)
312])
313class ATT_Find_Information_Request(ATT_PDU):
314    '''
315    See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
316    '''
317
318
319# -----------------------------------------------------------------------------
320@ATT_PDU.subclass([
321    ('format',           1),
322    ('information_data', '*')
323])
324class ATT_Find_Information_Response(ATT_PDU):
325    '''
326    See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
327    '''
328
329    def parse_information_data(self):
330        self.information = []
331        offset = 0
332        uuid_size = 2 if self.format == 1 else 16
333        while offset + uuid_size <= len(self.information_data):
334            handle = struct.unpack_from('<H', self.information_data, offset)[0]
335            uuid   = self.information_data[2 + offset:2 + offset + uuid_size]
336            self.information.append((handle, uuid))
337            offset += 2 + uuid_size
338
339    def __init__(self, *args, **kwargs):
340        super().__init__(*args, **kwargs)
341        self.parse_information_data()
342
343    def init_from_bytes(self, pdu, offset):
344        super().init_from_bytes(pdu, offset)
345        self.parse_information_data()
346
347    def __str__(self):
348        result = color(self.name, 'yellow')
349        result += ':\n' + HCI_Object.format_fields(self.__dict__, [
350            ('format',       1),
351            ('information', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x])})
352        ], '  ')
353        return result
354
355
356# -----------------------------------------------------------------------------
357@ATT_PDU.subclass([
358    ('starting_handle', HANDLE_FIELD_SPEC),
359    ('ending_handle',   HANDLE_FIELD_SPEC),
360    ('attribute_type',  UUID_2_FIELD_SPEC),
361    ('attribute_value', '*')
362])
363class ATT_Find_By_Type_Value_Request(ATT_PDU):
364    '''
365    See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
366    '''
367
368
369# -----------------------------------------------------------------------------
370@ATT_PDU.subclass([
371    ('handles_information_list', '*')
372])
373class ATT_Find_By_Type_Value_Response(ATT_PDU):
374    '''
375    See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
376    '''
377
378    def parse_handles_information_list(self):
379        self.handles_information = []
380        offset = 0
381        while offset + 4 <= len(self.handles_information_list):
382            found_attribute_handle, group_end_handle = struct.unpack_from('<HH', self.handles_information_list, offset)
383            self.handles_information.append((found_attribute_handle, group_end_handle))
384            offset += 4
385
386    def __init__(self, *args, **kwargs):
387        super().__init__(*args, **kwargs)
388        self.parse_handles_information_list()
389
390    def init_from_bytes(self, pdu, offset):
391        super().init_from_bytes(pdu, offset)
392        self.parse_handles_information_list()
393
394    def __str__(self):
395        result = color(self.name, 'yellow')
396        result += ':\n' + HCI_Object.format_fields(self.__dict__, [
397            ('handles_information', {'mapper': lambda x: ', '.join([f'0x{handle1:04X}-0x{handle2:04X}' for handle1, handle2 in x])})
398        ], '  ')
399        return result
400
401
402# -----------------------------------------------------------------------------
403@ATT_PDU.subclass([
404    ('starting_handle', HANDLE_FIELD_SPEC),
405    ('ending_handle',   HANDLE_FIELD_SPEC),
406    ('attribute_type',  UUID_2_16_FIELD_SPEC)
407])
408class ATT_Read_By_Type_Request(ATT_PDU):
409    '''
410    See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
411    '''
412
413
414# -----------------------------------------------------------------------------
415@ATT_PDU.subclass([
416    ('length',              1),
417    ('attribute_data_list', '*')
418])
419class ATT_Read_By_Type_Response(ATT_PDU):
420    '''
421    See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
422    '''
423
424    def parse_attribute_data_list(self):
425        self.attributes = []
426        offset = 0
427        while self.length != 0 and offset + self.length <= len(self.attribute_data_list):
428            attribute_handle, = struct.unpack_from('<H', self.attribute_data_list, offset)
429            attribute_value = self.attribute_data_list[offset + 2:offset + self.length]
430            self.attributes.append((attribute_handle, attribute_value))
431            offset += self.length
432
433    def __init__(self, *args, **kwargs):
434        super().__init__(*args, **kwargs)
435        self.parse_attribute_data_list()
436
437    def init_from_bytes(self, pdu, offset):
438        super().init_from_bytes(pdu, offset)
439        self.parse_attribute_data_list()
440
441    def __str__(self):
442        result = color(self.name, 'yellow')
443        result += ':\n' + HCI_Object.format_fields(self.__dict__, [
444            ('length',     1),
445            ('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{value.hex()}' for handle, value in x])})
446        ], '  ')
447        return result
448
449
450# -----------------------------------------------------------------------------
451@ATT_PDU.subclass([
452    ('attribute_handle', HANDLE_FIELD_SPEC)
453])
454class ATT_Read_Request(ATT_PDU):
455    '''
456    See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
457    '''
458
459
460# -----------------------------------------------------------------------------
461@ATT_PDU.subclass([
462    ('attribute_value', '*')
463])
464class ATT_Read_Response(ATT_PDU):
465    '''
466    See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
467    '''
468
469
470# -----------------------------------------------------------------------------
471@ATT_PDU.subclass([
472    ('attribute_handle', HANDLE_FIELD_SPEC),
473    ('value_offset',     2)
474])
475class ATT_Read_Blob_Request(ATT_PDU):
476    '''
477    See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
478    '''
479
480
481# -----------------------------------------------------------------------------
482@ATT_PDU.subclass([
483    ('part_attribute_value', '*')
484])
485class ATT_Read_Blob_Response(ATT_PDU):
486    '''
487    See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
488    '''
489
490
491# -----------------------------------------------------------------------------
492@ATT_PDU.subclass([
493    ('set_of_handles', '*')
494])
495class ATT_Read_Multiple_Request(ATT_PDU):
496    '''
497    See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
498    '''
499
500
501# -----------------------------------------------------------------------------
502@ATT_PDU.subclass([
503    ('set_of_values', '*')
504])
505class ATT_Read_Multiple_Response(ATT_PDU):
506    '''
507    See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
508    '''
509
510
511# -----------------------------------------------------------------------------
512@ATT_PDU.subclass([
513    ('starting_handle',      HANDLE_FIELD_SPEC),
514    ('ending_handle',        HANDLE_FIELD_SPEC),
515    ('attribute_group_type', UUID_2_16_FIELD_SPEC)
516])
517class ATT_Read_By_Group_Type_Request(ATT_PDU):
518    '''
519    See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
520    '''
521
522
523# -----------------------------------------------------------------------------
524@ATT_PDU.subclass([
525    ('length',              1),
526    ('attribute_data_list', '*')
527])
528class ATT_Read_By_Group_Type_Response(ATT_PDU):
529    '''
530    See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
531    '''
532
533    def parse_attribute_data_list(self):
534        self.attributes = []
535        offset = 0
536        while self.length != 0 and offset + self.length <= len(self.attribute_data_list):
537            attribute_handle, end_group_handle = struct.unpack_from('<HH', self.attribute_data_list, offset)
538            attribute_value = self.attribute_data_list[offset + 4:offset + self.length]
539            self.attributes.append((attribute_handle, end_group_handle, attribute_value))
540            offset += self.length
541
542    def __init__(self, *args, **kwargs):
543        super().__init__(*args, **kwargs)
544        self.parse_attribute_data_list()
545
546    def init_from_bytes(self, pdu, offset):
547        super().init_from_bytes(pdu, offset)
548        self.parse_attribute_data_list()
549
550    def __str__(self):
551        result = color(self.name, 'yellow')
552        result += ':\n' + HCI_Object.format_fields(self.__dict__, [
553            ('length',     1),
554            ('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}-0x{end:04X}:{value.hex()}' for handle, end, value in x])})
555        ], '  ')
556        return result
557
558
559# -----------------------------------------------------------------------------
560@ATT_PDU.subclass([
561    ('attribute_handle', HANDLE_FIELD_SPEC),
562    ('attribute_value',  '*')
563])
564class ATT_Write_Request(ATT_PDU):
565    '''
566    See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
567    '''
568
569
570# -----------------------------------------------------------------------------
571@ATT_PDU.subclass([])
572class ATT_Write_Response(ATT_PDU):
573    '''
574    See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
575    '''
576
577
578# -----------------------------------------------------------------------------
579@ATT_PDU.subclass([
580    ('attribute_handle', HANDLE_FIELD_SPEC),
581    ('attribute_value',  '*')
582])
583class ATT_Write_Command(ATT_PDU):
584    '''
585    See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
586    '''
587
588
589# -----------------------------------------------------------------------------
590@ATT_PDU.subclass([
591    ('attribute_handle', HANDLE_FIELD_SPEC),
592    ('attribute_value',  '*')
593    # ('authentication_signature', 'TODO')
594])
595class ATT_Signed_Write_Command(ATT_PDU):
596    '''
597    See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
598    '''
599
600
601# -----------------------------------------------------------------------------
602@ATT_PDU.subclass([
603    ('attribute_handle',     HANDLE_FIELD_SPEC),
604    ('value_offset',         2),
605    ('part_attribute_value', '*')
606])
607class ATT_Prepare_Write_Request(ATT_PDU):
608    '''
609    See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
610    '''
611
612
613# -----------------------------------------------------------------------------
614@ATT_PDU.subclass([
615    ('attribute_handle',     HANDLE_FIELD_SPEC),
616    ('value_offset',         2),
617    ('part_attribute_value', '*')
618])
619class ATT_Prepare_Write_Response(ATT_PDU):
620    '''
621    See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
622    '''
623
624
625# -----------------------------------------------------------------------------
626@ATT_PDU.subclass([])
627class ATT_Execute_Write_Request(ATT_PDU):
628    '''
629    See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
630    '''
631
632
633# -----------------------------------------------------------------------------
634@ATT_PDU.subclass([])
635class ATT_Execute_Write_Response(ATT_PDU):
636    '''
637    See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
638    '''
639
640
641# -----------------------------------------------------------------------------
642@ATT_PDU.subclass([
643    ('attribute_handle', HANDLE_FIELD_SPEC),
644    ('attribute_value',  '*')
645])
646class ATT_Handle_Value_Notification(ATT_PDU):
647    '''
648    See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
649    '''
650
651
652# -----------------------------------------------------------------------------
653@ATT_PDU.subclass([
654    ('attribute_handle', HANDLE_FIELD_SPEC),
655    ('attribute_value',  '*')
656])
657class ATT_Handle_Value_Indication(ATT_PDU):
658    '''
659    See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
660    '''
661
662
663# -----------------------------------------------------------------------------
664@ATT_PDU.subclass([])
665class ATT_Handle_Value_Confirmation(ATT_PDU):
666    '''
667    See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
668    '''
669
670
671# -----------------------------------------------------------------------------
672class Attribute(EventEmitter):
673    # Permission flags
674    READABLE                      = 0x01
675    WRITEABLE                     = 0x02
676    READ_REQUIRES_ENCRYPTION      = 0x04
677    WRITE_REQUIRES_ENCRYPTION     = 0x08
678    READ_REQUIRES_AUTHENTICATION  = 0x10
679    WRITE_REQUIRES_AUTHENTICATION = 0x20
680    READ_REQUIRES_AUTHORIZATION   = 0x40
681    WRITE_REQUIRES_AUTHORIZATION  = 0x80
682
683    def __init__(self, attribute_type, permissions, value = b''):
684        EventEmitter.__init__(self)
685        self.handle           = 0
686        self.end_group_handle = 0
687        self.permissions      = permissions
688
689        # Convert the type to a UUID object if it isn't already
690        if type(attribute_type) is str:
691            self.type = UUID(attribute_type)
692        elif type(attribute_type) is bytes:
693            self.type = UUID.from_bytes(attribute_type)
694        else:
695            self.type = attribute_type
696
697        # Convert the value to a byte array
698        if type(value) is str:
699            self.value = bytes(value, 'utf-8')
700        else:
701            self.value = value
702
703    def read_value(self, connection):
704        if read := getattr(self.value, 'read', None):
705            try:
706                return read(connection)
707            except ATT_Error as error:
708                raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
709        else:
710            return self.value
711
712    def write_value(self, connection, value):
713        if write := getattr(self.value, 'write', None):
714            try:
715                write(connection, value)
716            except ATT_Error as error:
717                raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
718        else:
719            self.value = value
720
721        self.emit('write', connection, value)
722
723    def __repr__(self):
724        if len(self.value) > 0:
725            value_string = f', value={self.value.hex()}'
726        else:
727            value_string = ''
728        return f'Attribute(handle=0x{self.handle:04X}, type={self.type}, permissions={self.permissions}{value_string})'
729