• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# ATT - Attribute Protocol
17#
18# See Bluetooth spec @ Vol 3, Part F
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import functools
27import struct
28from pyee import EventEmitter
29from typing import Dict, Type, TYPE_CHECKING
30
31from bumble.core import UUID, name_or_number, get_dict_key_by_value, ProtocolError
32from bumble.hci import HCI_Object, key_with_value, HCI_Constant
33from bumble.colors import color
34
35if TYPE_CHECKING:
36    from bumble.device import Connection
37
38# -----------------------------------------------------------------------------
39# Constants
40# -----------------------------------------------------------------------------
41# fmt: off
42# pylint: disable=line-too-long
43
44ATT_CID = 0x04
45
46ATT_ERROR_RESPONSE              = 0x01
47ATT_EXCHANGE_MTU_REQUEST        = 0x02
48ATT_EXCHANGE_MTU_RESPONSE       = 0x03
49ATT_FIND_INFORMATION_REQUEST    = 0x04
50ATT_FIND_INFORMATION_RESPONSE   = 0x05
51ATT_FIND_BY_TYPE_VALUE_REQUEST  = 0x06
52ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
53ATT_READ_BY_TYPE_REQUEST        = 0x08
54ATT_READ_BY_TYPE_RESPONSE       = 0x09
55ATT_READ_REQUEST                = 0x0A
56ATT_READ_RESPONSE               = 0x0B
57ATT_READ_BLOB_REQUEST           = 0x0C
58ATT_READ_BLOB_RESPONSE          = 0x0D
59ATT_READ_MULTIPLE_REQUEST       = 0x0E
60ATT_READ_MULTIPLE_RESPONSE      = 0x0F
61ATT_READ_BY_GROUP_TYPE_REQUEST  = 0x10
62ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
63ATT_WRITE_REQUEST               = 0x12
64ATT_WRITE_RESPONSE              = 0x13
65ATT_WRITE_COMMAND               = 0x52
66ATT_SIGNED_WRITE_COMMAND        = 0xD2
67ATT_PREPARE_WRITE_REQUEST       = 0x16
68ATT_PREPARE_WRITE_RESPONSE      = 0x17
69ATT_EXECUTE_WRITE_REQUEST       = 0x18
70ATT_EXECUTE_WRITE_RESPONSE      = 0x19
71ATT_HANDLE_VALUE_NOTIFICATION   = 0x1B
72ATT_HANDLE_VALUE_INDICATION     = 0x1D
73ATT_HANDLE_VALUE_CONFIRMATION   = 0x1E
74
75ATT_PDU_NAMES = {
76    ATT_ERROR_RESPONSE:              'ATT_ERROR_RESPONSE',
77    ATT_EXCHANGE_MTU_REQUEST:        'ATT_EXCHANGE_MTU_REQUEST',
78    ATT_EXCHANGE_MTU_RESPONSE:       'ATT_EXCHANGE_MTU_RESPONSE',
79    ATT_FIND_INFORMATION_REQUEST:    'ATT_FIND_INFORMATION_REQUEST',
80    ATT_FIND_INFORMATION_RESPONSE:   'ATT_FIND_INFORMATION_RESPONSE',
81    ATT_FIND_BY_TYPE_VALUE_REQUEST:  'ATT_FIND_BY_TYPE_VALUE_REQUEST',
82    ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
83    ATT_READ_BY_TYPE_REQUEST:        'ATT_READ_BY_TYPE_REQUEST',
84    ATT_READ_BY_TYPE_RESPONSE:       'ATT_READ_BY_TYPE_RESPONSE',
85    ATT_READ_REQUEST:                'ATT_READ_REQUEST',
86    ATT_READ_RESPONSE:               'ATT_READ_RESPONSE',
87    ATT_READ_BLOB_REQUEST:           'ATT_READ_BLOB_REQUEST',
88    ATT_READ_BLOB_RESPONSE:          'ATT_READ_BLOB_RESPONSE',
89    ATT_READ_MULTIPLE_REQUEST:       'ATT_READ_MULTIPLE_REQUEST',
90    ATT_READ_MULTIPLE_RESPONSE:      'ATT_READ_MULTIPLE_RESPONSE',
91    ATT_READ_BY_GROUP_TYPE_REQUEST:  'ATT_READ_BY_GROUP_TYPE_REQUEST',
92    ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
93    ATT_WRITE_REQUEST:               'ATT_WRITE_REQUEST',
94    ATT_WRITE_RESPONSE:              'ATT_WRITE_RESPONSE',
95    ATT_WRITE_COMMAND:               'ATT_WRITE_COMMAND',
96    ATT_SIGNED_WRITE_COMMAND:        'ATT_SIGNED_WRITE_COMMAND',
97    ATT_PREPARE_WRITE_REQUEST:       'ATT_PREPARE_WRITE_REQUEST',
98    ATT_PREPARE_WRITE_RESPONSE:      'ATT_PREPARE_WRITE_RESPONSE',
99    ATT_EXECUTE_WRITE_REQUEST:       'ATT_EXECUTE_WRITE_REQUEST',
100    ATT_EXECUTE_WRITE_RESPONSE:      'ATT_EXECUTE_WRITE_RESPONSE',
101    ATT_HANDLE_VALUE_NOTIFICATION:   'ATT_HANDLE_VALUE_NOTIFICATION',
102    ATT_HANDLE_VALUE_INDICATION:     'ATT_HANDLE_VALUE_INDICATION',
103    ATT_HANDLE_VALUE_CONFIRMATION:   'ATT_HANDLE_VALUE_CONFIRMATION'
104}
105
106ATT_REQUESTS = [
107    ATT_EXCHANGE_MTU_REQUEST,
108    ATT_FIND_INFORMATION_REQUEST,
109    ATT_FIND_BY_TYPE_VALUE_REQUEST,
110    ATT_READ_BY_TYPE_REQUEST,
111    ATT_READ_REQUEST,
112    ATT_READ_BLOB_REQUEST,
113    ATT_READ_MULTIPLE_REQUEST,
114    ATT_READ_BY_GROUP_TYPE_REQUEST,
115    ATT_WRITE_REQUEST,
116    ATT_PREPARE_WRITE_REQUEST,
117    ATT_EXECUTE_WRITE_REQUEST
118]
119
120ATT_RESPONSES = [
121    ATT_ERROR_RESPONSE,
122    ATT_EXCHANGE_MTU_RESPONSE,
123    ATT_FIND_INFORMATION_RESPONSE,
124    ATT_FIND_BY_TYPE_VALUE_RESPONSE,
125    ATT_READ_BY_TYPE_RESPONSE,
126    ATT_READ_RESPONSE,
127    ATT_READ_BLOB_RESPONSE,
128    ATT_READ_MULTIPLE_RESPONSE,
129    ATT_READ_BY_GROUP_TYPE_RESPONSE,
130    ATT_WRITE_RESPONSE,
131    ATT_PREPARE_WRITE_RESPONSE,
132    ATT_EXECUTE_WRITE_RESPONSE
133]
134
135ATT_INVALID_HANDLE_ERROR                   = 0x01
136ATT_READ_NOT_PERMITTED_ERROR               = 0x02
137ATT_WRITE_NOT_PERMITTED_ERROR              = 0x03
138ATT_INVALID_PDU_ERROR                      = 0x04
139ATT_INSUFFICIENT_AUTHENTICATION_ERROR      = 0x05
140ATT_REQUEST_NOT_SUPPORTED_ERROR            = 0x06
141ATT_INVALID_OFFSET_ERROR                   = 0x07
142ATT_INSUFFICIENT_AUTHORIZATION_ERROR       = 0x08
143ATT_PREPARE_QUEUE_FULL_ERROR               = 0x09
144ATT_ATTRIBUTE_NOT_FOUND_ERROR              = 0x0A
145ATT_ATTRIBUTE_NOT_LONG_ERROR               = 0x0B
146ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C
147ATT_INVALID_ATTRIBUTE_LENGTH_ERROR         = 0x0D
148ATT_UNLIKELY_ERROR_ERROR                   = 0x0E
149ATT_INSUFFICIENT_ENCRYPTION_ERROR          = 0x0F
150ATT_UNSUPPORTED_GROUP_TYPE_ERROR           = 0x10
151ATT_INSUFFICIENT_RESOURCES_ERROR           = 0x11
152
153ATT_ERROR_NAMES = {
154    ATT_INVALID_HANDLE_ERROR:                   'ATT_INVALID_HANDLE_ERROR',
155    ATT_READ_NOT_PERMITTED_ERROR:               'ATT_READ_NOT_PERMITTED_ERROR',
156    ATT_WRITE_NOT_PERMITTED_ERROR:              'ATT_WRITE_NOT_PERMITTED_ERROR',
157    ATT_INVALID_PDU_ERROR:                      'ATT_INVALID_PDU_ERROR',
158    ATT_INSUFFICIENT_AUTHENTICATION_ERROR:      'ATT_INSUFFICIENT_AUTHENTICATION_ERROR',
159    ATT_REQUEST_NOT_SUPPORTED_ERROR:            'ATT_REQUEST_NOT_SUPPORTED_ERROR',
160    ATT_INVALID_OFFSET_ERROR:                   'ATT_INVALID_OFFSET_ERROR',
161    ATT_INSUFFICIENT_AUTHORIZATION_ERROR:       'ATT_INSUFFICIENT_AUTHORIZATION_ERROR',
162    ATT_PREPARE_QUEUE_FULL_ERROR:               'ATT_PREPARE_QUEUE_FULL_ERROR',
163    ATT_ATTRIBUTE_NOT_FOUND_ERROR:              'ATT_ATTRIBUTE_NOT_FOUND_ERROR',
164    ATT_ATTRIBUTE_NOT_LONG_ERROR:               'ATT_ATTRIBUTE_NOT_LONG_ERROR',
165    ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR',
166    ATT_INVALID_ATTRIBUTE_LENGTH_ERROR:         'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR',
167    ATT_UNLIKELY_ERROR_ERROR:                   'ATT_UNLIKELY_ERROR_ERROR',
168    ATT_INSUFFICIENT_ENCRYPTION_ERROR:          'ATT_INSUFFICIENT_ENCRYPTION_ERROR',
169    ATT_UNSUPPORTED_GROUP_TYPE_ERROR:           'ATT_UNSUPPORTED_GROUP_TYPE_ERROR',
170    ATT_INSUFFICIENT_RESOURCES_ERROR:           'ATT_INSUFFICIENT_RESOURCES_ERROR'
171}
172
173ATT_DEFAULT_MTU = 23
174
175HANDLE_FIELD_SPEC    = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
176# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
177UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y)
178# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
179UUID_2_FIELD_SPEC    = lambda x, y: UUID.parse_uuid_2(x, y)  # noqa: E731
180
181# fmt: on
182# pylint: enable=line-too-long
183# pylint: disable=invalid-name
184
185# -----------------------------------------------------------------------------
186# Exceptions
187# -----------------------------------------------------------------------------
188class ATT_Error(ProtocolError):
189    def __init__(self, error_code, att_handle=0x0000, message=''):
190        super().__init__(
191            error_code,
192            error_namespace='att',
193            error_name=ATT_PDU.error_name(error_code),
194        )
195        self.att_handle = att_handle
196        self.message = message
197
198    def __str__(self):
199        return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
200
201
202# -----------------------------------------------------------------------------
203# Attribute Protocol
204# -----------------------------------------------------------------------------
205class ATT_PDU:
206    '''
207    See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
208    '''
209
210    pdu_classes: Dict[int, Type[ATT_PDU]] = {}
211    op_code = 0
212    name = None
213
214    @staticmethod
215    def from_bytes(pdu):
216        op_code = pdu[0]
217
218        cls = ATT_PDU.pdu_classes.get(op_code)
219        if cls is None:
220            instance = ATT_PDU(pdu)
221            instance.name = ATT_PDU.pdu_name(op_code)
222            instance.op_code = op_code
223            return instance
224        self = cls.__new__(cls)
225        ATT_PDU.__init__(self, pdu)
226        if hasattr(self, 'fields'):
227            self.init_from_bytes(pdu, 1)
228        return self
229
230    @staticmethod
231    def pdu_name(op_code):
232        return name_or_number(ATT_PDU_NAMES, op_code, 2)
233
234    @staticmethod
235    def error_name(error_code):
236        return name_or_number(ATT_ERROR_NAMES, error_code, 2)
237
238    @staticmethod
239    def subclass(fields):
240        def inner(cls):
241            cls.name = cls.__name__.upper()
242            cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
243            if cls.op_code is None:
244                raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
245            cls.fields = fields
246
247            # Register a factory for this class
248            ATT_PDU.pdu_classes[cls.op_code] = cls
249
250            return cls
251
252        return inner
253
254    def __init__(self, pdu=None, **kwargs):
255        if hasattr(self, 'fields') and kwargs:
256            HCI_Object.init_from_fields(self, self.fields, kwargs)
257        if pdu is None:
258            pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
259        self.pdu = pdu
260
261    def init_from_bytes(self, pdu, offset):
262        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
263
264    def to_bytes(self):
265        return self.pdu
266
267    @property
268    def is_command(self):
269        return ((self.op_code >> 6) & 1) == 1
270
271    @property
272    def has_authentication_signature(self):
273        return ((self.op_code >> 7) & 1) == 1
274
275    def __bytes__(self):
276        return self.to_bytes()
277
278    def __str__(self):
279        result = color(self.name, 'yellow')
280        if fields := getattr(self, 'fields', None):
281            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
282        else:
283            if len(self.pdu) > 1:
284                result += f': {self.pdu.hex()}'
285        return result
286
287
288# -----------------------------------------------------------------------------
289@ATT_PDU.subclass(
290    [
291        ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}),
292        ('attribute_handle_in_error', HANDLE_FIELD_SPEC),
293        ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}),
294    ]
295)
296class ATT_Error_Response(ATT_PDU):
297    '''
298    See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
299    '''
300
301
302# -----------------------------------------------------------------------------
303@ATT_PDU.subclass([('client_rx_mtu', 2)])
304class ATT_Exchange_MTU_Request(ATT_PDU):
305    '''
306    See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
307    '''
308
309
310# -----------------------------------------------------------------------------
311@ATT_PDU.subclass([('server_rx_mtu', 2)])
312class ATT_Exchange_MTU_Response(ATT_PDU):
313    '''
314    See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
315    '''
316
317
318# -----------------------------------------------------------------------------
319@ATT_PDU.subclass(
320    [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)]
321)
322class ATT_Find_Information_Request(ATT_PDU):
323    '''
324    See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
325    '''
326
327
328# -----------------------------------------------------------------------------
329@ATT_PDU.subclass([('format', 1), ('information_data', '*')])
330class ATT_Find_Information_Response(ATT_PDU):
331    '''
332    See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
333    '''
334
335    def parse_information_data(self):
336        self.information = []
337        offset = 0
338        uuid_size = 2 if self.format == 1 else 16
339        while offset + uuid_size <= len(self.information_data):
340            handle = struct.unpack_from('<H', self.information_data, offset)[0]
341            uuid = self.information_data[2 + offset : 2 + offset + uuid_size]
342            self.information.append((handle, uuid))
343            offset += 2 + uuid_size
344
345    def __init__(self, *args, **kwargs):
346        super().__init__(*args, **kwargs)
347        self.parse_information_data()
348
349    def init_from_bytes(self, pdu, offset):
350        super().init_from_bytes(pdu, offset)
351        self.parse_information_data()
352
353    def __str__(self):
354        result = color(self.name, 'yellow')
355        result += ':\n' + HCI_Object.format_fields(
356            self.__dict__,
357            [
358                ('format', 1),
359                (
360                    'information',
361                    {
362                        'mapper': lambda x: ', '.join(
363                            [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x]
364                        )
365                    },
366                ),
367            ],
368            '  ',
369        )
370        return result
371
372
373# -----------------------------------------------------------------------------
374@ATT_PDU.subclass(
375    [
376        ('starting_handle', HANDLE_FIELD_SPEC),
377        ('ending_handle', HANDLE_FIELD_SPEC),
378        ('attribute_type', UUID_2_FIELD_SPEC),
379        ('attribute_value', '*'),
380    ]
381)
382class ATT_Find_By_Type_Value_Request(ATT_PDU):
383    '''
384    See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
385    '''
386
387
388# -----------------------------------------------------------------------------
389@ATT_PDU.subclass([('handles_information_list', '*')])
390class ATT_Find_By_Type_Value_Response(ATT_PDU):
391    '''
392    See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
393    '''
394
395    def parse_handles_information_list(self):
396        self.handles_information = []
397        offset = 0
398        while offset + 4 <= len(self.handles_information_list):
399            found_attribute_handle, group_end_handle = struct.unpack_from(
400                '<HH', self.handles_information_list, offset
401            )
402            self.handles_information.append((found_attribute_handle, group_end_handle))
403            offset += 4
404
405    def __init__(self, *args, **kwargs):
406        super().__init__(*args, **kwargs)
407        self.parse_handles_information_list()
408
409    def init_from_bytes(self, pdu, offset):
410        super().init_from_bytes(pdu, offset)
411        self.parse_handles_information_list()
412
413    def __str__(self):
414        result = color(self.name, 'yellow')
415        result += ':\n' + HCI_Object.format_fields(
416            self.__dict__,
417            [
418                (
419                    'handles_information',
420                    {
421                        'mapper': lambda x: ', '.join(
422                            [
423                                f'0x{handle1:04X}-0x{handle2:04X}'
424                                for handle1, handle2 in x
425                            ]
426                        )
427                    },
428                )
429            ],
430            '  ',
431        )
432        return result
433
434
435# -----------------------------------------------------------------------------
436@ATT_PDU.subclass(
437    [
438        ('starting_handle', HANDLE_FIELD_SPEC),
439        ('ending_handle', HANDLE_FIELD_SPEC),
440        ('attribute_type', UUID_2_16_FIELD_SPEC),
441    ]
442)
443class ATT_Read_By_Type_Request(ATT_PDU):
444    '''
445    See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
446    '''
447
448
449# -----------------------------------------------------------------------------
450@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
451class ATT_Read_By_Type_Response(ATT_PDU):
452    '''
453    See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
454    '''
455
456    def parse_attribute_data_list(self):
457        self.attributes = []
458        offset = 0
459        while self.length != 0 and offset + self.length <= len(
460            self.attribute_data_list
461        ):
462            (attribute_handle,) = struct.unpack_from(
463                '<H', self.attribute_data_list, offset
464            )
465            attribute_value = self.attribute_data_list[
466                offset + 2 : offset + self.length
467            ]
468            self.attributes.append((attribute_handle, attribute_value))
469            offset += self.length
470
471    def __init__(self, *args, **kwargs):
472        super().__init__(*args, **kwargs)
473        self.parse_attribute_data_list()
474
475    def init_from_bytes(self, pdu, offset):
476        super().init_from_bytes(pdu, offset)
477        self.parse_attribute_data_list()
478
479    def __str__(self):
480        result = color(self.name, 'yellow')
481        result += ':\n' + HCI_Object.format_fields(
482            self.__dict__,
483            [
484                ('length', 1),
485                (
486                    'attributes',
487                    {
488                        'mapper': lambda x: ', '.join(
489                            [f'0x{handle:04X}:{value.hex()}' for handle, value in x]
490                        )
491                    },
492                ),
493            ],
494            '  ',
495        )
496        return result
497
498
499# -----------------------------------------------------------------------------
500@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)])
501class ATT_Read_Request(ATT_PDU):
502    '''
503    See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
504    '''
505
506
507# -----------------------------------------------------------------------------
508@ATT_PDU.subclass([('attribute_value', '*')])
509class ATT_Read_Response(ATT_PDU):
510    '''
511    See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
512    '''
513
514
515# -----------------------------------------------------------------------------
516@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)])
517class ATT_Read_Blob_Request(ATT_PDU):
518    '''
519    See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
520    '''
521
522
523# -----------------------------------------------------------------------------
524@ATT_PDU.subclass([('part_attribute_value', '*')])
525class ATT_Read_Blob_Response(ATT_PDU):
526    '''
527    See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
528    '''
529
530
531# -----------------------------------------------------------------------------
532@ATT_PDU.subclass([('set_of_handles', '*')])
533class ATT_Read_Multiple_Request(ATT_PDU):
534    '''
535    See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
536    '''
537
538
539# -----------------------------------------------------------------------------
540@ATT_PDU.subclass([('set_of_values', '*')])
541class ATT_Read_Multiple_Response(ATT_PDU):
542    '''
543    See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
544    '''
545
546
547# -----------------------------------------------------------------------------
548@ATT_PDU.subclass(
549    [
550        ('starting_handle', HANDLE_FIELD_SPEC),
551        ('ending_handle', HANDLE_FIELD_SPEC),
552        ('attribute_group_type', UUID_2_16_FIELD_SPEC),
553    ]
554)
555class ATT_Read_By_Group_Type_Request(ATT_PDU):
556    '''
557    See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
558    '''
559
560
561# -----------------------------------------------------------------------------
562@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
563class ATT_Read_By_Group_Type_Response(ATT_PDU):
564    '''
565    See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
566    '''
567
568    def parse_attribute_data_list(self):
569        self.attributes = []
570        offset = 0
571        while self.length != 0 and offset + self.length <= len(
572            self.attribute_data_list
573        ):
574            attribute_handle, end_group_handle = struct.unpack_from(
575                '<HH', self.attribute_data_list, offset
576            )
577            attribute_value = self.attribute_data_list[
578                offset + 4 : offset + self.length
579            ]
580            self.attributes.append(
581                (attribute_handle, end_group_handle, attribute_value)
582            )
583            offset += self.length
584
585    def __init__(self, *args, **kwargs):
586        super().__init__(*args, **kwargs)
587        self.parse_attribute_data_list()
588
589    def init_from_bytes(self, pdu, offset):
590        super().init_from_bytes(pdu, offset)
591        self.parse_attribute_data_list()
592
593    def __str__(self):
594        result = color(self.name, 'yellow')
595        result += ':\n' + HCI_Object.format_fields(
596            self.__dict__,
597            [
598                ('length', 1),
599                (
600                    'attributes',
601                    {
602                        'mapper': lambda x: ', '.join(
603                            [
604                                f'0x{handle:04X}-0x{end:04X}:{value.hex()}'
605                                for handle, end, value in x
606                            ]
607                        )
608                    },
609                ),
610            ],
611            '  ',
612        )
613        return result
614
615
616# -----------------------------------------------------------------------------
617@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
618class ATT_Write_Request(ATT_PDU):
619    '''
620    See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
621    '''
622
623
624# -----------------------------------------------------------------------------
625@ATT_PDU.subclass([])
626class ATT_Write_Response(ATT_PDU):
627    '''
628    See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
629    '''
630
631
632# -----------------------------------------------------------------------------
633@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
634class ATT_Write_Command(ATT_PDU):
635    '''
636    See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
637    '''
638
639
640# -----------------------------------------------------------------------------
641@ATT_PDU.subclass(
642    [
643        ('attribute_handle', HANDLE_FIELD_SPEC),
644        ('attribute_value', '*')
645        # ('authentication_signature', 'TODO')
646    ]
647)
648class ATT_Signed_Write_Command(ATT_PDU):
649    '''
650    See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
651    '''
652
653
654# -----------------------------------------------------------------------------
655@ATT_PDU.subclass(
656    [
657        ('attribute_handle', HANDLE_FIELD_SPEC),
658        ('value_offset', 2),
659        ('part_attribute_value', '*'),
660    ]
661)
662class ATT_Prepare_Write_Request(ATT_PDU):
663    '''
664    See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
665    '''
666
667
668# -----------------------------------------------------------------------------
669@ATT_PDU.subclass(
670    [
671        ('attribute_handle', HANDLE_FIELD_SPEC),
672        ('value_offset', 2),
673        ('part_attribute_value', '*'),
674    ]
675)
676class ATT_Prepare_Write_Response(ATT_PDU):
677    '''
678    See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
679    '''
680
681
682# -----------------------------------------------------------------------------
683@ATT_PDU.subclass([])
684class ATT_Execute_Write_Request(ATT_PDU):
685    '''
686    See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
687    '''
688
689
690# -----------------------------------------------------------------------------
691@ATT_PDU.subclass([])
692class ATT_Execute_Write_Response(ATT_PDU):
693    '''
694    See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
695    '''
696
697
698# -----------------------------------------------------------------------------
699@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
700class ATT_Handle_Value_Notification(ATT_PDU):
701    '''
702    See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
703    '''
704
705
706# -----------------------------------------------------------------------------
707@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
708class ATT_Handle_Value_Indication(ATT_PDU):
709    '''
710    See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
711    '''
712
713
714# -----------------------------------------------------------------------------
715@ATT_PDU.subclass([])
716class ATT_Handle_Value_Confirmation(ATT_PDU):
717    '''
718    See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
719    '''
720
721
722# -----------------------------------------------------------------------------
723class Attribute(EventEmitter):
724    # Permission flags
725    READABLE = 0x01
726    WRITEABLE = 0x02
727    READ_REQUIRES_ENCRYPTION = 0x04
728    WRITE_REQUIRES_ENCRYPTION = 0x08
729    READ_REQUIRES_AUTHENTICATION = 0x10
730    WRITE_REQUIRES_AUTHENTICATION = 0x20
731    READ_REQUIRES_AUTHORIZATION = 0x40
732    WRITE_REQUIRES_AUTHORIZATION = 0x80
733
734    PERMISSION_NAMES = {
735        READABLE: 'READABLE',
736        WRITEABLE: 'WRITEABLE',
737        READ_REQUIRES_ENCRYPTION: 'READ_REQUIRES_ENCRYPTION',
738        WRITE_REQUIRES_ENCRYPTION: 'WRITE_REQUIRES_ENCRYPTION',
739        READ_REQUIRES_AUTHENTICATION: 'READ_REQUIRES_AUTHENTICATION',
740        WRITE_REQUIRES_AUTHENTICATION: 'WRITE_REQUIRES_AUTHENTICATION',
741        READ_REQUIRES_AUTHORIZATION: 'READ_REQUIRES_AUTHORIZATION',
742        WRITE_REQUIRES_AUTHORIZATION: 'WRITE_REQUIRES_AUTHORIZATION',
743    }
744
745    @staticmethod
746    def string_to_permissions(permissions_str: str):
747        try:
748            return functools.reduce(
749                lambda x, y: x | get_dict_key_by_value(Attribute.PERMISSION_NAMES, y),
750                permissions_str.split(","),
751                0,
752            )
753        except TypeError:
754            raise TypeError(
755                f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
756            )
757
758    def __init__(self, attribute_type, permissions, value=b''):
759        EventEmitter.__init__(self)
760        self.handle = 0
761        self.end_group_handle = 0
762        if isinstance(permissions, str):
763            self.permissions = self.string_to_permissions(permissions)
764        else:
765            self.permissions = permissions
766
767        # Convert the type to a UUID object if it isn't already
768        if isinstance(attribute_type, str):
769            self.type = UUID(attribute_type)
770        elif isinstance(attribute_type, bytes):
771            self.type = UUID.from_bytes(attribute_type)
772        else:
773            self.type = attribute_type
774
775        # Convert the value to a byte array
776        if isinstance(value, str):
777            self.value = bytes(value, 'utf-8')
778        else:
779            self.value = value
780
781    def encode_value(self, value):
782        return value
783
784    def decode_value(self, value_bytes):
785        return value_bytes
786
787    def read_value(self, connection: Connection):
788        if (
789            self.permissions & self.READ_REQUIRES_ENCRYPTION
790        ) and not connection.encryption:
791            raise ATT_Error(
792                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
793            )
794        if (
795            self.permissions & self.READ_REQUIRES_AUTHENTICATION
796        ) and not connection.authenticated:
797            raise ATT_Error(
798                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
799            )
800        if self.permissions & self.READ_REQUIRES_AUTHORIZATION:
801            # TODO: handle authorization better
802            raise ATT_Error(
803                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
804            )
805
806        if read := getattr(self.value, 'read', None):
807            try:
808                value = read(connection)  # pylint: disable=not-callable
809            except ATT_Error as error:
810                raise ATT_Error(
811                    error_code=error.error_code, att_handle=self.handle
812                ) from error
813        else:
814            value = self.value
815
816        return self.encode_value(value)
817
818    def write_value(self, connection: Connection, value_bytes):
819        if (
820            self.permissions & self.WRITE_REQUIRES_ENCRYPTION
821        ) and not connection.encryption:
822            raise ATT_Error(
823                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
824            )
825        if (
826            self.permissions & self.WRITE_REQUIRES_AUTHENTICATION
827        ) and not connection.authenticated:
828            raise ATT_Error(
829                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
830            )
831        if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION:
832            # TODO: handle authorization better
833            raise ATT_Error(
834                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
835            )
836
837        value = self.decode_value(value_bytes)
838
839        if write := getattr(self.value, 'write', None):
840            try:
841                write(connection, value)  # pylint: disable=not-callable
842            except ATT_Error as error:
843                raise ATT_Error(
844                    error_code=error.error_code, att_handle=self.handle
845                ) from error
846        else:
847            self.value = value
848
849        self.emit('write', connection, value)
850
851    def __repr__(self):
852        if isinstance(self.value, bytes):
853            value_str = self.value.hex()
854        else:
855            value_str = str(self.value)
856        if value_str:
857            value_string = f', value={self.value.hex()}'
858        else:
859            value_string = ''
860        return (
861            f'Attribute(handle=0x{self.handle:04X}, '
862            f'type={self.type}, '
863            f'permissions={self.permissions}{value_string})'
864        )
865