• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import logging
21import struct
22
23from collections import deque
24from pyee import EventEmitter
25from typing import Dict, Type
26
27from .colors import color
28from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
29from .hci import (
30    HCI_LE_Connection_Update_Command,
31    HCI_Object,
32    key_with_value,
33    name_or_number,
34)
35
36# -----------------------------------------------------------------------------
37# Logging
38# -----------------------------------------------------------------------------
39logger = logging.getLogger(__name__)
40
41
42# -----------------------------------------------------------------------------
43# Constants
44# -----------------------------------------------------------------------------
45# fmt: off
46# pylint: disable=line-too-long
47
48L2CAP_SIGNALING_CID    = 0x01
49L2CAP_LE_SIGNALING_CID = 0x05
50
51L2CAP_MIN_LE_MTU     = 23
52L2CAP_MIN_BR_EDR_MTU = 48
53
54L2CAP_DEFAULT_MTU = 2048  # Default value for the MTU we are willing to accept
55
56L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024
57
58# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links
59L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040
60L2CAP_ACL_U_DYNAMIC_CID_RANGE_END   = 0xFFFF
61
62# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link
63L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040
64L2CAP_LE_U_DYNAMIC_CID_RANGE_END   = 0x007F
65
66# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage
67L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001
68L2CAP_PSM_DYNAMIC_RANGE_END   = 0xFFFF
69
70# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges
71L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080
72L2CAP_LE_PSM_DYNAMIC_RANGE_END   = 0x00FF
73
74# Frame types
75L2CAP_COMMAND_REJECT                       = 0x01
76L2CAP_CONNECTION_REQUEST                   = 0x02
77L2CAP_CONNECTION_RESPONSE                  = 0x03
78L2CAP_CONFIGURE_REQUEST                    = 0x04
79L2CAP_CONFIGURE_RESPONSE                   = 0x05
80L2CAP_DISCONNECTION_REQUEST                = 0x06
81L2CAP_DISCONNECTION_RESPONSE               = 0x07
82L2CAP_ECHO_REQUEST                         = 0x08
83L2CAP_ECHO_RESPONSE                        = 0x09
84L2CAP_INFORMATION_REQUEST                  = 0x0A
85L2CAP_INFORMATION_RESPONSE                 = 0x0B
86L2CAP_CREATE_CHANNEL_REQUEST               = 0x0C
87L2CAP_CREATE_CHANNEL_RESPONSE              = 0x0D
88L2CAP_MOVE_CHANNEL_REQUEST                 = 0x0E
89L2CAP_MOVE_CHANNEL_RESPONSE                = 0x0F
90L2CAP_MOVE_CHANNEL_CONFIRMATION            = 0x10
91L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE   = 0x11
92L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST  = 0x12
93L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13
94L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST   = 0x14
95L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE  = 0x15
96L2CAP_LE_FLOW_CONTROL_CREDIT               = 0x16
97
98L2CAP_CONTROL_FRAME_NAMES = {
99    L2CAP_COMMAND_REJECT:                       'L2CAP_COMMAND_REJECT',
100    L2CAP_CONNECTION_REQUEST:                   'L2CAP_CONNECTION_REQUEST',
101    L2CAP_CONNECTION_RESPONSE:                  'L2CAP_CONNECTION_RESPONSE',
102    L2CAP_CONFIGURE_REQUEST:                    'L2CAP_CONFIGURE_REQUEST',
103    L2CAP_CONFIGURE_RESPONSE:                   'L2CAP_CONFIGURE_RESPONSE',
104    L2CAP_DISCONNECTION_REQUEST:                'L2CAP_DISCONNECTION_REQUEST',
105    L2CAP_DISCONNECTION_RESPONSE:               'L2CAP_DISCONNECTION_RESPONSE',
106    L2CAP_ECHO_REQUEST:                         'L2CAP_ECHO_REQUEST',
107    L2CAP_ECHO_RESPONSE:                        'L2CAP_ECHO_RESPONSE',
108    L2CAP_INFORMATION_REQUEST:                  'L2CAP_INFORMATION_REQUEST',
109    L2CAP_INFORMATION_RESPONSE:                 'L2CAP_INFORMATION_RESPONSE',
110    L2CAP_CREATE_CHANNEL_REQUEST:               'L2CAP_CREATE_CHANNEL_REQUEST',
111    L2CAP_CREATE_CHANNEL_RESPONSE:              'L2CAP_CREATE_CHANNEL_RESPONSE',
112    L2CAP_MOVE_CHANNEL_REQUEST:                 'L2CAP_MOVE_CHANNEL_REQUEST',
113    L2CAP_MOVE_CHANNEL_RESPONSE:                'L2CAP_MOVE_CHANNEL_RESPONSE',
114    L2CAP_MOVE_CHANNEL_CONFIRMATION:            'L2CAP_MOVE_CHANNEL_CONFIRMATION',
115    L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE:   'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE',
116    L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST:  'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST',
117    L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE',
118    L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST:   'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST',
119    L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE:  'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE',
120    L2CAP_LE_FLOW_CONTROL_CREDIT:               'L2CAP_LE_FLOW_CONTROL_CREDIT'
121}
122
123L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
124L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
125
126L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000
127L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001
128L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
129
130L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS             = 65535
131L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU                 = 23
132L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS                 = 23
133L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS                 = 65533
134L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU             = 2046
135L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS             = 2048
136L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
137
138L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
139
140L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
141
142# fmt: on
143# pylint: enable=line-too-long
144
145
146# -----------------------------------------------------------------------------
147# Classes
148# -----------------------------------------------------------------------------
149# pylint: disable=invalid-name
150
151
152class L2CAP_PDU:
153    '''
154    See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT
155    '''
156
157    @staticmethod
158    def from_bytes(data):
159        # Sanity check
160        if len(data) < 4:
161            raise ValueError('not enough data for L2CAP header')
162
163        _, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0)
164        l2cap_pdu_payload = data[4:]
165
166        return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
167
168    def to_bytes(self):
169        header = struct.pack('<HH', len(self.payload), self.cid)
170        return header + self.payload
171
172    def __init__(self, cid, payload):
173        self.cid = cid
174        self.payload = payload
175
176    def __bytes__(self):
177        return self.to_bytes()
178
179    def __str__(self):
180        return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
181
182
183# -----------------------------------------------------------------------------
184class L2CAP_Control_Frame:
185    '''
186    See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
187    '''
188
189    classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
190    code = 0
191    name = None
192
193    @staticmethod
194    def from_bytes(pdu):
195        code = pdu[0]
196
197        cls = L2CAP_Control_Frame.classes.get(code)
198        if cls is None:
199            instance = L2CAP_Control_Frame(pdu)
200            instance.name = L2CAP_Control_Frame.code_name(code)
201            instance.code = code
202            return instance
203        self = cls.__new__(cls)
204        L2CAP_Control_Frame.__init__(self, pdu)
205        self.identifier = pdu[1]
206        length = struct.unpack_from('<H', pdu, 2)[0]
207        if length + 4 != len(pdu):
208            logger.warning(
209                color(
210                    f'!!! length mismatch: expected {len(pdu) - 4} but got {length}',
211                    'red',
212                )
213            )
214        if hasattr(self, 'fields'):
215            self.init_from_bytes(pdu, 4)
216        return self
217
218    @staticmethod
219    def code_name(code):
220        return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code)
221
222    @staticmethod
223    def decode_configuration_options(data):
224        options = []
225        while len(data) >= 2:
226            value_type = data[0]
227            length = data[1]
228            value = data[2 : 2 + length]
229            data = data[2 + length :]
230            options.append((value_type, value))
231
232        return options
233
234    @staticmethod
235    def encode_configuration_options(options):
236        return b''.join(
237            [bytes([option[0], len(option[1])]) + option[1] for option in options]
238        )
239
240    @staticmethod
241    def subclass(fields):
242        def inner(cls):
243            cls.name = cls.__name__.upper()
244            cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name)
245            if cls.code is None:
246                raise KeyError(
247                    f'Control Frame name {cls.name} '
248                    'not found in L2CAP_CONTROL_FRAME_NAMES'
249                )
250            cls.fields = fields
251
252            # Register a factory for this class
253            L2CAP_Control_Frame.classes[cls.code] = cls
254
255            return cls
256
257        return inner
258
259    def __init__(self, pdu=None, **kwargs):
260        self.identifier = kwargs.get('identifier', 0)
261        if hasattr(self, 'fields') and kwargs:
262            HCI_Object.init_from_fields(self, self.fields, kwargs)
263        if pdu is None:
264            data = HCI_Object.dict_to_bytes(kwargs, self.fields)
265            pdu = (
266                bytes([self.code, self.identifier])
267                + struct.pack('<H', len(data))
268                + data
269            )
270        self.pdu = pdu
271
272    def init_from_bytes(self, pdu, offset):
273        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
274
275    def to_bytes(self):
276        return self.pdu
277
278    def __bytes__(self):
279        return self.to_bytes()
280
281    def __str__(self):
282        result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
283        if fields := getattr(self, 'fields', None):
284            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
285        else:
286            if len(self.pdu) > 1:
287                result += f': {self.pdu.hex()}'
288        return result
289
290
291# -----------------------------------------------------------------------------
292@L2CAP_Control_Frame.subclass(
293    # pylint: disable=unnecessary-lambda
294    [
295        (
296            'reason',
297            {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)},
298        ),
299        ('data', '*'),
300    ]
301)
302class L2CAP_Command_Reject(L2CAP_Control_Frame):
303    '''
304    See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT
305    '''
306
307    COMMAND_NOT_UNDERSTOOD = 0x0000
308    SIGNALING_MTU_EXCEEDED = 0x0001
309    INVALID_CID_IN_REQUEST = 0x0002
310
311    REASON_NAMES = {
312        COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD',
313        SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED',
314        INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST',
315    }
316
317    @staticmethod
318    def reason_name(reason):
319        return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
320
321
322# -----------------------------------------------------------------------------
323@L2CAP_Control_Frame.subclass(
324    # pylint: disable=unnecessary-lambda
325    [
326        (
327            'psm',
328            {
329                'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm(
330                    data, offset
331                ),
332                'serializer': lambda value: L2CAP_Connection_Request.serialize_psm(
333                    value
334                ),
335            },
336        ),
337        ('source_cid', 2),
338    ]
339)
340class L2CAP_Connection_Request(L2CAP_Control_Frame):
341    '''
342    See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
343    '''
344
345    @staticmethod
346    def parse_psm(data, offset=0):
347        psm_length = 2
348        psm = data[offset] | data[offset + 1] << 8
349
350        # The PSM field extends until the first even octet (inclusive)
351        while data[offset + psm_length - 1] % 2 == 1:
352            psm |= data[offset + psm_length] << (8 * psm_length)
353            psm_length += 1
354
355        return offset + psm_length, psm
356
357    @staticmethod
358    def serialize_psm(psm):
359        serialized = struct.pack('<H', psm & 0xFFFF)
360        psm >>= 16
361        while psm:
362            serialized += bytes([psm & 0xFF])
363            psm >>= 8
364
365        return serialized
366
367
368# -----------------------------------------------------------------------------
369@L2CAP_Control_Frame.subclass(
370    # pylint: disable=unnecessary-lambda
371    [
372        ('destination_cid', 2),
373        ('source_cid', 2),
374        (
375            'result',
376            {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)},
377        ),
378        ('status', 2),
379    ]
380)
381class L2CAP_Connection_Response(L2CAP_Control_Frame):
382    '''
383    See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
384    '''
385
386    CONNECTION_SUCCESSFUL = 0x0000
387    CONNECTION_PENDING = 0x0001
388    CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
389    CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003
390    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
391    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006
392    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007
393    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
394
395    # pylint: disable=line-too-long
396    RESULT_NAMES = {
397        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
398        CONNECTION_PENDING: 'CONNECTION_PENDING',
399        CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED',
400        CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK',
401        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
402        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
403        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
404        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
405    }
406
407    @staticmethod
408    def result_name(result):
409        return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result)
410
411
412# -----------------------------------------------------------------------------
413@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')])
414class L2CAP_Configure_Request(L2CAP_Control_Frame):
415    '''
416    See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST
417    '''
418
419
420# -----------------------------------------------------------------------------
421@L2CAP_Control_Frame.subclass(
422    # pylint: disable=unnecessary-lambda
423    [
424        ('source_cid', 2),
425        ('flags', 2),
426        (
427            'result',
428            {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)},
429        ),
430        ('options', '*'),
431    ]
432)
433class L2CAP_Configure_Response(L2CAP_Control_Frame):
434    '''
435    See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE
436    '''
437
438    SUCCESS = 0x0000
439    FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001
440    FAILURE_REJECTED = 0x0002
441    FAILURE_UNKNOWN_OPTIONS = 0x0003
442    PENDING = 0x0004
443    FAILURE_FLOW_SPEC_REJECTED = 0x0005
444
445    RESULT_NAMES = {
446        SUCCESS: 'SUCCESS',
447        FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS',
448        FAILURE_REJECTED: 'FAILURE_REJECTED',
449        FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS',
450        PENDING: 'PENDING',
451        FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED',
452    }
453
454    @staticmethod
455    def result_name(result):
456        return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
457
458
459# -----------------------------------------------------------------------------
460@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
461class L2CAP_Disconnection_Request(L2CAP_Control_Frame):
462    '''
463    See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST
464    '''
465
466
467# -----------------------------------------------------------------------------
468@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
469class L2CAP_Disconnection_Response(L2CAP_Control_Frame):
470    '''
471    See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE
472    '''
473
474
475# -----------------------------------------------------------------------------
476@L2CAP_Control_Frame.subclass([('data', '*')])
477class L2CAP_Echo_Request(L2CAP_Control_Frame):
478    '''
479    See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST
480    '''
481
482
483# -----------------------------------------------------------------------------
484@L2CAP_Control_Frame.subclass([('data', '*')])
485class L2CAP_Echo_Response(L2CAP_Control_Frame):
486    '''
487    See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE
488    '''
489
490
491# -----------------------------------------------------------------------------
492@L2CAP_Control_Frame.subclass(
493    [
494        (
495            'info_type',
496            {
497                'size': 2,
498                # pylint: disable-next=unnecessary-lambda
499                'mapper': lambda x: L2CAP_Information_Request.info_type_name(x),
500            },
501        )
502    ]
503)
504class L2CAP_Information_Request(L2CAP_Control_Frame):
505    '''
506    See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
507    '''
508
509    CONNECTIONLESS_MTU = 0x0001
510    EXTENDED_FEATURES_SUPPORTED = 0x0002
511    FIXED_CHANNELS_SUPPORTED = 0x0003
512
513    EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
514    EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
515    EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
516    EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
517    EXTENDED_FEATURE_STREAMING_MODE = 0x0010
518    EXTENDED_FEATURE_FCS_OPTION = 0x0020
519    EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
520    EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
521    EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
522    EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
523    EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
524
525    INFO_TYPE_NAMES = {
526        CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
527        EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
528        FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED',
529    }
530
531    @staticmethod
532    def info_type_name(info_type):
533        return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
534
535
536# -----------------------------------------------------------------------------
537@L2CAP_Control_Frame.subclass(
538    [
539        ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}),
540        (
541            'result',
542            # pylint: disable-next=unnecessary-lambda
543            {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)},
544        ),
545        ('data', '*'),
546    ]
547)
548class L2CAP_Information_Response(L2CAP_Control_Frame):
549    '''
550    See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
551    '''
552
553    SUCCESS = 0x00
554    NOT_SUPPORTED = 0x01
555
556    RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'}
557
558    @staticmethod
559    def result_name(result):
560        return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
561
562
563# -----------------------------------------------------------------------------
564@L2CAP_Control_Frame.subclass(
565    [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)]
566)
567class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
568    '''
569    See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST
570    '''
571
572
573# -----------------------------------------------------------------------------
574@L2CAP_Control_Frame.subclass([('result', 2)])
575class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame):
576    '''
577    See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE
578    '''
579
580
581# -----------------------------------------------------------------------------
582@L2CAP_Control_Frame.subclass(
583    [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)]
584)
585class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
586    '''
587    See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST
588    (CODE 0x14)
589    '''
590
591
592# -----------------------------------------------------------------------------
593@L2CAP_Control_Frame.subclass(
594    # pylint: disable=unnecessary-lambda,line-too-long
595    [
596        ('destination_cid', 2),
597        ('mtu', 2),
598        ('mps', 2),
599        ('initial_credits', 2),
600        (
601            'result',
602            {
603                'size': 2,
604                'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name(
605                    x
606                ),
607            },
608        ),
609    ]
610)
611class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
612    '''
613    See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE
614    (CODE 0x15)
615    '''
616
617    CONNECTION_SUCCESSFUL = 0x0000
618    CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002
619    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
620    CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
621    CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
622    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007
623    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
624    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009
625    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
626    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
627
628    # pylint: disable=line-too-long
629    RESULT_NAMES = {
630        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
631        CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
632        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
633        CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION',
634        CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION',
635        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE',
636        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION',
637        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
638        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
639        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
640    }
641
642    @staticmethod
643    def result_name(result):
644        return name_or_number(
645            L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result
646        )
647
648
649# -----------------------------------------------------------------------------
650@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)])
651class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
652    '''
653    See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16)
654    '''
655
656
657# -----------------------------------------------------------------------------
658class Channel(EventEmitter):
659    # States
660    CLOSED = 0x00
661    WAIT_CONNECT = 0x01
662    WAIT_CONNECT_RSP = 0x02
663    OPEN = 0x03
664    WAIT_DISCONNECT = 0x04
665    WAIT_CREATE = 0x05
666    WAIT_CREATE_RSP = 0x06
667    WAIT_MOVE = 0x07
668    WAIT_MOVE_RSP = 0x08
669    WAIT_MOVE_CONFIRM = 0x09
670    WAIT_CONFIRM_RSP = 0x0A
671
672    # CONFIG substates
673    WAIT_CONFIG = 0x10
674    WAIT_SEND_CONFIG = 0x11
675    WAIT_CONFIG_REQ_RSP = 0x12
676    WAIT_CONFIG_RSP = 0x13
677    WAIT_CONFIG_REQ = 0x14
678    WAIT_IND_FINAL_RSP = 0x15
679    WAIT_FINAL_RSP = 0x16
680    WAIT_CONTROL_IND = 0x17
681
682    STATE_NAMES = {
683        CLOSED: 'CLOSED',
684        WAIT_CONNECT: 'WAIT_CONNECT',
685        WAIT_CONNECT_RSP: 'WAIT_CONNECT_RSP',
686        OPEN: 'OPEN',
687        WAIT_DISCONNECT: 'WAIT_DISCONNECT',
688        WAIT_CREATE: 'WAIT_CREATE',
689        WAIT_CREATE_RSP: 'WAIT_CREATE_RSP',
690        WAIT_MOVE: 'WAIT_MOVE',
691        WAIT_MOVE_RSP: 'WAIT_MOVE_RSP',
692        WAIT_MOVE_CONFIRM: 'WAIT_MOVE_CONFIRM',
693        WAIT_CONFIRM_RSP: 'WAIT_CONFIRM_RSP',
694        WAIT_CONFIG: 'WAIT_CONFIG',
695        WAIT_SEND_CONFIG: 'WAIT_SEND_CONFIG',
696        WAIT_CONFIG_REQ_RSP: 'WAIT_CONFIG_REQ_RSP',
697        WAIT_CONFIG_RSP: 'WAIT_CONFIG_RSP',
698        WAIT_CONFIG_REQ: 'WAIT_CONFIG_REQ',
699        WAIT_IND_FINAL_RSP: 'WAIT_IND_FINAL_RSP',
700        WAIT_FINAL_RSP: 'WAIT_FINAL_RSP',
701        WAIT_CONTROL_IND: 'WAIT_CONTROL_IND',
702    }
703
704    def __init__(self, manager, connection, signaling_cid, psm, source_cid, mtu):
705        super().__init__()
706        self.manager = manager
707        self.connection = connection
708        self.signaling_cid = signaling_cid
709        self.state = Channel.CLOSED
710        self.mtu = mtu
711        self.psm = psm
712        self.source_cid = source_cid
713        self.destination_cid = 0
714        self.response = None
715        self.connection_result = None
716        self.disconnection_result = None
717        self.sink = None
718
719    def change_state(self, new_state):
720        logger.debug(
721            f'{self} state change -> {color(Channel.STATE_NAMES[new_state], "cyan")}'
722        )
723        self.state = new_state
724
725    def send_pdu(self, pdu):
726        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
727
728    def send_control_frame(self, frame):
729        self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
730
731    async def send_request(self, request):
732        # Check that there isn't already a request pending
733        if self.response:
734            raise InvalidStateError('request already pending')
735        if self.state != Channel.OPEN:
736            raise InvalidStateError('channel not open')
737
738        self.response = asyncio.get_running_loop().create_future()
739        self.send_pdu(request)
740        return await self.response
741
742    def on_pdu(self, pdu):
743        if self.response:
744            self.response.set_result(pdu)
745            self.response = None
746        elif self.sink:
747            # pylint: disable=not-callable
748            self.sink(pdu)
749        else:
750            logger.warning(
751                color('received pdu without a pending request or sink', 'red')
752            )
753
754    async def connect(self):
755        if self.state != Channel.CLOSED:
756            raise InvalidStateError('invalid state')
757
758        # Check that we can start a new connection
759        if self.connection_result:
760            raise RuntimeError('connection already pending')
761
762        self.change_state(Channel.WAIT_CONNECT_RSP)
763        self.send_control_frame(
764            L2CAP_Connection_Request(
765                identifier=self.manager.next_identifier(self.connection),
766                psm=self.psm,
767                source_cid=self.source_cid,
768            )
769        )
770
771        # Create a future to wait for the state machine to get to a success or error
772        # state
773        self.connection_result = asyncio.get_running_loop().create_future()
774
775        # Wait for the connection to succeed or fail
776        try:
777            return await self.connection_result
778        finally:
779            self.connection_result = None
780
781    async def disconnect(self):
782        if self.state != Channel.OPEN:
783            raise InvalidStateError('invalid state')
784
785        self.change_state(Channel.WAIT_DISCONNECT)
786        self.send_control_frame(
787            L2CAP_Disconnection_Request(
788                identifier=self.manager.next_identifier(self.connection),
789                destination_cid=self.destination_cid,
790                source_cid=self.source_cid,
791            )
792        )
793
794        # Create a future to wait for the state machine to get to a success or error
795        # state
796        self.disconnection_result = asyncio.get_running_loop().create_future()
797        return await self.disconnection_result
798
799    def abort(self):
800        if self.state == self.OPEN:
801            self.change_state(self.CLOSED)
802            self.emit('close')
803
804    def send_configure_request(self):
805        options = L2CAP_Control_Frame.encode_configuration_options(
806            [
807                (
808                    L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
809                    struct.pack('<H', L2CAP_DEFAULT_MTU),
810                )
811            ]
812        )
813        self.send_control_frame(
814            L2CAP_Configure_Request(
815                identifier=self.manager.next_identifier(self.connection),
816                destination_cid=self.destination_cid,
817                flags=0x0000,
818                options=options,
819            )
820        )
821
822    def on_connection_request(self, request):
823        self.destination_cid = request.source_cid
824        self.change_state(Channel.WAIT_CONNECT)
825        self.send_control_frame(
826            L2CAP_Connection_Response(
827                identifier=request.identifier,
828                destination_cid=self.source_cid,
829                source_cid=self.destination_cid,
830                result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL,
831                status=0x0000,
832            )
833        )
834        self.change_state(Channel.WAIT_CONFIG)
835        self.send_configure_request()
836        self.change_state(Channel.WAIT_CONFIG_REQ_RSP)
837
838    def on_connection_response(self, response):
839        if self.state != Channel.WAIT_CONNECT_RSP:
840            logger.warning(color('invalid state', 'red'))
841            return
842
843        if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL:
844            self.destination_cid = response.destination_cid
845            self.change_state(Channel.WAIT_CONFIG)
846            self.send_configure_request()
847            self.change_state(Channel.WAIT_CONFIG_REQ_RSP)
848        elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING:
849            pass
850        else:
851            self.change_state(Channel.CLOSED)
852            self.connection_result.set_exception(
853                ProtocolError(
854                    response.result,
855                    'l2cap',
856                    L2CAP_Connection_Response.result_name(response.result),
857                )
858            )
859            self.connection_result = None
860
861    def on_configure_request(self, request):
862        if self.state not in (
863            Channel.WAIT_CONFIG,
864            Channel.WAIT_CONFIG_REQ,
865            Channel.WAIT_CONFIG_REQ_RSP,
866        ):
867            logger.warning(color('invalid state', 'red'))
868            return
869
870        # Decode the options
871        options = L2CAP_Control_Frame.decode_configuration_options(request.options)
872        for option in options:
873            if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
874                self.mtu = struct.unpack('<H', option[1])[0]
875                logger.debug(f'MTU = {self.mtu}')
876
877        self.send_control_frame(
878            L2CAP_Configure_Response(
879                identifier=request.identifier,
880                source_cid=self.destination_cid,
881                flags=0x0000,
882                result=L2CAP_Configure_Response.SUCCESS,
883                options=request.options,  # TODO: don't accept everything blindly
884            )
885        )
886        if self.state == Channel.WAIT_CONFIG:
887            self.change_state(Channel.WAIT_SEND_CONFIG)
888            self.send_configure_request()
889            self.change_state(Channel.WAIT_CONFIG_RSP)
890        elif self.state == Channel.WAIT_CONFIG_REQ:
891            self.change_state(Channel.OPEN)
892            if self.connection_result:
893                self.connection_result.set_result(None)
894                self.connection_result = None
895            self.emit('open')
896        elif self.state == Channel.WAIT_CONFIG_REQ_RSP:
897            self.change_state(Channel.WAIT_CONFIG_RSP)
898
899    def on_configure_response(self, response):
900        if response.result == L2CAP_Configure_Response.SUCCESS:
901            if self.state == Channel.WAIT_CONFIG_REQ_RSP:
902                self.change_state(Channel.WAIT_CONFIG_REQ)
903            elif self.state in (Channel.WAIT_CONFIG_RSP, Channel.WAIT_CONTROL_IND):
904                self.change_state(Channel.OPEN)
905                if self.connection_result:
906                    self.connection_result.set_result(None)
907                    self.connection_result = None
908                self.emit('open')
909            else:
910                logger.warning(color('invalid state', 'red'))
911        elif (
912            response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS
913        ):
914            # Re-configure with what's suggested in the response
915            self.send_control_frame(
916                L2CAP_Configure_Request(
917                    identifier=self.manager.next_identifier(self.connection),
918                    destination_cid=self.destination_cid,
919                    flags=0x0000,
920                    options=response.options,
921                )
922            )
923        else:
924            logger.warning(
925                color(
926                    '!!! configuration rejected: '
927                    f'{L2CAP_Configure_Response.result_name(response.result)}',
928                    'red',
929                )
930            )
931            # TODO: decide how to fail gracefully
932
933    def on_disconnection_request(self, request):
934        if self.state in (Channel.OPEN, Channel.WAIT_DISCONNECT):
935            self.send_control_frame(
936                L2CAP_Disconnection_Response(
937                    identifier=request.identifier,
938                    destination_cid=request.destination_cid,
939                    source_cid=request.source_cid,
940                )
941            )
942            self.change_state(Channel.CLOSED)
943            self.emit('close')
944            self.manager.on_channel_closed(self)
945        else:
946            logger.warning(color('invalid state', 'red'))
947
948    def on_disconnection_response(self, response):
949        if self.state != Channel.WAIT_DISCONNECT:
950            logger.warning(color('invalid state', 'red'))
951            return
952
953        if (
954            response.destination_cid != self.destination_cid
955            or response.source_cid != self.source_cid
956        ):
957            logger.warning('unexpected source or destination CID')
958            return
959
960        self.change_state(Channel.CLOSED)
961        if self.disconnection_result:
962            self.disconnection_result.set_result(None)
963            self.disconnection_result = None
964        self.emit('close')
965        self.manager.on_channel_closed(self)
966
967    def __str__(self):
968        return (
969            f'Channel({self.source_cid}->{self.destination_cid}, '
970            f'PSM={self.psm}, '
971            f'MTU={self.mtu}, '
972            f'state={Channel.STATE_NAMES[self.state]})'
973        )
974
975
976# -----------------------------------------------------------------------------
977class LeConnectionOrientedChannel(EventEmitter):
978    """
979    LE Credit-based Connection Oriented Channel
980    """
981
982    INIT = 0
983    CONNECTED = 1
984    CONNECTING = 2
985    DISCONNECTING = 3
986    DISCONNECTED = 4
987    CONNECTION_ERROR = 5
988
989    STATE_NAMES = {
990        INIT: 'INIT',
991        CONNECTED: 'CONNECTED',
992        CONNECTING: 'CONNECTING',
993        DISCONNECTING: 'DISCONNECTING',
994        DISCONNECTED: 'DISCONNECTED',
995        CONNECTION_ERROR: 'CONNECTION_ERROR',
996    }
997
998    @staticmethod
999    def state_name(state):
1000        return name_or_number(LeConnectionOrientedChannel.STATE_NAMES, state)
1001
1002    def __init__(
1003        self,
1004        manager,
1005        connection,
1006        le_psm,
1007        source_cid,
1008        destination_cid,
1009        mtu,
1010        mps,
1011        credits,  # pylint: disable=redefined-builtin
1012        peer_mtu,
1013        peer_mps,
1014        peer_credits,
1015        connected,
1016    ):
1017        super().__init__()
1018        self.manager = manager
1019        self.connection = connection
1020        self.le_psm = le_psm
1021        self.source_cid = source_cid
1022        self.destination_cid = destination_cid
1023        self.mtu = mtu
1024        self.mps = mps
1025        self.credits = credits
1026        self.peer_mtu = peer_mtu
1027        self.peer_mps = peer_mps
1028        self.peer_credits = peer_credits
1029        self.peer_max_credits = self.peer_credits
1030        self.peer_credits_threshold = self.peer_max_credits // 2
1031        self.in_sdu = None
1032        self.in_sdu_length = 0
1033        self.out_queue = deque()
1034        self.out_sdu = None
1035        self.sink = None
1036        self.connected = False
1037        self.connection_result = None
1038        self.disconnection_result = None
1039        self.drained = asyncio.Event()
1040
1041        self.drained.set()
1042
1043        if connected:
1044            self.state = LeConnectionOrientedChannel.CONNECTED
1045        else:
1046            self.state = LeConnectionOrientedChannel.INIT
1047
1048    def change_state(self, new_state):
1049        logger.debug(
1050            f'{self} state change -> {color(self.state_name(new_state), "cyan")}'
1051        )
1052        self.state = new_state
1053
1054        if new_state == self.CONNECTED:
1055            self.emit('open')
1056        elif new_state == self.DISCONNECTED:
1057            self.emit('close')
1058
1059    def send_pdu(self, pdu):
1060        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
1061
1062    def send_control_frame(self, frame):
1063        self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
1064
1065    async def connect(self):
1066        # Check that we're in the right state
1067        if self.state != self.INIT:
1068            raise InvalidStateError('not in a connectable state')
1069
1070        # Check that we can start a new connection
1071        identifier = self.manager.next_identifier(self.connection)
1072        if identifier in self.manager.le_coc_requests:
1073            raise RuntimeError('too many concurrent connection requests')
1074
1075        self.change_state(self.CONNECTING)
1076        request = L2CAP_LE_Credit_Based_Connection_Request(
1077            identifier=identifier,
1078            le_psm=self.le_psm,
1079            source_cid=self.source_cid,
1080            mtu=self.mtu,
1081            mps=self.mps,
1082            initial_credits=self.peer_credits,
1083        )
1084        self.manager.le_coc_requests[identifier] = request
1085        self.send_control_frame(request)
1086
1087        # Create a future to wait for the response
1088        self.connection_result = asyncio.get_running_loop().create_future()
1089
1090        # Wait for the connection to succeed or fail
1091        return await self.connection_result
1092
1093    async def disconnect(self):
1094        # Check that we're connected
1095        if self.state != self.CONNECTED:
1096            raise InvalidStateError('not connected')
1097
1098        self.change_state(self.DISCONNECTING)
1099        self.flush_output()
1100        self.send_control_frame(
1101            L2CAP_Disconnection_Request(
1102                identifier=self.manager.next_identifier(self.connection),
1103                destination_cid=self.destination_cid,
1104                source_cid=self.source_cid,
1105            )
1106        )
1107
1108        # Create a future to wait for the state machine to get to a success or error
1109        # state
1110        self.disconnection_result = asyncio.get_running_loop().create_future()
1111        return await self.disconnection_result
1112
1113    def abort(self):
1114        if self.state == self.CONNECTED:
1115            self.change_state(self.DISCONNECTED)
1116
1117    def on_pdu(self, pdu):
1118        if self.sink is None:
1119            logger.warning('received pdu without a sink')
1120            return
1121
1122        if self.state != self.CONNECTED:
1123            logger.warning('received PDU while not connected, dropping')
1124
1125        # Manage the peer credits
1126        if self.peer_credits == 0:
1127            logger.warning('received LE frame when peer out of credits')
1128        else:
1129            self.peer_credits -= 1
1130            if self.peer_credits <= self.peer_credits_threshold:
1131                # The credits fell below the threshold, replenish them to the max
1132                self.send_control_frame(
1133                    L2CAP_LE_Flow_Control_Credit(
1134                        identifier=self.manager.next_identifier(self.connection),
1135                        cid=self.source_cid,
1136                        credits=self.peer_max_credits - self.peer_credits,
1137                    )
1138                )
1139                self.peer_credits = self.peer_max_credits
1140
1141        # Check if this starts a new SDU
1142        if self.in_sdu is None:
1143            # Start a new SDU
1144            self.in_sdu = pdu
1145        else:
1146            # Continue an SDU
1147            self.in_sdu += pdu
1148
1149        # Check if the SDU is complete
1150        if self.in_sdu_length == 0:
1151            # We don't know the size yet, check if we have received the header to
1152            # compute it
1153            if len(self.in_sdu) >= 2:
1154                self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0]
1155        if self.in_sdu_length == 0:
1156            # We'll compute it later
1157            return
1158        if len(self.in_sdu) < 2 + self.in_sdu_length:
1159            # Not complete yet
1160            logger.debug(
1161                f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received'
1162            )
1163            return
1164        if len(self.in_sdu) != 2 + self.in_sdu_length:
1165            # Overflow
1166            logger.warning(
1167                f'SDU overflow: sdu_length={self.in_sdu_length}, '
1168                f'received {len(self.in_sdu) - 2}'
1169            )
1170            # TODO: we should disconnect
1171            self.in_sdu = None
1172            self.in_sdu_length = 0
1173            return
1174
1175        # Send the SDU to the sink
1176        logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes')
1177        self.sink(self.in_sdu[2:])  # pylint: disable=not-callable
1178
1179        # Prepare for a new SDU
1180        self.in_sdu = None
1181        self.in_sdu_length = 0
1182
1183    def on_connection_response(self, response):
1184        # Look for a matching pending response result
1185        if self.connection_result is None:
1186            logger.warning(
1187                f'received unexpected connection response (id={response.identifier})'
1188            )
1189            return
1190
1191        if (
1192            response.result
1193            == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL
1194        ):
1195            self.destination_cid = response.destination_cid
1196            self.peer_mtu = response.mtu
1197            self.peer_mps = response.mps
1198            self.credits = response.initial_credits
1199            self.connected = True
1200            self.connection_result.set_result(self)
1201            self.change_state(self.CONNECTED)
1202        else:
1203            self.connection_result.set_exception(
1204                ProtocolError(
1205                    response.result,
1206                    'l2cap',
1207                    L2CAP_LE_Credit_Based_Connection_Response.result_name(
1208                        response.result
1209                    ),
1210                )
1211            )
1212            self.change_state(self.CONNECTION_ERROR)
1213
1214        # Cleanup
1215        self.connection_result = None
1216
1217    def on_credits(self, credits):  # pylint: disable=redefined-builtin
1218        self.credits += credits
1219        logger.debug(f'received {credits} credits, total = {self.credits}')
1220
1221        # Try to send more data if we have any queued up
1222        self.process_output()
1223
1224    def on_disconnection_request(self, request):
1225        self.send_control_frame(
1226            L2CAP_Disconnection_Response(
1227                identifier=request.identifier,
1228                destination_cid=request.destination_cid,
1229                source_cid=request.source_cid,
1230            )
1231        )
1232        self.change_state(self.DISCONNECTED)
1233        self.flush_output()
1234
1235    def on_disconnection_response(self, response):
1236        if self.state != self.DISCONNECTING:
1237            logger.warning(color('invalid state', 'red'))
1238            return
1239
1240        if (
1241            response.destination_cid != self.destination_cid
1242            or response.source_cid != self.source_cid
1243        ):
1244            logger.warning('unexpected source or destination CID')
1245            return
1246
1247        self.change_state(self.DISCONNECTED)
1248        if self.disconnection_result:
1249            self.disconnection_result.set_result(None)
1250            self.disconnection_result = None
1251
1252    def flush_output(self):
1253        self.out_queue.clear()
1254        self.out_sdu = None
1255
1256    def process_output(self):
1257        while self.credits > 0:
1258            if self.out_sdu is not None:
1259                # Finish the current SDU
1260                packet = self.out_sdu[: self.peer_mps]
1261                self.send_pdu(packet)
1262                self.credits -= 1
1263                logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left')
1264                if len(packet) == len(self.out_sdu):
1265                    # We sent everything
1266                    self.out_sdu = None
1267                else:
1268                    # Keep what's still left to send
1269                    self.out_sdu = self.out_sdu[len(packet) :]
1270                continue
1271
1272            if self.out_queue:
1273                # Create the next SDU (2 bytes header plus up to MTU bytes payload)
1274                logger.debug(
1275                    f'assembling SDU from {len(self.out_queue)} packets in output queue'
1276                )
1277                payload = b''
1278                while self.out_queue and len(payload) < self.peer_mtu:
1279                    # We can add more data to the payload
1280                    chunk = self.out_queue[0][: self.peer_mtu - len(payload)]
1281                    payload += chunk
1282                    self.out_queue[0] = self.out_queue[0][len(chunk) :]
1283                    if len(self.out_queue[0]) == 0:
1284                        # We consumed the entire buffer, remove it
1285                        self.out_queue.popleft()
1286                        logger.debug(
1287                            f'packet completed, {len(self.out_queue)} left in queue'
1288                        )
1289
1290                # Construct the SDU with its header
1291                assert len(payload) != 0
1292                logger.debug(f'SDU complete: {len(payload)} payload bytes')
1293                self.out_sdu = struct.pack('<H', len(payload)) + payload
1294            else:
1295                # Nothing left to send for now
1296                self.drained.set()
1297                return
1298
1299    def write(self, data):
1300        if self.state != self.CONNECTED:
1301            logger.warning('not connected, dropping data')
1302            return
1303
1304        # Queue the data
1305        self.out_queue.append(data)
1306        self.drained.clear()
1307        logger.debug(
1308            f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue'
1309        )
1310
1311        # Send what we can
1312        self.process_output()
1313
1314    async def drain(self):
1315        await self.drained.wait()
1316
1317    def pause_reading(self):
1318        # TODO: not implemented yet
1319        pass
1320
1321    def resume_reading(self):
1322        # TODO: not implemented yet
1323        pass
1324
1325    def __str__(self):
1326        return (
1327            f'CoC({self.source_cid}->{self.destination_cid}, '
1328            f'State={self.state_name(self.state)}, '
1329            f'PSM={self.le_psm}, '
1330            f'MTU={self.mtu}/{self.peer_mtu}, '
1331            f'MPS={self.mps}/{self.peer_mps}, '
1332            f'credits={self.credits}/{self.peer_credits})'
1333        )
1334
1335
1336# -----------------------------------------------------------------------------
1337class ChannelManager:
1338    def __init__(
1339        self, extended_features=(), connectionless_mtu=L2CAP_DEFAULT_CONNECTIONLESS_MTU
1340    ):
1341        self._host = None
1342        self.identifiers = {}  # Incrementing identifier values by connection
1343        self.channels = {}  # All channels, mapped by connection and source cid
1344        self.fixed_channels = {  # Fixed channel handlers, mapped by cid
1345            L2CAP_SIGNALING_CID: None,
1346            L2CAP_LE_SIGNALING_CID: None,
1347        }
1348        self.servers = {}  # Servers accepting connections, by PSM
1349        self.le_coc_channels = (
1350            {}
1351        )  # LE CoC channels, mapped by connection and destination cid
1352        self.le_coc_servers = {}  # LE CoC - Servers accepting connections, by PSM
1353        self.le_coc_requests = {}  # LE CoC connection requests, by identifier
1354        self.extended_features = extended_features
1355        self.connectionless_mtu = connectionless_mtu
1356
1357    @property
1358    def host(self):
1359        return self._host
1360
1361    @host.setter
1362    def host(self, host):
1363        if self._host is not None:
1364            self._host.remove_listener('disconnection', self.on_disconnection)
1365        self._host = host
1366        if host is not None:
1367            host.on('disconnection', self.on_disconnection)
1368
1369    def find_channel(self, connection_handle, cid):
1370        if connection_channels := self.channels.get(connection_handle):
1371            return connection_channels.get(cid)
1372
1373        return None
1374
1375    def find_le_coc_channel(self, connection_handle, cid):
1376        if connection_channels := self.le_coc_channels.get(connection_handle):
1377            return connection_channels.get(cid)
1378
1379        return None
1380
1381    @staticmethod
1382    def find_free_br_edr_cid(channels):
1383        # Pick the smallest valid CID that's not already in the list
1384        # (not necessarily the most efficient algorithm, but the list of CID is
1385        # very small in practice)
1386        for cid in range(
1387            L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1
1388        ):
1389            if cid not in channels:
1390                return cid
1391
1392        raise RuntimeError('no free CID available')
1393
1394    @staticmethod
1395    def find_free_le_cid(channels):
1396        # Pick the smallest valid CID that's not already in the list
1397        # (not necessarily the most efficient algorithm, but the list of CID is
1398        # very small in practice)
1399        for cid in range(
1400            L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1
1401        ):
1402            if cid not in channels:
1403                return cid
1404
1405        raise RuntimeError('no free CID')
1406
1407    @staticmethod
1408    def check_le_coc_parameters(max_credits, mtu, mps):
1409        if (
1410            max_credits < 1
1411            or max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
1412        ):
1413            raise ValueError('max credits out of range')
1414        if mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU:
1415            raise ValueError('MTU too small')
1416        if (
1417            mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
1418            or mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
1419        ):
1420            raise ValueError('MPS out of range')
1421
1422    def next_identifier(self, connection):
1423        identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
1424        self.identifiers[connection.handle] = identifier
1425        return identifier
1426
1427    def register_fixed_channel(self, cid, handler):
1428        self.fixed_channels[cid] = handler
1429
1430    def deregister_fixed_channel(self, cid):
1431        if cid in self.fixed_channels:
1432            del self.fixed_channels[cid]
1433
1434    def register_server(self, psm, server):
1435        if psm == 0:
1436            # Find a free PSM
1437            for candidate in range(
1438                L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
1439            ):
1440                if (candidate >> 8) % 2 == 1:
1441                    continue
1442                if candidate in self.servers:
1443                    continue
1444                psm = candidate
1445                break
1446            else:
1447                raise InvalidStateError('no free PSM')
1448        else:
1449            # Check that the PSM isn't already in use
1450            if psm in self.servers:
1451                raise ValueError('PSM already in use')
1452
1453            # Check that the PSM is valid
1454            if psm % 2 == 0:
1455                raise ValueError('invalid PSM (not odd)')
1456            check = psm >> 8
1457            while check:
1458                if check % 2 != 0:
1459                    raise ValueError('invalid PSM')
1460                check >>= 8
1461
1462        self.servers[psm] = server
1463
1464        return psm
1465
1466    def register_le_coc_server(
1467        self,
1468        psm,
1469        server,
1470        max_credits=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
1471        mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
1472        mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
1473    ):
1474        self.check_le_coc_parameters(max_credits, mtu, mps)
1475
1476        if psm == 0:
1477            # Find a free PSM
1478            for candidate in range(
1479                L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
1480            ):
1481                if candidate in self.le_coc_servers:
1482                    continue
1483                psm = candidate
1484                break
1485            else:
1486                raise InvalidStateError('no free PSM')
1487        else:
1488            # Check that the PSM isn't already in use
1489            if psm in self.le_coc_servers:
1490                raise ValueError('PSM already in use')
1491
1492        self.le_coc_servers[psm] = (
1493            server,
1494            max_credits or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
1495            mtu or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
1496            mps or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
1497        )
1498
1499        return psm
1500
1501    def on_disconnection(self, connection_handle, _reason):
1502        logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
1503        if connection_handle in self.channels:
1504            for _, channel in self.channels[connection_handle].items():
1505                channel.abort()
1506            del self.channels[connection_handle]
1507        if connection_handle in self.le_coc_channels:
1508            for _, channel in self.le_coc_channels[connection_handle].items():
1509                channel.abort()
1510            del self.le_coc_channels[connection_handle]
1511        if connection_handle in self.identifiers:
1512            del self.identifiers[connection_handle]
1513
1514    def send_pdu(self, connection, cid, pdu):
1515        pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
1516        logger.debug(
1517            f'{color(">>> Sending L2CAP PDU", "blue")} '
1518            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1519            f'{connection.peer_address}: {pdu_str}'
1520        )
1521        self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
1522
1523    def on_pdu(self, connection, cid, pdu):
1524        if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
1525            # Parse the L2CAP payload into a Control Frame object
1526            control_frame = L2CAP_Control_Frame.from_bytes(pdu)
1527
1528            self.on_control_frame(connection, cid, control_frame)
1529        elif cid in self.fixed_channels:
1530            self.fixed_channels[cid](connection.handle, pdu)
1531        else:
1532            if (channel := self.find_channel(connection.handle, cid)) is None:
1533                logger.warning(
1534                    color(
1535                        f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'
1536                    )
1537                )
1538                return
1539
1540            channel.on_pdu(pdu)
1541
1542    def send_control_frame(self, connection, cid, control_frame):
1543        logger.debug(
1544            f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
1545            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1546            f'{connection.peer_address}:\n{control_frame}'
1547        )
1548        self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
1549
1550    def on_control_frame(self, connection, cid, control_frame):
1551        logger.debug(
1552            f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
1553            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1554            f'{connection.peer_address}:\n{control_frame}'
1555        )
1556
1557        # Find the handler method
1558        handler_name = f'on_{control_frame.name.lower()}'
1559        handler = getattr(self, handler_name, None)
1560        if handler:
1561            try:
1562                handler(connection, cid, control_frame)
1563            except Exception as error:
1564                logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
1565                self.send_control_frame(
1566                    connection,
1567                    cid,
1568                    L2CAP_Command_Reject(
1569                        identifier=control_frame.identifier,
1570                        reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1571                        data=b'',
1572                    ),
1573                )
1574                raise error
1575        else:
1576            logger.error(color('Channel Manager command not handled???', 'red'))
1577            self.send_control_frame(
1578                connection,
1579                cid,
1580                L2CAP_Command_Reject(
1581                    identifier=control_frame.identifier,
1582                    reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1583                    data=b'',
1584                ),
1585            )
1586
1587    def on_l2cap_command_reject(self, _connection, _cid, packet):
1588        logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
1589
1590    def on_l2cap_connection_request(self, connection, cid, request):
1591        # Check if there's a server for this PSM
1592        server = self.servers.get(request.psm)
1593        if server:
1594            # Find a free CID for this new channel
1595            connection_channels = self.channels.setdefault(connection.handle, {})
1596            source_cid = self.find_free_br_edr_cid(connection_channels)
1597            if source_cid is None:  # Should never happen!
1598                self.send_control_frame(
1599                    connection,
1600                    cid,
1601                    L2CAP_Connection_Response(
1602                        identifier=request.identifier,
1603                        destination_cid=request.source_cid,
1604                        source_cid=0,
1605                        # pylint: disable=line-too-long
1606                        result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
1607                        status=0x0000,
1608                    ),
1609                )
1610                return
1611
1612            # Create a new channel
1613            logger.debug(
1614                f'creating server channel with cid={source_cid} for psm {request.psm}'
1615            )
1616            channel = Channel(
1617                self, connection, cid, request.psm, source_cid, L2CAP_MIN_BR_EDR_MTU
1618            )
1619            connection_channels[source_cid] = channel
1620
1621            # Notify
1622            server(channel)
1623            channel.on_connection_request(request)
1624        else:
1625            logger.warning(
1626                f'No server for connection 0x{connection.handle:04X} '
1627                f'on PSM {request.psm}'
1628            )
1629            self.send_control_frame(
1630                connection,
1631                cid,
1632                L2CAP_Connection_Response(
1633                    identifier=request.identifier,
1634                    destination_cid=request.source_cid,
1635                    source_cid=0,
1636                    # pylint: disable=line-too-long
1637                    result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
1638                    status=0x0000,
1639                ),
1640            )
1641
1642    def on_l2cap_connection_response(self, connection, cid, response):
1643        if (
1644            channel := self.find_channel(connection.handle, response.source_cid)
1645        ) is None:
1646            logger.warning(
1647                color(
1648                    f'channel {response.source_cid} not found for '
1649                    f'0x{connection.handle:04X}:{cid}',
1650                    'red',
1651                )
1652            )
1653            return
1654
1655        channel.on_connection_response(response)
1656
1657    def on_l2cap_configure_request(self, connection, cid, request):
1658        if (
1659            channel := self.find_channel(connection.handle, request.destination_cid)
1660        ) is None:
1661            logger.warning(
1662                color(
1663                    f'channel {request.destination_cid} not found for '
1664                    f'0x{connection.handle:04X}:{cid}',
1665                    'red',
1666                )
1667            )
1668            return
1669
1670        channel.on_configure_request(request)
1671
1672    def on_l2cap_configure_response(self, connection, cid, response):
1673        if (
1674            channel := self.find_channel(connection.handle, response.source_cid)
1675        ) is None:
1676            logger.warning(
1677                color(
1678                    f'channel {response.source_cid} not found for '
1679                    f'0x{connection.handle:04X}:{cid}',
1680                    'red',
1681                )
1682            )
1683            return
1684
1685        channel.on_configure_response(response)
1686
1687    def on_l2cap_disconnection_request(self, connection, cid, request):
1688        if (
1689            channel := self.find_channel(connection.handle, request.destination_cid)
1690        ) is None:
1691            logger.warning(
1692                color(
1693                    f'channel {request.destination_cid} not found for '
1694                    f'0x{connection.handle:04X}:{cid}',
1695                    'red',
1696                )
1697            )
1698            return
1699
1700        channel.on_disconnection_request(request)
1701
1702    def on_l2cap_disconnection_response(self, connection, cid, response):
1703        if (
1704            channel := self.find_channel(connection.handle, response.source_cid)
1705        ) is None:
1706            logger.warning(
1707                color(
1708                    f'channel {response.source_cid} not found for '
1709                    f'0x{connection.handle:04X}:{cid}',
1710                    'red',
1711                )
1712            )
1713            return
1714
1715        channel.on_disconnection_response(response)
1716
1717    def on_l2cap_echo_request(self, connection, cid, request):
1718        logger.debug(f'<<< Echo request: data={request.data.hex()}')
1719        self.send_control_frame(
1720            connection,
1721            cid,
1722            L2CAP_Echo_Response(identifier=request.identifier, data=request.data),
1723        )
1724
1725    def on_l2cap_echo_response(self, _connection, _cid, response):
1726        logger.debug(f'<<< Echo response: data={response.data.hex()}')
1727        # TODO notify listeners
1728
1729    def on_l2cap_information_request(self, connection, cid, request):
1730        if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
1731            result = L2CAP_Information_Response.SUCCESS
1732            data = self.connectionless_mtu.to_bytes(2, 'little')
1733        elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
1734            result = L2CAP_Information_Response.SUCCESS
1735            data = sum(self.extended_features).to_bytes(4, 'little')
1736        elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
1737            result = L2CAP_Information_Response.SUCCESS
1738            data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
1739        else:
1740            result = L2CAP_Information_Response.NOT_SUPPORTED
1741
1742        self.send_control_frame(
1743            connection,
1744            cid,
1745            L2CAP_Information_Response(
1746                identifier=request.identifier,
1747                info_type=request.info_type,
1748                result=result,
1749                data=data,
1750            ),
1751        )
1752
1753    def on_l2cap_connection_parameter_update_request(self, connection, cid, request):
1754        if connection.role == BT_CENTRAL_ROLE:
1755            self.send_control_frame(
1756                connection,
1757                cid,
1758                L2CAP_Connection_Parameter_Update_Response(
1759                    identifier=request.identifier,
1760                    result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT,
1761                ),
1762            )
1763            self.host.send_command_sync(
1764                HCI_LE_Connection_Update_Command(
1765                    connection_handle=connection.handle,
1766                    connection_interval_min=request.interval_min,
1767                    connection_interval_max=request.interval_max,
1768                    max_latency=request.latency,
1769                    supervision_timeout=request.timeout,
1770                    min_ce_length=0,
1771                    max_ce_length=0,
1772                )
1773            )
1774        else:
1775            self.send_control_frame(
1776                connection,
1777                cid,
1778                L2CAP_Connection_Parameter_Update_Response(
1779                    identifier=request.identifier,
1780                    result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT,
1781                ),
1782            )
1783
1784    def on_l2cap_connection_parameter_update_response(self, connection, cid, response):
1785        # TODO: check response
1786        pass
1787
1788    def on_l2cap_le_credit_based_connection_request(self, connection, cid, request):
1789        if request.le_psm in self.le_coc_servers:
1790            (server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm]
1791
1792            # Check that the CID isn't already used
1793            le_connection_channels = self.le_coc_channels.setdefault(
1794                connection.handle, {}
1795            )
1796            if request.source_cid in le_connection_channels:
1797                logger.warning(f'source CID {request.source_cid} already in use')
1798                self.send_control_frame(
1799                    connection,
1800                    cid,
1801                    L2CAP_LE_Credit_Based_Connection_Response(
1802                        identifier=request.identifier,
1803                        destination_cid=0,
1804                        mtu=mtu,
1805                        mps=mps,
1806                        initial_credits=0,
1807                        # pylint: disable=line-too-long
1808                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
1809                    ),
1810                )
1811                return
1812
1813            # Find a free CID for this new channel
1814            connection_channels = self.channels.setdefault(connection.handle, {})
1815            source_cid = self.find_free_le_cid(connection_channels)
1816            if source_cid is None:  # Should never happen!
1817                self.send_control_frame(
1818                    connection,
1819                    cid,
1820                    L2CAP_LE_Credit_Based_Connection_Response(
1821                        identifier=request.identifier,
1822                        destination_cid=0,
1823                        mtu=mtu,
1824                        mps=mps,
1825                        initial_credits=0,
1826                        # pylint: disable=line-too-long
1827                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
1828                    ),
1829                )
1830                return
1831
1832            # Create a new channel
1833            logger.debug(
1834                f'creating LE CoC server channel with cid={source_cid} for psm '
1835                f'{request.le_psm}'
1836            )
1837            channel = LeConnectionOrientedChannel(
1838                self,
1839                connection,
1840                request.le_psm,
1841                source_cid,
1842                request.source_cid,
1843                mtu,
1844                mps,
1845                request.initial_credits,
1846                request.mtu,
1847                request.mps,
1848                max_credits,
1849                True,
1850            )
1851            connection_channels[source_cid] = channel
1852            le_connection_channels[request.source_cid] = channel
1853
1854            # Respond
1855            self.send_control_frame(
1856                connection,
1857                cid,
1858                L2CAP_LE_Credit_Based_Connection_Response(
1859                    identifier=request.identifier,
1860                    destination_cid=source_cid,
1861                    mtu=mtu,
1862                    mps=mps,
1863                    initial_credits=max_credits,
1864                    # pylint: disable=line-too-long
1865                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL,
1866                ),
1867            )
1868
1869            # Notify
1870            server(channel)
1871        else:
1872            logger.info(
1873                f'No LE server for connection 0x{connection.handle:04X} '
1874                f'on PSM {request.le_psm}'
1875            )
1876            self.send_control_frame(
1877                connection,
1878                cid,
1879                L2CAP_LE_Credit_Based_Connection_Response(
1880                    identifier=request.identifier,
1881                    destination_cid=0,
1882                    mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
1883                    mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
1884                    initial_credits=0,
1885                    # pylint: disable=line-too-long
1886                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
1887                ),
1888            )
1889
1890    def on_l2cap_le_credit_based_connection_response(self, connection, _cid, response):
1891        # Find the pending request by identifier
1892        request = self.le_coc_requests.get(response.identifier)
1893        if request is None:
1894            logger.warning(color('!!! received response for unknown request', 'red'))
1895            return
1896        del self.le_coc_requests[response.identifier]
1897
1898        # Find the channel for this request
1899        channel = self.find_channel(connection.handle, request.source_cid)
1900        if channel is None:
1901            logger.warning(
1902                color(
1903                    'received connection response for an unknown channel '
1904                    f'(cid={request.source_cid})',
1905                    'red',
1906                )
1907            )
1908            return
1909
1910        # Process the response
1911        channel.on_connection_response(response)
1912
1913    def on_l2cap_le_flow_control_credit(self, connection, _cid, credit):
1914        channel = self.find_le_coc_channel(connection.handle, credit.cid)
1915        if channel is None:
1916            logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
1917            return
1918
1919        channel.on_credits(credit.credits)
1920
1921    def on_channel_closed(self, channel):
1922        connection_channels = self.channels.get(channel.connection.handle)
1923        if connection_channels:
1924            if channel.source_cid in connection_channels:
1925                del connection_channels[channel.source_cid]
1926
1927    async def open_le_coc(self, connection, psm, max_credits, mtu, mps):
1928        self.check_le_coc_parameters(max_credits, mtu, mps)
1929
1930        # Find a free CID for the new channel
1931        connection_channels = self.channels.setdefault(connection.handle, {})
1932        source_cid = self.find_free_le_cid(connection_channels)
1933        if source_cid is None:  # Should never happen!
1934            raise RuntimeError('all CIDs already in use')
1935
1936        # Create the channel
1937        logger.debug(f'creating coc channel with cid={source_cid} for psm {psm}')
1938        channel = LeConnectionOrientedChannel(
1939            manager=self,
1940            connection=connection,
1941            le_psm=psm,
1942            source_cid=source_cid,
1943            destination_cid=0,
1944            mtu=mtu,
1945            mps=mps,
1946            credits=0,
1947            peer_mtu=0,
1948            peer_mps=0,
1949            peer_credits=max_credits,
1950            connected=False,
1951        )
1952        connection_channels[source_cid] = channel
1953
1954        # Connect
1955        try:
1956            await channel.connect()
1957        except Exception as error:
1958            logger.warning(f'connection failed: {error}')
1959            del connection_channels[source_cid]
1960            raise
1961
1962        # Remember the channel by source CID and destination CID
1963        le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
1964        le_connection_channels[channel.destination_cid] = channel
1965
1966        return channel
1967
1968    async def connect(self, connection, psm):
1969        # NOTE: this implementation hard-codes BR/EDR
1970
1971        # Find a free CID for a new channel
1972        connection_channels = self.channels.setdefault(connection.handle, {})
1973        source_cid = self.find_free_br_edr_cid(connection_channels)
1974        if source_cid is None:  # Should never happen!
1975            raise RuntimeError('all CIDs already in use')
1976
1977        # Create the channel
1978        logger.debug(f'creating client channel with cid={source_cid} for psm {psm}')
1979        channel = Channel(
1980            self, connection, L2CAP_SIGNALING_CID, psm, source_cid, L2CAP_MIN_BR_EDR_MTU
1981        )
1982        connection_channels[source_cid] = channel
1983
1984        # Connect
1985        try:
1986            await channel.connect()
1987        except Exception:
1988            del connection_channels[source_cid]
1989
1990        return channel
1991