• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import collections
21import dataclasses
22import logging
23import struct
24
25from typing import (
26    Any,
27    Awaitable,
28    Callable,
29    Deque,
30    Dict,
31    Optional,
32    Set,
33    cast,
34    TYPE_CHECKING,
35)
36
37from bumble.colors import color
38from bumble.l2cap import L2CAP_PDU
39from bumble.snoop import Snooper
40from bumble import drivers
41from bumble import hci
42from bumble.core import (
43    BT_BR_EDR_TRANSPORT,
44    BT_LE_TRANSPORT,
45    ConnectionPHY,
46    ConnectionParameters,
47)
48from bumble.utils import AbortableEventEmitter
49from bumble.transport.common import TransportLostError
50
51if TYPE_CHECKING:
52    from .transport.common import TransportSink, TransportSource
53
54
55# -----------------------------------------------------------------------------
56# Logging
57# -----------------------------------------------------------------------------
58logger = logging.getLogger(__name__)
59
60
61# -----------------------------------------------------------------------------
62class AclPacketQueue:
63    max_packet_size: int
64
65    def __init__(
66        self,
67        max_packet_size: int,
68        max_in_flight: int,
69        send: Callable[[hci.HCI_Packet], None],
70    ) -> None:
71        self.max_packet_size = max_packet_size
72        self.max_in_flight = max_in_flight
73        self.in_flight = 0
74        self.send = send
75        self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
76
77    def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
78        self.packets.appendleft(packet)
79        self.check_queue()
80
81        if self.packets:
82            logger.debug(
83                f'{self.in_flight} ACL packets in flight, '
84                f'{len(self.packets)} in queue'
85            )
86
87    def check_queue(self) -> None:
88        while self.packets and self.in_flight < self.max_in_flight:
89            packet = self.packets.pop()
90            self.send(packet)
91            self.in_flight += 1
92
93    def on_packets_completed(self, packet_count: int) -> None:
94        if packet_count > self.in_flight:
95            logger.warning(
96                color(
97                    '!!! {packet_count} completed but only '
98                    f'{self.in_flight} in flight'
99                )
100            )
101            packet_count = self.in_flight
102
103        self.in_flight -= packet_count
104        self.check_queue()
105
106
107# -----------------------------------------------------------------------------
108class Connection:
109    def __init__(
110        self, host: Host, handle: int, peer_address: hci.Address, transport: int
111    ):
112        self.host = host
113        self.handle = handle
114        self.peer_address = peer_address
115        self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
116        self.transport = transport
117        acl_packet_queue: Optional[AclPacketQueue] = (
118            host.le_acl_packet_queue
119            if transport == BT_LE_TRANSPORT
120            else host.acl_packet_queue
121        )
122        assert acl_packet_queue
123        self.acl_packet_queue = acl_packet_queue
124
125    def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
126        self.assembler.feed_packet(packet)
127
128    def on_acl_pdu(self, pdu: bytes) -> None:
129        l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
130        self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
131
132
133# -----------------------------------------------------------------------------
134@dataclasses.dataclass
135class ScoLink:
136    peer_address: hci.Address
137    handle: int
138
139
140# -----------------------------------------------------------------------------
141@dataclasses.dataclass
142class CisLink:
143    peer_address: hci.Address
144    handle: int
145
146
147# -----------------------------------------------------------------------------
148class Host(AbortableEventEmitter):
149    connections: Dict[int, Connection]
150    cis_links: Dict[int, CisLink]
151    sco_links: Dict[int, ScoLink]
152    acl_packet_queue: Optional[AclPacketQueue] = None
153    le_acl_packet_queue: Optional[AclPacketQueue] = None
154    hci_sink: Optional[TransportSink] = None
155    hci_metadata: Dict[str, Any]
156    long_term_key_provider: Optional[
157        Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
158    ]
159    link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]]
160
161    def __init__(
162        self,
163        controller_source: Optional[TransportSource] = None,
164        controller_sink: Optional[TransportSink] = None,
165    ) -> None:
166        super().__init__()
167
168        self.hci_metadata = {}
169        self.ready = False  # True when we can accept incoming packets
170        self.connections = {}  # Connections, by connection handle
171        self.cis_links = {}  # CIS links, by connection handle
172        self.sco_links = {}  # SCO links, by connection handle
173        self.pending_command = None
174        self.pending_response = None
175        self.number_of_supported_advertising_sets = 0
176        self.maximum_advertising_data_length = 31
177        self.local_version = None
178        self.local_supported_commands = 0
179        self.local_le_features = 0
180        self.local_lmp_features = hci.LmpFeatureMask(0)  # Classic LMP features
181        self.suggested_max_tx_octets = 251  # Max allowed
182        self.suggested_max_tx_time = 2120  # Max allowed
183        self.command_semaphore = asyncio.Semaphore(1)
184        self.long_term_key_provider = None
185        self.link_key_provider = None
186        self.pairing_io_capability_provider = None  # Classic only
187        self.snooper: Optional[Snooper] = None
188
189        # Connect to the source and sink if specified
190        if controller_source:
191            self.set_packet_source(controller_source)
192        if controller_sink:
193            self.set_packet_sink(controller_sink)
194
195    def find_connection_by_bd_addr(
196        self,
197        bd_addr: hci.Address,
198        transport: Optional[int] = None,
199        check_address_type: bool = False,
200    ) -> Optional[Connection]:
201        for connection in self.connections.values():
202            if connection.peer_address.to_bytes() == bd_addr.to_bytes():
203                if (
204                    check_address_type
205                    and connection.peer_address.address_type != bd_addr.address_type
206                ):
207                    continue
208                if transport is None or connection.transport == transport:
209                    return connection
210
211        return None
212
213    async def flush(self) -> None:
214        # Make sure no command is pending
215        await self.command_semaphore.acquire()
216
217        # Flush current host state, then release command semaphore
218        self.emit('flush')
219        self.command_semaphore.release()
220
221    async def reset(self, driver_factory=drivers.get_driver_for_host):
222        if self.ready:
223            self.ready = False
224            await self.flush()
225
226        # Instantiate and init a driver for the host if needed.
227        # NOTE: we don't keep a reference to the driver here, because we don't
228        # currently have a need for the driver later on. But if the driver interface
229        # evolves, it may be required, then, to store a reference to the driver in
230        # an object property.
231        reset_needed = True
232        if driver_factory is not None:
233            if driver := await driver_factory(self):
234                await driver.init_controller()
235                reset_needed = False
236
237        # Send a reset command unless a driver has already done so.
238        if reset_needed:
239            await self.send_command(hci.HCI_Reset_Command(), check_result=True)
240            self.ready = True
241
242        response = await self.send_command(
243            hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
244        )
245        self.local_supported_commands = int.from_bytes(
246            response.return_parameters.supported_commands, 'little'
247        )
248
249        if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
250            response = await self.send_command(
251                hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
252            )
253            self.local_le_features = struct.unpack(
254                '<Q', response.return_parameters.le_features
255            )[0]
256
257        if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
258            response = await self.send_command(
259                hci.HCI_Read_Local_Version_Information_Command(), check_result=True
260            )
261            self.local_version = response.return_parameters
262
263        if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
264            max_page_number = 0
265            page_number = 0
266            lmp_features = 0
267            while page_number <= max_page_number:
268                response = await self.send_command(
269                    hci.HCI_Read_Local_Extended_Features_Command(
270                        page_number=page_number
271                    ),
272                    check_result=True,
273                )
274                lmp_features |= int.from_bytes(
275                    response.return_parameters.extended_lmp_features, 'little'
276                ) << (64 * page_number)
277                max_page_number = response.return_parameters.maximum_page_number
278                page_number += 1
279            self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
280
281        elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
282            response = await self.send_command(
283                hci.HCI_Read_Local_Supported_Features_Command(), check_result=True
284            )
285            self.local_lmp_features = hci.LmpFeatureMask(
286                int.from_bytes(response.return_parameters.lmp_features, 'little')
287            )
288
289        await self.send_command(
290            hci.HCI_Set_Event_Mask_Command(
291                event_mask=hci.HCI_Set_Event_Mask_Command.mask(
292                    [
293                        hci.HCI_INQUIRY_COMPLETE_EVENT,
294                        hci.HCI_INQUIRY_RESULT_EVENT,
295                        hci.HCI_CONNECTION_COMPLETE_EVENT,
296                        hci.HCI_CONNECTION_REQUEST_EVENT,
297                        hci.HCI_DISCONNECTION_COMPLETE_EVENT,
298                        hci.HCI_AUTHENTICATION_COMPLETE_EVENT,
299                        hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
300                        hci.HCI_ENCRYPTION_CHANGE_EVENT,
301                        hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
302                        hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT,
303                        hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
304                        hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
305                        hci.HCI_QOS_SETUP_COMPLETE_EVENT,
306                        hci.HCI_HARDWARE_ERROR_EVENT,
307                        hci.HCI_FLUSH_OCCURRED_EVENT,
308                        hci.HCI_ROLE_CHANGE_EVENT,
309                        hci.HCI_MODE_CHANGE_EVENT,
310                        hci.HCI_RETURN_LINK_KEYS_EVENT,
311                        hci.HCI_PIN_CODE_REQUEST_EVENT,
312                        hci.HCI_LINK_KEY_REQUEST_EVENT,
313                        hci.HCI_LINK_KEY_NOTIFICATION_EVENT,
314                        hci.HCI_LOOPBACK_COMMAND_EVENT,
315                        hci.HCI_DATA_BUFFER_OVERFLOW_EVENT,
316                        hci.HCI_MAX_SLOTS_CHANGE_EVENT,
317                        hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
318                        hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
319                        hci.HCI_QOS_VIOLATION_EVENT,
320                        hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
321                        hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
322                        hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
323                        hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
324                        hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
325                        hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
326                        hci.HCI_SNIFF_SUBRATING_EVENT,
327                        hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT,
328                        hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
329                        hci.HCI_IO_CAPABILITY_REQUEST_EVENT,
330                        hci.HCI_IO_CAPABILITY_RESPONSE_EVENT,
331                        hci.HCI_USER_CONFIRMATION_REQUEST_EVENT,
332                        hci.HCI_USER_PASSKEY_REQUEST_EVENT,
333                        hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
334                        hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
335                        hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
336                        hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
337                        hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT,
338                        hci.HCI_KEYPRESS_NOTIFICATION_EVENT,
339                        hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
340                        hci.HCI_LE_META_EVENT,
341                    ]
342                )
343            )
344        )
345
346        if (
347            self.local_version is not None
348            and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0
349        ):
350            # Some older controllers don't like event masks with bits they don't
351            # understand
352            le_event_mask = bytes.fromhex('1F00000000000000')
353        else:
354            le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask(
355                [
356                    hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
357                    hci.HCI_LE_ADVERTISING_REPORT_EVENT,
358                    hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
359                    hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
360                    hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
361                    hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
362                    hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT,
363                    hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
364                    hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
365                    hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
366                    hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
367                    hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
368                    hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
369                    hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
370                    hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
371                    hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
372                    hci.HCI_LE_SCAN_TIMEOUT_EVENT,
373                    hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
374                    hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
375                    hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
376                    hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT,
377                    hci.HCI_LE_CTE_REQUEST_FAILED_EVENT,
378                    hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
379                    hci.HCI_LE_CIS_ESTABLISHED_EVENT,
380                    hci.HCI_LE_CIS_REQUEST_EVENT,
381                    hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT,
382                    hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
383                    hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
384                    hci.HCI_LE_BIG_SYNC_LOST_EVENT,
385                    hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
386                    hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
387                    hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
388                    hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
389                    hci.HCI_LE_SUBRATE_CHANGE_EVENT,
390                ]
391            )
392
393        await self.send_command(
394            hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
395        )
396
397        if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
398            response = await self.send_command(
399                hci.HCI_Read_Buffer_Size_Command(), check_result=True
400            )
401            hc_acl_data_packet_length = (
402                response.return_parameters.hc_acl_data_packet_length
403            )
404            hc_total_num_acl_data_packets = (
405                response.return_parameters.hc_total_num_acl_data_packets
406            )
407
408            logger.debug(
409                'HCI ACL flow control: '
410                f'hc_acl_data_packet_length={hc_acl_data_packet_length},'
411                f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}'
412            )
413
414            self.acl_packet_queue = AclPacketQueue(
415                max_packet_size=hc_acl_data_packet_length,
416                max_in_flight=hc_total_num_acl_data_packets,
417                send=self.send_hci_packet,
418            )
419
420        hc_le_acl_data_packet_length = 0
421        hc_total_num_le_acl_data_packets = 0
422        if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
423            response = await self.send_command(
424                hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
425            )
426            hc_le_acl_data_packet_length = (
427                response.return_parameters.hc_le_acl_data_packet_length
428            )
429            hc_total_num_le_acl_data_packets = (
430                response.return_parameters.hc_total_num_le_acl_data_packets
431            )
432
433            logger.debug(
434                'HCI LE ACL flow control: '
435                f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},'
436                f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}'
437            )
438
439        if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0:
440            # LE and Classic share the same queue
441            self.le_acl_packet_queue = self.acl_packet_queue
442        else:
443            # Create a separate queue for LE
444            self.le_acl_packet_queue = AclPacketQueue(
445                max_packet_size=hc_le_acl_data_packet_length,
446                max_in_flight=hc_total_num_le_acl_data_packets,
447                send=self.send_hci_packet,
448            )
449
450        if self.supports_command(
451            hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
452        ) and self.supports_command(
453            hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
454        ):
455            response = await self.send_command(
456                hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
457            )
458            suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
459            suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
460            if (
461                suggested_max_tx_octets != self.suggested_max_tx_octets
462                or suggested_max_tx_time != self.suggested_max_tx_time
463            ):
464                await self.send_command(
465                    hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
466                        suggested_max_tx_octets=self.suggested_max_tx_octets,
467                        suggested_max_tx_time=self.suggested_max_tx_time,
468                    )
469                )
470
471        if self.supports_command(
472            hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
473        ):
474            response = await self.send_command(
475                hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
476                check_result=True,
477            )
478            self.number_of_supported_advertising_sets = (
479                response.return_parameters.num_supported_advertising_sets
480            )
481
482        if self.supports_command(
483            hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
484        ):
485            response = await self.send_command(
486                hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
487                check_result=True,
488            )
489            self.maximum_advertising_data_length = (
490                response.return_parameters.max_advertising_data_length
491            )
492
493    @property
494    def controller(self) -> Optional[TransportSink]:
495        return self.hci_sink
496
497    @controller.setter
498    def controller(self, controller) -> None:
499        self.set_packet_sink(controller)
500        if controller:
501            self.set_packet_source(controller)
502
503    def set_packet_sink(self, sink: Optional[TransportSink]) -> None:
504        self.hci_sink = sink
505
506    def set_packet_source(self, source: TransportSource) -> None:
507        source.set_packet_sink(self)
508        self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
509
510    def send_hci_packet(self, packet: hci.HCI_Packet) -> None:
511        logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}')
512        if self.snooper:
513            self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
514        if self.hci_sink:
515            self.hci_sink.on_packet(bytes(packet))
516
517    async def send_command(self, command, check_result=False):
518        # Wait until we can send (only one pending command at a time)
519        async with self.command_semaphore:
520            assert self.pending_command is None
521            assert self.pending_response is None
522
523            # Create a future value to hold the eventual response
524            self.pending_response = asyncio.get_running_loop().create_future()
525            self.pending_command = command
526
527            try:
528                self.send_hci_packet(command)
529                response = await self.pending_response
530
531                # Check the return parameters if required
532                if check_result:
533                    if isinstance(response, hci.HCI_Command_Status_Event):
534                        status = response.status
535                    elif isinstance(response.return_parameters, int):
536                        status = response.return_parameters
537                    elif isinstance(response.return_parameters, bytes):
538                        # return parameters first field is a one byte status code
539                        status = response.return_parameters[0]
540                    else:
541                        status = response.return_parameters.status
542
543                    if status != hci.HCI_SUCCESS:
544                        logger.warning(
545                            f'{command.name} failed '
546                            f'({hci.HCI_Constant.error_name(status)})'
547                        )
548                        raise hci.HCI_Error(status)
549
550                return response
551            except Exception as error:
552                logger.warning(
553                    f'{color("!!! Exception while sending command:", "red")} {error}'
554                )
555                raise error
556            finally:
557                self.pending_command = None
558                self.pending_response = None
559
560    # Use this method to send a command from a task
561    def send_command_sync(self, command: hci.HCI_Command) -> None:
562        async def send_command(command: hci.HCI_Command) -> None:
563            await self.send_command(command)
564
565        asyncio.create_task(send_command(command))
566
567    def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
568        if not (connection := self.connections.get(connection_handle)):
569            logger.warning(f'connection 0x{connection_handle:04X} not found')
570            return
571        packet_queue = connection.acl_packet_queue
572        if packet_queue is None:
573            logger.warning(
574                f'no ACL packet queue for connection 0x{connection_handle:04X}'
575            )
576            return
577
578        # Create a PDU
579        l2cap_pdu = bytes(L2CAP_PDU(cid, pdu))
580
581        # Send the data to the controller via ACL packets
582        bytes_remaining = len(l2cap_pdu)
583        offset = 0
584        pb_flag = 0
585        while bytes_remaining:
586            data_total_length = min(bytes_remaining, packet_queue.max_packet_size)
587            acl_packet = hci.HCI_AclDataPacket(
588                connection_handle=connection_handle,
589                pb_flag=pb_flag,
590                bc_flag=0,
591                data_total_length=data_total_length,
592                data=l2cap_pdu[offset : offset + data_total_length],
593            )
594            logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}')
595            packet_queue.enqueue(acl_packet)
596            pb_flag = 1
597            offset += data_total_length
598            bytes_remaining -= data_total_length
599
600    def supports_command(self, op_code: int) -> bool:
601        return (
602            self.local_supported_commands
603            & hci.HCI_SUPPORTED_COMMANDS_MASKS.get(op_code, 0)
604        ) != 0
605
606    @property
607    def supported_commands(self) -> Set[int]:
608        return set(
609            op_code
610            for op_code, mask in hci.HCI_SUPPORTED_COMMANDS_MASKS.items()
611            if self.local_supported_commands & mask
612        )
613
614    def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
615        return (self.local_le_features & feature) == feature
616
617    def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool:
618        return self.local_lmp_features & (feature) == feature
619
620    @property
621    def supported_le_features(self):
622        return [
623            feature for feature in range(64) if self.local_le_features & (1 << feature)
624        ]
625
626    # Packet Sink protocol (packets coming from the controller via HCI)
627    def on_packet(self, packet: bytes) -> None:
628        hci_packet = hci.HCI_Packet.from_bytes(packet)
629        if self.ready or (
630            isinstance(hci_packet, hci.HCI_Command_Complete_Event)
631            and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
632        ):
633            self.on_hci_packet(hci_packet)
634        else:
635            logger.debug('reset not done, ignoring packet from controller')
636
637    def on_transport_lost(self):
638        # Called by the source when the transport has been lost.
639        if self.pending_response:
640            self.pending_response.set_exception(TransportLostError('transport lost'))
641
642        self.emit('flush')
643
644    def on_hci_packet(self, packet: hci.HCI_Packet) -> None:
645        logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
646
647        if self.snooper:
648            self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
649
650        # If the packet is a command, invoke the handler for this packet
651        if packet.hci_packet_type == hci.HCI_COMMAND_PACKET:
652            self.on_hci_command_packet(cast(hci.HCI_Command, packet))
653        elif packet.hci_packet_type == hci.HCI_EVENT_PACKET:
654            self.on_hci_event_packet(cast(hci.HCI_Event, packet))
655        elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET:
656            self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet))
657        elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
658            self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet))
659        elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET:
660            self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet))
661        else:
662            logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
663
664    def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
665        logger.warning(f'!!! unexpected command packet: {command}')
666
667    def on_hci_event_packet(self, event: hci.HCI_Event) -> None:
668        handler_name = f'on_{event.name.lower()}'
669        handler = getattr(self, handler_name, self.on_hci_event)
670        handler(event)
671
672    def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
673        # Look for the connection to which this data belongs
674        if connection := self.connections.get(packet.connection_handle):
675            connection.on_hci_acl_data_packet(packet)
676
677    def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
678        # Experimental
679        self.emit('sco_packet', packet.connection_handle, packet)
680
681    def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None:
682        # Experimental
683        self.emit('iso_packet', packet.connection_handle, packet)
684
685    def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
686        self.emit('l2cap_pdu', connection.handle, cid, pdu)
687
688    def on_command_processed(self, event):
689        if self.pending_response:
690            # Check that it is what we were expecting
691            if self.pending_command.op_code != event.command_opcode:
692                logger.warning(
693                    '!!! command result mismatch, expected '
694                    f'0x{self.pending_command.op_code:X} but got '
695                    f'0x{event.command_opcode:X}'
696                )
697
698            self.pending_response.set_result(event)
699        else:
700            logger.warning('!!! no pending response future to set')
701
702    ############################################################
703    # HCI handlers
704    ############################################################
705    def on_hci_event(self, event):
706        logger.warning(f'{color(f"--- Ignoring event {event}", "red")}')
707
708    def on_hci_command_complete_event(self, event):
709        if event.command_opcode == 0:
710            # This is used just for the Num_HCI_Command_Packets field, not related to
711            # an actual command
712            logger.debug('no-command event')
713            return
714
715        return self.on_command_processed(event)
716
717    def on_hci_command_status_event(self, event):
718        return self.on_command_processed(event)
719
720    def on_hci_number_of_completed_packets_event(self, event):
721        for connection_handle, num_completed_packets in zip(
722            event.connection_handles, event.num_completed_packets
723        ):
724            if connection := self.connections.get(connection_handle):
725                connection.acl_packet_queue.on_packets_completed(num_completed_packets)
726            elif not (
727                self.cis_links.get(connection_handle)
728                or self.sco_links.get(connection_handle)
729            ):
730                logger.warning(
731                    'received packet completion event for unknown handle '
732                    f'0x{connection_handle:04X}'
733                )
734
735    # Classic only
736    def on_hci_connection_request_event(self, event):
737        # Notify the listeners
738        self.emit(
739            'connection_request',
740            event.bd_addr,
741            event.class_of_device,
742            event.link_type,
743        )
744
745    def on_hci_le_connection_complete_event(self, event):
746        # Check if this is a cancellation
747        if event.status == hci.HCI_SUCCESS:
748            # Create/update the connection
749            logger.debug(
750                f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
751                f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}'
752            )
753
754            connection = self.connections.get(event.connection_handle)
755            if connection is None:
756                connection = Connection(
757                    self,
758                    event.connection_handle,
759                    event.peer_address,
760                    BT_LE_TRANSPORT,
761                )
762                self.connections[event.connection_handle] = connection
763
764            # Notify the client
765            connection_parameters = ConnectionParameters(
766                event.connection_interval,
767                event.peripheral_latency,
768                event.supervision_timeout,
769            )
770            self.emit(
771                'connection',
772                event.connection_handle,
773                BT_LE_TRANSPORT,
774                event.peer_address,
775                event.role,
776                connection_parameters,
777            )
778        else:
779            logger.debug(f'### CONNECTION FAILED: {event.status}')
780
781            # Notify the listeners
782            self.emit(
783                'connection_failure', BT_LE_TRANSPORT, event.peer_address, event.status
784            )
785
786    def on_hci_le_enhanced_connection_complete_event(self, event):
787        # Just use the same implementation as for the non-enhanced event for now
788        self.on_hci_le_connection_complete_event(event)
789
790    def on_hci_connection_complete_event(self, event):
791        if event.status == hci.HCI_SUCCESS:
792            # Create/update the connection
793            logger.debug(
794                f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
795                f'{event.bd_addr}'
796            )
797
798            connection = self.connections.get(event.connection_handle)
799            if connection is None:
800                connection = Connection(
801                    self,
802                    event.connection_handle,
803                    event.bd_addr,
804                    BT_BR_EDR_TRANSPORT,
805                )
806                self.connections[event.connection_handle] = connection
807
808            # Notify the client
809            self.emit(
810                'connection',
811                event.connection_handle,
812                BT_BR_EDR_TRANSPORT,
813                event.bd_addr,
814                None,
815                None,
816            )
817        else:
818            logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
819
820            # Notify the client
821            self.emit(
822                'connection_failure', BT_BR_EDR_TRANSPORT, event.bd_addr, event.status
823            )
824
825    def on_hci_disconnection_complete_event(self, event):
826        # Find the connection
827        handle = event.connection_handle
828        if (
829            connection := (
830                self.connections.get(handle)
831                or self.cis_links.get(handle)
832                or self.sco_links.get(handle)
833            )
834        ) is None:
835            logger.warning('!!! DISCONNECTION COMPLETE: unknown handle')
836            return
837
838        if event.status == hci.HCI_SUCCESS:
839            logger.debug(
840                f'### DISCONNECTION: [0x{handle:04X}] '
841                f'{connection.peer_address} '
842                f'reason={event.reason}'
843            )
844
845            # Notify the listeners
846            self.emit('disconnection', handle, event.reason)
847
848            # Remove the handle reference
849            _ = (
850                self.connections.pop(handle, 0)
851                or self.cis_links.pop(handle, 0)
852                or self.sco_links.pop(handle, 0)
853            )
854        else:
855            logger.debug(f'### DISCONNECTION FAILED: {event.status}')
856
857            # Notify the listeners
858            self.emit('disconnection_failure', handle, event.status)
859
860    def on_hci_le_connection_update_complete_event(self, event):
861        if (connection := self.connections.get(event.connection_handle)) is None:
862            logger.warning('!!! CONNECTION PARAMETERS UPDATE COMPLETE: unknown handle')
863            return
864
865        # Notify the client
866        if event.status == hci.HCI_SUCCESS:
867            connection_parameters = ConnectionParameters(
868                event.connection_interval,
869                event.peripheral_latency,
870                event.supervision_timeout,
871            )
872            self.emit(
873                'connection_parameters_update', connection.handle, connection_parameters
874            )
875        else:
876            self.emit(
877                'connection_parameters_update_failure', connection.handle, event.status
878            )
879
880    def on_hci_le_phy_update_complete_event(self, event):
881        if (connection := self.connections.get(event.connection_handle)) is None:
882            logger.warning('!!! CONNECTION PHY UPDATE COMPLETE: unknown handle')
883            return
884
885        # Notify the client
886        if event.status == hci.HCI_SUCCESS:
887            connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
888            self.emit('connection_phy_update', connection.handle, connection_phy)
889        else:
890            self.emit('connection_phy_update_failure', connection.handle, event.status)
891
892    def on_hci_le_advertising_report_event(self, event):
893        for report in event.reports:
894            self.emit('advertising_report', report)
895
896    def on_hci_le_extended_advertising_report_event(self, event):
897        self.on_hci_le_advertising_report_event(event)
898
899    def on_hci_le_advertising_set_terminated_event(self, event):
900        self.emit(
901            'advertising_set_termination',
902            event.status,
903            event.advertising_handle,
904            event.connection_handle,
905            event.num_completed_extended_advertising_events,
906        )
907
908    def on_hci_le_cis_request_event(self, event):
909        self.emit(
910            'cis_request',
911            event.acl_connection_handle,
912            event.cis_connection_handle,
913            event.cig_id,
914            event.cis_id,
915        )
916
917    def on_hci_le_cis_established_event(self, event):
918        # The remaining parameters are unused for now.
919        if event.status == hci.HCI_SUCCESS:
920            self.cis_links[event.connection_handle] = CisLink(
921                handle=event.connection_handle,
922                peer_address=hci.Address.ANY,
923            )
924            self.emit('cis_establishment', event.connection_handle)
925        else:
926            self.emit(
927                'cis_establishment_failure', event.connection_handle, event.status
928            )
929
930    def on_hci_le_remote_connection_parameter_request_event(self, event):
931        if event.connection_handle not in self.connections:
932            logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')
933            return
934
935        # For now, just accept everything
936        # TODO: delegate the decision
937        self.send_command_sync(
938            hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
939                connection_handle=event.connection_handle,
940                interval_min=event.interval_min,
941                interval_max=event.interval_max,
942                max_latency=event.max_latency,
943                timeout=event.timeout,
944                min_ce_length=0,
945                max_ce_length=0,
946            )
947        )
948
949    def on_hci_le_long_term_key_request_event(self, event):
950        if (connection := self.connections.get(event.connection_handle)) is None:
951            logger.warning('!!! LE LONG TERM KEY REQUEST: unknown handle')
952            return
953
954        async def send_long_term_key():
955            if self.long_term_key_provider is None:
956                logger.debug('no long term key provider')
957                long_term_key = None
958            else:
959                long_term_key = await self.abort_on(
960                    'flush',
961                    # pylint: disable-next=not-callable
962                    self.long_term_key_provider(
963                        connection.handle,
964                        event.random_number,
965                        event.encryption_diversifier,
966                    ),
967                )
968            if long_term_key:
969                response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command(
970                    connection_handle=event.connection_handle,
971                    long_term_key=long_term_key,
972                )
973            else:
974                response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
975                    connection_handle=event.connection_handle
976                )
977
978            await self.send_command(response)
979
980        asyncio.create_task(send_long_term_key())
981
982    def on_hci_synchronous_connection_complete_event(self, event):
983        if event.status == hci.HCI_SUCCESS:
984            # Create/update the connection
985            logger.debug(
986                f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
987                f'{event.bd_addr}'
988            )
989
990            self.sco_links[event.connection_handle] = ScoLink(
991                peer_address=event.bd_addr,
992                handle=event.connection_handle,
993            )
994
995            # Notify the client
996            self.emit(
997                'sco_connection',
998                event.bd_addr,
999                event.connection_handle,
1000                event.link_type,
1001            )
1002        else:
1003            logger.debug(f'### SCO CONNECTION FAILED: {event.status}')
1004
1005            # Notify the client
1006            self.emit('sco_connection_failure', event.bd_addr, event.status)
1007
1008    def on_hci_synchronous_connection_changed_event(self, event):
1009        pass
1010
1011    def on_hci_role_change_event(self, event):
1012        if event.status == hci.HCI_SUCCESS:
1013            logger.debug(
1014                f'role change for {event.bd_addr}: '
1015                f'{hci.HCI_Constant.role_name(event.new_role)}'
1016            )
1017            self.emit('role_change', event.bd_addr, event.new_role)
1018        else:
1019            logger.debug(
1020                f'role change for {event.bd_addr} failed: '
1021                f'{hci.HCI_Constant.error_name(event.status)}'
1022            )
1023            self.emit('role_change_failure', event.bd_addr, event.status)
1024
1025    def on_hci_le_data_length_change_event(self, event):
1026        self.emit(
1027            'connection_data_length_change',
1028            event.connection_handle,
1029            event.max_tx_octets,
1030            event.max_tx_time,
1031            event.max_rx_octets,
1032            event.max_rx_time,
1033        )
1034
1035    def on_hci_authentication_complete_event(self, event):
1036        # Notify the client
1037        if event.status == hci.HCI_SUCCESS:
1038            self.emit('connection_authentication', event.connection_handle)
1039        else:
1040            self.emit(
1041                'connection_authentication_failure',
1042                event.connection_handle,
1043                event.status,
1044            )
1045
1046    def on_hci_encryption_change_event(self, event):
1047        # Notify the client
1048        if event.status == hci.HCI_SUCCESS:
1049            self.emit(
1050                'connection_encryption_change',
1051                event.connection_handle,
1052                event.encryption_enabled,
1053            )
1054        else:
1055            self.emit(
1056                'connection_encryption_failure', event.connection_handle, event.status
1057            )
1058
1059    def on_hci_encryption_key_refresh_complete_event(self, event):
1060        # Notify the client
1061        if event.status == hci.HCI_SUCCESS:
1062            self.emit('connection_encryption_key_refresh', event.connection_handle)
1063        else:
1064            self.emit(
1065                'connection_encryption_key_refresh_failure',
1066                event.connection_handle,
1067                event.status,
1068            )
1069
1070    def on_hci_link_supervision_timeout_changed_event(self, event):
1071        pass
1072
1073    def on_hci_max_slots_change_event(self, event):
1074        pass
1075
1076    def on_hci_page_scan_repetition_mode_change_event(self, event):
1077        pass
1078
1079    def on_hci_link_key_notification_event(self, event):
1080        logger.debug(
1081            f'link key for {event.bd_addr}: {event.link_key.hex()}, '
1082            f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}'
1083        )
1084        self.emit('link_key', event.bd_addr, event.link_key, event.key_type)
1085
1086    def on_hci_simple_pairing_complete_event(self, event):
1087        logger.debug(
1088            f'simple pairing complete for {event.bd_addr}: '
1089            f'status={hci.HCI_Constant.status_name(event.status)}'
1090        )
1091        if event.status == hci.HCI_SUCCESS:
1092            self.emit('classic_pairing', event.bd_addr)
1093        else:
1094            self.emit('classic_pairing_failure', event.bd_addr, event.status)
1095
1096    def on_hci_pin_code_request_event(self, event):
1097        self.emit('pin_code_request', event.bd_addr)
1098
1099    def on_hci_link_key_request_event(self, event):
1100        async def send_link_key():
1101            if self.link_key_provider is None:
1102                logger.debug('no link key provider')
1103                link_key = None
1104            else:
1105                link_key = await self.abort_on(
1106                    'flush',
1107                    # pylint: disable-next=not-callable
1108                    self.link_key_provider(event.bd_addr),
1109                )
1110            if link_key:
1111                response = hci.HCI_Link_Key_Request_Reply_Command(
1112                    bd_addr=event.bd_addr, link_key=link_key
1113                )
1114            else:
1115                response = hci.HCI_Link_Key_Request_Negative_Reply_Command(
1116                    bd_addr=event.bd_addr
1117                )
1118
1119            await self.send_command(response)
1120
1121        asyncio.create_task(send_link_key())
1122
1123    def on_hci_io_capability_request_event(self, event):
1124        self.emit('authentication_io_capability_request', event.bd_addr)
1125
1126    def on_hci_io_capability_response_event(self, event):
1127        self.emit(
1128            'authentication_io_capability_response',
1129            event.bd_addr,
1130            event.io_capability,
1131            event.authentication_requirements,
1132        )
1133
1134    def on_hci_user_confirmation_request_event(self, event):
1135        self.emit(
1136            'authentication_user_confirmation_request',
1137            event.bd_addr,
1138            event.numeric_value,
1139        )
1140
1141    def on_hci_user_passkey_request_event(self, event):
1142        self.emit('authentication_user_passkey_request', event.bd_addr)
1143
1144    def on_hci_user_passkey_notification_event(self, event):
1145        self.emit(
1146            'authentication_user_passkey_notification', event.bd_addr, event.passkey
1147        )
1148
1149    def on_hci_inquiry_complete_event(self, _event):
1150        self.emit('inquiry_complete')
1151
1152    def on_hci_inquiry_result_with_rssi_event(self, event):
1153        for response in event.responses:
1154            self.emit(
1155                'inquiry_result',
1156                response.bd_addr,
1157                response.class_of_device,
1158                b'',
1159                response.rssi,
1160            )
1161
1162    def on_hci_extended_inquiry_result_event(self, event):
1163        self.emit(
1164            'inquiry_result',
1165            event.bd_addr,
1166            event.class_of_device,
1167            event.extended_inquiry_response,
1168            event.rssi,
1169        )
1170
1171    def on_hci_remote_name_request_complete_event(self, event):
1172        if event.status != hci.HCI_SUCCESS:
1173            self.emit('remote_name_failure', event.bd_addr, event.status)
1174        else:
1175            utf8_name = event.remote_name
1176            terminator = utf8_name.find(0)
1177            if terminator >= 0:
1178                utf8_name = utf8_name[0:terminator]
1179
1180            self.emit('remote_name', event.bd_addr, utf8_name)
1181
1182    def on_hci_remote_host_supported_features_notification_event(self, event):
1183        self.emit(
1184            'remote_host_supported_features',
1185            event.bd_addr,
1186            event.host_supported_features,
1187        )
1188
1189    def on_hci_le_read_remote_features_complete_event(self, event):
1190        if event.status != hci.HCI_SUCCESS:
1191            self.emit(
1192                'le_remote_features_failure', event.connection_handle, event.status
1193            )
1194        else:
1195            self.emit(
1196                'le_remote_features',
1197                event.connection_handle,
1198                int.from_bytes(event.le_features, 'little'),
1199            )
1200