1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import dataclasses 21import enum 22import logging 23import struct 24 25from collections import deque 26from pyee import EventEmitter 27from typing import ( 28 Dict, 29 Type, 30 List, 31 Optional, 32 Tuple, 33 Callable, 34 Any, 35 Union, 36 Deque, 37 Iterable, 38 SupportsBytes, 39 TYPE_CHECKING, 40) 41 42from .utils import deprecated 43from .colors import color 44from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError 45from .hci import ( 46 HCI_LE_Connection_Update_Command, 47 HCI_Object, 48 key_with_value, 49 name_or_number, 50) 51 52if TYPE_CHECKING: 53 from bumble.device import Connection 54 from bumble.host import Host 55 56# ----------------------------------------------------------------------------- 57# Logging 58# ----------------------------------------------------------------------------- 59logger = logging.getLogger(__name__) 60 61 62# ----------------------------------------------------------------------------- 63# Constants 64# ----------------------------------------------------------------------------- 65# fmt: off 66# pylint: disable=line-too-long 67 68L2CAP_SIGNALING_CID = 0x01 69L2CAP_LE_SIGNALING_CID = 0x05 70 71L2CAP_MIN_LE_MTU = 23 72L2CAP_MIN_BR_EDR_MTU = 48 73L2CAP_MAX_BR_EDR_MTU = 65535 74 75L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept 76 77L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024 78 79# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links 80L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040 81L2CAP_ACL_U_DYNAMIC_CID_RANGE_END = 0xFFFF 82 83# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link 84L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040 85L2CAP_LE_U_DYNAMIC_CID_RANGE_END = 0x007F 86 87# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage 88L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001 89L2CAP_PSM_DYNAMIC_RANGE_END = 0xFFFF 90 91# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges 92L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080 93L2CAP_LE_PSM_DYNAMIC_RANGE_END = 0x00FF 94 95# Frame types 96L2CAP_COMMAND_REJECT = 0x01 97L2CAP_CONNECTION_REQUEST = 0x02 98L2CAP_CONNECTION_RESPONSE = 0x03 99L2CAP_CONFIGURE_REQUEST = 0x04 100L2CAP_CONFIGURE_RESPONSE = 0x05 101L2CAP_DISCONNECTION_REQUEST = 0x06 102L2CAP_DISCONNECTION_RESPONSE = 0x07 103L2CAP_ECHO_REQUEST = 0x08 104L2CAP_ECHO_RESPONSE = 0x09 105L2CAP_INFORMATION_REQUEST = 0x0A 106L2CAP_INFORMATION_RESPONSE = 0x0B 107L2CAP_CREATE_CHANNEL_REQUEST = 0x0C 108L2CAP_CREATE_CHANNEL_RESPONSE = 0x0D 109L2CAP_MOVE_CHANNEL_REQUEST = 0x0E 110L2CAP_MOVE_CHANNEL_RESPONSE = 0x0F 111L2CAP_MOVE_CHANNEL_CONFIRMATION = 0x10 112L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE = 0x11 113L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST = 0x12 114L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13 115L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14 116L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15 117L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16 118 119L2CAP_CONTROL_FRAME_NAMES = { 120 L2CAP_COMMAND_REJECT: 'L2CAP_COMMAND_REJECT', 121 L2CAP_CONNECTION_REQUEST: 'L2CAP_CONNECTION_REQUEST', 122 L2CAP_CONNECTION_RESPONSE: 'L2CAP_CONNECTION_RESPONSE', 123 L2CAP_CONFIGURE_REQUEST: 'L2CAP_CONFIGURE_REQUEST', 124 L2CAP_CONFIGURE_RESPONSE: 'L2CAP_CONFIGURE_RESPONSE', 125 L2CAP_DISCONNECTION_REQUEST: 'L2CAP_DISCONNECTION_REQUEST', 126 L2CAP_DISCONNECTION_RESPONSE: 'L2CAP_DISCONNECTION_RESPONSE', 127 L2CAP_ECHO_REQUEST: 'L2CAP_ECHO_REQUEST', 128 L2CAP_ECHO_RESPONSE: 'L2CAP_ECHO_RESPONSE', 129 L2CAP_INFORMATION_REQUEST: 'L2CAP_INFORMATION_REQUEST', 130 L2CAP_INFORMATION_RESPONSE: 'L2CAP_INFORMATION_RESPONSE', 131 L2CAP_CREATE_CHANNEL_REQUEST: 'L2CAP_CREATE_CHANNEL_REQUEST', 132 L2CAP_CREATE_CHANNEL_RESPONSE: 'L2CAP_CREATE_CHANNEL_RESPONSE', 133 L2CAP_MOVE_CHANNEL_REQUEST: 'L2CAP_MOVE_CHANNEL_REQUEST', 134 L2CAP_MOVE_CHANNEL_RESPONSE: 'L2CAP_MOVE_CHANNEL_RESPONSE', 135 L2CAP_MOVE_CHANNEL_CONFIRMATION: 'L2CAP_MOVE_CHANNEL_CONFIRMATION', 136 L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE: 'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE', 137 L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST: 'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST', 138 L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE', 139 L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST: 'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST', 140 L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE: 'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE', 141 L2CAP_LE_FLOW_CONTROL_CREDIT: 'L2CAP_LE_FLOW_CONTROL_CREDIT' 142} 143 144L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000 145L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001 146 147L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000 148L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001 149L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002 150 151L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535 152L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23 153L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535 154L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23 155L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533 156L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048 157L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048 158L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256 159 160L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01 161 162L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01 163 164# fmt: on 165# pylint: enable=line-too-long 166 167 168# ----------------------------------------------------------------------------- 169# Classes 170# ----------------------------------------------------------------------------- 171# pylint: disable=invalid-name 172 173 174@dataclasses.dataclass 175class ClassicChannelSpec: 176 psm: Optional[int] = None 177 mtu: int = L2CAP_DEFAULT_MTU 178 179 180@dataclasses.dataclass 181class LeCreditBasedChannelSpec: 182 psm: Optional[int] = None 183 mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU 184 mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS 185 max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS 186 187 def __post_init__(self): 188 if ( 189 self.max_credits < 1 190 or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS 191 ): 192 raise ValueError('max credits out of range') 193 if ( 194 self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU 195 or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU 196 ): 197 raise ValueError('MTU out of range') 198 if ( 199 self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS 200 or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS 201 ): 202 raise ValueError('MPS out of range') 203 204 205class L2CAP_PDU: 206 ''' 207 See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT 208 ''' 209 210 @staticmethod 211 def from_bytes(data: bytes) -> L2CAP_PDU: 212 # Check parameters 213 if len(data) < 4: 214 raise ValueError('not enough data for L2CAP header') 215 216 _, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0) 217 l2cap_pdu_payload = data[4:] 218 219 return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload) 220 221 def to_bytes(self) -> bytes: 222 header = struct.pack('<HH', len(self.payload), self.cid) 223 return header + self.payload 224 225 def __init__(self, cid: int, payload: bytes) -> None: 226 self.cid = cid 227 self.payload = payload 228 229 def __bytes__(self) -> bytes: 230 return self.to_bytes() 231 232 def __str__(self) -> str: 233 return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}' 234 235 236# ----------------------------------------------------------------------------- 237class L2CAP_Control_Frame: 238 ''' 239 See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS 240 ''' 241 242 classes: Dict[int, Type[L2CAP_Control_Frame]] = {} 243 code = 0 244 name: str 245 246 @staticmethod 247 def from_bytes(pdu: bytes) -> L2CAP_Control_Frame: 248 code = pdu[0] 249 250 cls = L2CAP_Control_Frame.classes.get(code) 251 if cls is None: 252 instance = L2CAP_Control_Frame(pdu) 253 instance.name = L2CAP_Control_Frame.code_name(code) 254 instance.code = code 255 return instance 256 self = cls.__new__(cls) 257 L2CAP_Control_Frame.__init__(self, pdu) 258 self.identifier = pdu[1] 259 length = struct.unpack_from('<H', pdu, 2)[0] 260 if length + 4 != len(pdu): 261 logger.warning( 262 color( 263 f'!!! length mismatch: expected {len(pdu) - 4} but got {length}', 264 'red', 265 ) 266 ) 267 if hasattr(self, 'fields'): 268 self.init_from_bytes(pdu, 4) 269 return self 270 271 @staticmethod 272 def code_name(code: int) -> str: 273 return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code) 274 275 @staticmethod 276 def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]: 277 options = [] 278 while len(data) >= 2: 279 value_type = data[0] 280 length = data[1] 281 value = data[2 : 2 + length] 282 data = data[2 + length :] 283 options.append((value_type, value)) 284 285 return options 286 287 @staticmethod 288 def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes: 289 return b''.join( 290 [bytes([option[0], len(option[1])]) + option[1] for option in options] 291 ) 292 293 @staticmethod 294 def subclass(fields): 295 def inner(cls): 296 cls.name = cls.__name__.upper() 297 cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name) 298 if cls.code is None: 299 raise KeyError( 300 f'Control Frame name {cls.name} ' 301 'not found in L2CAP_CONTROL_FRAME_NAMES' 302 ) 303 cls.fields = fields 304 305 # Register a factory for this class 306 L2CAP_Control_Frame.classes[cls.code] = cls 307 308 return cls 309 310 return inner 311 312 def __init__(self, pdu=None, **kwargs) -> None: 313 self.identifier = kwargs.get('identifier', 0) 314 if hasattr(self, 'fields'): 315 if kwargs: 316 HCI_Object.init_from_fields(self, self.fields, kwargs) 317 if pdu is None: 318 data = HCI_Object.dict_to_bytes(kwargs, self.fields) 319 pdu = ( 320 bytes([self.code, self.identifier]) 321 + struct.pack('<H', len(data)) 322 + data 323 ) 324 self.pdu = pdu 325 326 def init_from_bytes(self, pdu, offset): 327 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 328 329 def to_bytes(self) -> bytes: 330 return self.pdu 331 332 def __bytes__(self) -> bytes: 333 return self.to_bytes() 334 335 def __str__(self) -> str: 336 result = f'{color(self.name, "yellow")} [ID={self.identifier}]' 337 if fields := getattr(self, 'fields', None): 338 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 339 else: 340 if len(self.pdu) > 1: 341 result += f': {self.pdu.hex()}' 342 return result 343 344 345# ----------------------------------------------------------------------------- 346@L2CAP_Control_Frame.subclass( 347 # pylint: disable=unnecessary-lambda 348 [ 349 ( 350 'reason', 351 {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)}, 352 ), 353 ('data', '*'), 354 ] 355) 356class L2CAP_Command_Reject(L2CAP_Control_Frame): 357 ''' 358 See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT 359 ''' 360 361 COMMAND_NOT_UNDERSTOOD = 0x0000 362 SIGNALING_MTU_EXCEEDED = 0x0001 363 INVALID_CID_IN_REQUEST = 0x0002 364 365 REASON_NAMES = { 366 COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD', 367 SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED', 368 INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST', 369 } 370 371 @staticmethod 372 def reason_name(reason: int) -> str: 373 return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason) 374 375 376# ----------------------------------------------------------------------------- 377@L2CAP_Control_Frame.subclass( 378 # pylint: disable=unnecessary-lambda 379 [ 380 ( 381 'psm', 382 { 383 'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm( 384 data, offset 385 ), 386 'serializer': lambda value: L2CAP_Connection_Request.serialize_psm( 387 value 388 ), 389 }, 390 ), 391 ('source_cid', 2), 392 ] 393) 394class L2CAP_Connection_Request(L2CAP_Control_Frame): 395 ''' 396 See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST 397 ''' 398 399 psm: int 400 source_cid: int 401 402 @staticmethod 403 def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]: 404 psm_length = 2 405 psm = data[offset] | data[offset + 1] << 8 406 407 # The PSM field extends until the first even octet (inclusive) 408 while data[offset + psm_length - 1] % 2 == 1: 409 psm |= data[offset + psm_length] << (8 * psm_length) 410 psm_length += 1 411 412 return offset + psm_length, psm 413 414 @staticmethod 415 def serialize_psm(psm: int) -> bytes: 416 serialized = struct.pack('<H', psm & 0xFFFF) 417 psm >>= 16 418 while psm: 419 serialized += bytes([psm & 0xFF]) 420 psm >>= 8 421 422 return serialized 423 424 425# ----------------------------------------------------------------------------- 426@L2CAP_Control_Frame.subclass( 427 # pylint: disable=unnecessary-lambda 428 [ 429 ('destination_cid', 2), 430 ('source_cid', 2), 431 ( 432 'result', 433 {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)}, 434 ), 435 ('status', 2), 436 ] 437) 438class L2CAP_Connection_Response(L2CAP_Control_Frame): 439 ''' 440 See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE 441 ''' 442 443 source_cid: int 444 destination_cid: int 445 status: int 446 result: int 447 448 CONNECTION_SUCCESSFUL = 0x0000 449 CONNECTION_PENDING = 0x0001 450 CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002 451 CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003 452 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 453 CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006 454 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007 455 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B 456 457 # pylint: disable=line-too-long 458 RESULT_NAMES = { 459 CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', 460 CONNECTION_PENDING: 'CONNECTION_PENDING', 461 CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED', 462 CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK', 463 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', 464 CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', 465 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', 466 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', 467 } 468 469 @staticmethod 470 def result_name(result: int) -> str: 471 return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result) 472 473 474# ----------------------------------------------------------------------------- 475@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')]) 476class L2CAP_Configure_Request(L2CAP_Control_Frame): 477 ''' 478 See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST 479 ''' 480 481 482# ----------------------------------------------------------------------------- 483@L2CAP_Control_Frame.subclass( 484 # pylint: disable=unnecessary-lambda 485 [ 486 ('source_cid', 2), 487 ('flags', 2), 488 ( 489 'result', 490 {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)}, 491 ), 492 ('options', '*'), 493 ] 494) 495class L2CAP_Configure_Response(L2CAP_Control_Frame): 496 ''' 497 See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE 498 ''' 499 500 SUCCESS = 0x0000 501 FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001 502 FAILURE_REJECTED = 0x0002 503 FAILURE_UNKNOWN_OPTIONS = 0x0003 504 PENDING = 0x0004 505 FAILURE_FLOW_SPEC_REJECTED = 0x0005 506 507 RESULT_NAMES = { 508 SUCCESS: 'SUCCESS', 509 FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS', 510 FAILURE_REJECTED: 'FAILURE_REJECTED', 511 FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS', 512 PENDING: 'PENDING', 513 FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED', 514 } 515 516 @staticmethod 517 def result_name(result: int) -> str: 518 return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result) 519 520 521# ----------------------------------------------------------------------------- 522@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) 523class L2CAP_Disconnection_Request(L2CAP_Control_Frame): 524 ''' 525 See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST 526 ''' 527 528 529# ----------------------------------------------------------------------------- 530@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) 531class L2CAP_Disconnection_Response(L2CAP_Control_Frame): 532 ''' 533 See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE 534 ''' 535 536 537# ----------------------------------------------------------------------------- 538@L2CAP_Control_Frame.subclass([('data', '*')]) 539class L2CAP_Echo_Request(L2CAP_Control_Frame): 540 ''' 541 See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST 542 ''' 543 544 545# ----------------------------------------------------------------------------- 546@L2CAP_Control_Frame.subclass([('data', '*')]) 547class L2CAP_Echo_Response(L2CAP_Control_Frame): 548 ''' 549 See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE 550 ''' 551 552 553# ----------------------------------------------------------------------------- 554@L2CAP_Control_Frame.subclass( 555 [ 556 ( 557 'info_type', 558 { 559 'size': 2, 560 # pylint: disable-next=unnecessary-lambda 561 'mapper': lambda x: L2CAP_Information_Request.info_type_name(x), 562 }, 563 ) 564 ] 565) 566class L2CAP_Information_Request(L2CAP_Control_Frame): 567 ''' 568 See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST 569 ''' 570 571 CONNECTIONLESS_MTU = 0x0001 572 EXTENDED_FEATURES_SUPPORTED = 0x0002 573 FIXED_CHANNELS_SUPPORTED = 0x0003 574 575 EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001 576 EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002 577 EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004 578 EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008 579 EXTENDED_FEATURE_STREAMING_MODE = 0x0010 580 EXTENDED_FEATURE_FCS_OPTION = 0x0020 581 EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040 582 EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080 583 EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100 584 EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200 585 EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400 586 587 INFO_TYPE_NAMES = { 588 CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU', 589 EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED', 590 FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED', 591 } 592 593 @staticmethod 594 def info_type_name(info_type: int) -> str: 595 return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type) 596 597 598# ----------------------------------------------------------------------------- 599@L2CAP_Control_Frame.subclass( 600 [ 601 ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}), 602 ( 603 'result', 604 # pylint: disable-next=unnecessary-lambda 605 {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)}, 606 ), 607 ('data', '*'), 608 ] 609) 610class L2CAP_Information_Response(L2CAP_Control_Frame): 611 ''' 612 See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE 613 ''' 614 615 SUCCESS = 0x00 616 NOT_SUPPORTED = 0x01 617 618 RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'} 619 620 @staticmethod 621 def result_name(result: int) -> str: 622 return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result) 623 624 625# ----------------------------------------------------------------------------- 626@L2CAP_Control_Frame.subclass( 627 [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)] 628) 629class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame): 630 ''' 631 See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST 632 ''' 633 634 635# ----------------------------------------------------------------------------- 636@L2CAP_Control_Frame.subclass([('result', 2)]) 637class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame): 638 ''' 639 See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE 640 ''' 641 642 643# ----------------------------------------------------------------------------- 644@L2CAP_Control_Frame.subclass( 645 [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)] 646) 647class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame): 648 ''' 649 See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST 650 (CODE 0x14) 651 ''' 652 653 source_cid: int 654 655 656# ----------------------------------------------------------------------------- 657@L2CAP_Control_Frame.subclass( 658 # pylint: disable=unnecessary-lambda,line-too-long 659 [ 660 ('destination_cid', 2), 661 ('mtu', 2), 662 ('mps', 2), 663 ('initial_credits', 2), 664 ( 665 'result', 666 { 667 'size': 2, 668 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name( 669 x 670 ), 671 }, 672 ), 673 ] 674) 675class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame): 676 ''' 677 See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE 678 (CODE 0x15) 679 ''' 680 681 CONNECTION_SUCCESSFUL = 0x0000 682 CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002 683 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 684 CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005 685 CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006 686 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007 687 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008 688 CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009 689 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A 690 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B 691 692 # pylint: disable=line-too-long 693 RESULT_NAMES = { 694 CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', 695 CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED', 696 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', 697 CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION', 698 CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION', 699 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE', 700 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION', 701 CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', 702 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', 703 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', 704 } 705 706 @staticmethod 707 def result_name(result: int) -> str: 708 return name_or_number( 709 L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result 710 ) 711 712 713# ----------------------------------------------------------------------------- 714@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)]) 715class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame): 716 ''' 717 See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16) 718 ''' 719 720 721# ----------------------------------------------------------------------------- 722class ClassicChannel(EventEmitter): 723 class State(enum.IntEnum): 724 # States 725 CLOSED = 0x00 726 WAIT_CONNECT = 0x01 727 WAIT_CONNECT_RSP = 0x02 728 OPEN = 0x03 729 WAIT_DISCONNECT = 0x04 730 WAIT_CREATE = 0x05 731 WAIT_CREATE_RSP = 0x06 732 WAIT_MOVE = 0x07 733 WAIT_MOVE_RSP = 0x08 734 WAIT_MOVE_CONFIRM = 0x09 735 WAIT_CONFIRM_RSP = 0x0A 736 737 # CONFIG substates 738 WAIT_CONFIG = 0x10 739 WAIT_SEND_CONFIG = 0x11 740 WAIT_CONFIG_REQ_RSP = 0x12 741 WAIT_CONFIG_RSP = 0x13 742 WAIT_CONFIG_REQ = 0x14 743 WAIT_IND_FINAL_RSP = 0x15 744 WAIT_FINAL_RSP = 0x16 745 WAIT_CONTROL_IND = 0x17 746 747 connection_result: Optional[asyncio.Future[None]] 748 disconnection_result: Optional[asyncio.Future[None]] 749 response: Optional[asyncio.Future[bytes]] 750 sink: Optional[Callable[[bytes], Any]] 751 state: State 752 connection: Connection 753 mtu: int 754 peer_mtu: int 755 756 def __init__( 757 self, 758 manager: ChannelManager, 759 connection: Connection, 760 signaling_cid: int, 761 psm: int, 762 source_cid: int, 763 mtu: int, 764 ) -> None: 765 super().__init__() 766 self.manager = manager 767 self.connection = connection 768 self.signaling_cid = signaling_cid 769 self.state = self.State.CLOSED 770 self.mtu = mtu 771 self.peer_mtu = L2CAP_MIN_BR_EDR_MTU 772 self.psm = psm 773 self.source_cid = source_cid 774 self.destination_cid = 0 775 self.response = None 776 self.connection_result = None 777 self.disconnection_result = None 778 self.sink = None 779 780 def _change_state(self, new_state: State) -> None: 781 logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') 782 self.state = new_state 783 784 def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: 785 self.manager.send_pdu(self.connection, self.destination_cid, pdu) 786 787 def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: 788 self.manager.send_control_frame(self.connection, self.signaling_cid, frame) 789 790 async def send_request(self, request: SupportsBytes) -> bytes: 791 # Check that there isn't already a request pending 792 if self.response: 793 raise InvalidStateError('request already pending') 794 if self.state != self.State.OPEN: 795 raise InvalidStateError('channel not open') 796 797 self.response = asyncio.get_running_loop().create_future() 798 self.send_pdu(request) 799 return await self.response 800 801 def on_pdu(self, pdu: bytes) -> None: 802 if self.response: 803 self.response.set_result(pdu) 804 self.response = None 805 elif self.sink: 806 # pylint: disable=not-callable 807 self.sink(pdu) 808 else: 809 logger.warning( 810 color('received pdu without a pending request or sink', 'red') 811 ) 812 813 async def connect(self) -> None: 814 if self.state != self.State.CLOSED: 815 raise InvalidStateError('invalid state') 816 817 # Check that we can start a new connection 818 if self.connection_result: 819 raise RuntimeError('connection already pending') 820 821 self._change_state(self.State.WAIT_CONNECT_RSP) 822 self.send_control_frame( 823 L2CAP_Connection_Request( 824 identifier=self.manager.next_identifier(self.connection), 825 psm=self.psm, 826 source_cid=self.source_cid, 827 ) 828 ) 829 830 # Create a future to wait for the state machine to get to a success or error 831 # state 832 self.connection_result = asyncio.get_running_loop().create_future() 833 834 # Wait for the connection to succeed or fail 835 try: 836 return await self.connection.abort_on( 837 'disconnection', self.connection_result 838 ) 839 finally: 840 self.connection_result = None 841 842 async def disconnect(self) -> None: 843 if self.state != self.State.OPEN: 844 raise InvalidStateError('invalid state') 845 846 self._change_state(self.State.WAIT_DISCONNECT) 847 self.send_control_frame( 848 L2CAP_Disconnection_Request( 849 identifier=self.manager.next_identifier(self.connection), 850 destination_cid=self.destination_cid, 851 source_cid=self.source_cid, 852 ) 853 ) 854 855 # Create a future to wait for the state machine to get to a success or error 856 # state 857 self.disconnection_result = asyncio.get_running_loop().create_future() 858 return await self.disconnection_result 859 860 def abort(self) -> None: 861 if self.state == self.State.OPEN: 862 self._change_state(self.State.CLOSED) 863 self.emit('close') 864 865 def send_configure_request(self) -> None: 866 options = L2CAP_Control_Frame.encode_configuration_options( 867 [ 868 ( 869 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE, 870 struct.pack('<H', self.mtu), 871 ) 872 ] 873 ) 874 self.send_control_frame( 875 L2CAP_Configure_Request( 876 identifier=self.manager.next_identifier(self.connection), 877 destination_cid=self.destination_cid, 878 flags=0x0000, 879 options=options, 880 ) 881 ) 882 883 def on_connection_request(self, request) -> None: 884 self.destination_cid = request.source_cid 885 self._change_state(self.State.WAIT_CONNECT) 886 self.send_control_frame( 887 L2CAP_Connection_Response( 888 identifier=request.identifier, 889 destination_cid=self.source_cid, 890 source_cid=self.destination_cid, 891 result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, 892 status=0x0000, 893 ) 894 ) 895 self._change_state(self.State.WAIT_CONFIG) 896 self.send_configure_request() 897 self._change_state(self.State.WAIT_CONFIG_REQ_RSP) 898 899 def on_connection_response(self, response): 900 if self.state != self.State.WAIT_CONNECT_RSP: 901 logger.warning(color('invalid state', 'red')) 902 return 903 904 if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL: 905 self.destination_cid = response.destination_cid 906 self._change_state(self.State.WAIT_CONFIG) 907 self.send_configure_request() 908 self._change_state(self.State.WAIT_CONFIG_REQ_RSP) 909 elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING: 910 pass 911 else: 912 self._change_state(self.State.CLOSED) 913 self.connection_result.set_exception( 914 ProtocolError( 915 response.result, 916 'l2cap', 917 L2CAP_Connection_Response.result_name(response.result), 918 ) 919 ) 920 self.connection_result = None 921 922 def on_configure_request(self, request) -> None: 923 if self.state not in ( 924 self.State.WAIT_CONFIG, 925 self.State.WAIT_CONFIG_REQ, 926 self.State.WAIT_CONFIG_REQ_RSP, 927 ): 928 logger.warning(color('invalid state', 'red')) 929 return 930 931 # Decode the options 932 options = L2CAP_Control_Frame.decode_configuration_options(request.options) 933 for option in options: 934 if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE: 935 self.peer_mtu = struct.unpack('<H', option[1])[0] 936 logger.debug(f'peer MTU = {self.peer_mtu}') 937 938 self.send_control_frame( 939 L2CAP_Configure_Response( 940 identifier=request.identifier, 941 source_cid=self.destination_cid, 942 flags=0x0000, 943 result=L2CAP_Configure_Response.SUCCESS, 944 options=request.options, # TODO: don't accept everything blindly 945 ) 946 ) 947 if self.state == self.State.WAIT_CONFIG: 948 self._change_state(self.State.WAIT_SEND_CONFIG) 949 self.send_configure_request() 950 self._change_state(self.State.WAIT_CONFIG_RSP) 951 elif self.state == self.State.WAIT_CONFIG_REQ: 952 self._change_state(self.State.OPEN) 953 if self.connection_result: 954 self.connection_result.set_result(None) 955 self.connection_result = None 956 self.emit('open') 957 elif self.state == self.State.WAIT_CONFIG_REQ_RSP: 958 self._change_state(self.State.WAIT_CONFIG_RSP) 959 960 def on_configure_response(self, response) -> None: 961 if response.result == L2CAP_Configure_Response.SUCCESS: 962 if self.state == self.State.WAIT_CONFIG_REQ_RSP: 963 self._change_state(self.State.WAIT_CONFIG_REQ) 964 elif self.state in ( 965 self.State.WAIT_CONFIG_RSP, 966 self.State.WAIT_CONTROL_IND, 967 ): 968 self._change_state(self.State.OPEN) 969 if self.connection_result: 970 self.connection_result.set_result(None) 971 self.connection_result = None 972 self.emit('open') 973 else: 974 logger.warning(color('invalid state', 'red')) 975 elif ( 976 response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS 977 ): 978 # Re-configure with what's suggested in the response 979 self.send_control_frame( 980 L2CAP_Configure_Request( 981 identifier=self.manager.next_identifier(self.connection), 982 destination_cid=self.destination_cid, 983 flags=0x0000, 984 options=response.options, 985 ) 986 ) 987 else: 988 logger.warning( 989 color( 990 '!!! configuration rejected: ' 991 f'{L2CAP_Configure_Response.result_name(response.result)}', 992 'red', 993 ) 994 ) 995 # TODO: decide how to fail gracefully 996 997 def on_disconnection_request(self, request) -> None: 998 if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT): 999 self.send_control_frame( 1000 L2CAP_Disconnection_Response( 1001 identifier=request.identifier, 1002 destination_cid=request.destination_cid, 1003 source_cid=request.source_cid, 1004 ) 1005 ) 1006 self._change_state(self.State.CLOSED) 1007 self.emit('close') 1008 self.manager.on_channel_closed(self) 1009 else: 1010 logger.warning(color('invalid state', 'red')) 1011 1012 def on_disconnection_response(self, response) -> None: 1013 if self.state != self.State.WAIT_DISCONNECT: 1014 logger.warning(color('invalid state', 'red')) 1015 return 1016 1017 if ( 1018 response.destination_cid != self.destination_cid 1019 or response.source_cid != self.source_cid 1020 ): 1021 logger.warning('unexpected source or destination CID') 1022 return 1023 1024 self._change_state(self.State.CLOSED) 1025 if self.disconnection_result: 1026 self.disconnection_result.set_result(None) 1027 self.disconnection_result = None 1028 self.emit('close') 1029 self.manager.on_channel_closed(self) 1030 1031 def __str__(self) -> str: 1032 return ( 1033 f'Channel({self.source_cid}->{self.destination_cid}, ' 1034 f'PSM={self.psm}, ' 1035 f'MTU={self.mtu}/{self.peer_mtu}, ' 1036 f'state={self.state.name})' 1037 ) 1038 1039 1040# ----------------------------------------------------------------------------- 1041class LeCreditBasedChannel(EventEmitter): 1042 """ 1043 LE Credit-based Connection Oriented Channel 1044 """ 1045 1046 class State(enum.IntEnum): 1047 INIT = 0 1048 CONNECTED = 1 1049 CONNECTING = 2 1050 DISCONNECTING = 3 1051 DISCONNECTED = 4 1052 CONNECTION_ERROR = 5 1053 1054 out_queue: Deque[bytes] 1055 connection_result: Optional[asyncio.Future[LeCreditBasedChannel]] 1056 disconnection_result: Optional[asyncio.Future[None]] 1057 in_sdu: Optional[bytes] 1058 out_sdu: Optional[bytes] 1059 state: State 1060 connection: Connection 1061 sink: Optional[Callable[[bytes], Any]] 1062 1063 def __init__( 1064 self, 1065 manager: ChannelManager, 1066 connection: Connection, 1067 le_psm: int, 1068 source_cid: int, 1069 destination_cid: int, 1070 mtu: int, 1071 mps: int, 1072 credits: int, # pylint: disable=redefined-builtin 1073 peer_mtu: int, 1074 peer_mps: int, 1075 peer_credits: int, 1076 connected: bool, 1077 ) -> None: 1078 super().__init__() 1079 self.manager = manager 1080 self.connection = connection 1081 self.le_psm = le_psm 1082 self.source_cid = source_cid 1083 self.destination_cid = destination_cid 1084 self.mtu = mtu 1085 self.mps = mps 1086 self.credits = credits 1087 self.peer_mtu = peer_mtu 1088 self.peer_mps = peer_mps 1089 self.peer_credits = peer_credits 1090 self.peer_max_credits = self.peer_credits 1091 self.peer_credits_threshold = self.peer_max_credits // 2 1092 self.in_sdu = None 1093 self.in_sdu_length = 0 1094 self.out_queue = deque() 1095 self.out_sdu = None 1096 self.sink = None 1097 self.connected = False 1098 self.connection_result = None 1099 self.disconnection_result = None 1100 self.drained = asyncio.Event() 1101 1102 self.drained.set() 1103 1104 if connected: 1105 self.state = self.State.CONNECTED 1106 else: 1107 self.state = self.State.INIT 1108 1109 def _change_state(self, new_state: State) -> None: 1110 logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') 1111 self.state = new_state 1112 1113 if new_state == self.State.CONNECTED: 1114 self.emit('open') 1115 elif new_state == self.State.DISCONNECTED: 1116 self.emit('close') 1117 1118 def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: 1119 self.manager.send_pdu(self.connection, self.destination_cid, pdu) 1120 1121 def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: 1122 self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame) 1123 1124 async def connect(self) -> LeCreditBasedChannel: 1125 # Check that we're in the right state 1126 if self.state != self.State.INIT: 1127 raise InvalidStateError('not in a connectable state') 1128 1129 # Check that we can start a new connection 1130 identifier = self.manager.next_identifier(self.connection) 1131 if identifier in self.manager.le_coc_requests: 1132 raise RuntimeError('too many concurrent connection requests') 1133 1134 self._change_state(self.State.CONNECTING) 1135 request = L2CAP_LE_Credit_Based_Connection_Request( 1136 identifier=identifier, 1137 le_psm=self.le_psm, 1138 source_cid=self.source_cid, 1139 mtu=self.mtu, 1140 mps=self.mps, 1141 initial_credits=self.peer_credits, 1142 ) 1143 self.manager.le_coc_requests[identifier] = request 1144 self.send_control_frame(request) 1145 1146 # Create a future to wait for the response 1147 self.connection_result = asyncio.get_running_loop().create_future() 1148 1149 # Wait for the connection to succeed or fail 1150 return await self.connection_result 1151 1152 async def disconnect(self) -> None: 1153 # Check that we're connected 1154 if self.state != self.State.CONNECTED: 1155 raise InvalidStateError('not connected') 1156 1157 self._change_state(self.State.DISCONNECTING) 1158 self.flush_output() 1159 self.send_control_frame( 1160 L2CAP_Disconnection_Request( 1161 identifier=self.manager.next_identifier(self.connection), 1162 destination_cid=self.destination_cid, 1163 source_cid=self.source_cid, 1164 ) 1165 ) 1166 1167 # Create a future to wait for the state machine to get to a success or error 1168 # state 1169 self.disconnection_result = asyncio.get_running_loop().create_future() 1170 return await self.disconnection_result 1171 1172 def abort(self) -> None: 1173 if self.state == self.State.CONNECTED: 1174 self._change_state(self.State.DISCONNECTED) 1175 1176 def on_pdu(self, pdu: bytes) -> None: 1177 if self.sink is None: 1178 logger.warning('received pdu without a sink') 1179 return 1180 1181 if self.state != self.State.CONNECTED: 1182 logger.warning('received PDU while not connected, dropping') 1183 1184 # Manage the peer credits 1185 if self.peer_credits == 0: 1186 logger.warning('received LE frame when peer out of credits') 1187 else: 1188 self.peer_credits -= 1 1189 if self.peer_credits <= self.peer_credits_threshold: 1190 # The credits fell below the threshold, replenish them to the max 1191 self.send_control_frame( 1192 L2CAP_LE_Flow_Control_Credit( 1193 identifier=self.manager.next_identifier(self.connection), 1194 cid=self.source_cid, 1195 credits=self.peer_max_credits - self.peer_credits, 1196 ) 1197 ) 1198 self.peer_credits = self.peer_max_credits 1199 1200 # Check if this starts a new SDU 1201 if self.in_sdu is None: 1202 # Start a new SDU 1203 self.in_sdu = pdu 1204 else: 1205 # Continue an SDU 1206 self.in_sdu += pdu 1207 1208 # Check if the SDU is complete 1209 if self.in_sdu_length == 0: 1210 # We don't know the size yet, check if we have received the header to 1211 # compute it 1212 if len(self.in_sdu) >= 2: 1213 self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0] 1214 if self.in_sdu_length == 0: 1215 # We'll compute it later 1216 return 1217 if len(self.in_sdu) < 2 + self.in_sdu_length: 1218 # Not complete yet 1219 logger.debug( 1220 f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received' 1221 ) 1222 return 1223 if len(self.in_sdu) != 2 + self.in_sdu_length: 1224 # Overflow 1225 logger.warning( 1226 f'SDU overflow: sdu_length={self.in_sdu_length}, ' 1227 f'received {len(self.in_sdu) - 2}' 1228 ) 1229 # TODO: we should disconnect 1230 self.in_sdu = None 1231 self.in_sdu_length = 0 1232 return 1233 1234 # Send the SDU to the sink 1235 logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes') 1236 self.sink(self.in_sdu[2:]) # pylint: disable=not-callable 1237 1238 # Prepare for a new SDU 1239 self.in_sdu = None 1240 self.in_sdu_length = 0 1241 1242 def on_connection_response(self, response) -> None: 1243 # Look for a matching pending response result 1244 if self.connection_result is None: 1245 logger.warning( 1246 f'received unexpected connection response (id={response.identifier})' 1247 ) 1248 return 1249 1250 if ( 1251 response.result 1252 == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL 1253 ): 1254 self.destination_cid = response.destination_cid 1255 self.peer_mtu = response.mtu 1256 self.peer_mps = response.mps 1257 self.credits = response.initial_credits 1258 self.connected = True 1259 self.connection_result.set_result(self) 1260 self._change_state(self.State.CONNECTED) 1261 else: 1262 self.connection_result.set_exception( 1263 ProtocolError( 1264 response.result, 1265 'l2cap', 1266 L2CAP_LE_Credit_Based_Connection_Response.result_name( 1267 response.result 1268 ), 1269 ) 1270 ) 1271 self._change_state(self.State.CONNECTION_ERROR) 1272 1273 # Cleanup 1274 self.connection_result = None 1275 1276 def on_credits(self, credits: int) -> None: # pylint: disable=redefined-builtin 1277 self.credits += credits 1278 logger.debug(f'received {credits} credits, total = {self.credits}') 1279 1280 # Try to send more data if we have any queued up 1281 self.process_output() 1282 1283 def on_disconnection_request(self, request) -> None: 1284 self.send_control_frame( 1285 L2CAP_Disconnection_Response( 1286 identifier=request.identifier, 1287 destination_cid=request.destination_cid, 1288 source_cid=request.source_cid, 1289 ) 1290 ) 1291 self._change_state(self.State.DISCONNECTED) 1292 self.flush_output() 1293 1294 def on_disconnection_response(self, response) -> None: 1295 if self.state != self.State.DISCONNECTING: 1296 logger.warning(color('invalid state', 'red')) 1297 return 1298 1299 if ( 1300 response.destination_cid != self.destination_cid 1301 or response.source_cid != self.source_cid 1302 ): 1303 logger.warning('unexpected source or destination CID') 1304 return 1305 1306 self._change_state(self.State.DISCONNECTED) 1307 if self.disconnection_result: 1308 self.disconnection_result.set_result(None) 1309 self.disconnection_result = None 1310 1311 def flush_output(self) -> None: 1312 self.out_queue.clear() 1313 self.out_sdu = None 1314 1315 def process_output(self) -> None: 1316 while self.credits > 0: 1317 if self.out_sdu is not None: 1318 # Finish the current SDU 1319 packet = self.out_sdu[: self.peer_mps] 1320 self.send_pdu(packet) 1321 self.credits -= 1 1322 logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left') 1323 if len(packet) == len(self.out_sdu): 1324 # We sent everything 1325 self.out_sdu = None 1326 else: 1327 # Keep what's still left to send 1328 self.out_sdu = self.out_sdu[len(packet) :] 1329 continue 1330 1331 if self.out_queue: 1332 # Create the next SDU (2 bytes header plus up to MTU bytes payload) 1333 logger.debug( 1334 f'assembling SDU from {len(self.out_queue)} packets in output queue' 1335 ) 1336 payload = b'' 1337 while self.out_queue and len(payload) < self.peer_mtu: 1338 # We can add more data to the payload 1339 chunk = self.out_queue[0][: self.peer_mtu - len(payload)] 1340 payload += chunk 1341 self.out_queue[0] = self.out_queue[0][len(chunk) :] 1342 if len(self.out_queue[0]) == 0: 1343 # We consumed the entire buffer, remove it 1344 self.out_queue.popleft() 1345 logger.debug( 1346 f'packet completed, {len(self.out_queue)} left in queue' 1347 ) 1348 1349 # Construct the SDU with its header 1350 assert len(payload) != 0 1351 logger.debug(f'SDU complete: {len(payload)} payload bytes') 1352 self.out_sdu = struct.pack('<H', len(payload)) + payload 1353 else: 1354 # Nothing left to send for now 1355 self.drained.set() 1356 return 1357 1358 def write(self, data: bytes) -> None: 1359 if self.state != self.State.CONNECTED: 1360 logger.warning('not connected, dropping data') 1361 return 1362 1363 # Queue the data 1364 self.out_queue.append(data) 1365 self.drained.clear() 1366 logger.debug( 1367 f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue' 1368 ) 1369 1370 # Send what we can 1371 self.process_output() 1372 1373 async def drain(self) -> None: 1374 await self.drained.wait() 1375 1376 def pause_reading(self) -> None: 1377 # TODO: not implemented yet 1378 pass 1379 1380 def resume_reading(self) -> None: 1381 # TODO: not implemented yet 1382 pass 1383 1384 def __str__(self) -> str: 1385 return ( 1386 f'CoC({self.source_cid}->{self.destination_cid}, ' 1387 f'State={self.state.name}, ' 1388 f'PSM={self.le_psm}, ' 1389 f'MTU={self.mtu}/{self.peer_mtu}, ' 1390 f'MPS={self.mps}/{self.peer_mps}, ' 1391 f'credits={self.credits}/{self.peer_credits})' 1392 ) 1393 1394 1395# ----------------------------------------------------------------------------- 1396class ClassicChannelServer(EventEmitter): 1397 def __init__( 1398 self, 1399 manager: ChannelManager, 1400 psm: int, 1401 handler: Optional[Callable[[ClassicChannel], Any]], 1402 mtu: int, 1403 ) -> None: 1404 super().__init__() 1405 self.manager = manager 1406 self.handler = handler 1407 self.psm = psm 1408 self.mtu = mtu 1409 1410 def on_connection(self, channel: ClassicChannel) -> None: 1411 self.emit('connection', channel) 1412 if self.handler: 1413 self.handler(channel) 1414 1415 def close(self) -> None: 1416 if self.psm in self.manager.servers: 1417 del self.manager.servers[self.psm] 1418 1419 1420# ----------------------------------------------------------------------------- 1421class LeCreditBasedChannelServer(EventEmitter): 1422 def __init__( 1423 self, 1424 manager: ChannelManager, 1425 psm: int, 1426 handler: Optional[Callable[[LeCreditBasedChannel], Any]], 1427 max_credits: int, 1428 mtu: int, 1429 mps: int, 1430 ) -> None: 1431 super().__init__() 1432 self.manager = manager 1433 self.handler = handler 1434 self.psm = psm 1435 self.max_credits = max_credits 1436 self.mtu = mtu 1437 self.mps = mps 1438 1439 def on_connection(self, channel: LeCreditBasedChannel) -> None: 1440 self.emit('connection', channel) 1441 if self.handler: 1442 self.handler(channel) 1443 1444 def close(self) -> None: 1445 if self.psm in self.manager.le_coc_servers: 1446 del self.manager.le_coc_servers[self.psm] 1447 1448 1449# ----------------------------------------------------------------------------- 1450class ChannelManager: 1451 identifiers: Dict[int, int] 1452 channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]] 1453 servers: Dict[int, ClassicChannelServer] 1454 le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]] 1455 le_coc_servers: Dict[int, LeCreditBasedChannelServer] 1456 le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request] 1457 fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]] 1458 _host: Optional[Host] 1459 connection_parameters_update_response: Optional[asyncio.Future[int]] 1460 1461 def __init__( 1462 self, 1463 extended_features: Iterable[int] = (), 1464 connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU, 1465 ) -> None: 1466 self._host = None 1467 self.identifiers = {} # Incrementing identifier values by connection 1468 self.channels = {} # All channels, mapped by connection and source cid 1469 self.fixed_channels = { # Fixed channel handlers, mapped by cid 1470 L2CAP_SIGNALING_CID: None, 1471 L2CAP_LE_SIGNALING_CID: None, 1472 } 1473 self.servers = {} # Servers accepting connections, by PSM 1474 self.le_coc_channels = ( 1475 {} 1476 ) # LE CoC channels, mapped by connection and destination cid 1477 self.le_coc_servers = {} # LE CoC - Servers accepting connections, by PSM 1478 self.le_coc_requests = {} # LE CoC connection requests, by identifier 1479 self.extended_features = extended_features 1480 self.connectionless_mtu = connectionless_mtu 1481 self.connection_parameters_update_response = None 1482 1483 @property 1484 def host(self) -> Host: 1485 assert self._host 1486 return self._host 1487 1488 @host.setter 1489 def host(self, host: Host) -> None: 1490 if self._host is not None: 1491 self._host.remove_listener('disconnection', self.on_disconnection) 1492 self._host = host 1493 if host is not None: 1494 host.on('disconnection', self.on_disconnection) 1495 1496 def find_channel(self, connection_handle: int, cid: int): 1497 if connection_channels := self.channels.get(connection_handle): 1498 return connection_channels.get(cid) 1499 1500 return None 1501 1502 def find_le_coc_channel(self, connection_handle: int, cid: int): 1503 if connection_channels := self.le_coc_channels.get(connection_handle): 1504 return connection_channels.get(cid) 1505 1506 return None 1507 1508 @staticmethod 1509 def find_free_br_edr_cid(channels: Iterable[int]) -> int: 1510 # Pick the smallest valid CID that's not already in the list 1511 # (not necessarily the most efficient algorithm, but the list of CID is 1512 # very small in practice) 1513 for cid in range( 1514 L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1 1515 ): 1516 if cid not in channels: 1517 return cid 1518 1519 raise RuntimeError('no free CID available') 1520 1521 @staticmethod 1522 def find_free_le_cid(channels: Iterable[int]) -> int: 1523 # Pick the smallest valid CID that's not already in the list 1524 # (not necessarily the most efficient algorithm, but the list of CID is 1525 # very small in practice) 1526 for cid in range( 1527 L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1 1528 ): 1529 if cid not in channels: 1530 return cid 1531 1532 raise RuntimeError('no free CID') 1533 1534 def next_identifier(self, connection: Connection) -> int: 1535 identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256 1536 self.identifiers[connection.handle] = identifier 1537 return identifier 1538 1539 def register_fixed_channel( 1540 self, cid: int, handler: Callable[[int, bytes], Any] 1541 ) -> None: 1542 self.fixed_channels[cid] = handler 1543 1544 def deregister_fixed_channel(self, cid: int) -> None: 1545 if cid in self.fixed_channels: 1546 del self.fixed_channels[cid] 1547 1548 @deprecated("Please use create_classic_server") 1549 def register_server( 1550 self, 1551 psm: int, 1552 server: Callable[[ClassicChannel], Any], 1553 ) -> int: 1554 return self.create_classic_server( 1555 handler=server, spec=ClassicChannelSpec(psm=psm) 1556 ).psm 1557 1558 def create_classic_server( 1559 self, 1560 spec: ClassicChannelSpec, 1561 handler: Optional[Callable[[ClassicChannel], Any]] = None, 1562 ) -> ClassicChannelServer: 1563 if not spec.psm: 1564 # Find a free PSM 1565 for candidate in range( 1566 L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2 1567 ): 1568 if (candidate >> 8) % 2 == 1: 1569 continue 1570 if candidate in self.servers: 1571 continue 1572 spec.psm = candidate 1573 break 1574 else: 1575 raise InvalidStateError('no free PSM') 1576 else: 1577 # Check that the PSM isn't already in use 1578 if spec.psm in self.servers: 1579 raise ValueError('PSM already in use') 1580 1581 # Check that the PSM is valid 1582 if spec.psm % 2 == 0: 1583 raise ValueError('invalid PSM (not odd)') 1584 check = spec.psm >> 8 1585 while check: 1586 if check % 2 != 0: 1587 raise ValueError('invalid PSM') 1588 check >>= 8 1589 1590 self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu) 1591 1592 return self.servers[spec.psm] 1593 1594 @deprecated("Please use create_le_credit_based_server()") 1595 def register_le_coc_server( 1596 self, 1597 psm: int, 1598 server: Callable[[LeCreditBasedChannel], Any], 1599 max_credits: int, 1600 mtu: int, 1601 mps: int, 1602 ) -> int: 1603 return self.create_le_credit_based_server( 1604 spec=LeCreditBasedChannelSpec( 1605 psm=None if psm == 0 else psm, mtu=mtu, mps=mps, max_credits=max_credits 1606 ), 1607 handler=server, 1608 ).psm 1609 1610 def create_le_credit_based_server( 1611 self, 1612 spec: LeCreditBasedChannelSpec, 1613 handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None, 1614 ) -> LeCreditBasedChannelServer: 1615 if not spec.psm: 1616 # Find a free PSM 1617 for candidate in range( 1618 L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1 1619 ): 1620 if candidate in self.le_coc_servers: 1621 continue 1622 spec.psm = candidate 1623 break 1624 else: 1625 raise InvalidStateError('no free PSM') 1626 else: 1627 # Check that the PSM isn't already in use 1628 if spec.psm in self.le_coc_servers: 1629 raise ValueError('PSM already in use') 1630 1631 self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer( 1632 self, 1633 spec.psm, 1634 handler, 1635 max_credits=spec.max_credits, 1636 mtu=spec.mtu, 1637 mps=spec.mps, 1638 ) 1639 1640 return self.le_coc_servers[spec.psm] 1641 1642 def on_disconnection(self, connection_handle: int, _reason: int) -> None: 1643 logger.debug(f'disconnection from {connection_handle}, cleaning up channels') 1644 if connection_handle in self.channels: 1645 for _, channel in self.channels[connection_handle].items(): 1646 channel.abort() 1647 del self.channels[connection_handle] 1648 if connection_handle in self.le_coc_channels: 1649 for _, channel in self.le_coc_channels[connection_handle].items(): 1650 channel.abort() 1651 del self.le_coc_channels[connection_handle] 1652 if connection_handle in self.identifiers: 1653 del self.identifiers[connection_handle] 1654 1655 def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None: 1656 pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu) 1657 pdu_bytes = bytes(pdu) 1658 logger.debug( 1659 f'{color(">>> Sending L2CAP PDU", "blue")} ' 1660 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1661 f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}' 1662 ) 1663 self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes) 1664 1665 def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: 1666 if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): 1667 # Parse the L2CAP payload into a Control Frame object 1668 control_frame = L2CAP_Control_Frame.from_bytes(pdu) 1669 1670 self.on_control_frame(connection, cid, control_frame) 1671 elif cid in self.fixed_channels: 1672 handler = self.fixed_channels[cid] 1673 assert handler is not None 1674 handler(connection.handle, pdu) 1675 else: 1676 if (channel := self.find_channel(connection.handle, cid)) is None: 1677 logger.warning( 1678 color( 1679 f'channel not found for 0x{connection.handle:04X}:{cid}', 'red' 1680 ) 1681 ) 1682 return 1683 1684 channel.on_pdu(pdu) 1685 1686 def send_control_frame( 1687 self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame 1688 ) -> None: 1689 logger.debug( 1690 f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} ' 1691 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1692 f'{connection.peer_address}:\n{control_frame}' 1693 ) 1694 self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame)) 1695 1696 def on_control_frame( 1697 self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame 1698 ) -> None: 1699 logger.debug( 1700 f'{color("<<< Received L2CAP Signaling Control Frame", "green")} ' 1701 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1702 f'{connection.peer_address}:\n{control_frame}' 1703 ) 1704 1705 # Find the handler method 1706 handler_name = f'on_{control_frame.name.lower()}' 1707 handler = getattr(self, handler_name, None) 1708 if handler: 1709 try: 1710 handler(connection, cid, control_frame) 1711 except Exception as error: 1712 logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') 1713 self.send_control_frame( 1714 connection, 1715 cid, 1716 L2CAP_Command_Reject( 1717 identifier=control_frame.identifier, 1718 reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON, 1719 data=b'', 1720 ), 1721 ) 1722 raise error 1723 else: 1724 logger.error(color('Channel Manager command not handled???', 'red')) 1725 self.send_control_frame( 1726 connection, 1727 cid, 1728 L2CAP_Command_Reject( 1729 identifier=control_frame.identifier, 1730 reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON, 1731 data=b'', 1732 ), 1733 ) 1734 1735 def on_l2cap_command_reject( 1736 self, _connection: Connection, _cid: int, packet 1737 ) -> None: 1738 logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}') 1739 1740 def on_l2cap_connection_request( 1741 self, connection: Connection, cid: int, request 1742 ) -> None: 1743 # Check if there's a server for this PSM 1744 server = self.servers.get(request.psm) 1745 if server: 1746 # Find a free CID for this new channel 1747 connection_channels = self.channels.setdefault(connection.handle, {}) 1748 source_cid = self.find_free_br_edr_cid(connection_channels) 1749 if source_cid is None: # Should never happen! 1750 self.send_control_frame( 1751 connection, 1752 cid, 1753 L2CAP_Connection_Response( 1754 identifier=request.identifier, 1755 destination_cid=request.source_cid, 1756 source_cid=0, 1757 # pylint: disable=line-too-long 1758 result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, 1759 status=0x0000, 1760 ), 1761 ) 1762 return 1763 1764 # Create a new channel 1765 logger.debug( 1766 f'creating server channel with cid={source_cid} for psm {request.psm}' 1767 ) 1768 channel = ClassicChannel( 1769 self, connection, cid, request.psm, source_cid, server.mtu 1770 ) 1771 connection_channels[source_cid] = channel 1772 1773 # Notify 1774 server.on_connection(channel) 1775 channel.on_connection_request(request) 1776 else: 1777 logger.warning( 1778 f'No server for connection 0x{connection.handle:04X} ' 1779 f'on PSM {request.psm}' 1780 ) 1781 self.send_control_frame( 1782 connection, 1783 cid, 1784 L2CAP_Connection_Response( 1785 identifier=request.identifier, 1786 destination_cid=request.source_cid, 1787 source_cid=0, 1788 # pylint: disable=line-too-long 1789 result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED, 1790 status=0x0000, 1791 ), 1792 ) 1793 1794 def on_l2cap_connection_response( 1795 self, connection: Connection, cid: int, response 1796 ) -> None: 1797 if ( 1798 channel := self.find_channel(connection.handle, response.source_cid) 1799 ) is None: 1800 logger.warning( 1801 color( 1802 f'channel {response.source_cid} not found for ' 1803 f'0x{connection.handle:04X}:{cid}', 1804 'red', 1805 ) 1806 ) 1807 return 1808 1809 channel.on_connection_response(response) 1810 1811 def on_l2cap_configure_request( 1812 self, connection: Connection, cid: int, request 1813 ) -> None: 1814 if ( 1815 channel := self.find_channel(connection.handle, request.destination_cid) 1816 ) is None: 1817 logger.warning( 1818 color( 1819 f'channel {request.destination_cid} not found for ' 1820 f'0x{connection.handle:04X}:{cid}', 1821 'red', 1822 ) 1823 ) 1824 return 1825 1826 channel.on_configure_request(request) 1827 1828 def on_l2cap_configure_response( 1829 self, connection: Connection, cid: int, response 1830 ) -> None: 1831 if ( 1832 channel := self.find_channel(connection.handle, response.source_cid) 1833 ) is None: 1834 logger.warning( 1835 color( 1836 f'channel {response.source_cid} not found for ' 1837 f'0x{connection.handle:04X}:{cid}', 1838 'red', 1839 ) 1840 ) 1841 return 1842 1843 channel.on_configure_response(response) 1844 1845 def on_l2cap_disconnection_request( 1846 self, connection: Connection, cid: int, request 1847 ) -> None: 1848 if ( 1849 channel := self.find_channel(connection.handle, request.destination_cid) 1850 ) is None: 1851 logger.warning( 1852 color( 1853 f'channel {request.destination_cid} not found for ' 1854 f'0x{connection.handle:04X}:{cid}', 1855 'red', 1856 ) 1857 ) 1858 return 1859 1860 channel.on_disconnection_request(request) 1861 1862 def on_l2cap_disconnection_response( 1863 self, connection: Connection, cid: int, response 1864 ) -> None: 1865 if ( 1866 channel := self.find_channel(connection.handle, response.source_cid) 1867 ) is None: 1868 logger.warning( 1869 color( 1870 f'channel {response.source_cid} not found for ' 1871 f'0x{connection.handle:04X}:{cid}', 1872 'red', 1873 ) 1874 ) 1875 return 1876 1877 channel.on_disconnection_response(response) 1878 1879 def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None: 1880 logger.debug(f'<<< Echo request: data={request.data.hex()}') 1881 self.send_control_frame( 1882 connection, 1883 cid, 1884 L2CAP_Echo_Response(identifier=request.identifier, data=request.data), 1885 ) 1886 1887 def on_l2cap_echo_response( 1888 self, _connection: Connection, _cid: int, response 1889 ) -> None: 1890 logger.debug(f'<<< Echo response: data={response.data.hex()}') 1891 # TODO notify listeners 1892 1893 def on_l2cap_information_request( 1894 self, connection: Connection, cid: int, request 1895 ) -> None: 1896 if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU: 1897 result = L2CAP_Information_Response.SUCCESS 1898 data = self.connectionless_mtu.to_bytes(2, 'little') 1899 elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED: 1900 result = L2CAP_Information_Response.SUCCESS 1901 data = sum(self.extended_features).to_bytes(4, 'little') 1902 elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED: 1903 result = L2CAP_Information_Response.SUCCESS 1904 data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little') 1905 else: 1906 result = L2CAP_Information_Response.NOT_SUPPORTED 1907 1908 self.send_control_frame( 1909 connection, 1910 cid, 1911 L2CAP_Information_Response( 1912 identifier=request.identifier, 1913 info_type=request.info_type, 1914 result=result, 1915 data=data, 1916 ), 1917 ) 1918 1919 def on_l2cap_connection_parameter_update_request( 1920 self, connection: Connection, cid: int, request 1921 ): 1922 if connection.role == BT_CENTRAL_ROLE: 1923 self.send_control_frame( 1924 connection, 1925 cid, 1926 L2CAP_Connection_Parameter_Update_Response( 1927 identifier=request.identifier, 1928 result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT, 1929 ), 1930 ) 1931 self.host.send_command_sync( 1932 HCI_LE_Connection_Update_Command( 1933 connection_handle=connection.handle, 1934 connection_interval_min=request.interval_min, 1935 connection_interval_max=request.interval_max, 1936 max_latency=request.latency, 1937 supervision_timeout=request.timeout, 1938 min_ce_length=0, 1939 max_ce_length=0, 1940 ) 1941 ) 1942 else: 1943 self.send_control_frame( 1944 connection, 1945 cid, 1946 L2CAP_Connection_Parameter_Update_Response( 1947 identifier=request.identifier, 1948 result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT, 1949 ), 1950 ) 1951 1952 async def update_connection_parameters( 1953 self, 1954 connection: Connection, 1955 interval_min: int, 1956 interval_max: int, 1957 latency: int, 1958 timeout: int, 1959 ) -> int: 1960 # Check that there isn't already a request pending 1961 if self.connection_parameters_update_response: 1962 raise InvalidStateError('request already pending') 1963 self.connection_parameters_update_response = ( 1964 asyncio.get_running_loop().create_future() 1965 ) 1966 self.send_control_frame( 1967 connection, 1968 L2CAP_LE_SIGNALING_CID, 1969 L2CAP_Connection_Parameter_Update_Request( 1970 interval_min=interval_min, 1971 interval_max=interval_max, 1972 latency=latency, 1973 timeout=timeout, 1974 ), 1975 ) 1976 return await self.connection_parameters_update_response 1977 1978 def on_l2cap_connection_parameter_update_response( 1979 self, connection: Connection, cid: int, response 1980 ) -> None: 1981 if self.connection_parameters_update_response: 1982 self.connection_parameters_update_response.set_result(response.result) 1983 self.connection_parameters_update_response = None 1984 else: 1985 logger.warning( 1986 color( 1987 'received l2cap_connection_parameter_update_response without a pending request', 1988 'red', 1989 ) 1990 ) 1991 1992 def on_l2cap_le_credit_based_connection_request( 1993 self, connection: Connection, cid: int, request 1994 ) -> None: 1995 if request.le_psm in self.le_coc_servers: 1996 server = self.le_coc_servers[request.le_psm] 1997 1998 # Check that the CID isn't already used 1999 le_connection_channels = self.le_coc_channels.setdefault( 2000 connection.handle, {} 2001 ) 2002 if request.source_cid in le_connection_channels: 2003 logger.warning(f'source CID {request.source_cid} already in use') 2004 self.send_control_frame( 2005 connection, 2006 cid, 2007 L2CAP_LE_Credit_Based_Connection_Response( 2008 identifier=request.identifier, 2009 destination_cid=0, 2010 mtu=server.mtu, 2011 mps=server.mps, 2012 initial_credits=0, 2013 # pylint: disable=line-too-long 2014 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED, 2015 ), 2016 ) 2017 return 2018 2019 # Find a free CID for this new channel 2020 connection_channels = self.channels.setdefault(connection.handle, {}) 2021 source_cid = self.find_free_le_cid(connection_channels) 2022 if source_cid is None: # Should never happen! 2023 self.send_control_frame( 2024 connection, 2025 cid, 2026 L2CAP_LE_Credit_Based_Connection_Response( 2027 identifier=request.identifier, 2028 destination_cid=0, 2029 mtu=server.mtu, 2030 mps=server.mps, 2031 initial_credits=0, 2032 # pylint: disable=line-too-long 2033 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, 2034 ), 2035 ) 2036 return 2037 2038 # Create a new channel 2039 logger.debug( 2040 f'creating LE CoC server channel with cid={source_cid} for psm ' 2041 f'{request.le_psm}' 2042 ) 2043 channel = LeCreditBasedChannel( 2044 self, 2045 connection, 2046 request.le_psm, 2047 source_cid, 2048 request.source_cid, 2049 server.mtu, 2050 server.mps, 2051 request.initial_credits, 2052 request.mtu, 2053 request.mps, 2054 server.max_credits, 2055 True, 2056 ) 2057 connection_channels[source_cid] = channel 2058 le_connection_channels[request.source_cid] = channel 2059 2060 # Respond 2061 self.send_control_frame( 2062 connection, 2063 cid, 2064 L2CAP_LE_Credit_Based_Connection_Response( 2065 identifier=request.identifier, 2066 destination_cid=source_cid, 2067 mtu=server.mtu, 2068 mps=server.mps, 2069 initial_credits=server.max_credits, 2070 # pylint: disable=line-too-long 2071 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL, 2072 ), 2073 ) 2074 2075 # Notify 2076 server.on_connection(channel) 2077 else: 2078 logger.info( 2079 f'No LE server for connection 0x{connection.handle:04X} ' 2080 f'on PSM {request.le_psm}' 2081 ) 2082 self.send_control_frame( 2083 connection, 2084 cid, 2085 L2CAP_LE_Credit_Based_Connection_Response( 2086 identifier=request.identifier, 2087 destination_cid=0, 2088 mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU, 2089 mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS, 2090 initial_credits=0, 2091 # pylint: disable=line-too-long 2092 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED, 2093 ), 2094 ) 2095 2096 def on_l2cap_le_credit_based_connection_response( 2097 self, connection: Connection, _cid: int, response 2098 ) -> None: 2099 # Find the pending request by identifier 2100 request = self.le_coc_requests.get(response.identifier) 2101 if request is None: 2102 logger.warning(color('!!! received response for unknown request', 'red')) 2103 return 2104 del self.le_coc_requests[response.identifier] 2105 2106 # Find the channel for this request 2107 channel = self.find_channel(connection.handle, request.source_cid) 2108 if channel is None: 2109 logger.warning( 2110 color( 2111 'received connection response for an unknown channel ' 2112 f'(cid={request.source_cid})', 2113 'red', 2114 ) 2115 ) 2116 return 2117 2118 # Process the response 2119 channel.on_connection_response(response) 2120 2121 def on_l2cap_le_flow_control_credit( 2122 self, connection: Connection, _cid: int, credit 2123 ) -> None: 2124 channel = self.find_le_coc_channel(connection.handle, credit.cid) 2125 if channel is None: 2126 logger.warning(f'received credits for an unknown channel (cid={credit.cid}') 2127 return 2128 2129 channel.on_credits(credit.credits) 2130 2131 def on_channel_closed(self, channel: ClassicChannel) -> None: 2132 connection_channels = self.channels.get(channel.connection.handle) 2133 if connection_channels: 2134 if channel.source_cid in connection_channels: 2135 del connection_channels[channel.source_cid] 2136 2137 @deprecated("Please use create_le_credit_based_channel()") 2138 async def open_le_coc( 2139 self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int 2140 ) -> LeCreditBasedChannel: 2141 return await self.create_le_credit_based_channel( 2142 connection=connection, 2143 spec=LeCreditBasedChannelSpec( 2144 psm=psm, max_credits=max_credits, mtu=mtu, mps=mps 2145 ), 2146 ) 2147 2148 async def create_le_credit_based_channel( 2149 self, 2150 connection: Connection, 2151 spec: LeCreditBasedChannelSpec, 2152 ) -> LeCreditBasedChannel: 2153 # Find a free CID for the new channel 2154 connection_channels = self.channels.setdefault(connection.handle, {}) 2155 source_cid = self.find_free_le_cid(connection_channels) 2156 if source_cid is None: # Should never happen! 2157 raise RuntimeError('all CIDs already in use') 2158 2159 if spec.psm is None: 2160 raise ValueError('PSM cannot be None') 2161 2162 # Create the channel 2163 logger.debug(f'creating coc channel with cid={source_cid} for psm {spec.psm}') 2164 channel = LeCreditBasedChannel( 2165 manager=self, 2166 connection=connection, 2167 le_psm=spec.psm, 2168 source_cid=source_cid, 2169 destination_cid=0, 2170 mtu=spec.mtu, 2171 mps=spec.mps, 2172 credits=0, 2173 peer_mtu=0, 2174 peer_mps=0, 2175 peer_credits=spec.max_credits, 2176 connected=False, 2177 ) 2178 connection_channels[source_cid] = channel 2179 2180 # Connect 2181 try: 2182 await channel.connect() 2183 except Exception as error: 2184 logger.warning(f'connection failed: {error}') 2185 del connection_channels[source_cid] 2186 raise 2187 2188 # Remember the channel by source CID and destination CID 2189 le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {}) 2190 le_connection_channels[channel.destination_cid] = channel 2191 2192 return channel 2193 2194 @deprecated("Please use create_classic_channel()") 2195 async def connect(self, connection: Connection, psm: int) -> ClassicChannel: 2196 return await self.create_classic_channel( 2197 connection=connection, spec=ClassicChannelSpec(psm=psm) 2198 ) 2199 2200 async def create_classic_channel( 2201 self, connection: Connection, spec: ClassicChannelSpec 2202 ) -> ClassicChannel: 2203 # NOTE: this implementation hard-codes BR/EDR 2204 2205 # Find a free CID for a new channel 2206 connection_channels = self.channels.setdefault(connection.handle, {}) 2207 source_cid = self.find_free_br_edr_cid(connection_channels) 2208 if source_cid is None: # Should never happen! 2209 raise RuntimeError('all CIDs already in use') 2210 2211 if spec.psm is None: 2212 raise ValueError('PSM cannot be None') 2213 2214 # Create the channel 2215 logger.debug( 2216 f'creating client channel with cid={source_cid} for psm {spec.psm}' 2217 ) 2218 channel = ClassicChannel( 2219 self, 2220 connection, 2221 L2CAP_SIGNALING_CID, 2222 spec.psm, 2223 source_cid, 2224 spec.mtu, 2225 ) 2226 connection_channels[source_cid] = channel 2227 2228 # Connect 2229 try: 2230 await channel.connect() 2231 except BaseException as e: 2232 del connection_channels[source_cid] 2233 raise e 2234 2235 return channel 2236 2237 2238# ----------------------------------------------------------------------------- 2239# Deprecated Classes 2240# ----------------------------------------------------------------------------- 2241 2242 2243class Channel(ClassicChannel): 2244 @deprecated("Please use ClassicChannel") 2245 def __init__(self, *args, **kwargs) -> None: 2246 super().__init__(*args, **kwargs) 2247 2248 2249class LeConnectionOrientedChannel(LeCreditBasedChannel): 2250 @deprecated("Please use LeCreditBasedChannel") 2251 def __init__(self, *args, **kwargs) -> None: 2252 super().__init__(*args, **kwargs) 2253