• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# ATT - Attribute Protocol
17#
18# See Bluetooth spec @ Vol 3, Part F
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import enum
27import functools
28import inspect
29import struct
30from typing import (
31    Any,
32    Awaitable,
33    Callable,
34    Dict,
35    List,
36    Optional,
37    Type,
38    Union,
39    TYPE_CHECKING,
40)
41
42from pyee import EventEmitter
43
44from bumble.core import UUID, name_or_number, ProtocolError
45from bumble.hci import HCI_Object, key_with_value
46from bumble.colors import color
47
48if TYPE_CHECKING:
49    from bumble.device import Connection
50
51# -----------------------------------------------------------------------------
52# Constants
53# -----------------------------------------------------------------------------
54# fmt: off
55# pylint: disable=line-too-long
56
57ATT_CID = 0x04
58
59ATT_ERROR_RESPONSE              = 0x01
60ATT_EXCHANGE_MTU_REQUEST        = 0x02
61ATT_EXCHANGE_MTU_RESPONSE       = 0x03
62ATT_FIND_INFORMATION_REQUEST    = 0x04
63ATT_FIND_INFORMATION_RESPONSE   = 0x05
64ATT_FIND_BY_TYPE_VALUE_REQUEST  = 0x06
65ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
66ATT_READ_BY_TYPE_REQUEST        = 0x08
67ATT_READ_BY_TYPE_RESPONSE       = 0x09
68ATT_READ_REQUEST                = 0x0A
69ATT_READ_RESPONSE               = 0x0B
70ATT_READ_BLOB_REQUEST           = 0x0C
71ATT_READ_BLOB_RESPONSE          = 0x0D
72ATT_READ_MULTIPLE_REQUEST       = 0x0E
73ATT_READ_MULTIPLE_RESPONSE      = 0x0F
74ATT_READ_BY_GROUP_TYPE_REQUEST  = 0x10
75ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
76ATT_WRITE_REQUEST               = 0x12
77ATT_WRITE_RESPONSE              = 0x13
78ATT_WRITE_COMMAND               = 0x52
79ATT_SIGNED_WRITE_COMMAND        = 0xD2
80ATT_PREPARE_WRITE_REQUEST       = 0x16
81ATT_PREPARE_WRITE_RESPONSE      = 0x17
82ATT_EXECUTE_WRITE_REQUEST       = 0x18
83ATT_EXECUTE_WRITE_RESPONSE      = 0x19
84ATT_HANDLE_VALUE_NOTIFICATION   = 0x1B
85ATT_HANDLE_VALUE_INDICATION     = 0x1D
86ATT_HANDLE_VALUE_CONFIRMATION   = 0x1E
87
88ATT_PDU_NAMES = {
89    ATT_ERROR_RESPONSE:              'ATT_ERROR_RESPONSE',
90    ATT_EXCHANGE_MTU_REQUEST:        'ATT_EXCHANGE_MTU_REQUEST',
91    ATT_EXCHANGE_MTU_RESPONSE:       'ATT_EXCHANGE_MTU_RESPONSE',
92    ATT_FIND_INFORMATION_REQUEST:    'ATT_FIND_INFORMATION_REQUEST',
93    ATT_FIND_INFORMATION_RESPONSE:   'ATT_FIND_INFORMATION_RESPONSE',
94    ATT_FIND_BY_TYPE_VALUE_REQUEST:  'ATT_FIND_BY_TYPE_VALUE_REQUEST',
95    ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
96    ATT_READ_BY_TYPE_REQUEST:        'ATT_READ_BY_TYPE_REQUEST',
97    ATT_READ_BY_TYPE_RESPONSE:       'ATT_READ_BY_TYPE_RESPONSE',
98    ATT_READ_REQUEST:                'ATT_READ_REQUEST',
99    ATT_READ_RESPONSE:               'ATT_READ_RESPONSE',
100    ATT_READ_BLOB_REQUEST:           'ATT_READ_BLOB_REQUEST',
101    ATT_READ_BLOB_RESPONSE:          'ATT_READ_BLOB_RESPONSE',
102    ATT_READ_MULTIPLE_REQUEST:       'ATT_READ_MULTIPLE_REQUEST',
103    ATT_READ_MULTIPLE_RESPONSE:      'ATT_READ_MULTIPLE_RESPONSE',
104    ATT_READ_BY_GROUP_TYPE_REQUEST:  'ATT_READ_BY_GROUP_TYPE_REQUEST',
105    ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
106    ATT_WRITE_REQUEST:               'ATT_WRITE_REQUEST',
107    ATT_WRITE_RESPONSE:              'ATT_WRITE_RESPONSE',
108    ATT_WRITE_COMMAND:               'ATT_WRITE_COMMAND',
109    ATT_SIGNED_WRITE_COMMAND:        'ATT_SIGNED_WRITE_COMMAND',
110    ATT_PREPARE_WRITE_REQUEST:       'ATT_PREPARE_WRITE_REQUEST',
111    ATT_PREPARE_WRITE_RESPONSE:      'ATT_PREPARE_WRITE_RESPONSE',
112    ATT_EXECUTE_WRITE_REQUEST:       'ATT_EXECUTE_WRITE_REQUEST',
113    ATT_EXECUTE_WRITE_RESPONSE:      'ATT_EXECUTE_WRITE_RESPONSE',
114    ATT_HANDLE_VALUE_NOTIFICATION:   'ATT_HANDLE_VALUE_NOTIFICATION',
115    ATT_HANDLE_VALUE_INDICATION:     'ATT_HANDLE_VALUE_INDICATION',
116    ATT_HANDLE_VALUE_CONFIRMATION:   'ATT_HANDLE_VALUE_CONFIRMATION'
117}
118
119ATT_REQUESTS = [
120    ATT_EXCHANGE_MTU_REQUEST,
121    ATT_FIND_INFORMATION_REQUEST,
122    ATT_FIND_BY_TYPE_VALUE_REQUEST,
123    ATT_READ_BY_TYPE_REQUEST,
124    ATT_READ_REQUEST,
125    ATT_READ_BLOB_REQUEST,
126    ATT_READ_MULTIPLE_REQUEST,
127    ATT_READ_BY_GROUP_TYPE_REQUEST,
128    ATT_WRITE_REQUEST,
129    ATT_PREPARE_WRITE_REQUEST,
130    ATT_EXECUTE_WRITE_REQUEST
131]
132
133ATT_RESPONSES = [
134    ATT_ERROR_RESPONSE,
135    ATT_EXCHANGE_MTU_RESPONSE,
136    ATT_FIND_INFORMATION_RESPONSE,
137    ATT_FIND_BY_TYPE_VALUE_RESPONSE,
138    ATT_READ_BY_TYPE_RESPONSE,
139    ATT_READ_RESPONSE,
140    ATT_READ_BLOB_RESPONSE,
141    ATT_READ_MULTIPLE_RESPONSE,
142    ATT_READ_BY_GROUP_TYPE_RESPONSE,
143    ATT_WRITE_RESPONSE,
144    ATT_PREPARE_WRITE_RESPONSE,
145    ATT_EXECUTE_WRITE_RESPONSE
146]
147
148ATT_INVALID_HANDLE_ERROR                   = 0x01
149ATT_READ_NOT_PERMITTED_ERROR               = 0x02
150ATT_WRITE_NOT_PERMITTED_ERROR              = 0x03
151ATT_INVALID_PDU_ERROR                      = 0x04
152ATT_INSUFFICIENT_AUTHENTICATION_ERROR      = 0x05
153ATT_REQUEST_NOT_SUPPORTED_ERROR            = 0x06
154ATT_INVALID_OFFSET_ERROR                   = 0x07
155ATT_INSUFFICIENT_AUTHORIZATION_ERROR       = 0x08
156ATT_PREPARE_QUEUE_FULL_ERROR               = 0x09
157ATT_ATTRIBUTE_NOT_FOUND_ERROR              = 0x0A
158ATT_ATTRIBUTE_NOT_LONG_ERROR               = 0x0B
159ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C
160ATT_INVALID_ATTRIBUTE_LENGTH_ERROR         = 0x0D
161ATT_UNLIKELY_ERROR_ERROR                   = 0x0E
162ATT_INSUFFICIENT_ENCRYPTION_ERROR          = 0x0F
163ATT_UNSUPPORTED_GROUP_TYPE_ERROR           = 0x10
164ATT_INSUFFICIENT_RESOURCES_ERROR           = 0x11
165
166ATT_ERROR_NAMES = {
167    ATT_INVALID_HANDLE_ERROR:                   'ATT_INVALID_HANDLE_ERROR',
168    ATT_READ_NOT_PERMITTED_ERROR:               'ATT_READ_NOT_PERMITTED_ERROR',
169    ATT_WRITE_NOT_PERMITTED_ERROR:              'ATT_WRITE_NOT_PERMITTED_ERROR',
170    ATT_INVALID_PDU_ERROR:                      'ATT_INVALID_PDU_ERROR',
171    ATT_INSUFFICIENT_AUTHENTICATION_ERROR:      'ATT_INSUFFICIENT_AUTHENTICATION_ERROR',
172    ATT_REQUEST_NOT_SUPPORTED_ERROR:            'ATT_REQUEST_NOT_SUPPORTED_ERROR',
173    ATT_INVALID_OFFSET_ERROR:                   'ATT_INVALID_OFFSET_ERROR',
174    ATT_INSUFFICIENT_AUTHORIZATION_ERROR:       'ATT_INSUFFICIENT_AUTHORIZATION_ERROR',
175    ATT_PREPARE_QUEUE_FULL_ERROR:               'ATT_PREPARE_QUEUE_FULL_ERROR',
176    ATT_ATTRIBUTE_NOT_FOUND_ERROR:              'ATT_ATTRIBUTE_NOT_FOUND_ERROR',
177    ATT_ATTRIBUTE_NOT_LONG_ERROR:               'ATT_ATTRIBUTE_NOT_LONG_ERROR',
178    ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR',
179    ATT_INVALID_ATTRIBUTE_LENGTH_ERROR:         'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR',
180    ATT_UNLIKELY_ERROR_ERROR:                   'ATT_UNLIKELY_ERROR_ERROR',
181    ATT_INSUFFICIENT_ENCRYPTION_ERROR:          'ATT_INSUFFICIENT_ENCRYPTION_ERROR',
182    ATT_UNSUPPORTED_GROUP_TYPE_ERROR:           'ATT_UNSUPPORTED_GROUP_TYPE_ERROR',
183    ATT_INSUFFICIENT_RESOURCES_ERROR:           'ATT_INSUFFICIENT_RESOURCES_ERROR'
184}
185
186ATT_DEFAULT_MTU = 23
187
188HANDLE_FIELD_SPEC    = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
189# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
190UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y)
191# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
192UUID_2_FIELD_SPEC    = lambda x, y: UUID.parse_uuid_2(x, y)  # noqa: E731
193
194# fmt: on
195# pylint: enable=line-too-long
196# pylint: disable=invalid-name
197
198
199# -----------------------------------------------------------------------------
200# Exceptions
201# -----------------------------------------------------------------------------
202class ATT_Error(ProtocolError):
203    def __init__(self, error_code, att_handle=0x0000, message=''):
204        super().__init__(
205            error_code,
206            error_namespace='att',
207            error_name=ATT_PDU.error_name(error_code),
208        )
209        self.att_handle = att_handle
210        self.message = message
211
212    def __str__(self):
213        return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
214
215
216# -----------------------------------------------------------------------------
217# Attribute Protocol
218# -----------------------------------------------------------------------------
219class ATT_PDU:
220    '''
221    See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
222    '''
223
224    pdu_classes: Dict[int, Type[ATT_PDU]] = {}
225    op_code = 0
226    name: str
227
228    @staticmethod
229    def from_bytes(pdu):
230        op_code = pdu[0]
231
232        cls = ATT_PDU.pdu_classes.get(op_code)
233        if cls is None:
234            instance = ATT_PDU(pdu)
235            instance.name = ATT_PDU.pdu_name(op_code)
236            instance.op_code = op_code
237            return instance
238        self = cls.__new__(cls)
239        ATT_PDU.__init__(self, pdu)
240        if hasattr(self, 'fields'):
241            self.init_from_bytes(pdu, 1)
242        return self
243
244    @staticmethod
245    def pdu_name(op_code):
246        return name_or_number(ATT_PDU_NAMES, op_code, 2)
247
248    @staticmethod
249    def error_name(error_code):
250        return name_or_number(ATT_ERROR_NAMES, error_code, 2)
251
252    @staticmethod
253    def subclass(fields):
254        def inner(cls):
255            cls.name = cls.__name__.upper()
256            cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
257            if cls.op_code is None:
258                raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
259            cls.fields = fields
260
261            # Register a factory for this class
262            ATT_PDU.pdu_classes[cls.op_code] = cls
263
264            return cls
265
266        return inner
267
268    def __init__(self, pdu=None, **kwargs):
269        if hasattr(self, 'fields') and kwargs:
270            HCI_Object.init_from_fields(self, self.fields, kwargs)
271        if pdu is None:
272            pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
273        self.pdu = pdu
274
275    def init_from_bytes(self, pdu, offset):
276        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
277
278    def to_bytes(self):
279        return self.pdu
280
281    @property
282    def is_command(self):
283        return ((self.op_code >> 6) & 1) == 1
284
285    @property
286    def has_authentication_signature(self):
287        return ((self.op_code >> 7) & 1) == 1
288
289    def __bytes__(self):
290        return self.to_bytes()
291
292    def __str__(self):
293        result = color(self.name, 'yellow')
294        if fields := getattr(self, 'fields', None):
295            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
296        else:
297            if len(self.pdu) > 1:
298                result += f': {self.pdu.hex()}'
299        return result
300
301
302# -----------------------------------------------------------------------------
303@ATT_PDU.subclass(
304    [
305        ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}),
306        ('attribute_handle_in_error', HANDLE_FIELD_SPEC),
307        ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}),
308    ]
309)
310class ATT_Error_Response(ATT_PDU):
311    '''
312    See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
313    '''
314
315
316# -----------------------------------------------------------------------------
317@ATT_PDU.subclass([('client_rx_mtu', 2)])
318class ATT_Exchange_MTU_Request(ATT_PDU):
319    '''
320    See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
321    '''
322
323
324# -----------------------------------------------------------------------------
325@ATT_PDU.subclass([('server_rx_mtu', 2)])
326class ATT_Exchange_MTU_Response(ATT_PDU):
327    '''
328    See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
329    '''
330
331
332# -----------------------------------------------------------------------------
333@ATT_PDU.subclass(
334    [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)]
335)
336class ATT_Find_Information_Request(ATT_PDU):
337    '''
338    See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
339    '''
340
341
342# -----------------------------------------------------------------------------
343@ATT_PDU.subclass([('format', 1), ('information_data', '*')])
344class ATT_Find_Information_Response(ATT_PDU):
345    '''
346    See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
347    '''
348
349    def parse_information_data(self):
350        self.information = []
351        offset = 0
352        uuid_size = 2 if self.format == 1 else 16
353        while offset + uuid_size <= len(self.information_data):
354            handle = struct.unpack_from('<H', self.information_data, offset)[0]
355            uuid = self.information_data[2 + offset : 2 + offset + uuid_size]
356            self.information.append((handle, uuid))
357            offset += 2 + uuid_size
358
359    def __init__(self, *args, **kwargs):
360        super().__init__(*args, **kwargs)
361        self.parse_information_data()
362
363    def init_from_bytes(self, pdu, offset):
364        super().init_from_bytes(pdu, offset)
365        self.parse_information_data()
366
367    def __str__(self):
368        result = color(self.name, 'yellow')
369        result += ':\n' + HCI_Object.format_fields(
370            self.__dict__,
371            [
372                ('format', 1),
373                (
374                    'information',
375                    {
376                        'mapper': lambda x: ', '.join(
377                            [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x]
378                        )
379                    },
380                ),
381            ],
382            '  ',
383        )
384        return result
385
386
387# -----------------------------------------------------------------------------
388@ATT_PDU.subclass(
389    [
390        ('starting_handle', HANDLE_FIELD_SPEC),
391        ('ending_handle', HANDLE_FIELD_SPEC),
392        ('attribute_type', UUID_2_FIELD_SPEC),
393        ('attribute_value', '*'),
394    ]
395)
396class ATT_Find_By_Type_Value_Request(ATT_PDU):
397    '''
398    See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
399    '''
400
401
402# -----------------------------------------------------------------------------
403@ATT_PDU.subclass([('handles_information_list', '*')])
404class ATT_Find_By_Type_Value_Response(ATT_PDU):
405    '''
406    See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
407    '''
408
409    def parse_handles_information_list(self):
410        self.handles_information = []
411        offset = 0
412        while offset + 4 <= len(self.handles_information_list):
413            found_attribute_handle, group_end_handle = struct.unpack_from(
414                '<HH', self.handles_information_list, offset
415            )
416            self.handles_information.append((found_attribute_handle, group_end_handle))
417            offset += 4
418
419    def __init__(self, *args, **kwargs):
420        super().__init__(*args, **kwargs)
421        self.parse_handles_information_list()
422
423    def init_from_bytes(self, pdu, offset):
424        super().init_from_bytes(pdu, offset)
425        self.parse_handles_information_list()
426
427    def __str__(self):
428        result = color(self.name, 'yellow')
429        result += ':\n' + HCI_Object.format_fields(
430            self.__dict__,
431            [
432                (
433                    'handles_information',
434                    {
435                        'mapper': lambda x: ', '.join(
436                            [
437                                f'0x{handle1:04X}-0x{handle2:04X}'
438                                for handle1, handle2 in x
439                            ]
440                        )
441                    },
442                )
443            ],
444            '  ',
445        )
446        return result
447
448
449# -----------------------------------------------------------------------------
450@ATT_PDU.subclass(
451    [
452        ('starting_handle', HANDLE_FIELD_SPEC),
453        ('ending_handle', HANDLE_FIELD_SPEC),
454        ('attribute_type', UUID_2_16_FIELD_SPEC),
455    ]
456)
457class ATT_Read_By_Type_Request(ATT_PDU):
458    '''
459    See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
460    '''
461
462
463# -----------------------------------------------------------------------------
464@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
465class ATT_Read_By_Type_Response(ATT_PDU):
466    '''
467    See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
468    '''
469
470    def parse_attribute_data_list(self):
471        self.attributes = []
472        offset = 0
473        while self.length != 0 and offset + self.length <= len(
474            self.attribute_data_list
475        ):
476            (attribute_handle,) = struct.unpack_from(
477                '<H', self.attribute_data_list, offset
478            )
479            attribute_value = self.attribute_data_list[
480                offset + 2 : offset + self.length
481            ]
482            self.attributes.append((attribute_handle, attribute_value))
483            offset += self.length
484
485    def __init__(self, *args, **kwargs):
486        super().__init__(*args, **kwargs)
487        self.parse_attribute_data_list()
488
489    def init_from_bytes(self, pdu, offset):
490        super().init_from_bytes(pdu, offset)
491        self.parse_attribute_data_list()
492
493    def __str__(self):
494        result = color(self.name, 'yellow')
495        result += ':\n' + HCI_Object.format_fields(
496            self.__dict__,
497            [
498                ('length', 1),
499                (
500                    'attributes',
501                    {
502                        'mapper': lambda x: ', '.join(
503                            [f'0x{handle:04X}:{value.hex()}' for handle, value in x]
504                        )
505                    },
506                ),
507            ],
508            '  ',
509        )
510        return result
511
512
513# -----------------------------------------------------------------------------
514@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)])
515class ATT_Read_Request(ATT_PDU):
516    '''
517    See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
518    '''
519
520
521# -----------------------------------------------------------------------------
522@ATT_PDU.subclass([('attribute_value', '*')])
523class ATT_Read_Response(ATT_PDU):
524    '''
525    See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
526    '''
527
528
529# -----------------------------------------------------------------------------
530@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)])
531class ATT_Read_Blob_Request(ATT_PDU):
532    '''
533    See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
534    '''
535
536
537# -----------------------------------------------------------------------------
538@ATT_PDU.subclass([('part_attribute_value', '*')])
539class ATT_Read_Blob_Response(ATT_PDU):
540    '''
541    See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
542    '''
543
544
545# -----------------------------------------------------------------------------
546@ATT_PDU.subclass([('set_of_handles', '*')])
547class ATT_Read_Multiple_Request(ATT_PDU):
548    '''
549    See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
550    '''
551
552
553# -----------------------------------------------------------------------------
554@ATT_PDU.subclass([('set_of_values', '*')])
555class ATT_Read_Multiple_Response(ATT_PDU):
556    '''
557    See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
558    '''
559
560
561# -----------------------------------------------------------------------------
562@ATT_PDU.subclass(
563    [
564        ('starting_handle', HANDLE_FIELD_SPEC),
565        ('ending_handle', HANDLE_FIELD_SPEC),
566        ('attribute_group_type', UUID_2_16_FIELD_SPEC),
567    ]
568)
569class ATT_Read_By_Group_Type_Request(ATT_PDU):
570    '''
571    See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
572    '''
573
574
575# -----------------------------------------------------------------------------
576@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
577class ATT_Read_By_Group_Type_Response(ATT_PDU):
578    '''
579    See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
580    '''
581
582    def parse_attribute_data_list(self):
583        self.attributes = []
584        offset = 0
585        while self.length != 0 and offset + self.length <= len(
586            self.attribute_data_list
587        ):
588            attribute_handle, end_group_handle = struct.unpack_from(
589                '<HH', self.attribute_data_list, offset
590            )
591            attribute_value = self.attribute_data_list[
592                offset + 4 : offset + self.length
593            ]
594            self.attributes.append(
595                (attribute_handle, end_group_handle, attribute_value)
596            )
597            offset += self.length
598
599    def __init__(self, *args, **kwargs):
600        super().__init__(*args, **kwargs)
601        self.parse_attribute_data_list()
602
603    def init_from_bytes(self, pdu, offset):
604        super().init_from_bytes(pdu, offset)
605        self.parse_attribute_data_list()
606
607    def __str__(self):
608        result = color(self.name, 'yellow')
609        result += ':\n' + HCI_Object.format_fields(
610            self.__dict__,
611            [
612                ('length', 1),
613                (
614                    'attributes',
615                    {
616                        'mapper': lambda x: ', '.join(
617                            [
618                                f'0x{handle:04X}-0x{end:04X}:{value.hex()}'
619                                for handle, end, value in x
620                            ]
621                        )
622                    },
623                ),
624            ],
625            '  ',
626        )
627        return result
628
629
630# -----------------------------------------------------------------------------
631@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
632class ATT_Write_Request(ATT_PDU):
633    '''
634    See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
635    '''
636
637
638# -----------------------------------------------------------------------------
639@ATT_PDU.subclass([])
640class ATT_Write_Response(ATT_PDU):
641    '''
642    See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
643    '''
644
645
646# -----------------------------------------------------------------------------
647@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
648class ATT_Write_Command(ATT_PDU):
649    '''
650    See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
651    '''
652
653
654# -----------------------------------------------------------------------------
655@ATT_PDU.subclass(
656    [
657        ('attribute_handle', HANDLE_FIELD_SPEC),
658        ('attribute_value', '*'),
659        # ('authentication_signature', 'TODO')
660    ]
661)
662class ATT_Signed_Write_Command(ATT_PDU):
663    '''
664    See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
665    '''
666
667
668# -----------------------------------------------------------------------------
669@ATT_PDU.subclass(
670    [
671        ('attribute_handle', HANDLE_FIELD_SPEC),
672        ('value_offset', 2),
673        ('part_attribute_value', '*'),
674    ]
675)
676class ATT_Prepare_Write_Request(ATT_PDU):
677    '''
678    See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
679    '''
680
681
682# -----------------------------------------------------------------------------
683@ATT_PDU.subclass(
684    [
685        ('attribute_handle', HANDLE_FIELD_SPEC),
686        ('value_offset', 2),
687        ('part_attribute_value', '*'),
688    ]
689)
690class ATT_Prepare_Write_Response(ATT_PDU):
691    '''
692    See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
693    '''
694
695
696# -----------------------------------------------------------------------------
697@ATT_PDU.subclass([])
698class ATT_Execute_Write_Request(ATT_PDU):
699    '''
700    See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
701    '''
702
703
704# -----------------------------------------------------------------------------
705@ATT_PDU.subclass([])
706class ATT_Execute_Write_Response(ATT_PDU):
707    '''
708    See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
709    '''
710
711
712# -----------------------------------------------------------------------------
713@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
714class ATT_Handle_Value_Notification(ATT_PDU):
715    '''
716    See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
717    '''
718
719
720# -----------------------------------------------------------------------------
721@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
722class ATT_Handle_Value_Indication(ATT_PDU):
723    '''
724    See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
725    '''
726
727
728# -----------------------------------------------------------------------------
729@ATT_PDU.subclass([])
730class ATT_Handle_Value_Confirmation(ATT_PDU):
731    '''
732    See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
733    '''
734
735
736# -----------------------------------------------------------------------------
737class AttributeValue:
738    '''
739    Attribute value where reading and/or writing is delegated to functions
740    passed as arguments to the constructor.
741    '''
742
743    def __init__(
744        self,
745        read: Union[
746            Callable[[Optional[Connection]], bytes],
747            Callable[[Optional[Connection]], Awaitable[bytes]],
748            None,
749        ] = None,
750        write: Union[
751            Callable[[Optional[Connection], bytes], None],
752            Callable[[Optional[Connection], bytes], Awaitable[None]],
753            None,
754        ] = None,
755    ):
756        self._read = read
757        self._write = write
758
759    def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
760        return self._read(connection) if self._read else b''
761
762    def write(
763        self, connection: Optional[Connection], value: bytes
764    ) -> Union[Awaitable[None], None]:
765        if self._write:
766            return self._write(connection, value)
767
768        return None
769
770
771# -----------------------------------------------------------------------------
772class Attribute(EventEmitter):
773    class Permissions(enum.IntFlag):
774        READABLE = 0x01
775        WRITEABLE = 0x02
776        READ_REQUIRES_ENCRYPTION = 0x04
777        WRITE_REQUIRES_ENCRYPTION = 0x08
778        READ_REQUIRES_AUTHENTICATION = 0x10
779        WRITE_REQUIRES_AUTHENTICATION = 0x20
780        READ_REQUIRES_AUTHORIZATION = 0x40
781        WRITE_REQUIRES_AUTHORIZATION = 0x80
782
783        @classmethod
784        def from_string(cls, permissions_str: str) -> Attribute.Permissions:
785            try:
786                return functools.reduce(
787                    lambda x, y: x | Attribute.Permissions[y],
788                    permissions_str.replace('|', ',').split(","),
789                    Attribute.Permissions(0),
790                )
791            except TypeError as exc:
792                # The check for `p.name is not None` here is needed because for InFlag
793                # enums, the .name property can be None, when the enum value is 0,
794                # so the type hint for .name is Optional[str].
795                enum_list: List[str] = [p.name for p in cls if p.name is not None]
796                enum_list_str = ",".join(enum_list)
797                raise TypeError(
798                    f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str  }\nGot: {permissions_str}"
799                ) from exc
800
801    # Permission flags(legacy-use only)
802    READABLE = Permissions.READABLE
803    WRITEABLE = Permissions.WRITEABLE
804    READ_REQUIRES_ENCRYPTION = Permissions.READ_REQUIRES_ENCRYPTION
805    WRITE_REQUIRES_ENCRYPTION = Permissions.WRITE_REQUIRES_ENCRYPTION
806    READ_REQUIRES_AUTHENTICATION = Permissions.READ_REQUIRES_AUTHENTICATION
807    WRITE_REQUIRES_AUTHENTICATION = Permissions.WRITE_REQUIRES_AUTHENTICATION
808    READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
809    WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
810
811    value: Union[bytes, AttributeValue]
812
813    def __init__(
814        self,
815        attribute_type: Union[str, bytes, UUID],
816        permissions: Union[str, Attribute.Permissions],
817        value: Union[str, bytes, AttributeValue] = b'',
818    ) -> None:
819        EventEmitter.__init__(self)
820        self.handle = 0
821        self.end_group_handle = 0
822        if isinstance(permissions, str):
823            self.permissions = Attribute.Permissions.from_string(permissions)
824        else:
825            self.permissions = permissions
826
827        # Convert the type to a UUID object if it isn't already
828        if isinstance(attribute_type, str):
829            self.type = UUID(attribute_type)
830        elif isinstance(attribute_type, bytes):
831            self.type = UUID.from_bytes(attribute_type)
832        else:
833            self.type = attribute_type
834
835        # Convert the value to a byte array
836        if isinstance(value, str):
837            self.value = bytes(value, 'utf-8')
838        else:
839            self.value = value
840
841    def encode_value(self, value: Any) -> bytes:
842        return value
843
844    def decode_value(self, value_bytes: bytes) -> Any:
845        return value_bytes
846
847    async def read_value(self, connection: Optional[Connection]) -> bytes:
848        if (
849            (self.permissions & self.READ_REQUIRES_ENCRYPTION)
850            and connection is not None
851            and not connection.encryption
852        ):
853            raise ATT_Error(
854                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
855            )
856        if (
857            (self.permissions & self.READ_REQUIRES_AUTHENTICATION)
858            and connection is not None
859            and not connection.authenticated
860        ):
861            raise ATT_Error(
862                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
863            )
864        if self.permissions & self.READ_REQUIRES_AUTHORIZATION:
865            # TODO: handle authorization better
866            raise ATT_Error(
867                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
868            )
869
870        if hasattr(self.value, 'read'):
871            try:
872                value = self.value.read(connection)
873                if inspect.isawaitable(value):
874                    value = await value
875            except ATT_Error as error:
876                raise ATT_Error(
877                    error_code=error.error_code, att_handle=self.handle
878                ) from error
879        else:
880            value = self.value
881
882        return self.encode_value(value)
883
884    async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
885        if (
886            self.permissions & self.WRITE_REQUIRES_ENCRYPTION
887        ) and not connection.encryption:
888            raise ATT_Error(
889                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
890            )
891        if (
892            self.permissions & self.WRITE_REQUIRES_AUTHENTICATION
893        ) and not connection.authenticated:
894            raise ATT_Error(
895                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
896            )
897        if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION:
898            # TODO: handle authorization better
899            raise ATT_Error(
900                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
901            )
902
903        value = self.decode_value(value_bytes)
904
905        if hasattr(self.value, 'write'):
906            try:
907                result = self.value.write(connection, value)
908                if inspect.isawaitable(result):
909                    await result
910            except ATT_Error as error:
911                raise ATT_Error(
912                    error_code=error.error_code, att_handle=self.handle
913                ) from error
914        else:
915            self.value = value
916
917        self.emit('write', connection, value)
918
919    def __repr__(self):
920        if isinstance(self.value, bytes):
921            value_str = self.value.hex()
922        else:
923            value_str = str(self.value)
924        if value_str:
925            value_string = f', value={self.value.hex()}'
926        else:
927            value_string = ''
928        return (
929            f'Attribute(handle=0x{self.handle:04X}, '
930            f'type={self.type}, '
931            f'permissions={self.permissions}{value_string})'
932        )
933