• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19from enum import IntEnum
20import copy
21import functools
22import json
23import asyncio
24import logging
25import secrets
26import sys
27from contextlib import (
28    asynccontextmanager,
29    AsyncExitStack,
30    closing,
31    AbstractAsyncContextManager,
32)
33from dataclasses import dataclass, field
34from collections.abc import Iterable
35from typing import (
36    Any,
37    Callable,
38    ClassVar,
39    Dict,
40    List,
41    Optional,
42    Tuple,
43    Type,
44    TypeVar,
45    Union,
46    cast,
47    overload,
48    TYPE_CHECKING,
49)
50from typing_extensions import Self
51
52from pyee import EventEmitter
53
54from .colors import color
55from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
56from .gatt import Characteristic, Descriptor, Service
57from .hci import (
58    HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
59    HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
60    HCI_CENTRAL_ROLE,
61    HCI_PERIPHERAL_ROLE,
62    HCI_COMMAND_STATUS_PENDING,
63    HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
64    HCI_DISPLAY_YES_NO_IO_CAPABILITY,
65    HCI_DISPLAY_ONLY_IO_CAPABILITY,
66    HCI_EXTENDED_INQUIRY_MODE,
67    HCI_GENERAL_INQUIRY_LAP,
68    HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
69    HCI_KEYBOARD_ONLY_IO_CAPABILITY,
70    HCI_LE_1M_PHY,
71    HCI_LE_1M_PHY_BIT,
72    HCI_LE_2M_PHY,
73    HCI_LE_CODED_PHY,
74    HCI_LE_CODED_PHY_BIT,
75    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
76    HCI_LE_RAND_COMMAND,
77    HCI_LE_READ_PHY_COMMAND,
78    HCI_LE_SET_PHY_COMMAND,
79    HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
80    HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
81    HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
82    HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
83    HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
84    HCI_R2_PAGE_SCAN_REPETITION_MODE,
85    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
86    HCI_SUCCESS,
87    HCI_WRITE_LE_HOST_SUPPORT_COMMAND,
88    HCI_Accept_Connection_Request_Command,
89    HCI_Authentication_Requested_Command,
90    HCI_Command_Status_Event,
91    HCI_Constant,
92    HCI_Create_Connection_Cancel_Command,
93    HCI_Create_Connection_Command,
94    HCI_Connection_Complete_Event,
95    HCI_Disconnect_Command,
96    HCI_Encryption_Change_Event,
97    HCI_Error,
98    HCI_IO_Capability_Request_Reply_Command,
99    HCI_Inquiry_Cancel_Command,
100    HCI_Inquiry_Command,
101    HCI_IsoDataPacket,
102    HCI_LE_Accept_CIS_Request_Command,
103    HCI_LE_Add_Device_To_Resolving_List_Command,
104    HCI_LE_Advertising_Report_Event,
105    HCI_LE_Clear_Resolving_List_Command,
106    HCI_LE_Connection_Update_Command,
107    HCI_LE_Create_Connection_Cancel_Command,
108    HCI_LE_Create_Connection_Command,
109    HCI_LE_Create_CIS_Command,
110    HCI_LE_Enable_Encryption_Command,
111    HCI_LE_Extended_Advertising_Report_Event,
112    HCI_LE_Extended_Create_Connection_Command,
113    HCI_LE_Rand_Command,
114    HCI_LE_Read_PHY_Command,
115    HCI_LE_Read_Remote_Features_Command,
116    HCI_LE_Reject_CIS_Request_Command,
117    HCI_LE_Remove_Advertising_Set_Command,
118    HCI_LE_Set_Address_Resolution_Enable_Command,
119    HCI_LE_Set_Advertising_Data_Command,
120    HCI_LE_Set_Advertising_Enable_Command,
121    HCI_LE_Set_Advertising_Parameters_Command,
122    HCI_LE_Set_Advertising_Set_Random_Address_Command,
123    HCI_LE_Set_CIG_Parameters_Command,
124    HCI_LE_Set_Data_Length_Command,
125    HCI_LE_Set_Default_PHY_Command,
126    HCI_LE_Set_Extended_Scan_Enable_Command,
127    HCI_LE_Set_Extended_Scan_Parameters_Command,
128    HCI_LE_Set_Extended_Scan_Response_Data_Command,
129    HCI_LE_Set_Extended_Advertising_Data_Command,
130    HCI_LE_Set_Extended_Advertising_Enable_Command,
131    HCI_LE_Set_Extended_Advertising_Parameters_Command,
132    HCI_LE_Set_Host_Feature_Command,
133    HCI_LE_Set_Periodic_Advertising_Enable_Command,
134    HCI_LE_Set_PHY_Command,
135    HCI_LE_Set_Random_Address_Command,
136    HCI_LE_Set_Scan_Enable_Command,
137    HCI_LE_Set_Scan_Parameters_Command,
138    HCI_LE_Set_Scan_Response_Data_Command,
139    HCI_PIN_Code_Request_Reply_Command,
140    HCI_PIN_Code_Request_Negative_Reply_Command,
141    HCI_Read_BD_ADDR_Command,
142    HCI_Read_RSSI_Command,
143    HCI_Reject_Connection_Request_Command,
144    HCI_Remote_Name_Request_Command,
145    HCI_Switch_Role_Command,
146    HCI_Set_Connection_Encryption_Command,
147    HCI_StatusError,
148    HCI_SynchronousDataPacket,
149    HCI_User_Confirmation_Request_Negative_Reply_Command,
150    HCI_User_Confirmation_Request_Reply_Command,
151    HCI_User_Passkey_Request_Negative_Reply_Command,
152    HCI_User_Passkey_Request_Reply_Command,
153    HCI_Write_Class_Of_Device_Command,
154    HCI_Write_Extended_Inquiry_Response_Command,
155    HCI_Write_Inquiry_Mode_Command,
156    HCI_Write_LE_Host_Support_Command,
157    HCI_Write_Local_Name_Command,
158    HCI_Write_Scan_Enable_Command,
159    HCI_Write_Secure_Connections_Host_Support_Command,
160    HCI_Write_Simple_Pairing_Mode_Command,
161    Address,
162    OwnAddressType,
163    LeFeature,
164    LeFeatureMask,
165    Phy,
166    phy_list_to_bits,
167)
168from .host import Host
169from .gap import GenericAccessService
170from .core import (
171    BT_BR_EDR_TRANSPORT,
172    BT_CENTRAL_ROLE,
173    BT_LE_TRANSPORT,
174    BT_PERIPHERAL_ROLE,
175    AdvertisingData,
176    ConnectionParameterUpdateError,
177    CommandTimeoutError,
178    ConnectionPHY,
179    InvalidStateError,
180)
181from .utils import (
182    AsyncRunner,
183    CompositeEventEmitter,
184    EventWatcher,
185    setup_event_forwarding,
186    composite_listener,
187    deprecated,
188    experimental,
189)
190from .keys import (
191    KeyStore,
192    PairingKeys,
193)
194from .pairing import PairingConfig
195from . import gatt_client
196from . import gatt_server
197from . import smp
198from . import sdp
199from . import l2cap
200from . import core
201
202if TYPE_CHECKING:
203    from .transport.common import TransportSource, TransportSink
204
205
206# -----------------------------------------------------------------------------
207# Logging
208# -----------------------------------------------------------------------------
209logger = logging.getLogger(__name__)
210
211# -----------------------------------------------------------------------------
212# Constants
213# -----------------------------------------------------------------------------
214# fmt: off
215# pylint: disable=line-too-long
216
217DEVICE_MIN_SCAN_INTERVAL                      = 25
218DEVICE_MAX_SCAN_INTERVAL                      = 10240
219DEVICE_MIN_SCAN_WINDOW                        = 25
220DEVICE_MAX_SCAN_WINDOW                        = 10240
221DEVICE_MIN_LE_RSSI                            = -127
222DEVICE_MAX_LE_RSSI                            = 20
223DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE    = 0x00
224DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE    = 0xEF
225
226DEVICE_DEFAULT_ADDRESS                        = '00:00:00:00:00:00'
227DEVICE_DEFAULT_ADVERTISING_INTERVAL           = 1000  # ms
228DEVICE_DEFAULT_ADVERTISING_DATA               = ''
229DEVICE_DEFAULT_NAME                           = 'Bumble'
230DEVICE_DEFAULT_INQUIRY_LENGTH                 = 8  # 10.24 seconds
231DEVICE_DEFAULT_CLASS_OF_DEVICE                = 0
232DEVICE_DEFAULT_SCAN_RESPONSE_DATA             = b''
233DEVICE_DEFAULT_DATA_LENGTH                    = (27, 328, 27, 328)
234DEVICE_DEFAULT_SCAN_INTERVAL                  = 60  # ms
235DEVICE_DEFAULT_SCAN_WINDOW                    = 60  # ms
236DEVICE_DEFAULT_CONNECT_TIMEOUT                = None  # No timeout
237DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL          = 60  # ms
238DEVICE_DEFAULT_CONNECT_SCAN_WINDOW            = 60  # ms
239DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN        = 15  # ms
240DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX        = 30  # ms
241DEVICE_DEFAULT_CONNECTION_MAX_LATENCY         = 0
242DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720  # ms
243DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH       = 0   # ms
244DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH       = 0   # ms
245DEVICE_DEFAULT_L2CAP_COC_MTU                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
246DEVICE_DEFAULT_L2CAP_COC_MPS                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
247DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS          = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
248DEVICE_DEFAULT_ADVERTISING_TX_POWER           = (
249    HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE
250)
251
252# fmt: on
253# pylint: enable=line-too-long
254
255# As specified in 7.8.56 LE Set Extended Advertising Enable command
256DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28
257
258
259# -----------------------------------------------------------------------------
260# Classes
261# -----------------------------------------------------------------------------
262
263
264# -----------------------------------------------------------------------------
265@dataclass
266class Advertisement:
267    # Attributes
268    address: Address
269    rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
270    is_legacy: bool = False
271    is_anonymous: bool = False
272    is_connectable: bool = False
273    is_directed: bool = False
274    is_scannable: bool = False
275    is_scan_response: bool = False
276    is_complete: bool = True
277    is_truncated: bool = False
278    primary_phy: int = 0
279    secondary_phy: int = 0
280    tx_power: int = (
281        HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
282    )
283    sid: int = 0
284    data_bytes: bytes = b''
285
286    # Constants
287    TX_POWER_NOT_AVAILABLE: ClassVar[int] = (
288        HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
289    )
290    RSSI_NOT_AVAILABLE: ClassVar[int] = (
291        HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
292    )
293
294    def __post_init__(self) -> None:
295        self.data = AdvertisingData.from_bytes(self.data_bytes)
296
297    @classmethod
298    def from_advertising_report(cls, report) -> Optional[Advertisement]:
299        if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
300            return LegacyAdvertisement.from_advertising_report(report)
301
302        if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report):
303            return ExtendedAdvertisement.from_advertising_report(report)
304
305        return None
306
307
308# -----------------------------------------------------------------------------
309class LegacyAdvertisement(Advertisement):
310    @classmethod
311    def from_advertising_report(cls, report):
312        return cls(
313            address=report.address,
314            rssi=report.rssi,
315            is_legacy=True,
316            is_connectable=report.event_type
317            in (
318                HCI_LE_Advertising_Report_Event.ADV_IND,
319                HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
320            ),
321            is_directed=report.event_type
322            == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
323            is_scannable=report.event_type
324            in (
325                HCI_LE_Advertising_Report_Event.ADV_IND,
326                HCI_LE_Advertising_Report_Event.ADV_SCAN_IND,
327            ),
328            is_scan_response=report.event_type
329            == HCI_LE_Advertising_Report_Event.SCAN_RSP,
330            data_bytes=report.data,
331        )
332
333
334# -----------------------------------------------------------------------------
335class ExtendedAdvertisement(Advertisement):
336    @classmethod
337    def from_advertising_report(cls, report):
338        # fmt: off
339        # pylint: disable=line-too-long
340        return cls(
341            address          = report.address,
342            rssi             = report.rssi,
343            is_legacy        = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0,
344            is_anonymous     = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
345            is_connectable   = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0,
346            is_directed      = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
347            is_scannable     = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
348            is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
349            is_complete      = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
350            is_truncated     = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
351            primary_phy      = report.primary_phy,
352            secondary_phy    = report.secondary_phy,
353            tx_power         = report.tx_power,
354            sid              = report.advertising_sid,
355            data_bytes       = report.data
356        )
357        # fmt: on
358
359
360# -----------------------------------------------------------------------------
361class AdvertisementDataAccumulator:
362    def __init__(self, passive=False):
363        self.passive = passive
364        self.last_advertisement = None
365        self.last_data = b''
366
367    def update(self, report):
368        advertisement = Advertisement.from_advertising_report(report)
369        if advertisement is None:
370            return None
371
372        result = None
373
374        if advertisement.is_scan_response:
375            if (
376                self.last_advertisement is not None
377                and not self.last_advertisement.is_scan_response
378            ):
379                # This is the response to a scannable advertisement
380                result = Advertisement.from_advertising_report(report)
381                result.is_connectable = self.last_advertisement.is_connectable
382                result.is_scannable = True
383                result.data = AdvertisingData.from_bytes(self.last_data + report.data)
384            self.last_data = b''
385        else:
386            if (
387                self.passive
388                or (not advertisement.is_scannable)
389                or (
390                    self.last_advertisement is not None
391                    and not self.last_advertisement.is_scan_response
392                )
393            ):
394                # Don't wait for a scan response
395                result = Advertisement.from_advertising_report(report)
396
397            self.last_data = report.data
398
399        self.last_advertisement = advertisement
400
401        return result
402
403
404# -----------------------------------------------------------------------------
405class AdvertisingType(IntEnum):
406    # fmt: off
407    # pylint: disable=line-too-long
408    UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00  # Undirected, connectable,     scannable
409    DIRECTED_CONNECTABLE_HIGH_DUTY   = 0x01  # Directed,   connectable,     non-scannable
410    UNDIRECTED_SCANNABLE             = 0x02  # Undirected, non-connectable, scannable
411    UNDIRECTED                       = 0x03  # Undirected, non-connectable, non-scannable
412    DIRECTED_CONNECTABLE_LOW_DUTY    = 0x04  # Directed,   connectable,     non-scannable
413    # fmt: on
414
415    @property
416    def has_data(self) -> bool:
417        return self in (
418            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
419            AdvertisingType.UNDIRECTED_SCANNABLE,
420            AdvertisingType.UNDIRECTED,
421        )
422
423    @property
424    def is_connectable(self) -> bool:
425        return self in (
426            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
427            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
428            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
429        )
430
431    @property
432    def is_scannable(self) -> bool:
433        return self in (
434            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
435            AdvertisingType.UNDIRECTED_SCANNABLE,
436        )
437
438    @property
439    def is_directed(self) -> bool:
440        return self in (
441            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
442            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
443        )
444
445    @property
446    def is_high_duty_cycle_directed_connectable(self):
447        return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY
448
449
450# -----------------------------------------------------------------------------
451@dataclass
452class LegacyAdvertiser:
453    device: Device
454    advertising_type: AdvertisingType
455    own_address_type: OwnAddressType
456    peer_address: Address
457    auto_restart: bool
458
459    async def start(self) -> None:
460        # Set/update the advertising data if the advertising type allows it
461        if self.advertising_type.has_data:
462            await self.device.send_command(
463                HCI_LE_Set_Advertising_Data_Command(
464                    advertising_data=self.device.advertising_data
465                ),
466                check_result=True,
467            )
468
469        # Set/update the scan response data if the advertising is scannable
470        if self.advertising_type.is_scannable:
471            await self.device.send_command(
472                HCI_LE_Set_Scan_Response_Data_Command(
473                    scan_response_data=self.device.scan_response_data
474                ),
475                check_result=True,
476            )
477
478        # Set the advertising parameters
479        await self.device.send_command(
480            HCI_LE_Set_Advertising_Parameters_Command(
481                advertising_interval_min=self.device.advertising_interval_min,
482                advertising_interval_max=self.device.advertising_interval_max,
483                advertising_type=int(self.advertising_type),
484                own_address_type=self.own_address_type,
485                peer_address_type=self.peer_address.address_type,
486                peer_address=self.peer_address,
487                advertising_channel_map=7,
488                advertising_filter_policy=0,
489            ),
490            check_result=True,
491        )
492
493        # Enable advertising
494        await self.device.send_command(
495            HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
496            check_result=True,
497        )
498
499    async def stop(self) -> None:
500        # Disable advertising
501        await self.device.send_command(
502            HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
503            check_result=True,
504        )
505
506
507# -----------------------------------------------------------------------------
508@dataclass
509class AdvertisingEventProperties:
510    is_connectable: bool = True
511    is_scannable: bool = False
512    is_directed: bool = False
513    is_high_duty_cycle_directed_connectable: bool = False
514    is_legacy: bool = False
515    is_anonymous: bool = False
516    include_tx_power: bool = False
517
518    def __int__(self) -> int:
519        properties = (
520            HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0)
521        )
522        if self.is_connectable:
523            properties |= properties.CONNECTABLE_ADVERTISING
524        if self.is_scannable:
525            properties |= properties.SCANNABLE_ADVERTISING
526        if self.is_directed:
527            properties |= properties.DIRECTED_ADVERTISING
528        if self.is_high_duty_cycle_directed_connectable:
529            properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING
530        if self.is_legacy:
531            properties |= properties.USE_LEGACY_ADVERTISING_PDUS
532        if self.is_anonymous:
533            properties |= properties.ANONYMOUS_ADVERTISING
534        if self.include_tx_power:
535            properties |= properties.INCLUDE_TX_POWER
536
537        return int(properties)
538
539    @classmethod
540    def from_advertising_type(
541        cls: Type[AdvertisingEventProperties],
542        advertising_type: AdvertisingType,
543    ) -> AdvertisingEventProperties:
544        return cls(
545            is_connectable=advertising_type.is_connectable,
546            is_scannable=advertising_type.is_scannable,
547            is_directed=advertising_type.is_directed,
548            is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable,
549            is_legacy=True,
550            is_anonymous=False,
551            include_tx_power=False,
552        )
553
554
555# -----------------------------------------------------------------------------
556# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10
557AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
558
559
560# -----------------------------------------------------------------------------
561@dataclass
562class AdvertisingParameters:
563    # pylint: disable=line-too-long
564    advertising_event_properties: AdvertisingEventProperties = field(
565        default_factory=AdvertisingEventProperties
566    )
567    primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
568    primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
569    primary_advertising_channel_map: (
570        HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
571    ) = (
572        AdvertisingChannelMap.CHANNEL_37
573        | AdvertisingChannelMap.CHANNEL_38
574        | AdvertisingChannelMap.CHANNEL_39
575    )
576    own_address_type: OwnAddressType = OwnAddressType.RANDOM
577    peer_address: Address = Address.ANY
578    advertising_filter_policy: int = 0
579    advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER
580    primary_advertising_phy: Phy = Phy.LE_1M
581    secondary_advertising_max_skip: int = 0
582    secondary_advertising_phy: Phy = Phy.LE_1M
583    advertising_sid: int = 0
584    enable_scan_request_notifications: bool = False
585    primary_advertising_phy_options: int = 0
586    secondary_advertising_phy_options: int = 0
587
588
589# -----------------------------------------------------------------------------
590@dataclass
591class PeriodicAdvertisingParameters:
592    # TODO implement this class
593    pass
594
595
596# -----------------------------------------------------------------------------
597@dataclass
598class AdvertisingSet(EventEmitter):
599    device: Device
600    advertising_handle: int
601    auto_restart: bool
602    random_address: Optional[Address]
603    advertising_parameters: AdvertisingParameters
604    advertising_data: bytes
605    scan_response_data: bytes
606    periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters]
607    periodic_advertising_data: bytes
608    selected_tx_power: int = 0
609    enabled: bool = False
610
611    def __post_init__(self) -> None:
612        super().__init__()
613
614    async def set_advertising_parameters(
615        self, advertising_parameters: AdvertisingParameters
616    ) -> None:
617        # Compliance check
618        if (
619            not advertising_parameters.advertising_event_properties.is_legacy
620            and advertising_parameters.advertising_event_properties.is_connectable
621            and advertising_parameters.advertising_event_properties.is_scannable
622        ):
623            logger.warning(
624                "non-legacy extended advertising event properties may not be both "
625                "connectable and scannable"
626            )
627
628        response = await self.device.send_command(
629            HCI_LE_Set_Extended_Advertising_Parameters_Command(
630                advertising_handle=self.advertising_handle,
631                advertising_event_properties=int(
632                    advertising_parameters.advertising_event_properties
633                ),
634                primary_advertising_interval_min=(
635                    int(advertising_parameters.primary_advertising_interval_min / 0.625)
636                ),
637                primary_advertising_interval_max=(
638                    int(advertising_parameters.primary_advertising_interval_min / 0.625)
639                ),
640                primary_advertising_channel_map=int(
641                    advertising_parameters.primary_advertising_channel_map
642                ),
643                own_address_type=advertising_parameters.own_address_type,
644                peer_address_type=advertising_parameters.peer_address.address_type,
645                peer_address=advertising_parameters.peer_address,
646                advertising_tx_power=advertising_parameters.advertising_tx_power,
647                advertising_filter_policy=(
648                    advertising_parameters.advertising_filter_policy
649                ),
650                primary_advertising_phy=advertising_parameters.primary_advertising_phy,
651                secondary_advertising_max_skip=(
652                    advertising_parameters.secondary_advertising_max_skip
653                ),
654                secondary_advertising_phy=(
655                    advertising_parameters.secondary_advertising_phy
656                ),
657                advertising_sid=advertising_parameters.advertising_sid,
658                scan_request_notification_enable=(
659                    1 if advertising_parameters.enable_scan_request_notifications else 0
660                ),
661            ),
662            check_result=True,
663        )
664        self.selected_tx_power = response.return_parameters.selected_tx_power
665        self.advertising_parameters = advertising_parameters
666
667    async def set_advertising_data(self, advertising_data: bytes) -> None:
668        # pylint: disable=line-too-long
669        await self.device.send_command(
670            HCI_LE_Set_Extended_Advertising_Data_Command(
671                advertising_handle=self.advertising_handle,
672                operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
673                fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
674                advertising_data=advertising_data,
675            ),
676            check_result=True,
677        )
678        self.advertising_data = advertising_data
679
680    async def set_scan_response_data(self, scan_response_data: bytes) -> None:
681        # pylint: disable=line-too-long
682        if (
683            scan_response_data
684            and not self.advertising_parameters.advertising_event_properties.is_scannable
685        ):
686            logger.warning(
687                "ignoring attempt to set non-empty scan response data on non-scannable "
688                "advertising set"
689            )
690            return
691
692        await self.device.send_command(
693            HCI_LE_Set_Extended_Scan_Response_Data_Command(
694                advertising_handle=self.advertising_handle,
695                operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
696                fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
697                scan_response_data=scan_response_data,
698            ),
699            check_result=True,
700        )
701        self.scan_response_data = scan_response_data
702
703    async def set_periodic_advertising_parameters(
704        self, advertising_parameters: PeriodicAdvertisingParameters
705    ) -> None:
706        # TODO: send command
707        self.periodic_advertising_parameters = advertising_parameters
708
709    async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
710        # TODO: send command
711        self.periodic_advertising_data = advertising_data
712
713    async def set_random_address(self, random_address: Address) -> None:
714        await self.device.send_command(
715            HCI_LE_Set_Advertising_Set_Random_Address_Command(
716                advertising_handle=self.advertising_handle,
717                random_address=(random_address or self.device.random_address),
718            ),
719            check_result=True,
720        )
721
722    async def start(
723        self, duration: float = 0.0, max_advertising_events: int = 0
724    ) -> None:
725        """
726        Start advertising.
727
728        Args:
729          duration: How long to advertise for, in seconds. Use 0 (the default) for
730          an unlimited duration, unless this advertising set is a High Duty Cycle
731          Directed Advertisement type.
732          max_advertising_events: Maximum number of events to advertise for. Use 0
733          (the default) for an unlimited number of advertisements.
734        """
735        await self.device.send_command(
736            HCI_LE_Set_Extended_Advertising_Enable_Command(
737                enable=1,
738                advertising_handles=[self.advertising_handle],
739                durations=[round(duration * 100)],
740                max_extended_advertising_events=[max_advertising_events],
741            ),
742            check_result=True,
743        )
744        self.enabled = True
745
746        self.emit('start')
747
748    async def start_periodic(self, include_adi: bool = False) -> None:
749        await self.device.send_command(
750            HCI_LE_Set_Periodic_Advertising_Enable_Command(
751                enable=1 | (2 if include_adi else 0),
752                advertising_handles=self.advertising_handle,
753            ),
754            check_result=True,
755        )
756
757        self.emit('start_periodic')
758
759    async def stop(self) -> None:
760        await self.device.send_command(
761            HCI_LE_Set_Extended_Advertising_Enable_Command(
762                enable=0,
763                advertising_handles=[self.advertising_handle],
764                durations=[0],
765                max_extended_advertising_events=[0],
766            ),
767            check_result=True,
768        )
769        self.enabled = False
770
771        self.emit('stop')
772
773    async def stop_periodic(self) -> None:
774        await self.device.send_command(
775            HCI_LE_Set_Periodic_Advertising_Enable_Command(
776                enable=0,
777                advertising_handles=self.advertising_handle,
778            ),
779            check_result=True,
780        )
781
782        self.emit('stop_periodic')
783
784    async def remove(self) -> None:
785        await self.device.send_command(
786            HCI_LE_Remove_Advertising_Set_Command(
787                advertising_handle=self.advertising_handle
788            ),
789            check_result=True,
790        )
791        del self.device.extended_advertising_sets[self.advertising_handle]
792
793    def on_termination(self, status: int) -> None:
794        self.enabled = False
795        self.emit('termination', status)
796
797
798# -----------------------------------------------------------------------------
799class LePhyOptions:
800    # Coded PHY preference
801    ANY_CODED_PHY = 0
802    PREFER_S_2_CODED_PHY = 1
803    PREFER_S_8_CODED_PHY = 2
804
805    def __init__(self, coded_phy_preference=0):
806        self.coded_phy_preference = coded_phy_preference
807
808    def __int__(self):
809        return self.coded_phy_preference & 3
810
811
812# -----------------------------------------------------------------------------
813_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
814
815
816class Peer:
817    def __init__(self, connection: Connection) -> None:
818        self.connection = connection
819
820        # Create a GATT client for the connection
821        self.gatt_client = gatt_client.Client(connection)
822        connection.gatt_client = self.gatt_client
823
824    @property
825    def services(self) -> List[gatt_client.ServiceProxy]:
826        return self.gatt_client.services
827
828    async def request_mtu(self, mtu: int) -> int:
829        mtu = await self.gatt_client.request_mtu(mtu)
830        self.connection.emit('connection_att_mtu_update')
831        return mtu
832
833    async def discover_service(
834        self, uuid: Union[core.UUID, str]
835    ) -> List[gatt_client.ServiceProxy]:
836        return await self.gatt_client.discover_service(uuid)
837
838    async def discover_services(
839        self, uuids: Iterable[core.UUID] = ()
840    ) -> List[gatt_client.ServiceProxy]:
841        return await self.gatt_client.discover_services(uuids)
842
843    async def discover_included_services(
844        self, service: gatt_client.ServiceProxy
845    ) -> List[gatt_client.ServiceProxy]:
846        return await self.gatt_client.discover_included_services(service)
847
848    async def discover_characteristics(
849        self,
850        uuids: Iterable[Union[core.UUID, str]] = (),
851        service: Optional[gatt_client.ServiceProxy] = None,
852    ) -> List[gatt_client.CharacteristicProxy]:
853        return await self.gatt_client.discover_characteristics(
854            uuids=uuids, service=service
855        )
856
857    async def discover_descriptors(
858        self,
859        characteristic: Optional[gatt_client.CharacteristicProxy] = None,
860        start_handle: Optional[int] = None,
861        end_handle: Optional[int] = None,
862    ):
863        return await self.gatt_client.discover_descriptors(
864            characteristic, start_handle, end_handle
865        )
866
867    async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
868        return await self.gatt_client.discover_attributes()
869
870    async def subscribe(
871        self,
872        characteristic: gatt_client.CharacteristicProxy,
873        subscriber: Optional[Callable[[bytes], Any]] = None,
874        prefer_notify: bool = True,
875    ) -> None:
876        return await self.gatt_client.subscribe(
877            characteristic, subscriber, prefer_notify
878        )
879
880    async def unsubscribe(
881        self,
882        characteristic: gatt_client.CharacteristicProxy,
883        subscriber: Optional[Callable[[bytes], Any]] = None,
884    ) -> None:
885        return await self.gatt_client.unsubscribe(characteristic, subscriber)
886
887    async def read_value(
888        self, attribute: Union[int, gatt_client.AttributeProxy]
889    ) -> bytes:
890        return await self.gatt_client.read_value(attribute)
891
892    async def write_value(
893        self,
894        attribute: Union[int, gatt_client.AttributeProxy],
895        value: bytes,
896        with_response: bool = False,
897    ) -> None:
898        return await self.gatt_client.write_value(attribute, value, with_response)
899
900    async def read_characteristics_by_uuid(
901        self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
902    ) -> List[bytes]:
903        return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
904
905    def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
906        return self.gatt_client.get_services_by_uuid(uuid)
907
908    def get_characteristics_by_uuid(
909        self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
910    ) -> List[gatt_client.CharacteristicProxy]:
911        return self.gatt_client.get_characteristics_by_uuid(uuid, service)
912
913    def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
914        return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
915
916    async def discover_service_and_create_proxy(
917        self, proxy_class: Type[_PROXY_CLASS]
918    ) -> Optional[_PROXY_CLASS]:
919        # Discover the first matching service and its characteristics
920        services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
921        if services:
922            service = services[0]
923            await service.discover_characteristics()
924            return self.create_service_proxy(proxy_class)
925        return None
926
927    async def sustain(self, timeout: Optional[float] = None) -> None:
928        await self.connection.sustain(timeout)
929
930    # [Classic only]
931    async def request_name(self) -> str:
932        return await self.connection.request_remote_name()
933
934    async def __aenter__(self):
935        await self.discover_services()
936        for service in self.services:
937            await service.discover_characteristics()
938
939        return self
940
941    async def __aexit__(self, exc_type, exc_value, traceback):
942        pass
943
944    def __str__(self) -> str:
945        return f'{self.connection.peer_address} as {self.connection.role_name}'
946
947
948# -----------------------------------------------------------------------------
949@dataclass
950class ConnectionParametersPreferences:
951    default: ClassVar[ConnectionParametersPreferences]
952    connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
953    connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
954    max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
955    supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
956    min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
957    max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
958
959
960ConnectionParametersPreferences.default = ConnectionParametersPreferences()
961
962
963# -----------------------------------------------------------------------------
964@dataclass
965class ScoLink(CompositeEventEmitter):
966    device: Device
967    acl_connection: Connection
968    handle: int
969    link_type: int
970    sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None
971
972    def __post_init__(self) -> None:
973        super().__init__()
974
975    async def disconnect(
976        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
977    ) -> None:
978        await self.device.disconnect(self, reason)
979
980
981# -----------------------------------------------------------------------------
982@dataclass
983class CisLink(CompositeEventEmitter):
984    class State(IntEnum):
985        PENDING = 0
986        ESTABLISHED = 1
987
988    device: Device
989    acl_connection: Connection  # Based ACL connection
990    handle: int  # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
991    cis_id: int  # CIS ID assigned by Central device
992    cig_id: int  # CIG ID assigned by Central device
993    state: State = State.PENDING
994    sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None
995
996    def __post_init__(self) -> None:
997        super().__init__()
998
999    async def disconnect(
1000        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
1001    ) -> None:
1002        await self.device.disconnect(self, reason)
1003
1004
1005# -----------------------------------------------------------------------------
1006class Connection(CompositeEventEmitter):
1007    device: Device
1008    handle: int
1009    transport: int
1010    self_address: Address
1011    peer_address: Address
1012    peer_resolvable_address: Optional[Address]
1013    peer_le_features: Optional[LeFeatureMask]
1014    role: int
1015    encryption: int
1016    authenticated: bool
1017    sc: bool
1018    link_key_type: int
1019    gatt_client: gatt_client.Client
1020    pairing_peer_io_capability: Optional[int]
1021    pairing_peer_authentication_requirements: Optional[int]
1022
1023    @composite_listener
1024    class Listener:
1025        def on_disconnection(self, reason):
1026            pass
1027
1028        def on_connection_parameters_update(self):
1029            pass
1030
1031        def on_connection_parameters_update_failure(self, error):
1032            pass
1033
1034        def on_connection_data_length_change(self):
1035            pass
1036
1037        def on_connection_phy_update(self):
1038            pass
1039
1040        def on_connection_phy_update_failure(self, error):
1041            pass
1042
1043        def on_connection_att_mtu_update(self):
1044            pass
1045
1046        def on_connection_encryption_change(self):
1047            pass
1048
1049        def on_connection_encryption_key_refresh(self):
1050            pass
1051
1052    def __init__(
1053        self,
1054        device,
1055        handle,
1056        transport,
1057        self_address,
1058        peer_address,
1059        peer_resolvable_address,
1060        role,
1061        parameters,
1062        phy,
1063    ):
1064        super().__init__()
1065        self.device = device
1066        self.handle = handle
1067        self.transport = transport
1068        self.self_address = self_address
1069        self.peer_address = peer_address
1070        self.peer_resolvable_address = peer_resolvable_address
1071        self.peer_name = None  # Classic only
1072        self.role = role
1073        self.parameters = parameters
1074        self.encryption = 0
1075        self.authenticated = False
1076        self.sc = False
1077        self.link_key_type = None
1078        self.phy = phy
1079        self.att_mtu = ATT_DEFAULT_MTU
1080        self.data_length = DEVICE_DEFAULT_DATA_LENGTH
1081        self.gatt_client = None  # Per-connection client
1082        self.gatt_server = (
1083            device.gatt_server
1084        )  # By default, use the device's shared server
1085        self.pairing_peer_io_capability = None
1086        self.pairing_peer_authentication_requirements = None
1087        self.peer_le_features = None
1088
1089    # [Classic only]
1090    @classmethod
1091    def incomplete(cls, device, peer_address, role):
1092        """
1093        Instantiate an incomplete connection (ie. one waiting for a HCI Connection
1094        Complete event).
1095        Once received it shall be completed using the `.complete` method.
1096        """
1097        return cls(
1098            device,
1099            None,
1100            BT_BR_EDR_TRANSPORT,
1101            device.public_address,
1102            peer_address,
1103            None,
1104            role,
1105            None,
1106            None,
1107        )
1108
1109    # [Classic only]
1110    def complete(self, handle, parameters):
1111        """
1112        Finish an incomplete connection upon completion.
1113        """
1114        assert self.handle is None
1115        assert self.transport == BT_BR_EDR_TRANSPORT
1116        self.handle = handle
1117        self.parameters = parameters
1118
1119    @property
1120    def role_name(self):
1121        if self.role is None:
1122            return 'NOT-SET'
1123        if self.role == BT_CENTRAL_ROLE:
1124            return 'CENTRAL'
1125        if self.role == BT_PERIPHERAL_ROLE:
1126            return 'PERIPHERAL'
1127        return f'UNKNOWN[{self.role}]'
1128
1129    @property
1130    def is_encrypted(self):
1131        return self.encryption != 0
1132
1133    @property
1134    def is_incomplete(self) -> bool:
1135        return self.handle is None
1136
1137    def send_l2cap_pdu(self, cid: int, pdu: bytes) -> None:
1138        self.device.send_l2cap_pdu(self.handle, cid, pdu)
1139
1140    @deprecated("Please use create_l2cap_channel()")
1141    async def open_l2cap_channel(
1142        self,
1143        psm,
1144        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1145        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1146        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1147    ):
1148        return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps)
1149
1150    @overload
1151    async def create_l2cap_channel(
1152        self, spec: l2cap.ClassicChannelSpec
1153    ) -> l2cap.ClassicChannel: ...
1154
1155    @overload
1156    async def create_l2cap_channel(
1157        self, spec: l2cap.LeCreditBasedChannelSpec
1158    ) -> l2cap.LeCreditBasedChannel: ...
1159
1160    async def create_l2cap_channel(
1161        self, spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec]
1162    ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]:
1163        return await self.device.create_l2cap_channel(connection=self, spec=spec)
1164
1165    async def disconnect(
1166        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
1167    ) -> None:
1168        await self.device.disconnect(self, reason)
1169
1170    async def pair(self) -> None:
1171        return await self.device.pair(self)
1172
1173    def request_pairing(self) -> None:
1174        return self.device.request_pairing(self)
1175
1176    # [Classic only]
1177    async def authenticate(self) -> None:
1178        return await self.device.authenticate(self)
1179
1180    async def encrypt(self, enable: bool = True) -> None:
1181        return await self.device.encrypt(self, enable)
1182
1183    async def switch_role(self, role: int) -> None:
1184        return await self.device.switch_role(self, role)
1185
1186    async def sustain(self, timeout: Optional[float] = None) -> None:
1187        """Idles the current task waiting for a disconnect or timeout"""
1188
1189        abort = asyncio.get_running_loop().create_future()
1190        self.on('disconnection', abort.set_result)
1191        self.on('disconnection_failure', abort.set_exception)
1192
1193        try:
1194            await asyncio.wait_for(self.device.abort_on('flush', abort), timeout)
1195        except asyncio.TimeoutError:
1196            pass
1197
1198        self.remove_listener('disconnection', abort.set_result)
1199        self.remove_listener('disconnection_failure', abort.set_exception)
1200
1201    async def set_data_length(self, tx_octets, tx_time) -> None:
1202        return await self.device.set_data_length(self, tx_octets, tx_time)
1203
1204    async def update_parameters(
1205        self,
1206        connection_interval_min,
1207        connection_interval_max,
1208        max_latency,
1209        supervision_timeout,
1210        use_l2cap=False,
1211    ):
1212        return await self.device.update_connection_parameters(
1213            self,
1214            connection_interval_min,
1215            connection_interval_max,
1216            max_latency,
1217            supervision_timeout,
1218            use_l2cap=use_l2cap,
1219        )
1220
1221    async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
1222        return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
1223
1224    async def get_rssi(self):
1225        return await self.device.get_connection_rssi(self)
1226
1227    async def get_phy(self):
1228        return await self.device.get_connection_phy(self)
1229
1230    # [Classic only]
1231    async def request_remote_name(self):
1232        return await self.device.request_remote_name(self)
1233
1234    async def get_remote_le_features(self) -> LeFeatureMask:
1235        """[LE Only] Reads remote LE supported features.
1236
1237        Returns:
1238            LE features supported by the remote device.
1239        """
1240        self.peer_le_features = await self.device.get_remote_le_features(self)
1241        return self.peer_le_features
1242
1243    async def __aenter__(self):
1244        return self
1245
1246    async def __aexit__(self, exc_type, exc_value, traceback):
1247        if exc_type is None:
1248            try:
1249                await self.disconnect()
1250            except HCI_StatusError as error:
1251                # Invalid parameter means the connection is no longer valid
1252                if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR:
1253                    raise
1254
1255    def __str__(self):
1256        return (
1257            f'Connection(handle=0x{self.handle:04X}, '
1258            f'role={self.role_name}, '
1259            f'self_address={self.self_address}, '
1260            f'peer_address={self.peer_address})'
1261        )
1262
1263
1264# -----------------------------------------------------------------------------
1265@dataclass
1266class DeviceConfiguration:
1267    # Setup defaults
1268    name: str = DEVICE_DEFAULT_NAME
1269    address: Address = Address(DEVICE_DEFAULT_ADDRESS)
1270    class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE
1271    scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
1272    advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
1273    advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
1274    le_enabled: bool = True
1275    # LE host enable 2nd parameter
1276    le_simultaneous_enabled: bool = False
1277    classic_enabled: bool = False
1278    classic_sc_enabled: bool = True
1279    classic_ssp_enabled: bool = True
1280    classic_smp_enabled: bool = True
1281    classic_accept_any: bool = True
1282    connectable: bool = True
1283    discoverable: bool = True
1284    advertising_data: bytes = bytes(
1285        AdvertisingData(
1286            [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))]
1287        )
1288    )
1289    irk: bytes = bytes(16)  # This really must be changed for any level of security
1290    keystore: Optional[str] = None
1291    address_resolution_offload: bool = False
1292    cis_enabled: bool = False
1293
1294    def __post_init__(self) -> None:
1295        self.gatt_services: List[Dict[str, Any]] = []
1296
1297    def load_from_dict(self, config: Dict[str, Any]) -> None:
1298        config = copy.deepcopy(config)
1299
1300        # Load simple properties
1301        if address := config.pop('address', None):
1302            self.address = Address(address)
1303
1304        # Load or synthesize an IRK
1305        if irk := config.pop('irk', None):
1306            self.irk = bytes.fromhex(irk)
1307        elif self.address != Address(DEVICE_DEFAULT_ADDRESS):
1308            # Construct an IRK from the address bytes
1309            # NOTE: this is not secure, but will always give the same IRK for the same
1310            # address
1311            address_bytes = bytes(self.address)
1312            self.irk = (address_bytes * 3)[:16]
1313        else:
1314            # Fallback - when both IRK and address are not set, randomly generate an IRK.
1315            self.irk = secrets.token_bytes(16)
1316
1317        if (name := config.pop('name', None)) is not None:
1318            self.name = name
1319
1320        # Load advertising data
1321        if advertising_data := config.pop('advertising_data', None):
1322            self.advertising_data = bytes.fromhex(advertising_data)
1323        elif name is not None:
1324            self.advertising_data = bytes(
1325                AdvertisingData(
1326                    [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]
1327                )
1328            )
1329
1330        # Load advertising interval (for backward compatibility)
1331        if advertising_interval := config.pop('advertising_interval', None):
1332            self.advertising_interval_min = advertising_interval
1333            self.advertising_interval_max = advertising_interval
1334            if (
1335                'advertising_interval_max' in config
1336                or 'advertising_interval_min' in config
1337            ):
1338                logger.warning(
1339                    'Trying to set both advertising_interval and '
1340                    'advertising_interval_min/max, advertising_interval will be'
1341                    'ignored.'
1342                )
1343
1344        # Load data in primitive types.
1345        for key, value in config.items():
1346            setattr(self, key, value)
1347
1348    def load_from_file(self, filename: str) -> None:
1349        with open(filename, 'r', encoding='utf-8') as file:
1350            self.load_from_dict(json.load(file))
1351
1352    @classmethod
1353    def from_file(cls: Type[Self], filename: str) -> Self:
1354        config = cls()
1355        config.load_from_file(filename)
1356        return config
1357
1358    @classmethod
1359    def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self:
1360        device_config = cls()
1361        device_config.load_from_dict(config)
1362        return device_config
1363
1364
1365# -----------------------------------------------------------------------------
1366# Decorators used with the following Device class
1367# (we define them outside of the Device class, because defining decorators
1368#  within a class requires unnecessarily complicated acrobatics)
1369# -----------------------------------------------------------------------------
1370
1371
1372# Decorator that converts the first argument from a connection handle to a connection
1373def with_connection_from_handle(function):
1374    @functools.wraps(function)
1375    def wrapper(self, connection_handle, *args, **kwargs):
1376        if (connection := self.lookup_connection(connection_handle)) is None:
1377            raise ValueError(f'no connection for handle: 0x{connection_handle:04x}')
1378        return function(self, connection, *args, **kwargs)
1379
1380    return wrapper
1381
1382
1383# Decorator that converts the first argument from a bluetooth address to a connection
1384def with_connection_from_address(function):
1385    @functools.wraps(function)
1386    def wrapper(self, address, *args, **kwargs):
1387        if connection := self.pending_connections.get(address, False):
1388            return function(self, connection, *args, **kwargs)
1389        for connection in self.connections.values():
1390            if connection.peer_address == address:
1391                return function(self, connection, *args, **kwargs)
1392        raise ValueError('no connection for address')
1393
1394    return wrapper
1395
1396
1397# Decorator that tries to convert the first argument from a bluetooth address to a
1398# connection
1399def try_with_connection_from_address(function):
1400    @functools.wraps(function)
1401    def wrapper(self, address, *args, **kwargs):
1402        if connection := self.pending_connections.get(address, False):
1403            return function(self, connection, address, *args, **kwargs)
1404        for connection in self.connections.values():
1405            if connection.peer_address == address:
1406                return function(self, connection, address, *args, **kwargs)
1407        return function(self, None, address, *args, **kwargs)
1408
1409    return wrapper
1410
1411
1412# Decorator that adds a method to the list of event handlers for host events.
1413# This assumes that the method name starts with `on_`
1414def host_event_handler(function):
1415    device_host_event_handlers.append(function.__name__[3:])
1416    return function
1417
1418
1419# List of host event handlers for the Device class.
1420# (we define this list outside the class, because referencing a class in method
1421#  decorators is not straightforward)
1422device_host_event_handlers: List[str] = []
1423
1424
1425# -----------------------------------------------------------------------------
1426class Device(CompositeEventEmitter):
1427    # Incomplete list of fields.
1428    random_address: Address
1429    public_address: Address
1430    classic_enabled: bool
1431    name: str
1432    class_of_device: int
1433    gatt_server: gatt_server.Server
1434    advertising_data: bytes
1435    scan_response_data: bytes
1436    connections: Dict[int, Connection]
1437    pending_connections: Dict[Address, Connection]
1438    classic_pending_accepts: Dict[
1439        Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
1440    ]
1441    advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
1442    config: DeviceConfiguration
1443    legacy_advertiser: Optional[LegacyAdvertiser]
1444    sco_links: Dict[int, ScoLink]
1445    cis_links: Dict[int, CisLink]
1446    _pending_cis: Dict[int, Tuple[int, int]]
1447
1448    @composite_listener
1449    class Listener:
1450        def on_advertisement(self, advertisement):
1451            pass
1452
1453        def on_inquiry_result(self, address, class_of_device, data, rssi):
1454            pass
1455
1456        def on_connection(self, connection):
1457            pass
1458
1459        def on_connection_failure(self, error):
1460            pass
1461
1462        def on_connection_request(self, bd_addr, class_of_device, link_type):
1463            pass
1464
1465        def on_characteristic_subscription(
1466            self, connection, characteristic, notify_enabled, indicate_enabled
1467        ):
1468            pass
1469
1470    @classmethod
1471    def with_hci(
1472        cls,
1473        name: str,
1474        address: Address,
1475        hci_source: TransportSource,
1476        hci_sink: TransportSink,
1477    ) -> Device:
1478        '''
1479        Create a Device instance with a Host configured to communicate with a controller
1480        through an HCI source/sink
1481        '''
1482        host = Host(controller_source=hci_source, controller_sink=hci_sink)
1483        return cls(name=name, address=address, host=host)
1484
1485    @classmethod
1486    def from_config_file(cls, filename: str) -> Device:
1487        config = DeviceConfiguration.from_file(filename)
1488        return cls(config=config)
1489
1490    @classmethod
1491    def from_config_with_hci(
1492        cls,
1493        config: DeviceConfiguration,
1494        hci_source: TransportSource,
1495        hci_sink: TransportSink,
1496    ) -> Device:
1497        host = Host(controller_source=hci_source, controller_sink=hci_sink)
1498        return cls(config=config, host=host)
1499
1500    @classmethod
1501    def from_config_file_with_hci(
1502        cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink
1503    ) -> Device:
1504        config = DeviceConfiguration.from_file(filename)
1505        return cls.from_config_with_hci(config, hci_source, hci_sink)
1506
1507    def __init__(
1508        self,
1509        name: Optional[str] = None,
1510        address: Optional[Address] = None,
1511        config: Optional[DeviceConfiguration] = None,
1512        host: Optional[Host] = None,
1513        generic_access_service: bool = True,
1514    ) -> None:
1515        super().__init__()
1516
1517        self._host = None
1518        self.powered_on = False
1519        self.auto_restart_inquiry = True
1520        self.command_timeout = 10  # seconds
1521        self.gatt_server = gatt_server.Server(self)
1522        self.sdp_server = sdp.Server(self)
1523        self.l2cap_channel_manager = l2cap.ChannelManager(
1524            [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
1525        )
1526        self.advertisement_accumulators = {}  # Accumulators, by address
1527        self.scanning = False
1528        self.scanning_is_passive = False
1529        self.discovering = False
1530        self.le_connecting = False
1531        self.disconnecting = False
1532        self.connections = {}  # Connections, by connection handle
1533        self.pending_connections = {}  # Connections, by BD address (BR/EDR only)
1534        self.sco_links = {}  # ScoLinks, by connection handle (BR/EDR only)
1535        self.cis_links = {}  # CisLinks, by connection handle (LE only)
1536        self._pending_cis = {}  # (CIS_ID, CIG_ID), by CIS_handle
1537        self.classic_enabled = False
1538        self.inquiry_response = None
1539        self.address_resolver = None
1540        self.classic_pending_accepts = {
1541            Address.ANY: []
1542        }  # Futures, by BD address OR [Futures] for Address.ANY
1543
1544        # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated.
1545        if sys.version_info >= (3, 10):
1546            self._cis_lock = asyncio.Lock()
1547        else:
1548            self._cis_lock = AsyncExitStack()
1549
1550        # Own address type cache
1551        self.connect_own_address_type = None
1552
1553        # Use the initial config or a default
1554        config = config or DeviceConfiguration()
1555        self.config = config
1556
1557        self.public_address = Address('00:00:00:00:00:00')
1558        self.name = config.name
1559        self.random_address = config.address
1560        self.class_of_device = config.class_of_device
1561        self.keystore = None
1562        self.irk = config.irk
1563        self.le_enabled = config.le_enabled
1564        self.classic_enabled = config.classic_enabled
1565        self.le_simultaneous_enabled = config.le_simultaneous_enabled
1566        self.cis_enabled = config.cis_enabled
1567        self.classic_sc_enabled = config.classic_sc_enabled
1568        self.classic_ssp_enabled = config.classic_ssp_enabled
1569        self.classic_smp_enabled = config.classic_smp_enabled
1570        self.discoverable = config.discoverable
1571        self.connectable = config.connectable
1572        self.classic_accept_any = config.classic_accept_any
1573        self.address_resolution_offload = config.address_resolution_offload
1574
1575        # Extended advertising.
1576        self.extended_advertising_sets: Dict[int, AdvertisingSet] = {}
1577
1578        # Legacy advertising.
1579        # The advertising and scan response data, as well as the advertising interval
1580        # values are stored as properties of this object for convenience so that they
1581        # can be initialized from a config object, and for backward compatibility for
1582        # client code that may set those values directly before calling
1583        # start_advertising().
1584        self.legacy_advertising_set: Optional[AdvertisingSet] = None
1585        self.legacy_advertiser: Optional[LegacyAdvertiser] = None
1586        self.advertising_data = config.advertising_data
1587        self.scan_response_data = config.scan_response_data
1588        self.advertising_interval_min = config.advertising_interval_min
1589        self.advertising_interval_max = config.advertising_interval_max
1590
1591        for service in config.gatt_services:
1592            characteristics = []
1593            for characteristic in service.get("characteristics", []):
1594                descriptors = []
1595                for descriptor in characteristic.get("descriptors", []):
1596                    # Leave this check until 5/25/2023
1597                    if descriptor.get("permission", False):
1598                        raise Exception(
1599                            "Error parsing Device Config's GATT Services. "
1600                            "The key 'permission' must be renamed to 'permissions'"
1601                        )
1602                    new_descriptor = Descriptor(
1603                        attribute_type=descriptor["descriptor_type"],
1604                        permissions=descriptor["permissions"],
1605                    )
1606                    descriptors.append(new_descriptor)
1607                new_characteristic = Characteristic(
1608                    uuid=characteristic["uuid"],
1609                    properties=Characteristic.Properties.from_string(
1610                        characteristic["properties"]
1611                    ),
1612                    permissions=characteristic["permissions"],
1613                    descriptors=descriptors,
1614                )
1615                characteristics.append(new_characteristic)
1616            new_service = Service(uuid=service["uuid"], characteristics=characteristics)
1617            self.gatt_server.add_service(new_service)
1618
1619        # If a name is passed, override the name from the config
1620        if name:
1621            self.name = name
1622
1623        # If an address is passed, override the address from the config
1624        if address:
1625            if isinstance(address, str):
1626                address = Address(address)
1627            self.random_address = address
1628
1629        # Setup SMP
1630        self.smp_manager = smp.Manager(
1631            self, pairing_config_factory=lambda connection: PairingConfig()
1632        )
1633
1634        self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
1635
1636        # Register the SDP server with the L2CAP Channel Manager
1637        self.sdp_server.register(self.l2cap_channel_manager)
1638
1639        self.add_default_services(generic_access_service)
1640        self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
1641
1642        # Forward some events
1643        setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
1644
1645        # Set the initial host
1646        if host:
1647            self.host = host
1648
1649    @property
1650    def host(self) -> Host:
1651        assert self._host
1652        return self._host
1653
1654    @host.setter
1655    def host(self, host: Host) -> None:
1656        # Unsubscribe from events from the current host
1657        if self._host:
1658            for event_name in device_host_event_handlers:
1659                self._host.remove_listener(
1660                    event_name, getattr(self, f'on_{event_name}')
1661                )
1662
1663        # Subscribe to events from the new host
1664        if host:
1665            for event_name in device_host_event_handlers:
1666                host.on(event_name, getattr(self, f'on_{event_name}'))
1667
1668        # Update the references to the new host
1669        self._host = host
1670        self.l2cap_channel_manager.host = host
1671
1672        # Set providers for the new host
1673        if host:
1674            host.long_term_key_provider = self.get_long_term_key
1675            host.link_key_provider = self.get_link_key
1676
1677    @property
1678    def sdp_service_records(self):
1679        return self.sdp_server.service_records
1680
1681    @sdp_service_records.setter
1682    def sdp_service_records(self, service_records):
1683        self.sdp_server.service_records = service_records
1684
1685    def lookup_connection(self, connection_handle: int) -> Optional[Connection]:
1686        if connection := self.connections.get(connection_handle):
1687            return connection
1688
1689        return None
1690
1691    def find_connection_by_bd_addr(
1692        self,
1693        bd_addr: Address,
1694        transport: Optional[int] = None,
1695        check_address_type: bool = False,
1696    ) -> Optional[Connection]:
1697        for connection in self.connections.values():
1698            if connection.peer_address.to_bytes() == bd_addr.to_bytes():
1699                if (
1700                    check_address_type
1701                    and connection.peer_address.address_type != bd_addr.address_type
1702                ):
1703                    continue
1704                if transport is None or connection.transport == transport:
1705                    return connection
1706
1707        return None
1708
1709    @deprecated("Please use create_l2cap_server()")
1710    def register_l2cap_server(self, psm, server) -> int:
1711        return self.l2cap_channel_manager.register_server(psm, server)
1712
1713    @deprecated("Please use create_l2cap_server()")
1714    def register_l2cap_channel_server(
1715        self,
1716        psm,
1717        server,
1718        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1719        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1720        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1721    ):
1722        return self.l2cap_channel_manager.register_le_coc_server(
1723            psm, server, max_credits, mtu, mps
1724        )
1725
1726    @deprecated("Please use create_l2cap_channel()")
1727    async def open_l2cap_channel(
1728        self,
1729        connection,
1730        psm,
1731        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1732        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1733        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1734    ):
1735        return await self.l2cap_channel_manager.open_le_coc(
1736            connection, psm, max_credits, mtu, mps
1737        )
1738
1739    @overload
1740    async def create_l2cap_channel(
1741        self,
1742        connection: Connection,
1743        spec: l2cap.ClassicChannelSpec,
1744    ) -> l2cap.ClassicChannel: ...
1745
1746    @overload
1747    async def create_l2cap_channel(
1748        self,
1749        connection: Connection,
1750        spec: l2cap.LeCreditBasedChannelSpec,
1751    ) -> l2cap.LeCreditBasedChannel: ...
1752
1753    async def create_l2cap_channel(
1754        self,
1755        connection: Connection,
1756        spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec],
1757    ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]:
1758        if isinstance(spec, l2cap.ClassicChannelSpec):
1759            return await self.l2cap_channel_manager.create_classic_channel(
1760                connection=connection, spec=spec
1761            )
1762        if isinstance(spec, l2cap.LeCreditBasedChannelSpec):
1763            return await self.l2cap_channel_manager.create_le_credit_based_channel(
1764                connection=connection, spec=spec
1765            )
1766
1767    @overload
1768    def create_l2cap_server(
1769        self,
1770        spec: l2cap.ClassicChannelSpec,
1771        handler: Optional[Callable[[l2cap.ClassicChannel], Any]] = None,
1772    ) -> l2cap.ClassicChannelServer: ...
1773
1774    @overload
1775    def create_l2cap_server(
1776        self,
1777        spec: l2cap.LeCreditBasedChannelSpec,
1778        handler: Optional[Callable[[l2cap.LeCreditBasedChannel], Any]] = None,
1779    ) -> l2cap.LeCreditBasedChannelServer: ...
1780
1781    def create_l2cap_server(
1782        self,
1783        spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec],
1784        handler: Union[
1785            Callable[[l2cap.ClassicChannel], Any],
1786            Callable[[l2cap.LeCreditBasedChannel], Any],
1787            None,
1788        ] = None,
1789    ) -> Union[l2cap.ClassicChannelServer, l2cap.LeCreditBasedChannelServer]:
1790        if isinstance(spec, l2cap.ClassicChannelSpec):
1791            return self.l2cap_channel_manager.create_classic_server(
1792                spec=spec,
1793                handler=cast(Callable[[l2cap.ClassicChannel], Any], handler),
1794            )
1795        elif isinstance(spec, l2cap.LeCreditBasedChannelSpec):
1796            return self.l2cap_channel_manager.create_le_credit_based_server(
1797                handler=cast(Callable[[l2cap.LeCreditBasedChannel], Any], handler),
1798                spec=spec,
1799            )
1800        else:
1801            raise ValueError(f'Unexpected mode {spec}')
1802
1803    def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
1804        self.host.send_l2cap_pdu(connection_handle, cid, pdu)
1805
1806    async def send_command(self, command, check_result=False):
1807        try:
1808            return await asyncio.wait_for(
1809                self.host.send_command(command, check_result), self.command_timeout
1810            )
1811        except asyncio.TimeoutError as error:
1812            logger.warning(f'!!! Command {command.name} timed out')
1813            raise CommandTimeoutError() from error
1814
1815    async def power_on(self) -> None:
1816        # Reset the controller
1817        await self.host.reset()
1818
1819        # Try to get the public address from the controller
1820        response = await self.send_command(HCI_Read_BD_ADDR_Command())
1821        if response.return_parameters.status == HCI_SUCCESS:
1822            logger.debug(
1823                color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
1824            )
1825            self.public_address = response.return_parameters.bd_addr
1826
1827        # Instantiate the Key Store (we do this here rather than at __init__ time
1828        # because some Key Store implementations use the public address as a namespace)
1829        if self.keystore is None:
1830            self.keystore = KeyStore.create_for_device(self)
1831
1832        # Finish setting up SMP based on post-init configurable options
1833        if self.classic_smp_enabled:
1834            self.l2cap_channel_manager.register_fixed_channel(
1835                smp.SMP_BR_CID, self.on_smp_pdu
1836            )
1837
1838        if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
1839            await self.send_command(
1840                HCI_Write_LE_Host_Support_Command(
1841                    le_supported_host=int(self.le_enabled),
1842                    simultaneous_le_host=int(self.le_simultaneous_enabled),
1843                )
1844            )
1845
1846        if self.le_enabled:
1847            # Set the controller address
1848            if self.random_address == Address.ANY_RANDOM:
1849                # Try to use an address generated at random by the controller
1850                if self.host.supports_command(HCI_LE_RAND_COMMAND):
1851                    # Get 8 random bytes
1852                    response = await self.send_command(
1853                        HCI_LE_Rand_Command(), check_result=True
1854                    )
1855
1856                    # Ensure the address bytes can be a static random address
1857                    address_bytes = response.return_parameters.random_number[
1858                        :5
1859                    ] + bytes([response.return_parameters.random_number[5] | 0xC0])
1860
1861                    # Create a static random address from the random bytes
1862                    self.random_address = Address(address_bytes)
1863
1864            if self.random_address != Address.ANY_RANDOM:
1865                logger.debug(
1866                    color(
1867                        f'LE Random Address: {self.random_address}',
1868                        'yellow',
1869                    )
1870                )
1871                await self.send_command(
1872                    HCI_LE_Set_Random_Address_Command(
1873                        random_address=self.random_address
1874                    ),
1875                    check_result=True,
1876                )
1877
1878            # Load the address resolving list
1879            if self.keystore:
1880                await self.refresh_resolving_list()
1881
1882            # Enable address resolution
1883            if self.address_resolution_offload:
1884                await self.send_command(
1885                    HCI_LE_Set_Address_Resolution_Enable_Command(
1886                        address_resolution_enable=1
1887                    )
1888                )
1889
1890            if self.cis_enabled:
1891                await self.send_command(
1892                    HCI_LE_Set_Host_Feature_Command(
1893                        bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM,
1894                        bit_value=1,
1895                    )
1896                )
1897
1898        if self.classic_enabled:
1899            await self.send_command(
1900                HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
1901            )
1902            await self.send_command(
1903                HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
1904            )
1905            await self.send_command(
1906                HCI_Write_Simple_Pairing_Mode_Command(
1907                    simple_pairing_mode=int(self.classic_ssp_enabled)
1908                )
1909            )
1910            await self.send_command(
1911                HCI_Write_Secure_Connections_Host_Support_Command(
1912                    secure_connections_host_support=int(self.classic_sc_enabled)
1913                )
1914            )
1915            await self.set_connectable(self.connectable)
1916            await self.set_discoverable(self.discoverable)
1917
1918        # Done
1919        self.powered_on = True
1920
1921    async def reset(self) -> None:
1922        await self.host.reset()
1923
1924    async def power_off(self) -> None:
1925        if self.powered_on:
1926            await self.host.flush()
1927            self.powered_on = False
1928
1929    async def refresh_resolving_list(self) -> None:
1930        assert self.keystore is not None
1931
1932        resolving_keys = await self.keystore.get_resolving_keys()
1933        # Create a host-side address resolver
1934        self.address_resolver = smp.AddressResolver(resolving_keys)
1935
1936        if self.address_resolution_offload:
1937            await self.send_command(HCI_LE_Clear_Resolving_List_Command())
1938
1939            # Add an empty entry for non-directed address generation.
1940            await self.send_command(
1941                HCI_LE_Add_Device_To_Resolving_List_Command(
1942                    peer_identity_address_type=Address.ANY.address_type,
1943                    peer_identity_address=Address.ANY,
1944                    peer_irk=bytes(16),
1945                    local_irk=self.irk,
1946                )
1947            )
1948
1949            for irk, address in resolving_keys:
1950                await self.send_command(
1951                    HCI_LE_Add_Device_To_Resolving_List_Command(
1952                        peer_identity_address_type=address.address_type,
1953                        peer_identity_address=address,
1954                        peer_irk=irk,
1955                        local_irk=self.irk,
1956                    )
1957                )
1958
1959    def supports_le_features(self, feature: LeFeatureMask) -> bool:
1960        return self.host.supports_le_features(feature)
1961
1962    def supports_le_phy(self, phy):
1963        if phy == HCI_LE_1M_PHY:
1964            return True
1965
1966        feature_map = {
1967            HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY,
1968            HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY,
1969        }
1970        if phy not in feature_map:
1971            raise ValueError('invalid PHY')
1972
1973        return self.supports_le_features(feature_map[phy])
1974
1975    @property
1976    def supports_le_extended_advertising(self):
1977        return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING)
1978
1979    async def start_advertising(
1980        self,
1981        advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
1982        target: Optional[Address] = None,
1983        own_address_type: int = OwnAddressType.RANDOM,
1984        auto_restart: bool = False,
1985        advertising_data: Optional[bytes] = None,
1986        scan_response_data: Optional[bytes] = None,
1987        advertising_interval_min: Optional[int] = None,
1988        advertising_interval_max: Optional[int] = None,
1989    ) -> None:
1990        """Start legacy advertising.
1991
1992        If the controller supports it, extended advertising commands with legacy PDUs
1993        will be used to advertise. If not, legacy advertising commands will be used.
1994
1995        Args:
1996          advertising_type:
1997            Type of advertising events.
1998          target:
1999            Peer address for directed advertising target.
2000            (Ignored if `advertising_type` is not directed)
2001          own_address_type:
2002            Own address type to use in the advertising.
2003          auto_restart:
2004            Whether the advertisement will be restarted after disconnection.
2005          advertising_data:
2006            Raw advertising data. If None, the value of the property
2007            self.advertising_data will be used.
2008          scan_response_data:
2009            Raw scan response. If None, the value of the property
2010            self.scan_response_data will be used.
2011          advertising_interval_min:
2012            Minimum advertising interval, in milliseconds. If None, the value of the
2013            property self.advertising_interval_min will be used.
2014          advertising_interval_max:
2015            Maximum advertising interval, in milliseconds. If None, the value of the
2016            property self.advertising_interval_max will be used.
2017        """
2018        # Update backing properties.
2019        if advertising_data is not None:
2020            self.advertising_data = advertising_data
2021        if scan_response_data is not None:
2022            self.scan_response_data = scan_response_data
2023        if advertising_interval_min is not None:
2024            self.advertising_interval_min = advertising_interval_min
2025        if advertising_interval_max is not None:
2026            self.advertising_interval_max = advertising_interval_max
2027
2028        # Decide what peer address to use
2029        if advertising_type.is_directed:
2030            if target is None:
2031                raise ValueError('directed advertising requires a target')
2032            peer_address = target
2033        else:
2034            peer_address = Address.ANY
2035
2036        # If we're already advertising, stop now because we'll be re-creating
2037        # a new advertiser or advertising set.
2038        await self.stop_advertising()
2039        assert self.legacy_advertiser is None
2040        assert self.legacy_advertising_set is None
2041
2042        if self.supports_le_extended_advertising:
2043            # Use extended advertising commands with legacy PDUs.
2044            self.legacy_advertising_set = await self.create_advertising_set(
2045                auto_start=True,
2046                auto_restart=auto_restart,
2047                random_address=self.random_address,
2048                advertising_parameters=AdvertisingParameters(
2049                    advertising_event_properties=(
2050                        AdvertisingEventProperties.from_advertising_type(
2051                            advertising_type
2052                        )
2053                    ),
2054                    primary_advertising_interval_min=self.advertising_interval_min,
2055                    primary_advertising_interval_max=self.advertising_interval_max,
2056                    own_address_type=OwnAddressType(own_address_type),
2057                    peer_address=peer_address,
2058                ),
2059                advertising_data=(
2060                    self.advertising_data if advertising_type.has_data else b''
2061                ),
2062                scan_response_data=(
2063                    self.scan_response_data if advertising_type.is_scannable else b''
2064                ),
2065            )
2066        else:
2067            # Use legacy commands.
2068            self.legacy_advertiser = LegacyAdvertiser(
2069                device=self,
2070                advertising_type=advertising_type,
2071                own_address_type=OwnAddressType(own_address_type),
2072                peer_address=peer_address,
2073                auto_restart=auto_restart,
2074            )
2075
2076            await self.legacy_advertiser.start()
2077
2078    async def stop_advertising(self) -> None:
2079        """Stop legacy advertising."""
2080        # Disable advertising
2081        if self.legacy_advertising_set:
2082            if self.legacy_advertising_set.enabled:
2083                await self.legacy_advertising_set.stop()
2084            await self.legacy_advertising_set.remove()
2085            self.legacy_advertising_set = None
2086        elif self.legacy_advertiser:
2087            await self.legacy_advertiser.stop()
2088            self.legacy_advertiser = None
2089
2090    async def create_advertising_set(
2091        self,
2092        advertising_parameters: Optional[AdvertisingParameters] = None,
2093        random_address: Optional[Address] = None,
2094        advertising_data: bytes = b'',
2095        scan_response_data: bytes = b'',
2096        periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None,
2097        periodic_advertising_data: bytes = b'',
2098        auto_start: bool = True,
2099        auto_restart: bool = False,
2100    ) -> AdvertisingSet:
2101        """
2102        Create an advertising set.
2103
2104        This method allows the creation of advertising sets for controllers that
2105        support extended advertising.
2106
2107        Args:
2108          advertising_parameters:
2109            The parameters to use for this set. If None, default parameters are used.
2110          random_address:
2111            The random address to use (only relevant when the parameters specify that
2112            own_address_type is random).
2113          advertising_data:
2114            Initial value for the set's advertising data.
2115          scan_response_data:
2116            Initial value for the set's scan response data.
2117          periodic_advertising_parameters:
2118            The parameters to use for periodic advertising (if needed).
2119          periodic_advertising_data:
2120            Initial value for the set's periodic advertising data.
2121          auto_start:
2122            True if the set should be automatically started upon creation.
2123          auto_restart:
2124            True if the set should be automatically restated after a disconnection.
2125
2126        Returns:
2127          An AdvertisingSet instance.
2128        """
2129        # Instantiate default values
2130        if advertising_parameters is None:
2131            advertising_parameters = AdvertisingParameters()
2132
2133        if (
2134            not advertising_parameters.advertising_event_properties.is_legacy
2135            and advertising_data
2136            and scan_response_data
2137        ):
2138            raise ValueError(
2139                "Extended advertisements can't have both data and scan \
2140                              response data"
2141            )
2142
2143        # Allocate a new handle
2144        try:
2145            advertising_handle = next(
2146                handle
2147                for handle in range(
2148                    DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
2149                    DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
2150                )
2151                if handle not in self.extended_advertising_sets
2152            )
2153        except StopIteration as exc:
2154            raise RuntimeError("all valid advertising handles already in use") from exc
2155
2156        # Use the device's random address if a random address is needed but none was
2157        # provided.
2158        if (
2159            advertising_parameters.own_address_type
2160            in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
2161            and random_address is None
2162        ):
2163            random_address = self.random_address
2164
2165        # Create the object that represents the set.
2166        advertising_set = AdvertisingSet(
2167            device=self,
2168            advertising_handle=advertising_handle,
2169            auto_restart=auto_restart,
2170            random_address=random_address,
2171            advertising_parameters=advertising_parameters,
2172            advertising_data=advertising_data,
2173            scan_response_data=scan_response_data,
2174            periodic_advertising_parameters=periodic_advertising_parameters,
2175            periodic_advertising_data=periodic_advertising_data,
2176        )
2177
2178        # Create the set in the controller.
2179        await advertising_set.set_advertising_parameters(advertising_parameters)
2180
2181        # Update the set in the controller.
2182        try:
2183            if random_address:
2184                await advertising_set.set_random_address(random_address)
2185
2186            if advertising_data:
2187                await advertising_set.set_advertising_data(advertising_data)
2188
2189            if scan_response_data:
2190                await advertising_set.set_scan_response_data(scan_response_data)
2191
2192            if periodic_advertising_parameters:
2193                # TODO: call LE Set Periodic Advertising Parameters command
2194                raise NotImplementedError('periodic advertising not yet supported')
2195
2196            if periodic_advertising_data:
2197                # TODO: call LE Set Periodic Advertising Data command
2198                raise NotImplementedError('periodic advertising not yet supported')
2199
2200        except HCI_Error as error:
2201            # Remove the advertising set so that it doesn't stay dangling in the
2202            # controller.
2203            await self.send_command(
2204                HCI_LE_Remove_Advertising_Set_Command(
2205                    advertising_handle=advertising_handle
2206                ),
2207                check_result=False,
2208            )
2209            raise error
2210
2211        # Remember the set.
2212        self.extended_advertising_sets[advertising_handle] = advertising_set
2213
2214        # Try to start the set if requested.
2215        if auto_start:
2216            try:
2217                # pylint: disable=line-too-long
2218                duration = (
2219                    DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION
2220                    if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable
2221                    else 0
2222                )
2223                await advertising_set.start(duration=duration)
2224            except Exception as error:
2225                logger.exception(f'failed to start advertising set: {error}')
2226                await advertising_set.remove()
2227                raise
2228
2229        return advertising_set
2230
2231    @property
2232    def is_advertising(self):
2233        if self.legacy_advertiser:
2234            return True
2235
2236        return any(
2237            advertising_set.enabled
2238            for advertising_set in self.extended_advertising_sets.values()
2239        )
2240
2241    async def start_scanning(
2242        self,
2243        legacy: bool = False,
2244        active: bool = True,
2245        scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL,  # Scan interval in ms
2246        scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW,  # Scan window in ms
2247        own_address_type: int = OwnAddressType.RANDOM,
2248        filter_duplicates: bool = False,
2249        scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY],
2250    ) -> None:
2251        # Check that the arguments are legal
2252        if scan_interval < scan_window:
2253            raise ValueError('scan_interval must be >= scan_window')
2254        if (
2255            scan_interval < DEVICE_MIN_SCAN_INTERVAL
2256            or scan_interval > DEVICE_MAX_SCAN_INTERVAL
2257        ):
2258            raise ValueError('scan_interval out of range')
2259        if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
2260            raise ValueError('scan_interval out of range')
2261
2262        # Reset the accumulators
2263        self.advertisement_accumulators = {}
2264
2265        # Enable scanning
2266        if not legacy and self.supports_le_extended_advertising:
2267            # Set the scanning parameters
2268            scan_type = (
2269                HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING
2270                if active
2271                else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
2272            )
2273            scanning_filter_policy = (
2274                HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
2275            )  # TODO: support other types
2276
2277            scanning_phy_count = 0
2278            scanning_phys_bits = 0
2279            if HCI_LE_1M_PHY in scanning_phys:
2280                scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
2281                scanning_phy_count += 1
2282            if HCI_LE_CODED_PHY in scanning_phys:
2283                if self.supports_le_features(LeFeatureMask.LE_CODED_PHY):
2284                    scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
2285                    scanning_phy_count += 1
2286
2287            if scanning_phy_count == 0:
2288                raise ValueError('at least one scanning PHY must be enabled')
2289
2290            await self.send_command(
2291                HCI_LE_Set_Extended_Scan_Parameters_Command(
2292                    own_address_type=own_address_type,
2293                    scanning_filter_policy=scanning_filter_policy,
2294                    scanning_phys=scanning_phys_bits,
2295                    scan_types=[scan_type] * scanning_phy_count,
2296                    scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
2297                    scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
2298                ),
2299                check_result=True,
2300            )
2301
2302            # Enable scanning
2303            await self.send_command(
2304                HCI_LE_Set_Extended_Scan_Enable_Command(
2305                    enable=1,
2306                    filter_duplicates=1 if filter_duplicates else 0,
2307                    duration=0,  # TODO allow other values
2308                    period=0,  # TODO allow other values
2309                ),
2310                check_result=True,
2311            )
2312        else:
2313            # Set the scanning parameters
2314            scan_type = (
2315                HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING
2316                if active
2317                else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
2318            )
2319            await self.send_command(
2320                # pylint: disable=line-too-long
2321                HCI_LE_Set_Scan_Parameters_Command(
2322                    le_scan_type=scan_type,
2323                    le_scan_interval=int(scan_window / 0.625),
2324                    le_scan_window=int(scan_window / 0.625),
2325                    own_address_type=own_address_type,
2326                    scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
2327                ),
2328                check_result=True,
2329            )
2330
2331            # Enable scanning
2332            await self.send_command(
2333                HCI_LE_Set_Scan_Enable_Command(
2334                    le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
2335                ),
2336                check_result=True,
2337            )
2338
2339        self.scanning_is_passive = not active
2340        self.scanning = True
2341
2342    async def stop_scanning(self, legacy: bool = False) -> None:
2343        # Disable scanning
2344        if not legacy and self.supports_le_extended_advertising:
2345            await self.send_command(
2346                HCI_LE_Set_Extended_Scan_Enable_Command(
2347                    enable=0, filter_duplicates=0, duration=0, period=0
2348                ),
2349                check_result=True,
2350            )
2351        else:
2352            await self.send_command(
2353                HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
2354                check_result=True,
2355            )
2356
2357        self.scanning = False
2358
2359    @property
2360    def is_scanning(self):
2361        return self.scanning
2362
2363    @host_event_handler
2364    def on_advertising_report(self, report):
2365        if not (accumulator := self.advertisement_accumulators.get(report.address)):
2366            accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
2367            self.advertisement_accumulators[report.address] = accumulator
2368        if advertisement := accumulator.update(report):
2369            self.emit('advertisement', advertisement)
2370
2371    async def start_discovery(self, auto_restart: bool = True) -> None:
2372        await self.send_command(
2373            HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
2374            check_result=True,
2375        )
2376
2377        response = await self.send_command(
2378            HCI_Inquiry_Command(
2379                lap=HCI_GENERAL_INQUIRY_LAP,
2380                inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
2381                num_responses=0,  # Unlimited number of responses.
2382            )
2383        )
2384        if response.status != HCI_Command_Status_Event.PENDING:
2385            self.discovering = False
2386            raise HCI_StatusError(response)
2387
2388        self.auto_restart_inquiry = auto_restart
2389        self.discovering = True
2390
2391    async def stop_discovery(self) -> None:
2392        if self.discovering:
2393            await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
2394        self.auto_restart_inquiry = True
2395        self.discovering = False
2396
2397    @host_event_handler
2398    def on_inquiry_result(self, address, class_of_device, data, rssi):
2399        self.emit(
2400            'inquiry_result',
2401            address,
2402            class_of_device,
2403            AdvertisingData.from_bytes(data),
2404            rssi,
2405        )
2406
2407    async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled):
2408        if inquiry_scan_enabled and page_scan_enabled:
2409            scan_enable = 0x03
2410        elif page_scan_enabled:
2411            scan_enable = 0x02
2412        elif inquiry_scan_enabled:
2413            scan_enable = 0x01
2414        else:
2415            scan_enable = 0x00
2416
2417        return await self.send_command(
2418            HCI_Write_Scan_Enable_Command(scan_enable=scan_enable)
2419        )
2420
2421    async def set_discoverable(self, discoverable: bool = True) -> None:
2422        self.discoverable = discoverable
2423        if self.classic_enabled:
2424            # Synthesize an inquiry response if none is set already
2425            if self.inquiry_response is None:
2426                self.inquiry_response = bytes(
2427                    AdvertisingData(
2428                        [
2429                            (
2430                                AdvertisingData.COMPLETE_LOCAL_NAME,
2431                                bytes(self.name, 'utf-8'),
2432                            )
2433                        ]
2434                    )
2435                )
2436
2437            # Update the controller
2438            await self.send_command(
2439                HCI_Write_Extended_Inquiry_Response_Command(
2440                    fec_required=0, extended_inquiry_response=self.inquiry_response
2441                ),
2442                check_result=True,
2443            )
2444            await self.set_scan_enable(
2445                inquiry_scan_enabled=self.discoverable,
2446                page_scan_enabled=self.connectable,
2447            )
2448
2449    async def set_connectable(self, connectable: bool = True) -> None:
2450        self.connectable = connectable
2451        if self.classic_enabled:
2452            await self.set_scan_enable(
2453                inquiry_scan_enabled=self.discoverable,
2454                page_scan_enabled=self.connectable,
2455            )
2456
2457    async def connect(
2458        self,
2459        peer_address: Union[Address, str],
2460        transport: int = BT_LE_TRANSPORT,
2461        connection_parameters_preferences: Optional[
2462            Dict[int, ConnectionParametersPreferences]
2463        ] = None,
2464        own_address_type: int = OwnAddressType.RANDOM,
2465        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
2466    ) -> Connection:
2467        '''
2468        Request a connection to a peer.
2469        When transport is BLE, this method cannot be called if there is already a
2470        pending connection.
2471
2472        connection_parameters_preferences: (BLE only, ignored for BR/EDR)
2473          * None: use the 1M PHY with default parameters
2474          * map: each entry has a PHY as key and a ConnectionParametersPreferences
2475            object as value
2476
2477        own_address_type: (BLE only)
2478        '''
2479
2480        # Check parameters
2481        if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
2482            raise ValueError('invalid transport')
2483
2484        # Adjust the transport automatically if we need to
2485        if transport == BT_LE_TRANSPORT and not self.le_enabled:
2486            transport = BT_BR_EDR_TRANSPORT
2487        elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled:
2488            transport = BT_LE_TRANSPORT
2489
2490        # Check that there isn't already a pending connection
2491        if transport == BT_LE_TRANSPORT and self.is_le_connecting:
2492            raise InvalidStateError('connection already pending')
2493
2494        if isinstance(peer_address, str):
2495            try:
2496                peer_address = Address.from_string_for_transport(
2497                    peer_address, transport
2498                )
2499            except ValueError:
2500                # If the address is not parsable, assume it is a name instead
2501                logger.debug('looking for peer by name')
2502                peer_address = await self.find_peer_by_name(
2503                    peer_address, transport
2504                )  # TODO: timeout
2505        else:
2506            # All BR/EDR addresses should be public addresses
2507            if (
2508                transport == BT_BR_EDR_TRANSPORT
2509                and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS
2510            ):
2511                raise ValueError('BR/EDR addresses must be PUBLIC')
2512
2513        assert isinstance(peer_address, Address)
2514
2515        def on_connection(connection):
2516            if transport == BT_LE_TRANSPORT or (
2517                # match BR/EDR connection event against peer address
2518                connection.transport == transport
2519                and connection.peer_address == peer_address
2520            ):
2521                pending_connection.set_result(connection)
2522
2523        def on_connection_failure(error):
2524            if transport == BT_LE_TRANSPORT or (
2525                # match BR/EDR connection failure event against peer address
2526                error.transport == transport
2527                and error.peer_address == peer_address
2528            ):
2529                pending_connection.set_exception(error)
2530
2531        # Create a future so that we can wait for the connection's result
2532        pending_connection = asyncio.get_running_loop().create_future()
2533        self.on('connection', on_connection)
2534        self.on('connection_failure', on_connection_failure)
2535
2536        try:
2537            # Tell the controller to connect
2538            if transport == BT_LE_TRANSPORT:
2539                if connection_parameters_preferences is None:
2540                    if connection_parameters_preferences is None:
2541                        connection_parameters_preferences = {
2542                            HCI_LE_1M_PHY: ConnectionParametersPreferences.default
2543                        }
2544
2545                self.connect_own_address_type = own_address_type
2546
2547                if self.host.supports_command(
2548                    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND
2549                ):
2550                    # Only keep supported PHYs
2551                    phys = sorted(
2552                        list(
2553                            set(
2554                                filter(
2555                                    self.supports_le_phy,
2556                                    connection_parameters_preferences.keys(),
2557                                )
2558                            )
2559                        )
2560                    )
2561                    if not phys:
2562                        raise ValueError('at least one supported PHY needed')
2563
2564                    phy_count = len(phys)
2565                    initiating_phys = phy_list_to_bits(phys)
2566
2567                    connection_interval_mins = [
2568                        int(
2569                            connection_parameters_preferences[
2570                                phy
2571                            ].connection_interval_min
2572                            / 1.25
2573                        )
2574                        for phy in phys
2575                    ]
2576                    connection_interval_maxs = [
2577                        int(
2578                            connection_parameters_preferences[
2579                                phy
2580                            ].connection_interval_max
2581                            / 1.25
2582                        )
2583                        for phy in phys
2584                    ]
2585                    max_latencies = [
2586                        connection_parameters_preferences[phy].max_latency
2587                        for phy in phys
2588                    ]
2589                    supervision_timeouts = [
2590                        int(
2591                            connection_parameters_preferences[phy].supervision_timeout
2592                            / 10
2593                        )
2594                        for phy in phys
2595                    ]
2596                    min_ce_lengths = [
2597                        int(
2598                            connection_parameters_preferences[phy].min_ce_length / 0.625
2599                        )
2600                        for phy in phys
2601                    ]
2602                    max_ce_lengths = [
2603                        int(
2604                            connection_parameters_preferences[phy].max_ce_length / 0.625
2605                        )
2606                        for phy in phys
2607                    ]
2608
2609                    result = await self.send_command(
2610                        HCI_LE_Extended_Create_Connection_Command(
2611                            initiator_filter_policy=0,
2612                            own_address_type=own_address_type,
2613                            peer_address_type=peer_address.address_type,
2614                            peer_address=peer_address,
2615                            initiating_phys=initiating_phys,
2616                            scan_intervals=(
2617                                int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
2618                            )
2619                            * phy_count,
2620                            scan_windows=(
2621                                int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
2622                            )
2623                            * phy_count,
2624                            connection_interval_mins=connection_interval_mins,
2625                            connection_interval_maxs=connection_interval_maxs,
2626                            max_latencies=max_latencies,
2627                            supervision_timeouts=supervision_timeouts,
2628                            min_ce_lengths=min_ce_lengths,
2629                            max_ce_lengths=max_ce_lengths,
2630                        )
2631                    )
2632                else:
2633                    if HCI_LE_1M_PHY not in connection_parameters_preferences:
2634                        raise ValueError('1M PHY preferences required')
2635
2636                    prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
2637                    result = await self.send_command(
2638                        HCI_LE_Create_Connection_Command(
2639                            le_scan_interval=int(
2640                                DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625
2641                            ),
2642                            le_scan_window=int(
2643                                DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625
2644                            ),
2645                            initiator_filter_policy=0,
2646                            peer_address_type=peer_address.address_type,
2647                            peer_address=peer_address,
2648                            own_address_type=own_address_type,
2649                            connection_interval_min=int(
2650                                prefs.connection_interval_min / 1.25
2651                            ),
2652                            connection_interval_max=int(
2653                                prefs.connection_interval_max / 1.25
2654                            ),
2655                            max_latency=prefs.max_latency,
2656                            supervision_timeout=int(prefs.supervision_timeout / 10),
2657                            min_ce_length=int(prefs.min_ce_length / 0.625),
2658                            max_ce_length=int(prefs.max_ce_length / 0.625),
2659                        )
2660                    )
2661            else:
2662                # Save pending connection
2663                self.pending_connections[peer_address] = Connection.incomplete(
2664                    self, peer_address, BT_CENTRAL_ROLE
2665                )
2666
2667                # TODO: allow passing other settings
2668                result = await self.send_command(
2669                    HCI_Create_Connection_Command(
2670                        bd_addr=peer_address,
2671                        packet_type=0xCC18,  # FIXME: change
2672                        page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE,
2673                        clock_offset=0x0000,
2674                        allow_role_switch=0x01,
2675                        reserved=0,
2676                    )
2677                )
2678
2679            if result.status != HCI_Command_Status_Event.PENDING:
2680                raise HCI_StatusError(result)
2681
2682            # Wait for the connection process to complete
2683            if transport == BT_LE_TRANSPORT:
2684                self.le_connecting = True
2685
2686            if timeout is None:
2687                return await self.abort_on('flush', pending_connection)
2688
2689            try:
2690                return await asyncio.wait_for(
2691                    asyncio.shield(pending_connection), timeout
2692                )
2693            except asyncio.TimeoutError:
2694                if transport == BT_LE_TRANSPORT:
2695                    await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
2696                else:
2697                    await self.send_command(
2698                        HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
2699                    )
2700
2701                try:
2702                    return await self.abort_on('flush', pending_connection)
2703                except core.ConnectionError as error:
2704                    raise core.TimeoutError() from error
2705        finally:
2706            self.remove_listener('connection', on_connection)
2707            self.remove_listener('connection_failure', on_connection_failure)
2708            if transport == BT_LE_TRANSPORT:
2709                self.le_connecting = False
2710                self.connect_own_address_type = None
2711            else:
2712                self.pending_connections.pop(peer_address, None)
2713
2714    async def accept(
2715        self,
2716        peer_address: Union[Address, str] = Address.ANY,
2717        role: int = BT_PERIPHERAL_ROLE,
2718        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
2719    ) -> Connection:
2720        '''
2721        Wait and accept any incoming connection or a connection from `peer_address` when
2722        set.
2723
2724        Notes:
2725          * A `connect` to the same peer will not complete this call.
2726          * The `timeout` parameter is only handled while waiting for the connection
2727            request, once received and accepted, the controller shall issue a connection
2728            complete event.
2729        '''
2730
2731        if isinstance(peer_address, str):
2732            try:
2733                peer_address = Address(peer_address)
2734            except ValueError:
2735                # If the address is not parsable, assume it is a name instead
2736                logger.debug('looking for peer by name')
2737                peer_address = await self.find_peer_by_name(
2738                    peer_address, BT_BR_EDR_TRANSPORT
2739                )  # TODO: timeout
2740
2741        assert isinstance(peer_address, Address)
2742
2743        if peer_address == Address.NIL:
2744            raise ValueError('accept on nil address')
2745
2746        # Create a future so that we can wait for the request
2747        pending_request_fut = asyncio.get_running_loop().create_future()
2748
2749        if peer_address == Address.ANY:
2750            self.classic_pending_accepts[Address.ANY].append(pending_request_fut)
2751        elif peer_address in self.classic_pending_accepts:
2752            raise InvalidStateError('accept connection already pending')
2753        else:
2754            self.classic_pending_accepts[peer_address] = [pending_request_fut]
2755
2756        try:
2757            # Wait for a request or a completed connection
2758            pending_request = self.abort_on('flush', pending_request_fut)
2759            result = await (
2760                asyncio.wait_for(pending_request, timeout)
2761                if timeout
2762                else pending_request
2763            )
2764        except Exception:
2765            # Remove future from device context
2766            if peer_address == Address.ANY:
2767                self.classic_pending_accepts[Address.ANY].remove(pending_request_fut)
2768            else:
2769                self.classic_pending_accepts.pop(peer_address)
2770            raise
2771
2772        # Result may already be a completed connection,
2773        # see `on_connection` for details
2774        if isinstance(result, Connection):
2775            return result
2776
2777        # Otherwise, result came from `on_connection_request`
2778        peer_address, _class_of_device, _link_type = result
2779        assert isinstance(peer_address, Address)
2780
2781        # Create a future so that we can wait for the connection's result
2782        pending_connection = asyncio.get_running_loop().create_future()
2783
2784        def on_connection(connection):
2785            if (
2786                connection.transport == BT_BR_EDR_TRANSPORT
2787                and connection.peer_address == peer_address
2788            ):
2789                pending_connection.set_result(connection)
2790
2791        def on_connection_failure(error):
2792            if (
2793                error.transport == BT_BR_EDR_TRANSPORT
2794                and error.peer_address == peer_address
2795            ):
2796                pending_connection.set_exception(error)
2797
2798        self.on('connection', on_connection)
2799        self.on('connection_failure', on_connection_failure)
2800
2801        # Save pending connection, with the Peripheral role.
2802        # Even if we requested a role switch in the HCI_Accept_Connection_Request
2803        # command, this connection is still considered Peripheral until an eventual
2804        # role change event.
2805        self.pending_connections[peer_address] = Connection.incomplete(
2806            self, peer_address, BT_PERIPHERAL_ROLE
2807        )
2808
2809        try:
2810            # Accept connection request
2811            await self.send_command(
2812                HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
2813            )
2814
2815            # Wait for connection complete
2816            return await self.abort_on('flush', pending_connection)
2817
2818        finally:
2819            self.remove_listener('connection', on_connection)
2820            self.remove_listener('connection_failure', on_connection_failure)
2821            self.pending_connections.pop(peer_address, None)
2822
2823    @asynccontextmanager
2824    async def connect_as_gatt(self, peer_address):
2825        async with AsyncExitStack() as stack:
2826            connection = await stack.enter_async_context(
2827                await self.connect(peer_address)
2828            )
2829            peer = await stack.enter_async_context(Peer(connection))
2830
2831            yield peer
2832
2833    @property
2834    def is_le_connecting(self):
2835        return self.le_connecting
2836
2837    @property
2838    def is_disconnecting(self):
2839        return self.disconnecting
2840
2841    async def cancel_connection(self, peer_address=None):
2842        # Low-energy: cancel ongoing connection
2843        if peer_address is None:
2844            if not self.is_le_connecting:
2845                return
2846            await self.send_command(
2847                HCI_LE_Create_Connection_Cancel_Command(), check_result=True
2848            )
2849
2850        # BR/EDR: try to cancel to ongoing connection
2851        # NOTE: This API does not prevent from trying to cancel a connection which is
2852        # not currently being created
2853        else:
2854            if isinstance(peer_address, str):
2855                try:
2856                    peer_address = Address(peer_address)
2857                except ValueError:
2858                    # If the address is not parsable, assume it is a name instead
2859                    logger.debug('looking for peer by name')
2860                    peer_address = await self.find_peer_by_name(
2861                        peer_address, BT_BR_EDR_TRANSPORT
2862                    )  # TODO: timeout
2863
2864            await self.send_command(
2865                HCI_Create_Connection_Cancel_Command(bd_addr=peer_address),
2866                check_result=True,
2867            )
2868
2869    async def disconnect(
2870        self, connection: Union[Connection, ScoLink, CisLink], reason: int
2871    ) -> None:
2872        # Create a future so that we can wait for the disconnection's result
2873        pending_disconnection = asyncio.get_running_loop().create_future()
2874        connection.on('disconnection', pending_disconnection.set_result)
2875        connection.on('disconnection_failure', pending_disconnection.set_exception)
2876
2877        # Request a disconnection
2878        result = await self.send_command(
2879            HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
2880        )
2881
2882        try:
2883            if result.status != HCI_Command_Status_Event.PENDING:
2884                raise HCI_StatusError(result)
2885
2886            # Wait for the disconnection process to complete
2887            self.disconnecting = True
2888            return await self.abort_on('flush', pending_disconnection)
2889        finally:
2890            connection.remove_listener(
2891                'disconnection', pending_disconnection.set_result
2892            )
2893            connection.remove_listener(
2894                'disconnection_failure', pending_disconnection.set_exception
2895            )
2896            self.disconnecting = False
2897
2898    async def set_data_length(self, connection, tx_octets, tx_time) -> None:
2899        if tx_octets < 0x001B or tx_octets > 0x00FB:
2900            raise ValueError('tx_octets must be between 0x001B and 0x00FB')
2901
2902        if tx_time < 0x0148 or tx_time > 0x4290:
2903            raise ValueError('tx_time must be between 0x0148 and 0x4290')
2904
2905        return await self.send_command(
2906            HCI_LE_Set_Data_Length_Command(
2907                connection_handle=connection.handle,
2908                tx_octets=tx_octets,
2909                tx_time=tx_time,
2910            ),
2911            check_result=True,
2912        )
2913
2914    async def update_connection_parameters(
2915        self,
2916        connection,
2917        connection_interval_min,
2918        connection_interval_max,
2919        max_latency,
2920        supervision_timeout,
2921        min_ce_length=0,
2922        max_ce_length=0,
2923        use_l2cap=False,
2924    ) -> None:
2925        '''
2926        NOTE: the name of the parameters may look odd, but it just follows the names
2927        used in the Bluetooth spec.
2928        '''
2929
2930        if use_l2cap:
2931            if connection.role != BT_PERIPHERAL_ROLE:
2932                raise InvalidStateError(
2933                    'only peripheral can update connection parameters with l2cap'
2934                )
2935            l2cap_result = (
2936                await self.l2cap_channel_manager.update_connection_parameters(
2937                    connection,
2938                    connection_interval_min,
2939                    connection_interval_max,
2940                    max_latency,
2941                    supervision_timeout,
2942                )
2943            )
2944            if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT:
2945                raise ConnectionParameterUpdateError(l2cap_result)
2946
2947        result = await self.send_command(
2948            HCI_LE_Connection_Update_Command(
2949                connection_handle=connection.handle,
2950                connection_interval_min=connection_interval_min,
2951                connection_interval_max=connection_interval_max,
2952                max_latency=max_latency,
2953                supervision_timeout=supervision_timeout,
2954                min_ce_length=min_ce_length,
2955                max_ce_length=max_ce_length,
2956            )
2957        )
2958        if result.status != HCI_Command_Status_Event.PENDING:
2959            raise HCI_StatusError(result)
2960
2961    async def get_connection_rssi(self, connection):
2962        result = await self.send_command(
2963            HCI_Read_RSSI_Command(handle=connection.handle), check_result=True
2964        )
2965        return result.return_parameters.rssi
2966
2967    async def get_connection_phy(self, connection):
2968        result = await self.send_command(
2969            HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
2970            check_result=True,
2971        )
2972        return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
2973
2974    async def set_connection_phy(
2975        self, connection, tx_phys=None, rx_phys=None, phy_options=None
2976    ):
2977        if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND):
2978            logger.warning('ignoring request, command not supported')
2979            return
2980
2981        all_phys_bits = (1 if tx_phys is None else 0) | (
2982            (1 if rx_phys is None else 0) << 1
2983        )
2984
2985        result = await self.send_command(
2986            HCI_LE_Set_PHY_Command(
2987                connection_handle=connection.handle,
2988                all_phys=all_phys_bits,
2989                tx_phys=phy_list_to_bits(tx_phys),
2990                rx_phys=phy_list_to_bits(rx_phys),
2991                phy_options=0 if phy_options is None else int(phy_options),
2992            )
2993        )
2994
2995        if result.status != HCI_COMMAND_STATUS_PENDING:
2996            logger.warning(
2997                'HCI_LE_Set_PHY_Command failed: '
2998                f'{HCI_Constant.error_name(result.status)}'
2999            )
3000            raise HCI_StatusError(result)
3001
3002    async def set_default_phy(self, tx_phys=None, rx_phys=None):
3003        all_phys_bits = (1 if tx_phys is None else 0) | (
3004            (1 if rx_phys is None else 0) << 1
3005        )
3006
3007        return await self.send_command(
3008            HCI_LE_Set_Default_PHY_Command(
3009                all_phys=all_phys_bits,
3010                tx_phys=phy_list_to_bits(tx_phys),
3011                rx_phys=phy_list_to_bits(rx_phys),
3012            ),
3013            check_result=True,
3014        )
3015
3016    async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
3017        """
3018        Scan for a peer with a give name and return its address and transport
3019        """
3020
3021        # Create a future to wait for an address to be found
3022        peer_address = asyncio.get_running_loop().create_future()
3023
3024        # Scan/inquire with event handlers to handle scan/inquiry results
3025        def on_peer_found(address, ad_data):
3026            local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
3027            if local_name is None:
3028                local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
3029            if local_name is not None:
3030                if local_name.decode('utf-8') == name:
3031                    peer_address.set_result(address)
3032
3033        handler = None
3034        was_scanning = self.scanning
3035        was_discovering = self.discovering
3036        try:
3037            if transport == BT_LE_TRANSPORT:
3038                event_name = 'advertisement'
3039                handler = self.on(
3040                    event_name,
3041                    lambda advertisement: on_peer_found(
3042                        advertisement.address, advertisement.data
3043                    ),
3044                )
3045
3046                if not self.scanning:
3047                    await self.start_scanning(filter_duplicates=True)
3048
3049            elif transport == BT_BR_EDR_TRANSPORT:
3050                event_name = 'inquiry_result'
3051                handler = self.on(
3052                    event_name,
3053                    lambda address, class_of_device, eir_data, rssi: on_peer_found(
3054                        address, eir_data
3055                    ),
3056                )
3057
3058                if not self.discovering:
3059                    await self.start_discovery()
3060            else:
3061                return None
3062
3063            return await self.abort_on('flush', peer_address)
3064        finally:
3065            if handler is not None:
3066                self.remove_listener(event_name, handler)
3067
3068            if transport == BT_LE_TRANSPORT and not was_scanning:
3069                await self.stop_scanning()
3070            elif transport == BT_BR_EDR_TRANSPORT and not was_discovering:
3071                await self.stop_discovery()
3072
3073    @property
3074    def pairing_config_factory(self) -> Callable[[Connection], PairingConfig]:
3075        return self.smp_manager.pairing_config_factory
3076
3077    @pairing_config_factory.setter
3078    def pairing_config_factory(
3079        self, pairing_config_factory: Callable[[Connection], PairingConfig]
3080    ) -> None:
3081        self.smp_manager.pairing_config_factory = pairing_config_factory
3082
3083    @property
3084    def smp_session_proxy(self) -> Type[smp.Session]:
3085        return self.smp_manager.session_proxy
3086
3087    @smp_session_proxy.setter
3088    def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None:
3089        self.smp_manager.session_proxy = session_proxy
3090
3091    async def pair(self, connection):
3092        return await self.smp_manager.pair(connection)
3093
3094    def request_pairing(self, connection):
3095        return self.smp_manager.request_pairing(connection)
3096
3097    async def get_long_term_key(
3098        self, connection_handle: int, rand: bytes, ediv: int
3099    ) -> Optional[bytes]:
3100        if (connection := self.lookup_connection(connection_handle)) is None:
3101            return None
3102
3103        # Start by looking for the key in an SMP session
3104        ltk = self.smp_manager.get_long_term_key(connection, rand, ediv)
3105        if ltk is not None:
3106            return ltk
3107
3108        # Then look for the key in the keystore
3109        if self.keystore is not None:
3110            keys = await self.keystore.get(str(connection.peer_address))
3111            if keys is not None:
3112                logger.debug('found keys in the key store')
3113                if keys.ltk:
3114                    return keys.ltk.value
3115
3116                if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
3117                    return keys.ltk_central.value
3118
3119                if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
3120                    return keys.ltk_peripheral.value
3121        return None
3122
3123    async def get_link_key(self, address: Address) -> Optional[bytes]:
3124        if self.keystore is None:
3125            return None
3126
3127        # Look for the key in the keystore
3128        keys = await self.keystore.get(str(address))
3129        if keys is None:
3130            logger.debug(f'no keys found for {address}')
3131            return None
3132
3133        logger.debug('found keys in the key store')
3134        if keys.link_key is None:
3135            logger.warning('no link key')
3136            return None
3137
3138        return keys.link_key.value
3139
3140    # [Classic only]
3141    async def authenticate(self, connection):
3142        # Set up event handlers
3143        pending_authentication = asyncio.get_running_loop().create_future()
3144
3145        def on_authentication():
3146            pending_authentication.set_result(None)
3147
3148        def on_authentication_failure(error_code):
3149            pending_authentication.set_exception(HCI_Error(error_code))
3150
3151        connection.on('connection_authentication', on_authentication)
3152        connection.on('connection_authentication_failure', on_authentication_failure)
3153
3154        # Request the authentication
3155        try:
3156            result = await self.send_command(
3157                HCI_Authentication_Requested_Command(
3158                    connection_handle=connection.handle
3159                )
3160            )
3161            if result.status != HCI_COMMAND_STATUS_PENDING:
3162                logger.warning(
3163                    'HCI_Authentication_Requested_Command failed: '
3164                    f'{HCI_Constant.error_name(result.status)}'
3165                )
3166                raise HCI_StatusError(result)
3167
3168            # Wait for the authentication to complete
3169            await connection.abort_on('disconnection', pending_authentication)
3170        finally:
3171            connection.remove_listener('connection_authentication', on_authentication)
3172            connection.remove_listener(
3173                'connection_authentication_failure', on_authentication_failure
3174            )
3175
3176    async def encrypt(self, connection, enable=True):
3177        if not enable and connection.transport == BT_LE_TRANSPORT:
3178            raise ValueError('`enable` parameter is classic only.')
3179
3180        # Set up event handlers
3181        pending_encryption = asyncio.get_running_loop().create_future()
3182
3183        def on_encryption_change():
3184            pending_encryption.set_result(None)
3185
3186        def on_encryption_failure(error_code):
3187            pending_encryption.set_exception(HCI_Error(error_code))
3188
3189        connection.on('connection_encryption_change', on_encryption_change)
3190        connection.on('connection_encryption_failure', on_encryption_failure)
3191
3192        # Request the encryption
3193        try:
3194            if connection.transport == BT_LE_TRANSPORT:
3195                # Look for a key in the key store
3196                if self.keystore is None:
3197                    raise RuntimeError('no key store')
3198
3199                keys = await self.keystore.get(str(connection.peer_address))
3200                if keys is None:
3201                    raise RuntimeError('keys not found in key store')
3202
3203                if keys.ltk is not None:
3204                    ltk = keys.ltk.value
3205                    rand = bytes(8)
3206                    ediv = 0
3207                elif keys.ltk_central is not None:
3208                    ltk = keys.ltk_central.value
3209                    rand = keys.ltk_central.rand
3210                    ediv = keys.ltk_central.ediv
3211                else:
3212                    raise RuntimeError('no LTK found for peer')
3213
3214                if connection.role != HCI_CENTRAL_ROLE:
3215                    raise InvalidStateError('only centrals can start encryption')
3216
3217                result = await self.send_command(
3218                    HCI_LE_Enable_Encryption_Command(
3219                        connection_handle=connection.handle,
3220                        random_number=rand,
3221                        encrypted_diversifier=ediv,
3222                        long_term_key=ltk,
3223                    )
3224                )
3225
3226                if result.status != HCI_COMMAND_STATUS_PENDING:
3227                    logger.warning(
3228                        'HCI_LE_Enable_Encryption_Command failed: '
3229                        f'{HCI_Constant.error_name(result.status)}'
3230                    )
3231                    raise HCI_StatusError(result)
3232            else:
3233                result = await self.send_command(
3234                    HCI_Set_Connection_Encryption_Command(
3235                        connection_handle=connection.handle,
3236                        encryption_enable=0x01 if enable else 0x00,
3237                    )
3238                )
3239
3240                if result.status != HCI_COMMAND_STATUS_PENDING:
3241                    logger.warning(
3242                        'HCI_Set_Connection_Encryption_Command failed: '
3243                        f'{HCI_Constant.error_name(result.status)}'
3244                    )
3245                    raise HCI_StatusError(result)
3246
3247            # Wait for the result
3248            await connection.abort_on('disconnection', pending_encryption)
3249        finally:
3250            connection.remove_listener(
3251                'connection_encryption_change', on_encryption_change
3252            )
3253            connection.remove_listener(
3254                'connection_encryption_failure', on_encryption_failure
3255            )
3256
3257    async def update_keys(self, address: str, keys: PairingKeys) -> None:
3258        if self.keystore is None:
3259            return
3260
3261        try:
3262            await self.keystore.update(address, keys)
3263            await self.refresh_resolving_list()
3264        except Exception as error:
3265            logger.warning(f'!!! error while storing keys: {error}')
3266        else:
3267            self.emit('key_store_update')
3268
3269    # [Classic only]
3270    async def switch_role(self, connection: Connection, role: int):
3271        pending_role_change = asyncio.get_running_loop().create_future()
3272
3273        def on_role_change(new_role):
3274            pending_role_change.set_result(new_role)
3275
3276        def on_role_change_failure(error_code):
3277            pending_role_change.set_exception(HCI_Error(error_code))
3278
3279        connection.on('role_change', on_role_change)
3280        connection.on('role_change_failure', on_role_change_failure)
3281
3282        try:
3283            result = await self.send_command(
3284                HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)
3285            )
3286            if result.status != HCI_COMMAND_STATUS_PENDING:
3287                logger.warning(
3288                    'HCI_Switch_Role_Command failed: '
3289                    f'{HCI_Constant.error_name(result.status)}'
3290                )
3291                raise HCI_StatusError(result)
3292            await connection.abort_on('disconnection', pending_role_change)
3293        finally:
3294            connection.remove_listener('role_change', on_role_change)
3295            connection.remove_listener('role_change_failure', on_role_change_failure)
3296
3297    # [Classic only]
3298    async def request_remote_name(self, remote: Union[Address, Connection]) -> str:
3299        # Set up event handlers
3300        pending_name = asyncio.get_running_loop().create_future()
3301
3302        peer_address = remote if isinstance(remote, Address) else remote.peer_address
3303
3304        handler = self.on(
3305            'remote_name',
3306            lambda address, remote_name: (
3307                pending_name.set_result(remote_name)
3308                if address == peer_address
3309                else None
3310            ),
3311        )
3312        failure_handler = self.on(
3313            'remote_name_failure',
3314            lambda address, error_code: (
3315                pending_name.set_exception(HCI_Error(error_code))
3316                if address == peer_address
3317                else None
3318            ),
3319        )
3320
3321        try:
3322            result = await self.send_command(
3323                HCI_Remote_Name_Request_Command(
3324                    bd_addr=peer_address,
3325                    page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
3326                    reserved=0,
3327                    clock_offset=0,  # TODO investigate non-0 values
3328                )
3329            )
3330
3331            if result.status != HCI_COMMAND_STATUS_PENDING:
3332                logger.warning(
3333                    'HCI_Remote_Name_Request_Command failed: '
3334                    f'{HCI_Constant.error_name(result.status)}'
3335                )
3336                raise HCI_StatusError(result)
3337
3338            # Wait for the result
3339            return await self.abort_on('flush', pending_name)
3340        finally:
3341            self.remove_listener('remote_name', handler)
3342            self.remove_listener('remote_name_failure', failure_handler)
3343
3344    # [LE only]
3345    @experimental('Only for testing.')
3346    async def setup_cig(
3347        self,
3348        cig_id: int,
3349        cis_id: List[int],
3350        sdu_interval: Tuple[int, int],
3351        framing: int,
3352        max_sdu: Tuple[int, int],
3353        retransmission_number: int,
3354        max_transport_latency: Tuple[int, int],
3355    ) -> List[int]:
3356        """Sends HCI_LE_Set_CIG_Parameters_Command.
3357
3358        Args:
3359            cig_id: CIG_ID.
3360            cis_id: CID ID list.
3361            sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
3362            framing: Un-framing(0) or Framing(1).
3363            max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
3364            retransmission_number: retransmission_number.
3365            max_transport_latency: Max transport latencies of
3366                                   (Central->Peripheral, Peripheral->Cental).
3367
3368        Returns:
3369            List of created CIS handles corresponding to the same order of [cid_id].
3370        """
3371        num_cis = len(cis_id)
3372
3373        response = await self.send_command(
3374            HCI_LE_Set_CIG_Parameters_Command(
3375                cig_id=cig_id,
3376                sdu_interval_c_to_p=sdu_interval[0],
3377                sdu_interval_p_to_c=sdu_interval[1],
3378                worst_case_sca=0x00,  # 251-500 ppm
3379                packing=0x00,  # Sequential
3380                framing=framing,
3381                max_transport_latency_c_to_p=max_transport_latency[0],
3382                max_transport_latency_p_to_c=max_transport_latency[1],
3383                cis_id=cis_id,
3384                max_sdu_c_to_p=[max_sdu[0]] * num_cis,
3385                max_sdu_p_to_c=[max_sdu[1]] * num_cis,
3386                phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
3387                phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
3388                rtn_c_to_p=[retransmission_number] * num_cis,
3389                rtn_p_to_c=[retransmission_number] * num_cis,
3390            ),
3391            check_result=True,
3392        )
3393
3394        # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
3395        # Server, so here it only provides a basic functionality for testing.
3396        cis_handles = response.return_parameters.connection_handle[:]
3397        for id, cis_handle in zip(cis_id, cis_handles):
3398            self._pending_cis[cis_handle] = (id, cig_id)
3399
3400        return cis_handles
3401
3402    # [LE only]
3403    @experimental('Only for testing.')
3404    async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
3405        for cis_handle, acl_handle in cis_acl_pairs:
3406            acl_connection = self.lookup_connection(acl_handle)
3407            assert acl_connection
3408            cis_id, cig_id = self._pending_cis.pop(cis_handle)
3409            self.cis_links[cis_handle] = CisLink(
3410                device=self,
3411                acl_connection=acl_connection,
3412                handle=cis_handle,
3413                cis_id=cis_id,
3414                cig_id=cig_id,
3415            )
3416
3417        with closing(EventWatcher()) as watcher:
3418            pending_cis_establishments = {
3419                cis_handle: asyncio.get_running_loop().create_future()
3420                for cis_handle, _ in cis_acl_pairs
3421            }
3422
3423            def on_cis_establishment(cis_link: CisLink) -> None:
3424                if pending_future := pending_cis_establishments.get(cis_link.handle):
3425                    pending_future.set_result(cis_link)
3426
3427            def on_cis_establishment_failure(cis_handle: int, status: int) -> None:
3428                if pending_future := pending_cis_establishments.get(cis_handle):
3429                    pending_future.set_exception(HCI_Error(status))
3430
3431            watcher.on(self, 'cis_establishment', on_cis_establishment)
3432            watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure)
3433            await self.send_command(
3434                HCI_LE_Create_CIS_Command(
3435                    cis_connection_handle=[p[0] for p in cis_acl_pairs],
3436                    acl_connection_handle=[p[1] for p in cis_acl_pairs],
3437                ),
3438                check_result=True,
3439            )
3440
3441            return await asyncio.gather(*pending_cis_establishments.values())
3442
3443    # [LE only]
3444    @experimental('Only for testing.')
3445    async def accept_cis_request(self, handle: int) -> CisLink:
3446        """[LE Only] Accepts an incoming CIS request.
3447
3448        When the specified CIS handle is already created, this method returns the
3449        existed CIS link object immediately.
3450
3451        Args:
3452            handle: CIS handle to accept.
3453
3454        Returns:
3455            CIS link object on the given handle.
3456        """
3457        if not (cis_link := self.cis_links.get(handle)):
3458            raise InvalidStateError(f'No pending CIS request of handle {handle}')
3459
3460        # There might be multiple ASE sharing a CIS channel.
3461        # If one of them has accepted the request, the others should just leverage it.
3462        async with self._cis_lock:
3463            if cis_link.state == CisLink.State.ESTABLISHED:
3464                return cis_link
3465
3466            with closing(EventWatcher()) as watcher:
3467                pending_establishment = asyncio.get_running_loop().create_future()
3468
3469                def on_establishment() -> None:
3470                    pending_establishment.set_result(None)
3471
3472                def on_establishment_failure(status: int) -> None:
3473                    pending_establishment.set_exception(HCI_Error(status))
3474
3475                watcher.on(cis_link, 'establishment', on_establishment)
3476                watcher.on(cis_link, 'establishment_failure', on_establishment_failure)
3477
3478                await self.send_command(
3479                    HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
3480                    check_result=True,
3481                )
3482
3483                await pending_establishment
3484                return cis_link
3485
3486        # Mypy believes this is reachable when context is an ExitStack.
3487        raise InvalidStateError('Unreachable')
3488
3489    # [LE only]
3490    @experimental('Only for testing.')
3491    async def reject_cis_request(
3492        self,
3493        handle: int,
3494        reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
3495    ) -> None:
3496        await self.send_command(
3497            HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
3498            check_result=True,
3499        )
3500
3501    async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
3502        """[LE Only] Reads remote LE supported features.
3503
3504        Args:
3505            handle: connection handle to read LE features.
3506
3507        Returns:
3508            LE features supported by the remote device.
3509        """
3510        with closing(EventWatcher()) as watcher:
3511            read_feature_future: asyncio.Future[LeFeatureMask] = (
3512                asyncio.get_running_loop().create_future()
3513            )
3514
3515            def on_le_remote_features(handle: int, features: int):
3516                if handle == connection.handle:
3517                    read_feature_future.set_result(LeFeatureMask(features))
3518
3519            def on_failure(handle: int, status: int):
3520                if handle == connection.handle:
3521                    read_feature_future.set_exception(HCI_Error(status))
3522
3523            watcher.on(self.host, 'le_remote_features', on_le_remote_features)
3524            watcher.on(self.host, 'le_remote_features_failure', on_failure)
3525            await self.send_command(
3526                HCI_LE_Read_Remote_Features_Command(
3527                    connection_handle=connection.handle
3528                ),
3529                check_result=True,
3530            )
3531            return await read_feature_future
3532
3533    @host_event_handler
3534    def on_flush(self):
3535        self.emit('flush')
3536        for _, connection in self.connections.items():
3537            connection.emit('disconnection', 0)
3538        self.connections = {}
3539
3540    # [Classic only]
3541    @host_event_handler
3542    def on_link_key(self, bd_addr, link_key, key_type):
3543        # Store the keys in the key store
3544        if self.keystore:
3545            authenticated = key_type in (
3546                HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
3547                HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
3548            )
3549            pairing_keys = PairingKeys()
3550            pairing_keys.link_key = PairingKeys.Key(
3551                value=link_key, authenticated=authenticated
3552            )
3553
3554            self.abort_on('flush', self.update_keys(str(bd_addr), pairing_keys))
3555
3556        if connection := self.find_connection_by_bd_addr(
3557            bd_addr, transport=BT_BR_EDR_TRANSPORT
3558        ):
3559            connection.link_key_type = key_type
3560
3561    def add_service(self, service):
3562        self.gatt_server.add_service(service)
3563
3564    def add_services(self, services):
3565        self.gatt_server.add_services(services)
3566
3567    def add_default_services(self, generic_access_service=True):
3568        # Add a GAP Service if requested
3569        if generic_access_service:
3570            self.gatt_server.add_service(GenericAccessService(self.name))
3571
3572    async def notify_subscriber(self, connection, attribute, value=None, force=False):
3573        await self.gatt_server.notify_subscriber(connection, attribute, value, force)
3574
3575    async def notify_subscribers(self, attribute, value=None, force=False):
3576        await self.gatt_server.notify_subscribers(attribute, value, force)
3577
3578    async def indicate_subscriber(self, connection, attribute, value=None, force=False):
3579        await self.gatt_server.indicate_subscriber(connection, attribute, value, force)
3580
3581    async def indicate_subscribers(self, attribute, value=None, force=False):
3582        await self.gatt_server.indicate_subscribers(attribute, value, force)
3583
3584    @host_event_handler
3585    def on_advertising_set_termination(
3586        self,
3587        status,
3588        advertising_handle,
3589        connection_handle,
3590        number_of_completed_extended_advertising_events,
3591    ):
3592        # Legacy advertising set is also one of extended advertising sets.
3593        if not (
3594            advertising_set := self.extended_advertising_sets.get(advertising_handle)
3595        ):
3596            logger.warning(f'advertising set {advertising_handle} not found')
3597            return
3598
3599        advertising_set.on_termination(status)
3600
3601        if status != HCI_SUCCESS:
3602            logger.debug(
3603                f'advertising set {advertising_handle} '
3604                f'terminated with status {status}'
3605            )
3606            return
3607
3608        if not (connection := self.lookup_connection(connection_handle)):
3609            logger.warning(f'no connection for handle 0x{connection_handle:04x}')
3610            return
3611
3612        # Update the connection address.
3613        connection.self_address = (
3614            advertising_set.random_address
3615            if advertising_set.advertising_parameters.own_address_type
3616            in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
3617            else self.public_address
3618        )
3619
3620        # Setup auto-restart of the advertising set if needed.
3621        if advertising_set.auto_restart:
3622            connection.once(
3623                'disconnection',
3624                lambda _: self.abort_on('flush', advertising_set.start()),
3625            )
3626
3627        self._emit_le_connection(connection)
3628
3629    def _emit_le_connection(self, connection: Connection) -> None:
3630        # If supported, read which PHY we're connected with before
3631        # notifying listeners of the new connection.
3632        if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
3633
3634            async def read_phy():
3635                result = await self.send_command(
3636                    HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
3637                    check_result=True,
3638                )
3639                connection.phy = ConnectionPHY(
3640                    result.return_parameters.tx_phy, result.return_parameters.rx_phy
3641                )
3642                # Emit an event to notify listeners of the new connection
3643                self.emit('connection', connection)
3644
3645            # Do so asynchronously to not block the current event handler
3646            connection.abort_on('disconnection', read_phy())
3647
3648            return
3649
3650        self.emit('connection', connection)
3651
3652    @host_event_handler
3653    def on_connection(
3654        self,
3655        connection_handle,
3656        transport,
3657        peer_address,
3658        role,
3659        connection_parameters,
3660    ):
3661        logger.debug(
3662            f'*** Connection: [0x{connection_handle:04X}] '
3663            f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}'
3664        )
3665        if connection_handle in self.connections:
3666            logger.warning(
3667                'new connection reuses the same handle as a previous connection'
3668            )
3669
3670        if transport == BT_BR_EDR_TRANSPORT:
3671            # Create a new connection
3672            connection = self.pending_connections.pop(peer_address)
3673            connection.complete(connection_handle, connection_parameters)
3674            self.connections[connection_handle] = connection
3675
3676            # Emit an event to notify listeners of the new connection
3677            self.emit('connection', connection)
3678
3679            return
3680
3681        # Resolve the peer address if we can
3682        peer_resolvable_address = None
3683        if self.address_resolver:
3684            if peer_address.is_resolvable:
3685                resolved_address = self.address_resolver.resolve(peer_address)
3686                if resolved_address is not None:
3687                    logger.debug(f'*** Address resolved as {resolved_address}')
3688                    peer_resolvable_address = peer_address
3689                    peer_address = resolved_address
3690
3691        self_address = None
3692        if role == HCI_CENTRAL_ROLE:
3693            own_address_type = self.connect_own_address_type
3694            assert own_address_type is not None
3695        else:
3696            if self.supports_le_extended_advertising:
3697                # We'll know the address when the advertising set terminates,
3698                # Use a temporary placeholder value for self_address.
3699                self_address = Address.ANY_RANDOM
3700            else:
3701                # We were connected via a legacy advertisement.
3702                if self.legacy_advertiser:
3703                    own_address_type = self.legacy_advertiser.own_address_type
3704                else:
3705                    # This should not happen, but just in case, pick a default.
3706                    logger.warning("connection without an advertiser")
3707                    self_address = self.random_address
3708
3709        if self_address is None:
3710            self_address = (
3711                self.public_address
3712                if own_address_type
3713                in (
3714                    OwnAddressType.PUBLIC,
3715                    OwnAddressType.RESOLVABLE_OR_PUBLIC,
3716                )
3717                else self.random_address
3718            )
3719
3720        # Create a connection.
3721        connection = Connection(
3722            self,
3723            connection_handle,
3724            transport,
3725            self_address,
3726            peer_address,
3727            peer_resolvable_address,
3728            role,
3729            connection_parameters,
3730            ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
3731        )
3732        self.connections[connection_handle] = connection
3733
3734        if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser:
3735            if self.legacy_advertiser.auto_restart:
3736                connection.once(
3737                    'disconnection',
3738                    lambda _: self.abort_on('flush', self.legacy_advertiser.start()),
3739                )
3740            else:
3741                self.legacy_advertiser = None
3742
3743        if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
3744            # We can emit now, we have all the info we need
3745            self._emit_le_connection(connection)
3746
3747    @host_event_handler
3748    def on_connection_failure(self, transport, peer_address, error_code):
3749        logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
3750
3751        # For directed advertising, this means a timeout
3752        if (
3753            transport == BT_LE_TRANSPORT
3754            and self.legacy_advertiser
3755            and self.legacy_advertiser.advertising_type.is_directed
3756        ):
3757            self.legacy_advertiser = None
3758
3759        # Notify listeners
3760        error = core.ConnectionError(
3761            error_code,
3762            transport,
3763            peer_address,
3764            'hci',
3765            HCI_Constant.error_name(error_code),
3766        )
3767        self.emit('connection_failure', error)
3768
3769    # FIXME: Explore a delegate-model for BR/EDR wait connection #56.
3770    @host_event_handler
3771    def on_connection_request(self, bd_addr, class_of_device, link_type):
3772        logger.debug(f'*** Connection request: {bd_addr}')
3773
3774        # Handle SCO request.
3775        if link_type in (
3776            HCI_Connection_Complete_Event.SCO_LINK_TYPE,
3777            HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
3778        ):
3779            if connection := self.find_connection_by_bd_addr(
3780                bd_addr, transport=BT_BR_EDR_TRANSPORT
3781            ):
3782                self.emit('sco_request', connection, link_type)
3783            else:
3784                logger.error(f'SCO request from a non-connected device {bd_addr}')
3785            return
3786
3787        # match a pending future using `bd_addr`
3788        elif bd_addr in self.classic_pending_accepts:
3789            future, *_ = self.classic_pending_accepts.pop(bd_addr)
3790            future.set_result((bd_addr, class_of_device, link_type))
3791
3792        # match first pending future for ANY address
3793        elif len(self.classic_pending_accepts[Address.ANY]) > 0:
3794            future = self.classic_pending_accepts[Address.ANY].pop(0)
3795            future.set_result((bd_addr, class_of_device, link_type))
3796
3797        # device configuration is set to accept any incoming connection
3798        elif self.classic_accept_any:
3799            # Save pending connection
3800            self.pending_connections[bd_addr] = Connection.incomplete(
3801                self, bd_addr, BT_PERIPHERAL_ROLE
3802            )
3803
3804            self.host.send_command_sync(
3805                HCI_Accept_Connection_Request_Command(
3806                    bd_addr=bd_addr, role=0x01  # Remain the peripheral
3807                )
3808            )
3809
3810        # reject incoming connection
3811        else:
3812            self.host.send_command_sync(
3813                HCI_Reject_Connection_Request_Command(
3814                    bd_addr=bd_addr,
3815                    reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
3816                )
3817            )
3818
3819    @host_event_handler
3820    def on_disconnection(self, connection_handle: int, reason: int) -> None:
3821        if connection := self.connections.pop(connection_handle, None):
3822            logger.debug(
3823                f'*** Disconnection: [0x{connection.handle:04X}] '
3824                f'{connection.peer_address} as {connection.role_name}, reason={reason}'
3825            )
3826            connection.emit('disconnection', reason)
3827
3828            # Cleanup subsystems that maintain per-connection state
3829            self.gatt_server.on_disconnection(connection)
3830        elif sco_link := self.sco_links.pop(connection_handle, None):
3831            sco_link.emit('disconnection', reason)
3832        elif cis_link := self.cis_links.pop(connection_handle, None):
3833            cis_link.emit('disconnection', reason)
3834        else:
3835            logger.error(
3836                f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
3837            )
3838
3839    @host_event_handler
3840    @with_connection_from_handle
3841    def on_disconnection_failure(self, connection, error_code):
3842        logger.debug(f'*** Disconnection failed: {error_code}')
3843        error = core.ConnectionError(
3844            error_code,
3845            connection.transport,
3846            connection.peer_address,
3847            'hci',
3848            HCI_Constant.error_name(error_code),
3849        )
3850        connection.emit('disconnection_failure', error)
3851
3852    @host_event_handler
3853    @AsyncRunner.run_in_task()
3854    async def on_inquiry_complete(self):
3855        if self.auto_restart_inquiry:
3856            # Inquire again
3857            await self.start_discovery(auto_restart=True)
3858        else:
3859            self.auto_restart_inquiry = True
3860            self.discovering = False
3861            self.emit('inquiry_complete')
3862
3863    @host_event_handler
3864    @with_connection_from_handle
3865    def on_connection_authentication(self, connection):
3866        logger.debug(
3867            f'*** Connection Authentication: [0x{connection.handle:04X}] '
3868            f'{connection.peer_address} as {connection.role_name}'
3869        )
3870        connection.authenticated = True
3871        connection.emit('connection_authentication')
3872
3873    @host_event_handler
3874    @with_connection_from_handle
3875    def on_connection_authentication_failure(self, connection, error):
3876        logger.debug(
3877            f'*** Connection Authentication Failure: [0x{connection.handle:04X}] '
3878            f'{connection.peer_address} as {connection.role_name}, error={error}'
3879        )
3880        connection.emit('connection_authentication_failure', error)
3881
3882    # [Classic only]
3883    @host_event_handler
3884    @with_connection_from_address
3885    def on_authentication_io_capability_request(self, connection):
3886        # Ask what the pairing config should be for this connection
3887        pairing_config = self.pairing_config_factory(connection)
3888
3889        # Compute the authentication requirements
3890        authentication_requirements = (
3891            # No Bonding
3892            (
3893                HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
3894                HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
3895            ),
3896            # General Bonding
3897            (
3898                HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
3899                HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
3900            ),
3901        )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
3902
3903        # Respond
3904        self.host.send_command_sync(
3905            HCI_IO_Capability_Request_Reply_Command(
3906                bd_addr=connection.peer_address,
3907                io_capability=pairing_config.delegate.classic_io_capability,
3908                oob_data_present=0x00,  # Not present
3909                authentication_requirements=authentication_requirements,
3910            )
3911        )
3912
3913    # [Classic only]
3914    @host_event_handler
3915    @with_connection_from_address
3916    def on_authentication_io_capability_response(
3917        self, connection, io_capability, authentication_requirements
3918    ):
3919        connection.peer_pairing_io_capability = io_capability
3920        connection.peer_pairing_authentication_requirements = (
3921            authentication_requirements
3922        )
3923
3924    # [Classic only]
3925    @host_event_handler
3926    @with_connection_from_address
3927    def on_authentication_user_confirmation_request(self, connection, code) -> None:
3928        # Ask what the pairing config should be for this connection
3929        pairing_config = self.pairing_config_factory(connection)
3930        io_capability = pairing_config.delegate.classic_io_capability
3931        peer_io_capability = connection.peer_pairing_io_capability
3932
3933        async def confirm() -> bool:
3934            # Ask the user to confirm the pairing, without display
3935            return await pairing_config.delegate.confirm()
3936
3937        async def auto_confirm() -> bool:
3938            # Ask the user to auto-confirm the pairing, without display
3939            return await pairing_config.delegate.confirm(auto=True)
3940
3941        async def display_confirm() -> bool:
3942            # Display the code and ask the user to compare
3943            return await pairing_config.delegate.compare_numbers(code, digits=6)
3944
3945        async def display_auto_confirm() -> bool:
3946            # Display the code to the user and ask the delegate to auto-confirm
3947            await pairing_config.delegate.display_number(code, digits=6)
3948            return await pairing_config.delegate.confirm(auto=True)
3949
3950        async def na() -> bool:
3951            assert False, "N/A: unreachable"
3952
3953        # See Bluetooth spec @ Vol 3, Part C 5.2.2.6
3954        methods = {
3955            HCI_DISPLAY_ONLY_IO_CAPABILITY: {
3956                HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
3957                HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
3958                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
3959                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
3960            },
3961            HCI_DISPLAY_YES_NO_IO_CAPABILITY: {
3962                HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
3963                HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
3964                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
3965                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
3966            },
3967            HCI_KEYBOARD_ONLY_IO_CAPABILITY: {
3968                HCI_DISPLAY_ONLY_IO_CAPABILITY: na,
3969                HCI_DISPLAY_YES_NO_IO_CAPABILITY: na,
3970                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
3971                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
3972            },
3973            HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
3974                HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm,
3975                HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm,
3976                HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm,
3977                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
3978            },
3979        }
3980
3981        method = methods[peer_io_capability][io_capability]
3982
3983        async def reply() -> None:
3984            try:
3985                if await connection.abort_on('disconnection', method()):
3986                    await self.host.send_command(
3987                        HCI_User_Confirmation_Request_Reply_Command(
3988                            bd_addr=connection.peer_address
3989                        )
3990                    )
3991                    return
3992            except Exception as error:
3993                logger.warning(f'exception while confirming: {error}')
3994
3995            await self.host.send_command(
3996                HCI_User_Confirmation_Request_Negative_Reply_Command(
3997                    bd_addr=connection.peer_address
3998                )
3999            )
4000
4001        AsyncRunner.spawn(reply())
4002
4003    # [Classic only]
4004    @host_event_handler
4005    @with_connection_from_address
4006    def on_authentication_user_passkey_request(self, connection) -> None:
4007        # Ask what the pairing config should be for this connection
4008        pairing_config = self.pairing_config_factory(connection)
4009
4010        async def reply() -> None:
4011            try:
4012                number = await connection.abort_on(
4013                    'disconnection', pairing_config.delegate.get_number()
4014                )
4015                if number is not None:
4016                    await self.host.send_command(
4017                        HCI_User_Passkey_Request_Reply_Command(
4018                            bd_addr=connection.peer_address, numeric_value=number
4019                        )
4020                    )
4021                    return
4022            except Exception as error:
4023                logger.warning(f'exception while asking for pass-key: {error}')
4024
4025            await self.host.send_command(
4026                HCI_User_Passkey_Request_Negative_Reply_Command(
4027                    bd_addr=connection.peer_address
4028                )
4029            )
4030
4031        AsyncRunner.spawn(reply())
4032
4033    # [Classic only]
4034    @host_event_handler
4035    @with_connection_from_address
4036    def on_pin_code_request(self, connection):
4037        # Classic legacy pairing
4038        # Ask what the pairing config should be for this connection
4039        pairing_config = self.pairing_config_factory(connection)
4040        io_capability = pairing_config.delegate.classic_io_capability
4041
4042        # Respond
4043        if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
4044            # Ask the user to enter a string
4045            async def get_pin_code():
4046                pin_code = await connection.abort_on(
4047                    'disconnection', pairing_config.delegate.get_string(16)
4048                )
4049
4050                if pin_code is not None:
4051                    pin_code = bytes(pin_code, encoding='utf-8')
4052                    pin_code_len = len(pin_code)
4053                    assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes"
4054                    await self.host.send_command(
4055                        HCI_PIN_Code_Request_Reply_Command(
4056                            bd_addr=connection.peer_address,
4057                            pin_code_length=pin_code_len,
4058                            pin_code=pin_code,
4059                        )
4060                    )
4061                else:
4062                    logger.debug("delegate.get_string() returned None")
4063                    await self.host.send_command(
4064                        HCI_PIN_Code_Request_Negative_Reply_Command(
4065                            bd_addr=connection.peer_address
4066                        )
4067                    )
4068
4069            asyncio.create_task(get_pin_code())
4070        else:
4071            self.host.send_command_sync(
4072                HCI_PIN_Code_Request_Negative_Reply_Command(
4073                    bd_addr=connection.peer_address
4074                )
4075            )
4076
4077    # [Classic only]
4078    @host_event_handler
4079    @with_connection_from_address
4080    def on_authentication_user_passkey_notification(self, connection, passkey):
4081        # Ask what the pairing config should be for this connection
4082        pairing_config = self.pairing_config_factory(connection)
4083
4084        # Show the passkey to the user
4085        connection.abort_on(
4086            'disconnection', pairing_config.delegate.display_number(passkey)
4087        )
4088
4089    # [Classic only]
4090    @host_event_handler
4091    @try_with_connection_from_address
4092    def on_remote_name(self, connection: Connection, address, remote_name):
4093        # Try to decode the name
4094        try:
4095            remote_name = remote_name.decode('utf-8')
4096            if connection:
4097                connection.peer_name = remote_name
4098                connection.emit('remote_name')
4099            self.emit('remote_name', address, remote_name)
4100        except UnicodeDecodeError as error:
4101            logger.warning('peer name is not valid UTF-8')
4102            if connection:
4103                connection.emit('remote_name_failure', error)
4104            else:
4105                self.emit('remote_name_failure', address, error)
4106
4107    # [Classic only]
4108    @host_event_handler
4109    @try_with_connection_from_address
4110    def on_remote_name_failure(self, connection: Connection, address, error):
4111        if connection:
4112            connection.emit('remote_name_failure', error)
4113        self.emit('remote_name_failure', address, error)
4114
4115    # [Classic only]
4116    @host_event_handler
4117    @with_connection_from_address
4118    @experimental('Only for testing.')
4119    def on_sco_connection(
4120        self, acl_connection: Connection, sco_handle: int, link_type: int
4121    ) -> None:
4122        logger.debug(
4123            f'*** SCO connected: {acl_connection.peer_address}, '
4124            f'sco_handle=[0x{sco_handle:04X}], '
4125            f'link_type=[0x{link_type:02X}] ***'
4126        )
4127        sco_link = self.sco_links[sco_handle] = ScoLink(
4128            device=self,
4129            acl_connection=acl_connection,
4130            handle=sco_handle,
4131            link_type=link_type,
4132        )
4133        self.emit('sco_connection', sco_link)
4134
4135    # [Classic only]
4136    @host_event_handler
4137    @with_connection_from_address
4138    @experimental('Only for testing.')
4139    def on_sco_connection_failure(
4140        self, acl_connection: Connection, status: int
4141    ) -> None:
4142        logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
4143        self.emit('sco_connection_failure')
4144
4145    # [Classic only]
4146    @host_event_handler
4147    @experimental('Only for testing')
4148    def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
4149        if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink:
4150            sco_link.sink(packet)
4151
4152    # [LE only]
4153    @host_event_handler
4154    @with_connection_from_handle
4155    @experimental('Only for testing')
4156    def on_cis_request(
4157        self,
4158        acl_connection: Connection,
4159        cis_handle: int,
4160        cig_id: int,
4161        cis_id: int,
4162    ) -> None:
4163        logger.debug(
4164            f'*** CIS Request '
4165            f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
4166            f'cis_handle=[0x{cis_handle:04X}], '
4167            f'cig_id=[0x{cig_id:02X}], '
4168            f'cis_id=[0x{cis_id:02X}] ***'
4169        )
4170        # LE_CIS_Established event doesn't provide info, so we must store them here.
4171        self.cis_links[cis_handle] = CisLink(
4172            device=self,
4173            acl_connection=acl_connection,
4174            handle=cis_handle,
4175            cig_id=cig_id,
4176            cis_id=cis_id,
4177        )
4178        self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
4179
4180    # [LE only]
4181    @host_event_handler
4182    @experimental('Only for testing')
4183    def on_cis_establishment(self, cis_handle: int) -> None:
4184        cis_link = self.cis_links[cis_handle]
4185        cis_link.state = CisLink.State.ESTABLISHED
4186
4187        assert cis_link.acl_connection
4188
4189        logger.debug(
4190            f'*** CIS Establishment '
4191            f'{cis_link.acl_connection.peer_address}, '
4192            f'cis_handle=[0x{cis_handle:04X}], '
4193            f'cig_id=[0x{cis_link.cig_id:02X}], '
4194            f'cis_id=[0x{cis_link.cis_id:02X}] ***'
4195        )
4196
4197        cis_link.emit('establishment')
4198        self.emit('cis_establishment', cis_link)
4199
4200    # [LE only]
4201    @host_event_handler
4202    @experimental('Only for testing')
4203    def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
4204        logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
4205        if cis_link := self.cis_links.pop(cis_handle):
4206            cis_link.emit('establishment_failure', status)
4207        self.emit('cis_establishment_failure', cis_handle, status)
4208
4209    # [LE only]
4210    @host_event_handler
4211    @experimental('Only for testing')
4212    def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
4213        if (cis_link := self.cis_links.get(handle)) and cis_link.sink:
4214            cis_link.sink(packet)
4215
4216    @host_event_handler
4217    @with_connection_from_handle
4218    def on_connection_encryption_change(self, connection, encryption):
4219        logger.debug(
4220            f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
4221            f'{connection.peer_address} as {connection.role_name}, '
4222            f'encryption={encryption}'
4223        )
4224        connection.encryption = encryption
4225        if (
4226            not connection.authenticated
4227            and connection.transport == BT_BR_EDR_TRANSPORT
4228            and encryption == HCI_Encryption_Change_Event.AES_CCM
4229        ):
4230            connection.authenticated = True
4231            connection.sc = True
4232        if (
4233            not connection.authenticated
4234            and connection.transport == BT_LE_TRANSPORT
4235            and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
4236        ):
4237            connection.authenticated = True
4238            connection.sc = True
4239        connection.emit('connection_encryption_change')
4240
4241    @host_event_handler
4242    @with_connection_from_handle
4243    def on_connection_encryption_failure(self, connection, error):
4244        logger.debug(
4245            f'*** Connection Encryption Failure: [0x{connection.handle:04X}] '
4246            f'{connection.peer_address} as {connection.role_name}, '
4247            f'error={error}'
4248        )
4249        connection.emit('connection_encryption_failure', error)
4250
4251    @host_event_handler
4252    @with_connection_from_handle
4253    def on_connection_encryption_key_refresh(self, connection):
4254        logger.debug(
4255            f'*** Connection Key Refresh: [0x{connection.handle:04X}] '
4256            f'{connection.peer_address} as {connection.role_name}'
4257        )
4258        connection.emit('connection_encryption_key_refresh')
4259
4260    @host_event_handler
4261    @with_connection_from_handle
4262    def on_connection_parameters_update(self, connection, connection_parameters):
4263        logger.debug(
4264            f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
4265            f'{connection.peer_address} as {connection.role_name}, '
4266            f'{connection_parameters}'
4267        )
4268        connection.parameters = connection_parameters
4269        connection.emit('connection_parameters_update')
4270
4271    @host_event_handler
4272    @with_connection_from_handle
4273    def on_connection_parameters_update_failure(self, connection, error):
4274        logger.debug(
4275            f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] '
4276            f'{connection.peer_address} as {connection.role_name}, '
4277            f'error={error}'
4278        )
4279        connection.emit('connection_parameters_update_failure', error)
4280
4281    @host_event_handler
4282    @with_connection_from_handle
4283    def on_connection_phy_update(self, connection, connection_phy):
4284        logger.debug(
4285            f'*** Connection PHY Update: [0x{connection.handle:04X}] '
4286            f'{connection.peer_address} as {connection.role_name}, '
4287            f'{connection_phy}'
4288        )
4289        connection.phy = connection_phy
4290        connection.emit('connection_phy_update')
4291
4292    @host_event_handler
4293    @with_connection_from_handle
4294    def on_connection_phy_update_failure(self, connection, error):
4295        logger.debug(
4296            f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] '
4297            f'{connection.peer_address} as {connection.role_name}, '
4298            f'error={error}'
4299        )
4300        connection.emit('connection_phy_update_failure', error)
4301
4302    @host_event_handler
4303    @with_connection_from_handle
4304    def on_connection_att_mtu_update(self, connection, att_mtu):
4305        logger.debug(
4306            f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] '
4307            f'{connection.peer_address} as {connection.role_name}, '
4308            f'{att_mtu}'
4309        )
4310        connection.att_mtu = att_mtu
4311        connection.emit('connection_att_mtu_update')
4312
4313    @host_event_handler
4314    @with_connection_from_handle
4315    def on_connection_data_length_change(
4316        self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time
4317    ):
4318        logger.debug(
4319            f'*** Connection Data Length Change: [0x{connection.handle:04X}] '
4320            f'{connection.peer_address} as {connection.role_name}'
4321        )
4322        connection.data_length = (
4323            max_tx_octets,
4324            max_tx_time,
4325            max_rx_octets,
4326            max_rx_time,
4327        )
4328        connection.emit('connection_data_length_change')
4329
4330    # [Classic only]
4331    @host_event_handler
4332    @with_connection_from_address
4333    def on_role_change(self, connection, new_role):
4334        connection.role = new_role
4335        connection.emit('role_change', new_role)
4336
4337    # [Classic only]
4338    @host_event_handler
4339    @try_with_connection_from_address
4340    def on_role_change_failure(self, connection, address, error):
4341        if connection:
4342            connection.emit('role_change_failure', error)
4343        self.emit('role_change_failure', address, error)
4344
4345    # [Classic only]
4346    @host_event_handler
4347    @with_connection_from_address
4348    def on_classic_pairing(self, connection: Connection) -> None:
4349        connection.emit('classic_pairing')
4350
4351    # [Classic only]
4352    @host_event_handler
4353    @with_connection_from_address
4354    def on_classic_pairing_failure(self, connection: Connection, status) -> None:
4355        connection.emit('classic_pairing_failure', status)
4356
4357    def on_pairing_start(self, connection: Connection) -> None:
4358        connection.emit('pairing_start')
4359
4360    def on_pairing(
4361        self,
4362        connection: Connection,
4363        identity_address: Optional[Address],
4364        keys: PairingKeys,
4365        sc: bool,
4366    ) -> None:
4367        if identity_address is not None:
4368            connection.peer_resolvable_address = connection.peer_address
4369            connection.peer_address = identity_address
4370        connection.sc = sc
4371        connection.authenticated = True
4372        connection.emit('pairing', keys)
4373
4374    def on_pairing_failure(self, connection: Connection, reason: int) -> None:
4375        connection.emit('pairing_failure', reason)
4376
4377    @with_connection_from_handle
4378    def on_gatt_pdu(self, connection, pdu):
4379        # Parse the L2CAP payload into an ATT PDU object
4380        att_pdu = ATT_PDU.from_bytes(pdu)
4381
4382        # Conveniently, even-numbered op codes are client->server and
4383        # odd-numbered ones are server->client
4384        if att_pdu.op_code & 1:
4385            if connection.gatt_client is None:
4386                logger.warning(
4387                    color('no GATT client for connection 0x{connection_handle:04X}')
4388                )
4389                return
4390            connection.gatt_client.on_gatt_pdu(att_pdu)
4391        else:
4392            if connection.gatt_server is None:
4393                logger.warning(
4394                    color('no GATT server for connection 0x{connection_handle:04X}')
4395                )
4396                return
4397            connection.gatt_server.on_gatt_pdu(connection, att_pdu)
4398
4399    @with_connection_from_handle
4400    def on_smp_pdu(self, connection, pdu):
4401        self.smp_manager.on_smp_pdu(connection, pdu)
4402
4403    @host_event_handler
4404    @with_connection_from_handle
4405    def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes):
4406        self.l2cap_channel_manager.on_pdu(connection, cid, pdu)
4407
4408    def __str__(self):
4409        return (
4410            f'Device(name="{self.name}", '
4411            f'random_address="{self.random_address}", '
4412            f'public_address="{self.public_address}")'
4413        )
4414