• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import dataclasses
21import enum
22import logging
23import struct
24
25from collections import deque
26from pyee import EventEmitter
27from typing import (
28    Dict,
29    Type,
30    List,
31    Optional,
32    Tuple,
33    Callable,
34    Any,
35    Union,
36    Deque,
37    Iterable,
38    SupportsBytes,
39    TYPE_CHECKING,
40)
41
42from .utils import deprecated
43from .colors import color
44from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
45from .hci import (
46    HCI_LE_Connection_Update_Command,
47    HCI_Object,
48    key_with_value,
49    name_or_number,
50)
51
52if TYPE_CHECKING:
53    from bumble.device import Connection
54    from bumble.host import Host
55
56# -----------------------------------------------------------------------------
57# Logging
58# -----------------------------------------------------------------------------
59logger = logging.getLogger(__name__)
60
61
62# -----------------------------------------------------------------------------
63# Constants
64# -----------------------------------------------------------------------------
65# fmt: off
66# pylint: disable=line-too-long
67
68L2CAP_SIGNALING_CID    = 0x01
69L2CAP_LE_SIGNALING_CID = 0x05
70
71L2CAP_MIN_LE_MTU     = 23
72L2CAP_MIN_BR_EDR_MTU = 48
73L2CAP_MAX_BR_EDR_MTU = 65535
74
75L2CAP_DEFAULT_MTU = 2048  # Default value for the MTU we are willing to accept
76
77L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024
78
79# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links
80L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040
81L2CAP_ACL_U_DYNAMIC_CID_RANGE_END   = 0xFFFF
82
83# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link
84L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040
85L2CAP_LE_U_DYNAMIC_CID_RANGE_END   = 0x007F
86
87# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage
88L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001
89L2CAP_PSM_DYNAMIC_RANGE_END   = 0xFFFF
90
91# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges
92L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080
93L2CAP_LE_PSM_DYNAMIC_RANGE_END   = 0x00FF
94
95# Frame types
96L2CAP_COMMAND_REJECT                       = 0x01
97L2CAP_CONNECTION_REQUEST                   = 0x02
98L2CAP_CONNECTION_RESPONSE                  = 0x03
99L2CAP_CONFIGURE_REQUEST                    = 0x04
100L2CAP_CONFIGURE_RESPONSE                   = 0x05
101L2CAP_DISCONNECTION_REQUEST                = 0x06
102L2CAP_DISCONNECTION_RESPONSE               = 0x07
103L2CAP_ECHO_REQUEST                         = 0x08
104L2CAP_ECHO_RESPONSE                        = 0x09
105L2CAP_INFORMATION_REQUEST                  = 0x0A
106L2CAP_INFORMATION_RESPONSE                 = 0x0B
107L2CAP_CREATE_CHANNEL_REQUEST               = 0x0C
108L2CAP_CREATE_CHANNEL_RESPONSE              = 0x0D
109L2CAP_MOVE_CHANNEL_REQUEST                 = 0x0E
110L2CAP_MOVE_CHANNEL_RESPONSE                = 0x0F
111L2CAP_MOVE_CHANNEL_CONFIRMATION            = 0x10
112L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE   = 0x11
113L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST  = 0x12
114L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13
115L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST   = 0x14
116L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE  = 0x15
117L2CAP_LE_FLOW_CONTROL_CREDIT               = 0x16
118
119L2CAP_CONTROL_FRAME_NAMES = {
120    L2CAP_COMMAND_REJECT:                       'L2CAP_COMMAND_REJECT',
121    L2CAP_CONNECTION_REQUEST:                   'L2CAP_CONNECTION_REQUEST',
122    L2CAP_CONNECTION_RESPONSE:                  'L2CAP_CONNECTION_RESPONSE',
123    L2CAP_CONFIGURE_REQUEST:                    'L2CAP_CONFIGURE_REQUEST',
124    L2CAP_CONFIGURE_RESPONSE:                   'L2CAP_CONFIGURE_RESPONSE',
125    L2CAP_DISCONNECTION_REQUEST:                'L2CAP_DISCONNECTION_REQUEST',
126    L2CAP_DISCONNECTION_RESPONSE:               'L2CAP_DISCONNECTION_RESPONSE',
127    L2CAP_ECHO_REQUEST:                         'L2CAP_ECHO_REQUEST',
128    L2CAP_ECHO_RESPONSE:                        'L2CAP_ECHO_RESPONSE',
129    L2CAP_INFORMATION_REQUEST:                  'L2CAP_INFORMATION_REQUEST',
130    L2CAP_INFORMATION_RESPONSE:                 'L2CAP_INFORMATION_RESPONSE',
131    L2CAP_CREATE_CHANNEL_REQUEST:               'L2CAP_CREATE_CHANNEL_REQUEST',
132    L2CAP_CREATE_CHANNEL_RESPONSE:              'L2CAP_CREATE_CHANNEL_RESPONSE',
133    L2CAP_MOVE_CHANNEL_REQUEST:                 'L2CAP_MOVE_CHANNEL_REQUEST',
134    L2CAP_MOVE_CHANNEL_RESPONSE:                'L2CAP_MOVE_CHANNEL_RESPONSE',
135    L2CAP_MOVE_CHANNEL_CONFIRMATION:            'L2CAP_MOVE_CHANNEL_CONFIRMATION',
136    L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE:   'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE',
137    L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST:  'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST',
138    L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE',
139    L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST:   'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST',
140    L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE:  'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE',
141    L2CAP_LE_FLOW_CONTROL_CREDIT:               'L2CAP_LE_FLOW_CONTROL_CREDIT'
142}
143
144L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
145L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
146
147L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000
148L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001
149L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
150
151L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS             = 65535
152L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU                 = 23
153L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU                 = 65535
154L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS                 = 23
155L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS                 = 65533
156L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU             = 2048
157L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS             = 2048
158L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
159
160L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
161
162L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
163
164# fmt: on
165# pylint: enable=line-too-long
166
167
168# -----------------------------------------------------------------------------
169# Classes
170# -----------------------------------------------------------------------------
171# pylint: disable=invalid-name
172
173
174@dataclasses.dataclass
175class ClassicChannelSpec:
176    psm: Optional[int] = None
177    mtu: int = L2CAP_DEFAULT_MTU
178
179
180@dataclasses.dataclass
181class LeCreditBasedChannelSpec:
182    psm: Optional[int] = None
183    mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
184    mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
185    max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
186
187    def __post_init__(self):
188        if (
189            self.max_credits < 1
190            or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
191        ):
192            raise ValueError('max credits out of range')
193        if (
194            self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
195            or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
196        ):
197            raise ValueError('MTU out of range')
198        if (
199            self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
200            or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
201        ):
202            raise ValueError('MPS out of range')
203
204
205class L2CAP_PDU:
206    '''
207    See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT
208    '''
209
210    @staticmethod
211    def from_bytes(data: bytes) -> L2CAP_PDU:
212        # Check parameters
213        if len(data) < 4:
214            raise ValueError('not enough data for L2CAP header')
215
216        _, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0)
217        l2cap_pdu_payload = data[4:]
218
219        return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
220
221    def to_bytes(self) -> bytes:
222        header = struct.pack('<HH', len(self.payload), self.cid)
223        return header + self.payload
224
225    def __init__(self, cid: int, payload: bytes) -> None:
226        self.cid = cid
227        self.payload = payload
228
229    def __bytes__(self) -> bytes:
230        return self.to_bytes()
231
232    def __str__(self) -> str:
233        return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
234
235
236# -----------------------------------------------------------------------------
237class L2CAP_Control_Frame:
238    '''
239    See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
240    '''
241
242    classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
243    code = 0
244    name: str
245
246    @staticmethod
247    def from_bytes(pdu: bytes) -> L2CAP_Control_Frame:
248        code = pdu[0]
249
250        cls = L2CAP_Control_Frame.classes.get(code)
251        if cls is None:
252            instance = L2CAP_Control_Frame(pdu)
253            instance.name = L2CAP_Control_Frame.code_name(code)
254            instance.code = code
255            return instance
256        self = cls.__new__(cls)
257        L2CAP_Control_Frame.__init__(self, pdu)
258        self.identifier = pdu[1]
259        length = struct.unpack_from('<H', pdu, 2)[0]
260        if length + 4 != len(pdu):
261            logger.warning(
262                color(
263                    f'!!! length mismatch: expected {len(pdu) - 4} but got {length}',
264                    'red',
265                )
266            )
267        if hasattr(self, 'fields'):
268            self.init_from_bytes(pdu, 4)
269        return self
270
271    @staticmethod
272    def code_name(code: int) -> str:
273        return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code)
274
275    @staticmethod
276    def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]:
277        options = []
278        while len(data) >= 2:
279            value_type = data[0]
280            length = data[1]
281            value = data[2 : 2 + length]
282            data = data[2 + length :]
283            options.append((value_type, value))
284
285        return options
286
287    @staticmethod
288    def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes:
289        return b''.join(
290            [bytes([option[0], len(option[1])]) + option[1] for option in options]
291        )
292
293    @staticmethod
294    def subclass(fields):
295        def inner(cls):
296            cls.name = cls.__name__.upper()
297            cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name)
298            if cls.code is None:
299                raise KeyError(
300                    f'Control Frame name {cls.name} '
301                    'not found in L2CAP_CONTROL_FRAME_NAMES'
302                )
303            cls.fields = fields
304
305            # Register a factory for this class
306            L2CAP_Control_Frame.classes[cls.code] = cls
307
308            return cls
309
310        return inner
311
312    def __init__(self, pdu=None, **kwargs) -> None:
313        self.identifier = kwargs.get('identifier', 0)
314        if hasattr(self, 'fields'):
315            if kwargs:
316                HCI_Object.init_from_fields(self, self.fields, kwargs)
317            if pdu is None:
318                data = HCI_Object.dict_to_bytes(kwargs, self.fields)
319                pdu = (
320                    bytes([self.code, self.identifier])
321                    + struct.pack('<H', len(data))
322                    + data
323                )
324        self.pdu = pdu
325
326    def init_from_bytes(self, pdu, offset):
327        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
328
329    def to_bytes(self) -> bytes:
330        return self.pdu
331
332    def __bytes__(self) -> bytes:
333        return self.to_bytes()
334
335    def __str__(self) -> str:
336        result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
337        if fields := getattr(self, 'fields', None):
338            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
339        else:
340            if len(self.pdu) > 1:
341                result += f': {self.pdu.hex()}'
342        return result
343
344
345# -----------------------------------------------------------------------------
346@L2CAP_Control_Frame.subclass(
347    # pylint: disable=unnecessary-lambda
348    [
349        (
350            'reason',
351            {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)},
352        ),
353        ('data', '*'),
354    ]
355)
356class L2CAP_Command_Reject(L2CAP_Control_Frame):
357    '''
358    See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT
359    '''
360
361    COMMAND_NOT_UNDERSTOOD = 0x0000
362    SIGNALING_MTU_EXCEEDED = 0x0001
363    INVALID_CID_IN_REQUEST = 0x0002
364
365    REASON_NAMES = {
366        COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD',
367        SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED',
368        INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST',
369    }
370
371    @staticmethod
372    def reason_name(reason: int) -> str:
373        return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
374
375
376# -----------------------------------------------------------------------------
377@L2CAP_Control_Frame.subclass(
378    # pylint: disable=unnecessary-lambda
379    [
380        (
381            'psm',
382            {
383                'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm(
384                    data, offset
385                ),
386                'serializer': lambda value: L2CAP_Connection_Request.serialize_psm(
387                    value
388                ),
389            },
390        ),
391        ('source_cid', 2),
392    ]
393)
394class L2CAP_Connection_Request(L2CAP_Control_Frame):
395    '''
396    See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
397    '''
398
399    psm: int
400    source_cid: int
401
402    @staticmethod
403    def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
404        psm_length = 2
405        psm = data[offset] | data[offset + 1] << 8
406
407        # The PSM field extends until the first even octet (inclusive)
408        while data[offset + psm_length - 1] % 2 == 1:
409            psm |= data[offset + psm_length] << (8 * psm_length)
410            psm_length += 1
411
412        return offset + psm_length, psm
413
414    @staticmethod
415    def serialize_psm(psm: int) -> bytes:
416        serialized = struct.pack('<H', psm & 0xFFFF)
417        psm >>= 16
418        while psm:
419            serialized += bytes([psm & 0xFF])
420            psm >>= 8
421
422        return serialized
423
424
425# -----------------------------------------------------------------------------
426@L2CAP_Control_Frame.subclass(
427    # pylint: disable=unnecessary-lambda
428    [
429        ('destination_cid', 2),
430        ('source_cid', 2),
431        (
432            'result',
433            {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)},
434        ),
435        ('status', 2),
436    ]
437)
438class L2CAP_Connection_Response(L2CAP_Control_Frame):
439    '''
440    See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
441    '''
442
443    source_cid: int
444    destination_cid: int
445    status: int
446    result: int
447
448    CONNECTION_SUCCESSFUL = 0x0000
449    CONNECTION_PENDING = 0x0001
450    CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
451    CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003
452    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
453    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006
454    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007
455    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
456
457    # pylint: disable=line-too-long
458    RESULT_NAMES = {
459        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
460        CONNECTION_PENDING: 'CONNECTION_PENDING',
461        CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED',
462        CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK',
463        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
464        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
465        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
466        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
467    }
468
469    @staticmethod
470    def result_name(result: int) -> str:
471        return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result)
472
473
474# -----------------------------------------------------------------------------
475@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')])
476class L2CAP_Configure_Request(L2CAP_Control_Frame):
477    '''
478    See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST
479    '''
480
481
482# -----------------------------------------------------------------------------
483@L2CAP_Control_Frame.subclass(
484    # pylint: disable=unnecessary-lambda
485    [
486        ('source_cid', 2),
487        ('flags', 2),
488        (
489            'result',
490            {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)},
491        ),
492        ('options', '*'),
493    ]
494)
495class L2CAP_Configure_Response(L2CAP_Control_Frame):
496    '''
497    See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE
498    '''
499
500    SUCCESS = 0x0000
501    FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001
502    FAILURE_REJECTED = 0x0002
503    FAILURE_UNKNOWN_OPTIONS = 0x0003
504    PENDING = 0x0004
505    FAILURE_FLOW_SPEC_REJECTED = 0x0005
506
507    RESULT_NAMES = {
508        SUCCESS: 'SUCCESS',
509        FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS',
510        FAILURE_REJECTED: 'FAILURE_REJECTED',
511        FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS',
512        PENDING: 'PENDING',
513        FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED',
514    }
515
516    @staticmethod
517    def result_name(result: int) -> str:
518        return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
519
520
521# -----------------------------------------------------------------------------
522@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
523class L2CAP_Disconnection_Request(L2CAP_Control_Frame):
524    '''
525    See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST
526    '''
527
528
529# -----------------------------------------------------------------------------
530@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
531class L2CAP_Disconnection_Response(L2CAP_Control_Frame):
532    '''
533    See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE
534    '''
535
536
537# -----------------------------------------------------------------------------
538@L2CAP_Control_Frame.subclass([('data', '*')])
539class L2CAP_Echo_Request(L2CAP_Control_Frame):
540    '''
541    See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST
542    '''
543
544
545# -----------------------------------------------------------------------------
546@L2CAP_Control_Frame.subclass([('data', '*')])
547class L2CAP_Echo_Response(L2CAP_Control_Frame):
548    '''
549    See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE
550    '''
551
552
553# -----------------------------------------------------------------------------
554@L2CAP_Control_Frame.subclass(
555    [
556        (
557            'info_type',
558            {
559                'size': 2,
560                # pylint: disable-next=unnecessary-lambda
561                'mapper': lambda x: L2CAP_Information_Request.info_type_name(x),
562            },
563        )
564    ]
565)
566class L2CAP_Information_Request(L2CAP_Control_Frame):
567    '''
568    See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
569    '''
570
571    CONNECTIONLESS_MTU = 0x0001
572    EXTENDED_FEATURES_SUPPORTED = 0x0002
573    FIXED_CHANNELS_SUPPORTED = 0x0003
574
575    EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
576    EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
577    EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
578    EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
579    EXTENDED_FEATURE_STREAMING_MODE = 0x0010
580    EXTENDED_FEATURE_FCS_OPTION = 0x0020
581    EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
582    EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
583    EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
584    EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
585    EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
586
587    INFO_TYPE_NAMES = {
588        CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
589        EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
590        FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED',
591    }
592
593    @staticmethod
594    def info_type_name(info_type: int) -> str:
595        return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
596
597
598# -----------------------------------------------------------------------------
599@L2CAP_Control_Frame.subclass(
600    [
601        ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}),
602        (
603            'result',
604            # pylint: disable-next=unnecessary-lambda
605            {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)},
606        ),
607        ('data', '*'),
608    ]
609)
610class L2CAP_Information_Response(L2CAP_Control_Frame):
611    '''
612    See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
613    '''
614
615    SUCCESS = 0x00
616    NOT_SUPPORTED = 0x01
617
618    RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'}
619
620    @staticmethod
621    def result_name(result: int) -> str:
622        return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
623
624
625# -----------------------------------------------------------------------------
626@L2CAP_Control_Frame.subclass(
627    [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)]
628)
629class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
630    '''
631    See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST
632    '''
633
634
635# -----------------------------------------------------------------------------
636@L2CAP_Control_Frame.subclass([('result', 2)])
637class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame):
638    '''
639    See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE
640    '''
641
642
643# -----------------------------------------------------------------------------
644@L2CAP_Control_Frame.subclass(
645    [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)]
646)
647class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
648    '''
649    See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST
650    (CODE 0x14)
651    '''
652
653    source_cid: int
654
655
656# -----------------------------------------------------------------------------
657@L2CAP_Control_Frame.subclass(
658    # pylint: disable=unnecessary-lambda,line-too-long
659    [
660        ('destination_cid', 2),
661        ('mtu', 2),
662        ('mps', 2),
663        ('initial_credits', 2),
664        (
665            'result',
666            {
667                'size': 2,
668                'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name(
669                    x
670                ),
671            },
672        ),
673    ]
674)
675class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
676    '''
677    See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE
678    (CODE 0x15)
679    '''
680
681    CONNECTION_SUCCESSFUL = 0x0000
682    CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002
683    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
684    CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
685    CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
686    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007
687    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
688    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009
689    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
690    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
691
692    # pylint: disable=line-too-long
693    RESULT_NAMES = {
694        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
695        CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
696        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
697        CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION',
698        CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION',
699        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE',
700        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION',
701        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
702        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
703        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
704    }
705
706    @staticmethod
707    def result_name(result: int) -> str:
708        return name_or_number(
709            L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result
710        )
711
712
713# -----------------------------------------------------------------------------
714@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)])
715class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
716    '''
717    See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16)
718    '''
719
720
721# -----------------------------------------------------------------------------
722class ClassicChannel(EventEmitter):
723    class State(enum.IntEnum):
724        # States
725        CLOSED = 0x00
726        WAIT_CONNECT = 0x01
727        WAIT_CONNECT_RSP = 0x02
728        OPEN = 0x03
729        WAIT_DISCONNECT = 0x04
730        WAIT_CREATE = 0x05
731        WAIT_CREATE_RSP = 0x06
732        WAIT_MOVE = 0x07
733        WAIT_MOVE_RSP = 0x08
734        WAIT_MOVE_CONFIRM = 0x09
735        WAIT_CONFIRM_RSP = 0x0A
736
737        # CONFIG substates
738        WAIT_CONFIG = 0x10
739        WAIT_SEND_CONFIG = 0x11
740        WAIT_CONFIG_REQ_RSP = 0x12
741        WAIT_CONFIG_RSP = 0x13
742        WAIT_CONFIG_REQ = 0x14
743        WAIT_IND_FINAL_RSP = 0x15
744        WAIT_FINAL_RSP = 0x16
745        WAIT_CONTROL_IND = 0x17
746
747    connection_result: Optional[asyncio.Future[None]]
748    disconnection_result: Optional[asyncio.Future[None]]
749    response: Optional[asyncio.Future[bytes]]
750    sink: Optional[Callable[[bytes], Any]]
751    state: State
752    connection: Connection
753    mtu: int
754    peer_mtu: int
755
756    def __init__(
757        self,
758        manager: ChannelManager,
759        connection: Connection,
760        signaling_cid: int,
761        psm: int,
762        source_cid: int,
763        mtu: int,
764    ) -> None:
765        super().__init__()
766        self.manager = manager
767        self.connection = connection
768        self.signaling_cid = signaling_cid
769        self.state = self.State.CLOSED
770        self.mtu = mtu
771        self.peer_mtu = L2CAP_MIN_BR_EDR_MTU
772        self.psm = psm
773        self.source_cid = source_cid
774        self.destination_cid = 0
775        self.response = None
776        self.connection_result = None
777        self.disconnection_result = None
778        self.sink = None
779
780    def _change_state(self, new_state: State) -> None:
781        logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
782        self.state = new_state
783
784    def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
785        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
786
787    def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
788        self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
789
790    async def send_request(self, request: SupportsBytes) -> bytes:
791        # Check that there isn't already a request pending
792        if self.response:
793            raise InvalidStateError('request already pending')
794        if self.state != self.State.OPEN:
795            raise InvalidStateError('channel not open')
796
797        self.response = asyncio.get_running_loop().create_future()
798        self.send_pdu(request)
799        return await self.response
800
801    def on_pdu(self, pdu: bytes) -> None:
802        if self.response:
803            self.response.set_result(pdu)
804            self.response = None
805        elif self.sink:
806            # pylint: disable=not-callable
807            self.sink(pdu)
808        else:
809            logger.warning(
810                color('received pdu without a pending request or sink', 'red')
811            )
812
813    async def connect(self) -> None:
814        if self.state != self.State.CLOSED:
815            raise InvalidStateError('invalid state')
816
817        # Check that we can start a new connection
818        if self.connection_result:
819            raise RuntimeError('connection already pending')
820
821        self._change_state(self.State.WAIT_CONNECT_RSP)
822        self.send_control_frame(
823            L2CAP_Connection_Request(
824                identifier=self.manager.next_identifier(self.connection),
825                psm=self.psm,
826                source_cid=self.source_cid,
827            )
828        )
829
830        # Create a future to wait for the state machine to get to a success or error
831        # state
832        self.connection_result = asyncio.get_running_loop().create_future()
833
834        # Wait for the connection to succeed or fail
835        try:
836            return await self.connection.abort_on(
837                'disconnection', self.connection_result
838            )
839        finally:
840            self.connection_result = None
841
842    async def disconnect(self) -> None:
843        if self.state != self.State.OPEN:
844            raise InvalidStateError('invalid state')
845
846        self._change_state(self.State.WAIT_DISCONNECT)
847        self.send_control_frame(
848            L2CAP_Disconnection_Request(
849                identifier=self.manager.next_identifier(self.connection),
850                destination_cid=self.destination_cid,
851                source_cid=self.source_cid,
852            )
853        )
854
855        # Create a future to wait for the state machine to get to a success or error
856        # state
857        self.disconnection_result = asyncio.get_running_loop().create_future()
858        return await self.disconnection_result
859
860    def abort(self) -> None:
861        if self.state == self.State.OPEN:
862            self._change_state(self.State.CLOSED)
863            self.emit('close')
864
865    def send_configure_request(self) -> None:
866        options = L2CAP_Control_Frame.encode_configuration_options(
867            [
868                (
869                    L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
870                    struct.pack('<H', self.mtu),
871                )
872            ]
873        )
874        self.send_control_frame(
875            L2CAP_Configure_Request(
876                identifier=self.manager.next_identifier(self.connection),
877                destination_cid=self.destination_cid,
878                flags=0x0000,
879                options=options,
880            )
881        )
882
883    def on_connection_request(self, request) -> None:
884        self.destination_cid = request.source_cid
885        self._change_state(self.State.WAIT_CONNECT)
886        self.send_control_frame(
887            L2CAP_Connection_Response(
888                identifier=request.identifier,
889                destination_cid=self.source_cid,
890                source_cid=self.destination_cid,
891                result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL,
892                status=0x0000,
893            )
894        )
895        self._change_state(self.State.WAIT_CONFIG)
896        self.send_configure_request()
897        self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
898
899    def on_connection_response(self, response):
900        if self.state != self.State.WAIT_CONNECT_RSP:
901            logger.warning(color('invalid state', 'red'))
902            return
903
904        if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL:
905            self.destination_cid = response.destination_cid
906            self._change_state(self.State.WAIT_CONFIG)
907            self.send_configure_request()
908            self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
909        elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING:
910            pass
911        else:
912            self._change_state(self.State.CLOSED)
913            self.connection_result.set_exception(
914                ProtocolError(
915                    response.result,
916                    'l2cap',
917                    L2CAP_Connection_Response.result_name(response.result),
918                )
919            )
920            self.connection_result = None
921
922    def on_configure_request(self, request) -> None:
923        if self.state not in (
924            self.State.WAIT_CONFIG,
925            self.State.WAIT_CONFIG_REQ,
926            self.State.WAIT_CONFIG_REQ_RSP,
927        ):
928            logger.warning(color('invalid state', 'red'))
929            return
930
931        # Decode the options
932        options = L2CAP_Control_Frame.decode_configuration_options(request.options)
933        for option in options:
934            if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
935                self.peer_mtu = struct.unpack('<H', option[1])[0]
936                logger.debug(f'peer MTU = {self.peer_mtu}')
937
938        self.send_control_frame(
939            L2CAP_Configure_Response(
940                identifier=request.identifier,
941                source_cid=self.destination_cid,
942                flags=0x0000,
943                result=L2CAP_Configure_Response.SUCCESS,
944                options=request.options,  # TODO: don't accept everything blindly
945            )
946        )
947        if self.state == self.State.WAIT_CONFIG:
948            self._change_state(self.State.WAIT_SEND_CONFIG)
949            self.send_configure_request()
950            self._change_state(self.State.WAIT_CONFIG_RSP)
951        elif self.state == self.State.WAIT_CONFIG_REQ:
952            self._change_state(self.State.OPEN)
953            if self.connection_result:
954                self.connection_result.set_result(None)
955                self.connection_result = None
956            self.emit('open')
957        elif self.state == self.State.WAIT_CONFIG_REQ_RSP:
958            self._change_state(self.State.WAIT_CONFIG_RSP)
959
960    def on_configure_response(self, response) -> None:
961        if response.result == L2CAP_Configure_Response.SUCCESS:
962            if self.state == self.State.WAIT_CONFIG_REQ_RSP:
963                self._change_state(self.State.WAIT_CONFIG_REQ)
964            elif self.state in (
965                self.State.WAIT_CONFIG_RSP,
966                self.State.WAIT_CONTROL_IND,
967            ):
968                self._change_state(self.State.OPEN)
969                if self.connection_result:
970                    self.connection_result.set_result(None)
971                    self.connection_result = None
972                self.emit('open')
973            else:
974                logger.warning(color('invalid state', 'red'))
975        elif (
976            response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS
977        ):
978            # Re-configure with what's suggested in the response
979            self.send_control_frame(
980                L2CAP_Configure_Request(
981                    identifier=self.manager.next_identifier(self.connection),
982                    destination_cid=self.destination_cid,
983                    flags=0x0000,
984                    options=response.options,
985                )
986            )
987        else:
988            logger.warning(
989                color(
990                    '!!! configuration rejected: '
991                    f'{L2CAP_Configure_Response.result_name(response.result)}',
992                    'red',
993                )
994            )
995            # TODO: decide how to fail gracefully
996
997    def on_disconnection_request(self, request) -> None:
998        if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT):
999            self.send_control_frame(
1000                L2CAP_Disconnection_Response(
1001                    identifier=request.identifier,
1002                    destination_cid=request.destination_cid,
1003                    source_cid=request.source_cid,
1004                )
1005            )
1006            self._change_state(self.State.CLOSED)
1007            self.emit('close')
1008            self.manager.on_channel_closed(self)
1009        else:
1010            logger.warning(color('invalid state', 'red'))
1011
1012    def on_disconnection_response(self, response) -> None:
1013        if self.state != self.State.WAIT_DISCONNECT:
1014            logger.warning(color('invalid state', 'red'))
1015            return
1016
1017        if (
1018            response.destination_cid != self.destination_cid
1019            or response.source_cid != self.source_cid
1020        ):
1021            logger.warning('unexpected source or destination CID')
1022            return
1023
1024        self._change_state(self.State.CLOSED)
1025        if self.disconnection_result:
1026            self.disconnection_result.set_result(None)
1027            self.disconnection_result = None
1028        self.emit('close')
1029        self.manager.on_channel_closed(self)
1030
1031    def __str__(self) -> str:
1032        return (
1033            f'Channel({self.source_cid}->{self.destination_cid}, '
1034            f'PSM={self.psm}, '
1035            f'MTU={self.mtu}/{self.peer_mtu}, '
1036            f'state={self.state.name})'
1037        )
1038
1039
1040# -----------------------------------------------------------------------------
1041class LeCreditBasedChannel(EventEmitter):
1042    """
1043    LE Credit-based Connection Oriented Channel
1044    """
1045
1046    class State(enum.IntEnum):
1047        INIT = 0
1048        CONNECTED = 1
1049        CONNECTING = 2
1050        DISCONNECTING = 3
1051        DISCONNECTED = 4
1052        CONNECTION_ERROR = 5
1053
1054    out_queue: Deque[bytes]
1055    connection_result: Optional[asyncio.Future[LeCreditBasedChannel]]
1056    disconnection_result: Optional[asyncio.Future[None]]
1057    in_sdu: Optional[bytes]
1058    out_sdu: Optional[bytes]
1059    state: State
1060    connection: Connection
1061    sink: Optional[Callable[[bytes], Any]]
1062
1063    def __init__(
1064        self,
1065        manager: ChannelManager,
1066        connection: Connection,
1067        le_psm: int,
1068        source_cid: int,
1069        destination_cid: int,
1070        mtu: int,
1071        mps: int,
1072        credits: int,  # pylint: disable=redefined-builtin
1073        peer_mtu: int,
1074        peer_mps: int,
1075        peer_credits: int,
1076        connected: bool,
1077    ) -> None:
1078        super().__init__()
1079        self.manager = manager
1080        self.connection = connection
1081        self.le_psm = le_psm
1082        self.source_cid = source_cid
1083        self.destination_cid = destination_cid
1084        self.mtu = mtu
1085        self.mps = mps
1086        self.credits = credits
1087        self.peer_mtu = peer_mtu
1088        self.peer_mps = peer_mps
1089        self.peer_credits = peer_credits
1090        self.peer_max_credits = self.peer_credits
1091        self.peer_credits_threshold = self.peer_max_credits // 2
1092        self.in_sdu = None
1093        self.in_sdu_length = 0
1094        self.out_queue = deque()
1095        self.out_sdu = None
1096        self.sink = None
1097        self.connected = False
1098        self.connection_result = None
1099        self.disconnection_result = None
1100        self.drained = asyncio.Event()
1101
1102        self.drained.set()
1103
1104        if connected:
1105            self.state = self.State.CONNECTED
1106        else:
1107            self.state = self.State.INIT
1108
1109    def _change_state(self, new_state: State) -> None:
1110        logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
1111        self.state = new_state
1112
1113        if new_state == self.State.CONNECTED:
1114            self.emit('open')
1115        elif new_state == self.State.DISCONNECTED:
1116            self.emit('close')
1117
1118    def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
1119        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
1120
1121    def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
1122        self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
1123
1124    async def connect(self) -> LeCreditBasedChannel:
1125        # Check that we're in the right state
1126        if self.state != self.State.INIT:
1127            raise InvalidStateError('not in a connectable state')
1128
1129        # Check that we can start a new connection
1130        identifier = self.manager.next_identifier(self.connection)
1131        if identifier in self.manager.le_coc_requests:
1132            raise RuntimeError('too many concurrent connection requests')
1133
1134        self._change_state(self.State.CONNECTING)
1135        request = L2CAP_LE_Credit_Based_Connection_Request(
1136            identifier=identifier,
1137            le_psm=self.le_psm,
1138            source_cid=self.source_cid,
1139            mtu=self.mtu,
1140            mps=self.mps,
1141            initial_credits=self.peer_credits,
1142        )
1143        self.manager.le_coc_requests[identifier] = request
1144        self.send_control_frame(request)
1145
1146        # Create a future to wait for the response
1147        self.connection_result = asyncio.get_running_loop().create_future()
1148
1149        # Wait for the connection to succeed or fail
1150        return await self.connection_result
1151
1152    async def disconnect(self) -> None:
1153        # Check that we're connected
1154        if self.state != self.State.CONNECTED:
1155            raise InvalidStateError('not connected')
1156
1157        self._change_state(self.State.DISCONNECTING)
1158        self.flush_output()
1159        self.send_control_frame(
1160            L2CAP_Disconnection_Request(
1161                identifier=self.manager.next_identifier(self.connection),
1162                destination_cid=self.destination_cid,
1163                source_cid=self.source_cid,
1164            )
1165        )
1166
1167        # Create a future to wait for the state machine to get to a success or error
1168        # state
1169        self.disconnection_result = asyncio.get_running_loop().create_future()
1170        return await self.disconnection_result
1171
1172    def abort(self) -> None:
1173        if self.state == self.State.CONNECTED:
1174            self._change_state(self.State.DISCONNECTED)
1175
1176    def on_pdu(self, pdu: bytes) -> None:
1177        if self.sink is None:
1178            logger.warning('received pdu without a sink')
1179            return
1180
1181        if self.state != self.State.CONNECTED:
1182            logger.warning('received PDU while not connected, dropping')
1183
1184        # Manage the peer credits
1185        if self.peer_credits == 0:
1186            logger.warning('received LE frame when peer out of credits')
1187        else:
1188            self.peer_credits -= 1
1189            if self.peer_credits <= self.peer_credits_threshold:
1190                # The credits fell below the threshold, replenish them to the max
1191                self.send_control_frame(
1192                    L2CAP_LE_Flow_Control_Credit(
1193                        identifier=self.manager.next_identifier(self.connection),
1194                        cid=self.source_cid,
1195                        credits=self.peer_max_credits - self.peer_credits,
1196                    )
1197                )
1198                self.peer_credits = self.peer_max_credits
1199
1200        # Check if this starts a new SDU
1201        if self.in_sdu is None:
1202            # Start a new SDU
1203            self.in_sdu = pdu
1204        else:
1205            # Continue an SDU
1206            self.in_sdu += pdu
1207
1208        # Check if the SDU is complete
1209        if self.in_sdu_length == 0:
1210            # We don't know the size yet, check if we have received the header to
1211            # compute it
1212            if len(self.in_sdu) >= 2:
1213                self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0]
1214        if self.in_sdu_length == 0:
1215            # We'll compute it later
1216            return
1217        if len(self.in_sdu) < 2 + self.in_sdu_length:
1218            # Not complete yet
1219            logger.debug(
1220                f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received'
1221            )
1222            return
1223        if len(self.in_sdu) != 2 + self.in_sdu_length:
1224            # Overflow
1225            logger.warning(
1226                f'SDU overflow: sdu_length={self.in_sdu_length}, '
1227                f'received {len(self.in_sdu) - 2}'
1228            )
1229            # TODO: we should disconnect
1230            self.in_sdu = None
1231            self.in_sdu_length = 0
1232            return
1233
1234        # Send the SDU to the sink
1235        logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes')
1236        self.sink(self.in_sdu[2:])  # pylint: disable=not-callable
1237
1238        # Prepare for a new SDU
1239        self.in_sdu = None
1240        self.in_sdu_length = 0
1241
1242    def on_connection_response(self, response) -> None:
1243        # Look for a matching pending response result
1244        if self.connection_result is None:
1245            logger.warning(
1246                f'received unexpected connection response (id={response.identifier})'
1247            )
1248            return
1249
1250        if (
1251            response.result
1252            == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL
1253        ):
1254            self.destination_cid = response.destination_cid
1255            self.peer_mtu = response.mtu
1256            self.peer_mps = response.mps
1257            self.credits = response.initial_credits
1258            self.connected = True
1259            self.connection_result.set_result(self)
1260            self._change_state(self.State.CONNECTED)
1261        else:
1262            self.connection_result.set_exception(
1263                ProtocolError(
1264                    response.result,
1265                    'l2cap',
1266                    L2CAP_LE_Credit_Based_Connection_Response.result_name(
1267                        response.result
1268                    ),
1269                )
1270            )
1271            self._change_state(self.State.CONNECTION_ERROR)
1272
1273        # Cleanup
1274        self.connection_result = None
1275
1276    def on_credits(self, credits: int) -> None:  # pylint: disable=redefined-builtin
1277        self.credits += credits
1278        logger.debug(f'received {credits} credits, total = {self.credits}')
1279
1280        # Try to send more data if we have any queued up
1281        self.process_output()
1282
1283    def on_disconnection_request(self, request) -> None:
1284        self.send_control_frame(
1285            L2CAP_Disconnection_Response(
1286                identifier=request.identifier,
1287                destination_cid=request.destination_cid,
1288                source_cid=request.source_cid,
1289            )
1290        )
1291        self._change_state(self.State.DISCONNECTED)
1292        self.flush_output()
1293
1294    def on_disconnection_response(self, response) -> None:
1295        if self.state != self.State.DISCONNECTING:
1296            logger.warning(color('invalid state', 'red'))
1297            return
1298
1299        if (
1300            response.destination_cid != self.destination_cid
1301            or response.source_cid != self.source_cid
1302        ):
1303            logger.warning('unexpected source or destination CID')
1304            return
1305
1306        self._change_state(self.State.DISCONNECTED)
1307        if self.disconnection_result:
1308            self.disconnection_result.set_result(None)
1309            self.disconnection_result = None
1310
1311    def flush_output(self) -> None:
1312        self.out_queue.clear()
1313        self.out_sdu = None
1314
1315    def process_output(self) -> None:
1316        while self.credits > 0:
1317            if self.out_sdu is not None:
1318                # Finish the current SDU
1319                packet = self.out_sdu[: self.peer_mps]
1320                self.send_pdu(packet)
1321                self.credits -= 1
1322                logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left')
1323                if len(packet) == len(self.out_sdu):
1324                    # We sent everything
1325                    self.out_sdu = None
1326                else:
1327                    # Keep what's still left to send
1328                    self.out_sdu = self.out_sdu[len(packet) :]
1329                continue
1330
1331            if self.out_queue:
1332                # Create the next SDU (2 bytes header plus up to MTU bytes payload)
1333                logger.debug(
1334                    f'assembling SDU from {len(self.out_queue)} packets in output queue'
1335                )
1336                payload = b''
1337                while self.out_queue and len(payload) < self.peer_mtu:
1338                    # We can add more data to the payload
1339                    chunk = self.out_queue[0][: self.peer_mtu - len(payload)]
1340                    payload += chunk
1341                    self.out_queue[0] = self.out_queue[0][len(chunk) :]
1342                    if len(self.out_queue[0]) == 0:
1343                        # We consumed the entire buffer, remove it
1344                        self.out_queue.popleft()
1345                        logger.debug(
1346                            f'packet completed, {len(self.out_queue)} left in queue'
1347                        )
1348
1349                # Construct the SDU with its header
1350                assert len(payload) != 0
1351                logger.debug(f'SDU complete: {len(payload)} payload bytes')
1352                self.out_sdu = struct.pack('<H', len(payload)) + payload
1353            else:
1354                # Nothing left to send for now
1355                self.drained.set()
1356                return
1357
1358    def write(self, data: bytes) -> None:
1359        if self.state != self.State.CONNECTED:
1360            logger.warning('not connected, dropping data')
1361            return
1362
1363        # Queue the data
1364        self.out_queue.append(data)
1365        self.drained.clear()
1366        logger.debug(
1367            f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue'
1368        )
1369
1370        # Send what we can
1371        self.process_output()
1372
1373    async def drain(self) -> None:
1374        await self.drained.wait()
1375
1376    def pause_reading(self) -> None:
1377        # TODO: not implemented yet
1378        pass
1379
1380    def resume_reading(self) -> None:
1381        # TODO: not implemented yet
1382        pass
1383
1384    def __str__(self) -> str:
1385        return (
1386            f'CoC({self.source_cid}->{self.destination_cid}, '
1387            f'State={self.state.name}, '
1388            f'PSM={self.le_psm}, '
1389            f'MTU={self.mtu}/{self.peer_mtu}, '
1390            f'MPS={self.mps}/{self.peer_mps}, '
1391            f'credits={self.credits}/{self.peer_credits})'
1392        )
1393
1394
1395# -----------------------------------------------------------------------------
1396class ClassicChannelServer(EventEmitter):
1397    def __init__(
1398        self,
1399        manager: ChannelManager,
1400        psm: int,
1401        handler: Optional[Callable[[ClassicChannel], Any]],
1402        mtu: int,
1403    ) -> None:
1404        super().__init__()
1405        self.manager = manager
1406        self.handler = handler
1407        self.psm = psm
1408        self.mtu = mtu
1409
1410    def on_connection(self, channel: ClassicChannel) -> None:
1411        self.emit('connection', channel)
1412        if self.handler:
1413            self.handler(channel)
1414
1415    def close(self) -> None:
1416        if self.psm in self.manager.servers:
1417            del self.manager.servers[self.psm]
1418
1419
1420# -----------------------------------------------------------------------------
1421class LeCreditBasedChannelServer(EventEmitter):
1422    def __init__(
1423        self,
1424        manager: ChannelManager,
1425        psm: int,
1426        handler: Optional[Callable[[LeCreditBasedChannel], Any]],
1427        max_credits: int,
1428        mtu: int,
1429        mps: int,
1430    ) -> None:
1431        super().__init__()
1432        self.manager = manager
1433        self.handler = handler
1434        self.psm = psm
1435        self.max_credits = max_credits
1436        self.mtu = mtu
1437        self.mps = mps
1438
1439    def on_connection(self, channel: LeCreditBasedChannel) -> None:
1440        self.emit('connection', channel)
1441        if self.handler:
1442            self.handler(channel)
1443
1444    def close(self) -> None:
1445        if self.psm in self.manager.le_coc_servers:
1446            del self.manager.le_coc_servers[self.psm]
1447
1448
1449# -----------------------------------------------------------------------------
1450class ChannelManager:
1451    identifiers: Dict[int, int]
1452    channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
1453    servers: Dict[int, ClassicChannelServer]
1454    le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]]
1455    le_coc_servers: Dict[int, LeCreditBasedChannelServer]
1456    le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
1457    fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
1458    _host: Optional[Host]
1459    connection_parameters_update_response: Optional[asyncio.Future[int]]
1460
1461    def __init__(
1462        self,
1463        extended_features: Iterable[int] = (),
1464        connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU,
1465    ) -> None:
1466        self._host = None
1467        self.identifiers = {}  # Incrementing identifier values by connection
1468        self.channels = {}  # All channels, mapped by connection and source cid
1469        self.fixed_channels = {  # Fixed channel handlers, mapped by cid
1470            L2CAP_SIGNALING_CID: None,
1471            L2CAP_LE_SIGNALING_CID: None,
1472        }
1473        self.servers = {}  # Servers accepting connections, by PSM
1474        self.le_coc_channels = (
1475            {}
1476        )  # LE CoC channels, mapped by connection and destination cid
1477        self.le_coc_servers = {}  # LE CoC - Servers accepting connections, by PSM
1478        self.le_coc_requests = {}  # LE CoC connection requests, by identifier
1479        self.extended_features = extended_features
1480        self.connectionless_mtu = connectionless_mtu
1481        self.connection_parameters_update_response = None
1482
1483    @property
1484    def host(self) -> Host:
1485        assert self._host
1486        return self._host
1487
1488    @host.setter
1489    def host(self, host: Host) -> None:
1490        if self._host is not None:
1491            self._host.remove_listener('disconnection', self.on_disconnection)
1492        self._host = host
1493        if host is not None:
1494            host.on('disconnection', self.on_disconnection)
1495
1496    def find_channel(self, connection_handle: int, cid: int):
1497        if connection_channels := self.channels.get(connection_handle):
1498            return connection_channels.get(cid)
1499
1500        return None
1501
1502    def find_le_coc_channel(self, connection_handle: int, cid: int):
1503        if connection_channels := self.le_coc_channels.get(connection_handle):
1504            return connection_channels.get(cid)
1505
1506        return None
1507
1508    @staticmethod
1509    def find_free_br_edr_cid(channels: Iterable[int]) -> int:
1510        # Pick the smallest valid CID that's not already in the list
1511        # (not necessarily the most efficient algorithm, but the list of CID is
1512        # very small in practice)
1513        for cid in range(
1514            L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1
1515        ):
1516            if cid not in channels:
1517                return cid
1518
1519        raise RuntimeError('no free CID available')
1520
1521    @staticmethod
1522    def find_free_le_cid(channels: Iterable[int]) -> int:
1523        # Pick the smallest valid CID that's not already in the list
1524        # (not necessarily the most efficient algorithm, but the list of CID is
1525        # very small in practice)
1526        for cid in range(
1527            L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1
1528        ):
1529            if cid not in channels:
1530                return cid
1531
1532        raise RuntimeError('no free CID')
1533
1534    def next_identifier(self, connection: Connection) -> int:
1535        identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
1536        self.identifiers[connection.handle] = identifier
1537        return identifier
1538
1539    def register_fixed_channel(
1540        self, cid: int, handler: Callable[[int, bytes], Any]
1541    ) -> None:
1542        self.fixed_channels[cid] = handler
1543
1544    def deregister_fixed_channel(self, cid: int) -> None:
1545        if cid in self.fixed_channels:
1546            del self.fixed_channels[cid]
1547
1548    @deprecated("Please use create_classic_server")
1549    def register_server(
1550        self,
1551        psm: int,
1552        server: Callable[[ClassicChannel], Any],
1553    ) -> int:
1554        return self.create_classic_server(
1555            handler=server, spec=ClassicChannelSpec(psm=psm)
1556        ).psm
1557
1558    def create_classic_server(
1559        self,
1560        spec: ClassicChannelSpec,
1561        handler: Optional[Callable[[ClassicChannel], Any]] = None,
1562    ) -> ClassicChannelServer:
1563        if not spec.psm:
1564            # Find a free PSM
1565            for candidate in range(
1566                L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
1567            ):
1568                if (candidate >> 8) % 2 == 1:
1569                    continue
1570                if candidate in self.servers:
1571                    continue
1572                spec.psm = candidate
1573                break
1574            else:
1575                raise InvalidStateError('no free PSM')
1576        else:
1577            # Check that the PSM isn't already in use
1578            if spec.psm in self.servers:
1579                raise ValueError('PSM already in use')
1580
1581            # Check that the PSM is valid
1582            if spec.psm % 2 == 0:
1583                raise ValueError('invalid PSM (not odd)')
1584            check = spec.psm >> 8
1585            while check:
1586                if check % 2 != 0:
1587                    raise ValueError('invalid PSM')
1588                check >>= 8
1589
1590        self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu)
1591
1592        return self.servers[spec.psm]
1593
1594    @deprecated("Please use create_le_credit_based_server()")
1595    def register_le_coc_server(
1596        self,
1597        psm: int,
1598        server: Callable[[LeCreditBasedChannel], Any],
1599        max_credits: int,
1600        mtu: int,
1601        mps: int,
1602    ) -> int:
1603        return self.create_le_credit_based_server(
1604            spec=LeCreditBasedChannelSpec(
1605                psm=None if psm == 0 else psm, mtu=mtu, mps=mps, max_credits=max_credits
1606            ),
1607            handler=server,
1608        ).psm
1609
1610    def create_le_credit_based_server(
1611        self,
1612        spec: LeCreditBasedChannelSpec,
1613        handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None,
1614    ) -> LeCreditBasedChannelServer:
1615        if not spec.psm:
1616            # Find a free PSM
1617            for candidate in range(
1618                L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
1619            ):
1620                if candidate in self.le_coc_servers:
1621                    continue
1622                spec.psm = candidate
1623                break
1624            else:
1625                raise InvalidStateError('no free PSM')
1626        else:
1627            # Check that the PSM isn't already in use
1628            if spec.psm in self.le_coc_servers:
1629                raise ValueError('PSM already in use')
1630
1631        self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer(
1632            self,
1633            spec.psm,
1634            handler,
1635            max_credits=spec.max_credits,
1636            mtu=spec.mtu,
1637            mps=spec.mps,
1638        )
1639
1640        return self.le_coc_servers[spec.psm]
1641
1642    def on_disconnection(self, connection_handle: int, _reason: int) -> None:
1643        logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
1644        if connection_handle in self.channels:
1645            for _, channel in self.channels[connection_handle].items():
1646                channel.abort()
1647            del self.channels[connection_handle]
1648        if connection_handle in self.le_coc_channels:
1649            for _, channel in self.le_coc_channels[connection_handle].items():
1650                channel.abort()
1651            del self.le_coc_channels[connection_handle]
1652        if connection_handle in self.identifiers:
1653            del self.identifiers[connection_handle]
1654
1655    def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
1656        pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
1657        pdu_bytes = bytes(pdu)
1658        logger.debug(
1659            f'{color(">>> Sending L2CAP PDU", "blue")} '
1660            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1661            f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
1662        )
1663        self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
1664
1665    def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
1666        if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
1667            # Parse the L2CAP payload into a Control Frame object
1668            control_frame = L2CAP_Control_Frame.from_bytes(pdu)
1669
1670            self.on_control_frame(connection, cid, control_frame)
1671        elif cid in self.fixed_channels:
1672            handler = self.fixed_channels[cid]
1673            assert handler is not None
1674            handler(connection.handle, pdu)
1675        else:
1676            if (channel := self.find_channel(connection.handle, cid)) is None:
1677                logger.warning(
1678                    color(
1679                        f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'
1680                    )
1681                )
1682                return
1683
1684            channel.on_pdu(pdu)
1685
1686    def send_control_frame(
1687        self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
1688    ) -> None:
1689        logger.debug(
1690            f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
1691            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1692            f'{connection.peer_address}:\n{control_frame}'
1693        )
1694        self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
1695
1696    def on_control_frame(
1697        self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
1698    ) -> None:
1699        logger.debug(
1700            f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
1701            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1702            f'{connection.peer_address}:\n{control_frame}'
1703        )
1704
1705        # Find the handler method
1706        handler_name = f'on_{control_frame.name.lower()}'
1707        handler = getattr(self, handler_name, None)
1708        if handler:
1709            try:
1710                handler(connection, cid, control_frame)
1711            except Exception as error:
1712                logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
1713                self.send_control_frame(
1714                    connection,
1715                    cid,
1716                    L2CAP_Command_Reject(
1717                        identifier=control_frame.identifier,
1718                        reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1719                        data=b'',
1720                    ),
1721                )
1722                raise error
1723        else:
1724            logger.error(color('Channel Manager command not handled???', 'red'))
1725            self.send_control_frame(
1726                connection,
1727                cid,
1728                L2CAP_Command_Reject(
1729                    identifier=control_frame.identifier,
1730                    reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1731                    data=b'',
1732                ),
1733            )
1734
1735    def on_l2cap_command_reject(
1736        self, _connection: Connection, _cid: int, packet
1737    ) -> None:
1738        logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
1739
1740    def on_l2cap_connection_request(
1741        self, connection: Connection, cid: int, request
1742    ) -> None:
1743        # Check if there's a server for this PSM
1744        server = self.servers.get(request.psm)
1745        if server:
1746            # Find a free CID for this new channel
1747            connection_channels = self.channels.setdefault(connection.handle, {})
1748            source_cid = self.find_free_br_edr_cid(connection_channels)
1749            if source_cid is None:  # Should never happen!
1750                self.send_control_frame(
1751                    connection,
1752                    cid,
1753                    L2CAP_Connection_Response(
1754                        identifier=request.identifier,
1755                        destination_cid=request.source_cid,
1756                        source_cid=0,
1757                        # pylint: disable=line-too-long
1758                        result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
1759                        status=0x0000,
1760                    ),
1761                )
1762                return
1763
1764            # Create a new channel
1765            logger.debug(
1766                f'creating server channel with cid={source_cid} for psm {request.psm}'
1767            )
1768            channel = ClassicChannel(
1769                self, connection, cid, request.psm, source_cid, server.mtu
1770            )
1771            connection_channels[source_cid] = channel
1772
1773            # Notify
1774            server.on_connection(channel)
1775            channel.on_connection_request(request)
1776        else:
1777            logger.warning(
1778                f'No server for connection 0x{connection.handle:04X} '
1779                f'on PSM {request.psm}'
1780            )
1781            self.send_control_frame(
1782                connection,
1783                cid,
1784                L2CAP_Connection_Response(
1785                    identifier=request.identifier,
1786                    destination_cid=request.source_cid,
1787                    source_cid=0,
1788                    # pylint: disable=line-too-long
1789                    result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
1790                    status=0x0000,
1791                ),
1792            )
1793
1794    def on_l2cap_connection_response(
1795        self, connection: Connection, cid: int, response
1796    ) -> None:
1797        if (
1798            channel := self.find_channel(connection.handle, response.source_cid)
1799        ) is None:
1800            logger.warning(
1801                color(
1802                    f'channel {response.source_cid} not found for '
1803                    f'0x{connection.handle:04X}:{cid}',
1804                    'red',
1805                )
1806            )
1807            return
1808
1809        channel.on_connection_response(response)
1810
1811    def on_l2cap_configure_request(
1812        self, connection: Connection, cid: int, request
1813    ) -> None:
1814        if (
1815            channel := self.find_channel(connection.handle, request.destination_cid)
1816        ) is None:
1817            logger.warning(
1818                color(
1819                    f'channel {request.destination_cid} not found for '
1820                    f'0x{connection.handle:04X}:{cid}',
1821                    'red',
1822                )
1823            )
1824            return
1825
1826        channel.on_configure_request(request)
1827
1828    def on_l2cap_configure_response(
1829        self, connection: Connection, cid: int, response
1830    ) -> None:
1831        if (
1832            channel := self.find_channel(connection.handle, response.source_cid)
1833        ) is None:
1834            logger.warning(
1835                color(
1836                    f'channel {response.source_cid} not found for '
1837                    f'0x{connection.handle:04X}:{cid}',
1838                    'red',
1839                )
1840            )
1841            return
1842
1843        channel.on_configure_response(response)
1844
1845    def on_l2cap_disconnection_request(
1846        self, connection: Connection, cid: int, request
1847    ) -> None:
1848        if (
1849            channel := self.find_channel(connection.handle, request.destination_cid)
1850        ) is None:
1851            logger.warning(
1852                color(
1853                    f'channel {request.destination_cid} not found for '
1854                    f'0x{connection.handle:04X}:{cid}',
1855                    'red',
1856                )
1857            )
1858            return
1859
1860        channel.on_disconnection_request(request)
1861
1862    def on_l2cap_disconnection_response(
1863        self, connection: Connection, cid: int, response
1864    ) -> None:
1865        if (
1866            channel := self.find_channel(connection.handle, response.source_cid)
1867        ) is None:
1868            logger.warning(
1869                color(
1870                    f'channel {response.source_cid} not found for '
1871                    f'0x{connection.handle:04X}:{cid}',
1872                    'red',
1873                )
1874            )
1875            return
1876
1877        channel.on_disconnection_response(response)
1878
1879    def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None:
1880        logger.debug(f'<<< Echo request: data={request.data.hex()}')
1881        self.send_control_frame(
1882            connection,
1883            cid,
1884            L2CAP_Echo_Response(identifier=request.identifier, data=request.data),
1885        )
1886
1887    def on_l2cap_echo_response(
1888        self, _connection: Connection, _cid: int, response
1889    ) -> None:
1890        logger.debug(f'<<< Echo response: data={response.data.hex()}')
1891        # TODO notify listeners
1892
1893    def on_l2cap_information_request(
1894        self, connection: Connection, cid: int, request
1895    ) -> None:
1896        if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
1897            result = L2CAP_Information_Response.SUCCESS
1898            data = self.connectionless_mtu.to_bytes(2, 'little')
1899        elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
1900            result = L2CAP_Information_Response.SUCCESS
1901            data = sum(self.extended_features).to_bytes(4, 'little')
1902        elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
1903            result = L2CAP_Information_Response.SUCCESS
1904            data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
1905        else:
1906            result = L2CAP_Information_Response.NOT_SUPPORTED
1907
1908        self.send_control_frame(
1909            connection,
1910            cid,
1911            L2CAP_Information_Response(
1912                identifier=request.identifier,
1913                info_type=request.info_type,
1914                result=result,
1915                data=data,
1916            ),
1917        )
1918
1919    def on_l2cap_connection_parameter_update_request(
1920        self, connection: Connection, cid: int, request
1921    ):
1922        if connection.role == BT_CENTRAL_ROLE:
1923            self.send_control_frame(
1924                connection,
1925                cid,
1926                L2CAP_Connection_Parameter_Update_Response(
1927                    identifier=request.identifier,
1928                    result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT,
1929                ),
1930            )
1931            self.host.send_command_sync(
1932                HCI_LE_Connection_Update_Command(
1933                    connection_handle=connection.handle,
1934                    connection_interval_min=request.interval_min,
1935                    connection_interval_max=request.interval_max,
1936                    max_latency=request.latency,
1937                    supervision_timeout=request.timeout,
1938                    min_ce_length=0,
1939                    max_ce_length=0,
1940                )
1941            )
1942        else:
1943            self.send_control_frame(
1944                connection,
1945                cid,
1946                L2CAP_Connection_Parameter_Update_Response(
1947                    identifier=request.identifier,
1948                    result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT,
1949                ),
1950            )
1951
1952    async def update_connection_parameters(
1953        self,
1954        connection: Connection,
1955        interval_min: int,
1956        interval_max: int,
1957        latency: int,
1958        timeout: int,
1959    ) -> int:
1960        # Check that there isn't already a request pending
1961        if self.connection_parameters_update_response:
1962            raise InvalidStateError('request already pending')
1963        self.connection_parameters_update_response = (
1964            asyncio.get_running_loop().create_future()
1965        )
1966        self.send_control_frame(
1967            connection,
1968            L2CAP_LE_SIGNALING_CID,
1969            L2CAP_Connection_Parameter_Update_Request(
1970                interval_min=interval_min,
1971                interval_max=interval_max,
1972                latency=latency,
1973                timeout=timeout,
1974            ),
1975        )
1976        return await self.connection_parameters_update_response
1977
1978    def on_l2cap_connection_parameter_update_response(
1979        self, connection: Connection, cid: int, response
1980    ) -> None:
1981        if self.connection_parameters_update_response:
1982            self.connection_parameters_update_response.set_result(response.result)
1983            self.connection_parameters_update_response = None
1984        else:
1985            logger.warning(
1986                color(
1987                    'received l2cap_connection_parameter_update_response without a pending request',
1988                    'red',
1989                )
1990            )
1991
1992    def on_l2cap_le_credit_based_connection_request(
1993        self, connection: Connection, cid: int, request
1994    ) -> None:
1995        if request.le_psm in self.le_coc_servers:
1996            server = self.le_coc_servers[request.le_psm]
1997
1998            # Check that the CID isn't already used
1999            le_connection_channels = self.le_coc_channels.setdefault(
2000                connection.handle, {}
2001            )
2002            if request.source_cid in le_connection_channels:
2003                logger.warning(f'source CID {request.source_cid} already in use')
2004                self.send_control_frame(
2005                    connection,
2006                    cid,
2007                    L2CAP_LE_Credit_Based_Connection_Response(
2008                        identifier=request.identifier,
2009                        destination_cid=0,
2010                        mtu=server.mtu,
2011                        mps=server.mps,
2012                        initial_credits=0,
2013                        # pylint: disable=line-too-long
2014                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
2015                    ),
2016                )
2017                return
2018
2019            # Find a free CID for this new channel
2020            connection_channels = self.channels.setdefault(connection.handle, {})
2021            source_cid = self.find_free_le_cid(connection_channels)
2022            if source_cid is None:  # Should never happen!
2023                self.send_control_frame(
2024                    connection,
2025                    cid,
2026                    L2CAP_LE_Credit_Based_Connection_Response(
2027                        identifier=request.identifier,
2028                        destination_cid=0,
2029                        mtu=server.mtu,
2030                        mps=server.mps,
2031                        initial_credits=0,
2032                        # pylint: disable=line-too-long
2033                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
2034                    ),
2035                )
2036                return
2037
2038            # Create a new channel
2039            logger.debug(
2040                f'creating LE CoC server channel with cid={source_cid} for psm '
2041                f'{request.le_psm}'
2042            )
2043            channel = LeCreditBasedChannel(
2044                self,
2045                connection,
2046                request.le_psm,
2047                source_cid,
2048                request.source_cid,
2049                server.mtu,
2050                server.mps,
2051                request.initial_credits,
2052                request.mtu,
2053                request.mps,
2054                server.max_credits,
2055                True,
2056            )
2057            connection_channels[source_cid] = channel
2058            le_connection_channels[request.source_cid] = channel
2059
2060            # Respond
2061            self.send_control_frame(
2062                connection,
2063                cid,
2064                L2CAP_LE_Credit_Based_Connection_Response(
2065                    identifier=request.identifier,
2066                    destination_cid=source_cid,
2067                    mtu=server.mtu,
2068                    mps=server.mps,
2069                    initial_credits=server.max_credits,
2070                    # pylint: disable=line-too-long
2071                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL,
2072                ),
2073            )
2074
2075            # Notify
2076            server.on_connection(channel)
2077        else:
2078            logger.info(
2079                f'No LE server for connection 0x{connection.handle:04X} '
2080                f'on PSM {request.le_psm}'
2081            )
2082            self.send_control_frame(
2083                connection,
2084                cid,
2085                L2CAP_LE_Credit_Based_Connection_Response(
2086                    identifier=request.identifier,
2087                    destination_cid=0,
2088                    mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
2089                    mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
2090                    initial_credits=0,
2091                    # pylint: disable=line-too-long
2092                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
2093                ),
2094            )
2095
2096    def on_l2cap_le_credit_based_connection_response(
2097        self, connection: Connection, _cid: int, response
2098    ) -> None:
2099        # Find the pending request by identifier
2100        request = self.le_coc_requests.get(response.identifier)
2101        if request is None:
2102            logger.warning(color('!!! received response for unknown request', 'red'))
2103            return
2104        del self.le_coc_requests[response.identifier]
2105
2106        # Find the channel for this request
2107        channel = self.find_channel(connection.handle, request.source_cid)
2108        if channel is None:
2109            logger.warning(
2110                color(
2111                    'received connection response for an unknown channel '
2112                    f'(cid={request.source_cid})',
2113                    'red',
2114                )
2115            )
2116            return
2117
2118        # Process the response
2119        channel.on_connection_response(response)
2120
2121    def on_l2cap_le_flow_control_credit(
2122        self, connection: Connection, _cid: int, credit
2123    ) -> None:
2124        channel = self.find_le_coc_channel(connection.handle, credit.cid)
2125        if channel is None:
2126            logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
2127            return
2128
2129        channel.on_credits(credit.credits)
2130
2131    def on_channel_closed(self, channel: ClassicChannel) -> None:
2132        connection_channels = self.channels.get(channel.connection.handle)
2133        if connection_channels:
2134            if channel.source_cid in connection_channels:
2135                del connection_channels[channel.source_cid]
2136
2137    @deprecated("Please use create_le_credit_based_channel()")
2138    async def open_le_coc(
2139        self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int
2140    ) -> LeCreditBasedChannel:
2141        return await self.create_le_credit_based_channel(
2142            connection=connection,
2143            spec=LeCreditBasedChannelSpec(
2144                psm=psm, max_credits=max_credits, mtu=mtu, mps=mps
2145            ),
2146        )
2147
2148    async def create_le_credit_based_channel(
2149        self,
2150        connection: Connection,
2151        spec: LeCreditBasedChannelSpec,
2152    ) -> LeCreditBasedChannel:
2153        # Find a free CID for the new channel
2154        connection_channels = self.channels.setdefault(connection.handle, {})
2155        source_cid = self.find_free_le_cid(connection_channels)
2156        if source_cid is None:  # Should never happen!
2157            raise RuntimeError('all CIDs already in use')
2158
2159        if spec.psm is None:
2160            raise ValueError('PSM cannot be None')
2161
2162        # Create the channel
2163        logger.debug(f'creating coc channel with cid={source_cid} for psm {spec.psm}')
2164        channel = LeCreditBasedChannel(
2165            manager=self,
2166            connection=connection,
2167            le_psm=spec.psm,
2168            source_cid=source_cid,
2169            destination_cid=0,
2170            mtu=spec.mtu,
2171            mps=spec.mps,
2172            credits=0,
2173            peer_mtu=0,
2174            peer_mps=0,
2175            peer_credits=spec.max_credits,
2176            connected=False,
2177        )
2178        connection_channels[source_cid] = channel
2179
2180        # Connect
2181        try:
2182            await channel.connect()
2183        except Exception as error:
2184            logger.warning(f'connection failed: {error}')
2185            del connection_channels[source_cid]
2186            raise
2187
2188        # Remember the channel by source CID and destination CID
2189        le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
2190        le_connection_channels[channel.destination_cid] = channel
2191
2192        return channel
2193
2194    @deprecated("Please use create_classic_channel()")
2195    async def connect(self, connection: Connection, psm: int) -> ClassicChannel:
2196        return await self.create_classic_channel(
2197            connection=connection, spec=ClassicChannelSpec(psm=psm)
2198        )
2199
2200    async def create_classic_channel(
2201        self, connection: Connection, spec: ClassicChannelSpec
2202    ) -> ClassicChannel:
2203        # NOTE: this implementation hard-codes BR/EDR
2204
2205        # Find a free CID for a new channel
2206        connection_channels = self.channels.setdefault(connection.handle, {})
2207        source_cid = self.find_free_br_edr_cid(connection_channels)
2208        if source_cid is None:  # Should never happen!
2209            raise RuntimeError('all CIDs already in use')
2210
2211        if spec.psm is None:
2212            raise ValueError('PSM cannot be None')
2213
2214        # Create the channel
2215        logger.debug(
2216            f'creating client channel with cid={source_cid} for psm {spec.psm}'
2217        )
2218        channel = ClassicChannel(
2219            self,
2220            connection,
2221            L2CAP_SIGNALING_CID,
2222            spec.psm,
2223            source_cid,
2224            spec.mtu,
2225        )
2226        connection_channels[source_cid] = channel
2227
2228        # Connect
2229        try:
2230            await channel.connect()
2231        except BaseException as e:
2232            del connection_channels[source_cid]
2233            raise e
2234
2235        return channel
2236
2237
2238# -----------------------------------------------------------------------------
2239# Deprecated Classes
2240# -----------------------------------------------------------------------------
2241
2242
2243class Channel(ClassicChannel):
2244    @deprecated("Please use ClassicChannel")
2245    def __init__(self, *args, **kwargs) -> None:
2246        super().__init__(*args, **kwargs)
2247
2248
2249class LeConnectionOrientedChannel(LeCreditBasedChannel):
2250    @deprecated("Please use LeCreditBasedChannel")
2251    def __init__(self, *args, **kwargs) -> None:
2252        super().__init__(*args, **kwargs)
2253