• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19from enum import IntEnum
20import functools
21import json
22import asyncio
23import logging
24from contextlib import asynccontextmanager, AsyncExitStack
25from dataclasses import dataclass
26from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union
27
28from .colors import color
29from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
30from .gatt import Characteristic, Descriptor, Service
31from .hci import (
32    HCI_CENTRAL_ROLE,
33    HCI_COMMAND_STATUS_PENDING,
34    HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
35    HCI_DISPLAY_ONLY_IO_CAPABILITY,
36    HCI_DISPLAY_YES_NO_IO_CAPABILITY,
37    HCI_EXTENDED_INQUIRY_MODE,
38    HCI_GENERAL_INQUIRY_LAP,
39    HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
40    HCI_KEYBOARD_ONLY_IO_CAPABILITY,
41    HCI_LE_1M_PHY,
42    HCI_LE_1M_PHY_BIT,
43    HCI_LE_2M_PHY,
44    HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
45    HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
46    HCI_LE_CODED_PHY,
47    HCI_LE_CODED_PHY_BIT,
48    HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
49    HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE,
50    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
51    HCI_LE_RAND_COMMAND,
52    HCI_LE_READ_PHY_COMMAND,
53    HCI_LE_SET_PHY_COMMAND,
54    HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
55    HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
56    HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
57    HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
58    HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
59    HCI_R2_PAGE_SCAN_REPETITION_MODE,
60    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
61    HCI_SUCCESS,
62    HCI_WRITE_LE_HOST_SUPPORT_COMMAND,
63    Address,
64    HCI_Accept_Connection_Request_Command,
65    HCI_Authentication_Requested_Command,
66    HCI_Command_Status_Event,
67    HCI_Constant,
68    HCI_Create_Connection_Cancel_Command,
69    HCI_Create_Connection_Command,
70    HCI_Disconnect_Command,
71    HCI_Encryption_Change_Event,
72    HCI_Error,
73    HCI_IO_Capability_Request_Reply_Command,
74    HCI_Inquiry_Cancel_Command,
75    HCI_Inquiry_Command,
76    HCI_LE_Add_Device_To_Resolving_List_Command,
77    HCI_LE_Advertising_Report_Event,
78    HCI_LE_Clear_Resolving_List_Command,
79    HCI_LE_Connection_Update_Command,
80    HCI_LE_Create_Connection_Cancel_Command,
81    HCI_LE_Create_Connection_Command,
82    HCI_LE_Enable_Encryption_Command,
83    HCI_LE_Extended_Advertising_Report_Event,
84    HCI_LE_Extended_Create_Connection_Command,
85    HCI_LE_Rand_Command,
86    HCI_LE_Read_PHY_Command,
87    HCI_LE_Set_Advertising_Data_Command,
88    HCI_LE_Set_Advertising_Enable_Command,
89    HCI_LE_Set_Advertising_Parameters_Command,
90    HCI_LE_Set_Default_PHY_Command,
91    HCI_LE_Set_Extended_Scan_Enable_Command,
92    HCI_LE_Set_Extended_Scan_Parameters_Command,
93    HCI_LE_Set_PHY_Command,
94    HCI_LE_Set_Random_Address_Command,
95    HCI_LE_Set_Scan_Enable_Command,
96    HCI_LE_Set_Scan_Parameters_Command,
97    HCI_LE_Set_Scan_Response_Data_Command,
98    HCI_PIN_Code_Request_Reply_Command,
99    HCI_PIN_Code_Request_Negative_Reply_Command,
100    HCI_Read_BD_ADDR_Command,
101    HCI_Read_RSSI_Command,
102    HCI_Reject_Connection_Request_Command,
103    HCI_Remote_Name_Request_Command,
104    HCI_Switch_Role_Command,
105    HCI_Set_Connection_Encryption_Command,
106    HCI_StatusError,
107    HCI_User_Confirmation_Request_Negative_Reply_Command,
108    HCI_User_Confirmation_Request_Reply_Command,
109    HCI_User_Passkey_Request_Negative_Reply_Command,
110    HCI_User_Passkey_Request_Reply_Command,
111    HCI_Write_Class_Of_Device_Command,
112    HCI_Write_Extended_Inquiry_Response_Command,
113    HCI_Write_Inquiry_Mode_Command,
114    HCI_Write_LE_Host_Support_Command,
115    HCI_Write_Local_Name_Command,
116    HCI_Write_Scan_Enable_Command,
117    HCI_Write_Secure_Connections_Host_Support_Command,
118    HCI_Write_Simple_Pairing_Mode_Command,
119    OwnAddressType,
120    phy_list_to_bits,
121)
122from .host import Host
123from .gap import GenericAccessService
124from .core import (
125    BT_BR_EDR_TRANSPORT,
126    BT_CENTRAL_ROLE,
127    BT_LE_TRANSPORT,
128    BT_PERIPHERAL_ROLE,
129    AdvertisingData,
130    CommandTimeoutError,
131    ConnectionPHY,
132    InvalidStateError,
133)
134from .utils import (
135    AsyncRunner,
136    CompositeEventEmitter,
137    setup_event_forwarding,
138    composite_listener,
139)
140from .keys import (
141    KeyStore,
142    PairingKeys,
143)
144from . import gatt_client
145from . import gatt_server
146from . import smp
147from . import sdp
148from . import l2cap
149from . import core
150
151
152# -----------------------------------------------------------------------------
153# Logging
154# -----------------------------------------------------------------------------
155logger = logging.getLogger(__name__)
156
157# -----------------------------------------------------------------------------
158# Constants
159# -----------------------------------------------------------------------------
160# fmt: off
161# pylint: disable=line-too-long
162
163DEVICE_MIN_SCAN_INTERVAL                      = 25
164DEVICE_MAX_SCAN_INTERVAL                      = 10240
165DEVICE_MIN_SCAN_WINDOW                        = 25
166DEVICE_MAX_SCAN_WINDOW                        = 10240
167DEVICE_MIN_LE_RSSI                            = -127
168DEVICE_MAX_LE_RSSI                            = 20
169
170DEVICE_DEFAULT_ADDRESS                        = '00:00:00:00:00:00'
171DEVICE_DEFAULT_ADVERTISING_INTERVAL           = 1000  # ms
172DEVICE_DEFAULT_ADVERTISING_DATA               = ''
173DEVICE_DEFAULT_NAME                           = 'Bumble'
174DEVICE_DEFAULT_INQUIRY_LENGTH                 = 8  # 10.24 seconds
175DEVICE_DEFAULT_CLASS_OF_DEVICE                = 0
176DEVICE_DEFAULT_SCAN_RESPONSE_DATA             = b''
177DEVICE_DEFAULT_DATA_LENGTH                    = (27, 328, 27, 328)
178DEVICE_DEFAULT_SCAN_INTERVAL                  = 60  # ms
179DEVICE_DEFAULT_SCAN_WINDOW                    = 60  # ms
180DEVICE_DEFAULT_CONNECT_TIMEOUT                = None  # No timeout
181DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL          = 60  # ms
182DEVICE_DEFAULT_CONNECT_SCAN_WINDOW            = 60  # ms
183DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN        = 15  # ms
184DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX        = 30  # ms
185DEVICE_DEFAULT_CONNECTION_MAX_LATENCY         = 0
186DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720  # ms
187DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH       = 0   # ms
188DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH       = 0   # ms
189DEVICE_DEFAULT_L2CAP_COC_MTU                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
190DEVICE_DEFAULT_L2CAP_COC_MPS                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
191DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS          = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
192
193# fmt: on
194# pylint: enable=line-too-long
195
196
197# -----------------------------------------------------------------------------
198# Classes
199# -----------------------------------------------------------------------------
200
201# -----------------------------------------------------------------------------
202class Advertisement:
203    address: Address
204
205    TX_POWER_NOT_AVAILABLE = (
206        HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
207    )
208    RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
209
210    @classmethod
211    def from_advertising_report(cls, report):
212        if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
213            return LegacyAdvertisement.from_advertising_report(report)
214
215        if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report):
216            return ExtendedAdvertisement.from_advertising_report(report)
217
218        return None
219
220    # pylint: disable=line-too-long
221    def __init__(
222        self,
223        address,
224        rssi=HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE,
225        is_legacy=False,
226        is_anonymous=False,
227        is_connectable=False,
228        is_directed=False,
229        is_scannable=False,
230        is_scan_response=False,
231        is_complete=True,
232        is_truncated=False,
233        primary_phy=0,
234        secondary_phy=0,
235        tx_power=HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
236        sid=0,
237        data=b'',
238    ):
239        self.address = address
240        self.rssi = rssi
241        self.is_legacy = is_legacy
242        self.is_anonymous = is_anonymous
243        self.is_connectable = is_connectable
244        self.is_directed = is_directed
245        self.is_scannable = is_scannable
246        self.is_scan_response = is_scan_response
247        self.is_complete = is_complete
248        self.is_truncated = is_truncated
249        self.primary_phy = primary_phy
250        self.secondary_phy = secondary_phy
251        self.tx_power = tx_power
252        self.sid = sid
253        self.data = AdvertisingData.from_bytes(data)
254
255
256# -----------------------------------------------------------------------------
257class LegacyAdvertisement(Advertisement):
258    @classmethod
259    def from_advertising_report(cls, report):
260        return cls(
261            address=report.address,
262            rssi=report.rssi,
263            is_legacy=True,
264            is_connectable=report.event_type
265            in (
266                HCI_LE_Advertising_Report_Event.ADV_IND,
267                HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
268            ),
269            is_directed=report.event_type
270            == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
271            is_scannable=report.event_type
272            in (
273                HCI_LE_Advertising_Report_Event.ADV_IND,
274                HCI_LE_Advertising_Report_Event.ADV_SCAN_IND,
275            ),
276            is_scan_response=report.event_type
277            == HCI_LE_Advertising_Report_Event.SCAN_RSP,
278            data=report.data,
279        )
280
281
282# -----------------------------------------------------------------------------
283class ExtendedAdvertisement(Advertisement):
284    @classmethod
285    def from_advertising_report(cls, report):
286        # fmt: off
287        # pylint: disable=line-too-long
288        return cls(
289            address          = report.address,
290            rssi             = report.rssi,
291            is_legacy        = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0,
292            is_anonymous     = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
293            is_connectable   = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0,
294            is_directed      = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
295            is_scannable     = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
296            is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
297            is_complete      = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
298            is_truncated     = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
299            primary_phy      = report.primary_phy,
300            secondary_phy    = report.secondary_phy,
301            tx_power         = report.tx_power,
302            sid              = report.advertising_sid,
303            data             = report.data
304        )
305        # fmt: on
306
307
308# -----------------------------------------------------------------------------
309class AdvertisementDataAccumulator:
310    def __init__(self, passive=False):
311        self.passive = passive
312        self.last_advertisement = None
313        self.last_data = b''
314
315    def update(self, report):
316        advertisement = Advertisement.from_advertising_report(report)
317        if advertisement is None:
318            return None
319
320        result = None
321
322        if advertisement.is_scan_response:
323            if (
324                self.last_advertisement is not None
325                and not self.last_advertisement.is_scan_response
326            ):
327                # This is the response to a scannable advertisement
328                result = Advertisement.from_advertising_report(report)
329                result.is_connectable = self.last_advertisement.is_connectable
330                result.is_scannable = True
331                result.data = AdvertisingData.from_bytes(self.last_data + report.data)
332            self.last_data = b''
333        else:
334            if (
335                self.passive
336                or (not advertisement.is_scannable)
337                or (
338                    self.last_advertisement is not None
339                    and not self.last_advertisement.is_scan_response
340                )
341            ):
342                # Don't wait for a scan response
343                result = Advertisement.from_advertising_report(report)
344
345            self.last_data = report.data
346
347        self.last_advertisement = advertisement
348
349        return result
350
351
352# -----------------------------------------------------------------------------
353class AdvertisingType(IntEnum):
354    # fmt: off
355    # pylint: disable=line-too-long
356    UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00  # Undirected, connectable,     scannable
357    DIRECTED_CONNECTABLE_HIGH_DUTY   = 0x01  # Directed,   connectable,     non-scannable
358    UNDIRECTED_SCANNABLE             = 0x02  # Undirected, non-connectable, scannable
359    UNDIRECTED                       = 0x03  # Undirected, non-connectable, non-scannable
360    DIRECTED_CONNECTABLE_LOW_DUTY    = 0x04  # Directed,   connectable,     non-scannable
361    # fmt: on
362
363    @property
364    def has_data(self):
365        return self in (
366            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
367            AdvertisingType.UNDIRECTED_SCANNABLE,
368            AdvertisingType.UNDIRECTED,
369        )
370
371    @property
372    def is_connectable(self):
373        return self in (
374            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
375            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
376            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
377        )
378
379    @property
380    def is_scannable(self):
381        return self in (
382            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
383            AdvertisingType.UNDIRECTED_SCANNABLE,
384        )
385
386    @property
387    def is_directed(self):
388        return self in (
389            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
390            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
391        )
392
393
394# -----------------------------------------------------------------------------
395class LePhyOptions:
396    # Coded PHY preference
397    ANY_CODED_PHY = 0
398    PREFER_S_2_CODED_PHY = 1
399    PREFER_S_8_CODED_PHY = 2
400
401    def __init__(self, coded_phy_preference=0):
402        self.coded_phy_preference = coded_phy_preference
403
404    def __int__(self):
405        return self.coded_phy_preference & 3
406
407
408# -----------------------------------------------------------------------------
409class Peer:
410    def __init__(self, connection):
411        self.connection = connection
412
413        # Create a GATT client for the connection
414        self.gatt_client = gatt_client.Client(connection)
415        connection.gatt_client = self.gatt_client
416
417    @property
418    def services(self):
419        return self.gatt_client.services
420
421    async def request_mtu(self, mtu):
422        mtu = await self.gatt_client.request_mtu(mtu)
423        self.connection.emit('connection_att_mtu_update')
424        return mtu
425
426    async def discover_service(self, uuid):
427        return await self.gatt_client.discover_service(uuid)
428
429    async def discover_services(self, uuids=()):
430        return await self.gatt_client.discover_services(uuids)
431
432    async def discover_included_services(self, service):
433        return await self.gatt_client.discover_included_services(service)
434
435    async def discover_characteristics(self, uuids=(), service=None):
436        return await self.gatt_client.discover_characteristics(
437            uuids=uuids, service=service
438        )
439
440    async def discover_descriptors(
441        self, characteristic=None, start_handle=None, end_handle=None
442    ):
443        return await self.gatt_client.discover_descriptors(
444            characteristic, start_handle, end_handle
445        )
446
447    async def discover_attributes(self):
448        return await self.gatt_client.discover_attributes()
449
450    async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
451        return await self.gatt_client.subscribe(
452            characteristic, subscriber, prefer_notify
453        )
454
455    async def unsubscribe(self, characteristic, subscriber=None):
456        return await self.gatt_client.unsubscribe(characteristic, subscriber)
457
458    async def read_value(self, attribute):
459        return await self.gatt_client.read_value(attribute)
460
461    async def write_value(self, attribute, value, with_response=False):
462        return await self.gatt_client.write_value(attribute, value, with_response)
463
464    async def read_characteristics_by_uuid(self, uuid, service=None):
465        return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
466
467    def get_services_by_uuid(self, uuid):
468        return self.gatt_client.get_services_by_uuid(uuid)
469
470    def get_characteristics_by_uuid(self, uuid, service=None):
471        return self.gatt_client.get_characteristics_by_uuid(uuid, service)
472
473    def create_service_proxy(self, proxy_class):
474        return proxy_class.from_client(self.gatt_client)
475
476    async def discover_service_and_create_proxy(self, proxy_class):
477        # Discover the first matching service and its characteristics
478        services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
479        if services:
480            service = services[0]
481            await service.discover_characteristics()
482            return self.create_service_proxy(proxy_class)
483
484    async def sustain(self, timeout=None):
485        await self.connection.sustain(timeout)
486
487    # [Classic only]
488    async def request_name(self):
489        return await self.connection.request_remote_name()
490
491    async def __aenter__(self):
492        await self.discover_services()
493        for service in self.services:
494            await service.discover_characteristics()
495
496        return self
497
498    async def __aexit__(self, exc_type, exc_value, traceback):
499        pass
500
501    def __str__(self):
502        return f'{self.connection.peer_address} as {self.connection.role_name}'
503
504
505# -----------------------------------------------------------------------------
506@dataclass
507class ConnectionParametersPreferences:
508    default: ClassVar[ConnectionParametersPreferences]
509    connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
510    connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
511    max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
512    supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
513    min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
514    max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
515
516
517ConnectionParametersPreferences.default = ConnectionParametersPreferences()
518
519
520# -----------------------------------------------------------------------------
521class Connection(CompositeEventEmitter):
522    device: Device
523    handle: int
524    transport: int
525    self_address: Address
526    peer_address: Address
527    role: int
528    encryption: int
529    authenticated: bool
530    sc: bool
531    link_key_type: int
532
533    @composite_listener
534    class Listener:
535        def on_disconnection(self, reason):
536            pass
537
538        def on_connection_parameters_update(self):
539            pass
540
541        def on_connection_parameters_update_failure(self, error):
542            pass
543
544        def on_connection_data_length_change(self):
545            pass
546
547        def on_connection_phy_update(self):
548            pass
549
550        def on_connection_phy_update_failure(self, error):
551            pass
552
553        def on_connection_att_mtu_update(self):
554            pass
555
556        def on_connection_encryption_change(self):
557            pass
558
559        def on_connection_encryption_key_refresh(self):
560            pass
561
562    def __init__(
563        self,
564        device,
565        handle,
566        transport,
567        self_address,
568        peer_address,
569        peer_resolvable_address,
570        role,
571        parameters,
572        phy,
573    ):
574        super().__init__()
575        self.device = device
576        self.handle = handle
577        self.transport = transport
578        self.self_address = self_address
579        self.peer_address = peer_address
580        self.peer_resolvable_address = peer_resolvable_address
581        self.peer_name = None  # Classic only
582        self.role = role
583        self.parameters = parameters
584        self.encryption = 0
585        self.authenticated = False
586        self.sc = False
587        self.link_key_type = None
588        self.phy = phy
589        self.att_mtu = ATT_DEFAULT_MTU
590        self.data_length = DEVICE_DEFAULT_DATA_LENGTH
591        self.gatt_client = None  # Per-connection client
592        self.gatt_server = (
593            device.gatt_server
594        )  # By default, use the device's shared server
595
596    # [Classic only]
597    @classmethod
598    def incomplete(cls, device, peer_address):
599        """
600        Instantiate an incomplete connection (ie. one waiting for a HCI Connection
601        Complete event).
602        Once received it shall be completed using the `.complete` method.
603        """
604        return cls(
605            device,
606            None,
607            BT_BR_EDR_TRANSPORT,
608            device.public_address,
609            peer_address,
610            None,
611            None,
612            None,
613            None,
614        )
615
616    # [Classic only]
617    def complete(self, handle, peer_resolvable_address, role, parameters):
618        """
619        Finish an incomplete connection upon completion.
620        """
621        assert self.handle is None
622        assert self.transport == BT_BR_EDR_TRANSPORT
623        self.handle = handle
624        self.peer_resolvable_address = peer_resolvable_address
625        # Quirk: role might be known before complete
626        if self.role is None:
627            self.role = role
628        self.parameters = parameters
629
630    @property
631    def role_name(self):
632        return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
633
634    @property
635    def is_encrypted(self):
636        return self.encryption != 0
637
638    @property
639    def is_incomplete(self) -> bool:
640        return self.handle == None
641
642    def send_l2cap_pdu(self, cid, pdu):
643        self.device.send_l2cap_pdu(self.handle, cid, pdu)
644
645    def create_l2cap_connector(self, psm):
646        return self.device.create_l2cap_connector(self, psm)
647
648    async def open_l2cap_channel(
649        self,
650        psm,
651        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
652        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
653        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
654    ):
655        return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps)
656
657    async def disconnect(
658        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
659    ) -> None:
660        await self.device.disconnect(self, reason)
661
662    async def pair(self) -> None:
663        return await self.device.pair(self)
664
665    def request_pairing(self) -> None:
666        return self.device.request_pairing(self)
667
668    # [Classic only]
669    async def authenticate(self) -> None:
670        return await self.device.authenticate(self)
671
672    async def encrypt(self, enable: bool = True) -> None:
673        return await self.device.encrypt(self, enable)
674
675    async def switch_role(self, role: int) -> None:
676        return await self.device.switch_role(self, role)
677
678    async def sustain(self, timeout=None):
679        """Idles the current task waiting for a disconnect or timeout"""
680
681        abort = asyncio.get_running_loop().create_future()
682        self.on('disconnection', abort.set_result)
683        self.on('disconnection_failure', abort.set_exception)
684
685        try:
686            await asyncio.wait_for(self.device.abort_on('flush', abort), timeout)
687        except asyncio.TimeoutError:
688            pass
689
690        self.remove_listener('disconnection', abort.set_result)
691        self.remove_listener('disconnection_failure', abort.set_exception)
692
693    async def update_parameters(
694        self,
695        connection_interval_min,
696        connection_interval_max,
697        max_latency,
698        supervision_timeout,
699    ):
700        return await self.device.update_connection_parameters(
701            self,
702            connection_interval_min,
703            connection_interval_max,
704            max_latency,
705            supervision_timeout,
706        )
707
708    async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
709        return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
710
711    async def get_rssi(self):
712        return await self.device.get_connection_rssi(self)
713
714    async def get_phy(self):
715        return await self.device.get_connection_phy(self)
716
717    # [Classic only]
718    async def request_remote_name(self):
719        return await self.device.request_remote_name(self)
720
721    async def __aenter__(self):
722        return self
723
724    async def __aexit__(self, exc_type, exc_value, traceback):
725        if exc_type is None:
726            try:
727                await self.disconnect()
728            except HCI_StatusError as error:
729                # Invalid parameter means the connection is no longer valid
730                if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR:
731                    raise
732
733    def __str__(self):
734        return (
735            f'Connection(handle=0x{self.handle:04X}, '
736            f'role={self.role_name}, '
737            f'address={self.peer_address})'
738        )
739
740
741# -----------------------------------------------------------------------------
742class DeviceConfiguration:
743    def __init__(self) -> None:
744        # Setup defaults
745        self.name = DEVICE_DEFAULT_NAME
746        self.address = Address(DEVICE_DEFAULT_ADDRESS)
747        self.class_of_device = DEVICE_DEFAULT_CLASS_OF_DEVICE
748        self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
749        self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
750        self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
751        self.le_enabled = True
752        # LE host enable 2nd parameter
753        self.le_simultaneous_enabled = True
754        self.classic_enabled = False
755        self.classic_sc_enabled = True
756        self.classic_ssp_enabled = True
757        self.classic_accept_any = True
758        self.connectable = True
759        self.discoverable = True
760        self.advertising_data = bytes(
761            AdvertisingData(
762                [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]
763            )
764        )
765        self.irk = bytes(16)  # This really must be changed for any level of security
766        self.keystore = None
767        self.gatt_services: List[Dict[str, Any]] = []
768
769    def load_from_dict(self, config: Dict[str, Any]) -> None:
770        # Load simple properties
771        self.name = config.get('name', self.name)
772        if address := config.get('address', None):
773            self.address = Address(address)
774        self.class_of_device = config.get('class_of_device', self.class_of_device)
775        self.advertising_interval_min = config.get(
776            'advertising_interval', self.advertising_interval_min
777        )
778        self.advertising_interval_max = self.advertising_interval_min
779        self.keystore = config.get('keystore')
780        self.le_enabled = config.get('le_enabled', self.le_enabled)
781        self.le_simultaneous_enabled = config.get(
782            'le_simultaneous_enabled', self.le_simultaneous_enabled
783        )
784        self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
785        self.classic_sc_enabled = config.get(
786            'classic_sc_enabled', self.classic_sc_enabled
787        )
788        self.classic_ssp_enabled = config.get(
789            'classic_ssp_enabled', self.classic_ssp_enabled
790        )
791        self.classic_accept_any = config.get(
792            'classic_accept_any', self.classic_accept_any
793        )
794        self.connectable = config.get('connectable', self.connectable)
795        self.discoverable = config.get('discoverable', self.discoverable)
796        self.gatt_services = config.get('gatt_services', self.gatt_services)
797
798        # Load or synthesize an IRK
799        irk = config.get('irk')
800        if irk:
801            self.irk = bytes.fromhex(irk)
802        else:
803            # Construct an IRK from the address bytes
804            # NOTE: this is not secure, but will always give the same IRK for the same
805            # address
806            address_bytes = bytes(self.address)
807            self.irk = (address_bytes * 3)[:16]
808
809        # Load advertising data
810        advertising_data = config.get('advertising_data')
811        if advertising_data:
812            self.advertising_data = bytes.fromhex(advertising_data)
813
814    def load_from_file(self, filename):
815        with open(filename, 'r', encoding='utf-8') as file:
816            self.load_from_dict(json.load(file))
817
818
819# -----------------------------------------------------------------------------
820# Decorators used with the following Device class
821# (we define them outside of the Device class, because defining decorators
822#  within a class requires unnecessarily complicated acrobatics)
823# -----------------------------------------------------------------------------
824
825
826# Decorator that converts the first argument from a connection handle to a connection
827def with_connection_from_handle(function):
828    @functools.wraps(function)
829    def wrapper(self, connection_handle, *args, **kwargs):
830        if (connection := self.lookup_connection(connection_handle)) is None:
831            raise ValueError(f"no connection for handle: 0x{connection_handle:04x}")
832        return function(self, connection, *args, **kwargs)
833
834    return wrapper
835
836
837# Decorator that converts the first argument from a bluetooth address to a connection
838def with_connection_from_address(function):
839    @functools.wraps(function)
840    def wrapper(self, address, *args, **kwargs):
841        if connection := self.pending_connections.get(address, False):
842            return function(self, connection, *args, **kwargs)
843        for connection in self.connections.values():
844            if connection.peer_address == address:
845                return function(self, connection, *args, **kwargs)
846        raise ValueError('no connection for address')
847
848    return wrapper
849
850
851# Decorator that tries to convert the first argument from a bluetooth address to a
852# connection
853def try_with_connection_from_address(function):
854    @functools.wraps(function)
855    def wrapper(self, address, *args, **kwargs):
856        if connection := self.pending_connections.get(address, False):
857            return function(self, connection, address, *args, **kwargs)
858        for connection in self.connections.values():
859            if connection.peer_address == address:
860                return function(self, connection, address, *args, **kwargs)
861        return function(self, None, address, *args, **kwargs)
862
863    return wrapper
864
865
866# Decorator that adds a method to the list of event handlers for host events.
867# This assumes that the method name starts with `on_`
868def host_event_handler(function):
869    device_host_event_handlers.append(function.__name__[3:])
870    return function
871
872
873# List of host event handlers for the Device class.
874# (we define this list outside the class, because referencing a class in method
875#  decorators is not straightforward)
876device_host_event_handlers: list[str] = []
877
878
879# -----------------------------------------------------------------------------
880class Device(CompositeEventEmitter):
881    # incomplete list of fields.
882    random_address: Address
883    public_address: Address
884    classic_enabled: bool
885    name: str
886    class_of_device: int
887    gatt_server: gatt_server.Server
888    advertising_data: bytes
889    scan_response_data: bytes
890    connections: Dict[int, Connection]
891    pending_connections: Dict[Address, Connection]
892    classic_pending_accepts: Dict[
893        Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
894    ]
895    advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
896
897    @composite_listener
898    class Listener:
899        def on_advertisement(self, advertisement):
900            pass
901
902        def on_inquiry_result(self, address, class_of_device, data, rssi):
903            pass
904
905        def on_connection(self, connection):
906            pass
907
908        def on_connection_failure(self, error):
909            pass
910
911        def on_connection_request(self, bd_addr, class_of_device, link_type):
912            pass
913
914        def on_characteristic_subscription(
915            self, connection, characteristic, notify_enabled, indicate_enabled
916        ):
917            pass
918
919    @classmethod
920    def with_hci(cls, name, address, hci_source, hci_sink):
921        '''
922        Create a Device instance with a Host configured to communicate with a controller
923        through an HCI source/sink
924        '''
925        host = Host(controller_source=hci_source, controller_sink=hci_sink)
926        return cls(name=name, address=address, host=host)
927
928    @classmethod
929    def from_config_file(cls, filename):
930        config = DeviceConfiguration()
931        config.load_from_file(filename)
932        return cls(config=config)
933
934    @classmethod
935    def from_config_file_with_hci(cls, filename, hci_source, hci_sink):
936        config = DeviceConfiguration()
937        config.load_from_file(filename)
938        host = Host(controller_source=hci_source, controller_sink=hci_sink)
939        return cls(config=config, host=host)
940
941    def __init__(
942        self,
943        name: Optional[str] = None,
944        address: Optional[Address] = None,
945        config: Optional[DeviceConfiguration] = None,
946        host: Optional[Host] = None,
947        generic_access_service: bool = True,
948    ) -> None:
949        super().__init__()
950
951        self._host = None
952        self.powered_on = False
953        self.advertising = False
954        self.advertising_type = None
955        self.auto_restart_inquiry = True
956        self.auto_restart_advertising = False
957        self.command_timeout = 10  # seconds
958        self.gatt_server = gatt_server.Server(self)
959        self.sdp_server = sdp.Server(self)
960        self.l2cap_channel_manager = l2cap.ChannelManager(
961            [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
962        )
963        self.advertisement_accumulators = {}  # Accumulators, by address
964        self.scanning = False
965        self.scanning_is_passive = False
966        self.discovering = False
967        self.le_connecting = False
968        self.disconnecting = False
969        self.connections = {}  # Connections, by connection handle
970        self.pending_connections = {}  # Connections, by BD address (BR/EDR only)
971        self.classic_enabled = False
972        self.inquiry_response = None
973        self.address_resolver = None
974        self.classic_pending_accepts = {
975            Address.ANY: []
976        }  # Futures, by BD address OR [Futures] for Address.ANY
977
978        # Own address type cache
979        self.advertising_own_address_type = None
980        self.connect_own_address_type = None
981
982        # Use the initial config or a default
983        self.public_address = Address('00:00:00:00:00:00')
984        if config is None:
985            config = DeviceConfiguration()
986        self.name = config.name
987        self.random_address = config.address
988        self.class_of_device = config.class_of_device
989        self.scan_response_data = config.scan_response_data
990        self.advertising_data = config.advertising_data
991        self.advertising_interval_min = config.advertising_interval_min
992        self.advertising_interval_max = config.advertising_interval_max
993        self.keystore = KeyStore.create_for_device(config)
994        self.irk = config.irk
995        self.le_enabled = config.le_enabled
996        self.classic_enabled = config.classic_enabled
997        self.le_simultaneous_enabled = config.le_simultaneous_enabled
998        self.classic_ssp_enabled = config.classic_ssp_enabled
999        self.classic_sc_enabled = config.classic_sc_enabled
1000        self.discoverable = config.discoverable
1001        self.connectable = config.connectable
1002        self.classic_accept_any = config.classic_accept_any
1003
1004        for service in config.gatt_services:
1005            characteristics = []
1006            for characteristic in service.get("characteristics", []):
1007                descriptors = []
1008                for descriptor in characteristic.get("descriptors", []):
1009                    # Leave this check until 5/25/2023
1010                    if descriptor.get("permission", False):
1011                        raise Exception(
1012                            "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'"
1013                        )
1014                    new_descriptor = Descriptor(
1015                        attribute_type=descriptor["descriptor_type"],
1016                        permissions=descriptor["permissions"],
1017                    )
1018                    descriptors.append(new_descriptor)
1019                new_characteristic = Characteristic(
1020                    uuid=characteristic["uuid"],
1021                    properties=characteristic["properties"],
1022                    permissions=characteristic["permissions"],
1023                    descriptors=descriptors,
1024                )
1025                characteristics.append(new_characteristic)
1026            new_service = Service(uuid=service["uuid"], characteristics=characteristics)
1027            self.gatt_server.add_service(new_service)
1028
1029        # If a name is passed, override the name from the config
1030        if name:
1031            self.name = name
1032
1033        # If an address is passed, override the address from the config
1034        if address:
1035            if isinstance(address, str):
1036                address = Address(address)
1037            self.random_address = address
1038
1039        # Setup SMP
1040        self.smp_manager = smp.Manager(self)
1041        self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
1042        self.l2cap_channel_manager.register_fixed_channel(
1043            smp.SMP_BR_CID, self.on_smp_pdu
1044        )
1045
1046        # Register the SDP server with the L2CAP Channel Manager
1047        self.sdp_server.register(self.l2cap_channel_manager)
1048
1049        self.add_default_services(generic_access_service)
1050        self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
1051
1052        # Forward some events
1053        setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
1054
1055        # Set the initial host
1056        if host:
1057            self.host = host
1058
1059    @property
1060    def host(self) -> Host:
1061        assert self._host
1062        return self._host
1063
1064    @host.setter
1065    def host(self, host):
1066        # Unsubscribe from events from the current host
1067        if self._host:
1068            for event_name in device_host_event_handlers:
1069                self._host.remove_listener(
1070                    event_name, getattr(self, f'on_{event_name}')
1071                )
1072
1073        # Subscribe to events from the new host
1074        if host:
1075            for event_name in device_host_event_handlers:
1076                host.on(event_name, getattr(self, f'on_{event_name}'))
1077
1078        # Update the references to the new host
1079        self._host = host
1080        self.l2cap_channel_manager.host = host
1081
1082        # Set providers for the new host
1083        if host:
1084            host.long_term_key_provider = self.get_long_term_key
1085            host.link_key_provider = self.get_link_key
1086
1087    @property
1088    def sdp_service_records(self):
1089        return self.sdp_server.service_records
1090
1091    @sdp_service_records.setter
1092    def sdp_service_records(self, service_records):
1093        self.sdp_server.service_records = service_records
1094
1095    def lookup_connection(self, connection_handle: int) -> Optional[Connection]:
1096        if connection := self.connections.get(connection_handle):
1097            return connection
1098
1099        return None
1100
1101    def find_connection_by_bd_addr(
1102        self,
1103        bd_addr: Address,
1104        transport: Optional[int] = None,
1105        check_address_type: bool = False,
1106    ) -> Optional[Connection]:
1107        for connection in self.connections.values():
1108            if connection.peer_address.to_bytes() == bd_addr.to_bytes():
1109                if (
1110                    check_address_type
1111                    and connection.peer_address.address_type != bd_addr.address_type
1112                ):
1113                    continue
1114                if transport is None or connection.transport == transport:
1115                    return connection
1116
1117        return None
1118
1119    def create_l2cap_connector(self, connection, psm):
1120        return lambda: self.l2cap_channel_manager.connect(connection, psm)
1121
1122    def create_l2cap_registrar(self, psm):
1123        return lambda handler: self.register_l2cap_server(psm, handler)
1124
1125    def register_l2cap_server(self, psm, server):
1126        self.l2cap_channel_manager.register_server(psm, server)
1127
1128    def register_l2cap_channel_server(
1129        self,
1130        psm,
1131        server,
1132        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1133        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1134        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1135    ):
1136        return self.l2cap_channel_manager.register_le_coc_server(
1137            psm, server, max_credits, mtu, mps
1138        )
1139
1140    async def open_l2cap_channel(
1141        self,
1142        connection,
1143        psm,
1144        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1145        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1146        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1147    ):
1148        return await self.l2cap_channel_manager.open_le_coc(
1149            connection, psm, max_credits, mtu, mps
1150        )
1151
1152    def send_l2cap_pdu(self, connection_handle, cid, pdu):
1153        self.host.send_l2cap_pdu(connection_handle, cid, pdu)
1154
1155    async def send_command(self, command, check_result=False):
1156        try:
1157            return await asyncio.wait_for(
1158                self.host.send_command(command, check_result), self.command_timeout
1159            )
1160        except asyncio.TimeoutError as error:
1161            logger.warning('!!! Command timed out')
1162            raise CommandTimeoutError() from error
1163
1164    async def power_on(self) -> None:
1165        # Reset the controller
1166        await self.host.reset()
1167
1168        response = await self.send_command(HCI_Read_BD_ADDR_Command())  # type: ignore[call-arg]
1169        if response.return_parameters.status == HCI_SUCCESS:
1170            logger.debug(
1171                color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
1172            )
1173            self.public_address = response.return_parameters.bd_addr
1174
1175        if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
1176            await self.send_command(
1177                HCI_Write_LE_Host_Support_Command(
1178                    le_supported_host=int(self.le_enabled),
1179                    simultaneous_le_host=int(self.le_simultaneous_enabled),
1180                )  # type: ignore[call-arg]
1181            )
1182
1183        if self.le_enabled:
1184            # Set the controller address
1185            if self.random_address == Address.ANY_RANDOM:
1186                # Try to use an address generated at random by the controller
1187                if self.host.supports_command(HCI_LE_RAND_COMMAND):
1188                    # Get 8 random bytes
1189                    response = await self.send_command(
1190                        HCI_LE_Rand_Command(), check_result=True  # type: ignore[call-arg]
1191                    )
1192
1193                    # Ensure the address bytes can be a static random address
1194                    address_bytes = response.return_parameters.random_number[
1195                        :5
1196                    ] + bytes([response.return_parameters.random_number[5] | 0xC0])
1197
1198                    # Create a static random address from the random bytes
1199                    self.random_address = Address(address_bytes)
1200
1201            if self.random_address != Address.ANY_RANDOM:
1202                logger.debug(
1203                    color(
1204                        f'LE Random Address: {self.random_address}',
1205                        'yellow',
1206                    )
1207                )
1208                await self.send_command(
1209                    HCI_LE_Set_Random_Address_Command(
1210                        random_address=self.random_address
1211                    ),  # type: ignore[call-arg]
1212                    check_result=True,
1213                )
1214
1215            # Load the address resolving list
1216            if self.keystore and self.host.supports_command(
1217                HCI_LE_CLEAR_RESOLVING_LIST_COMMAND
1218            ):
1219                await self.send_command(HCI_LE_Clear_Resolving_List_Command())  # type: ignore[call-arg]
1220
1221                resolving_keys = await self.keystore.get_resolving_keys()
1222                for (irk, address) in resolving_keys:
1223                    await self.send_command(
1224                        HCI_LE_Add_Device_To_Resolving_List_Command(
1225                            peer_identity_address_type=address.address_type,
1226                            peer_identity_address=address,
1227                            peer_irk=irk,
1228                            local_irk=self.irk,
1229                        )  # type: ignore[call-arg]
1230                    )
1231
1232                # Enable address resolution
1233                # await self.send_command(
1234                #     HCI_LE_Set_Address_Resolution_Enable_Command(
1235                #         address_resolution_enable=1)
1236                #     )
1237                # )
1238
1239                # Create a host-side address resolver
1240                self.address_resolver = smp.AddressResolver(resolving_keys)
1241
1242        if self.classic_enabled:
1243            await self.send_command(
1244                HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))  # type: ignore[call-arg]
1245            )
1246            await self.send_command(
1247                HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)  # type: ignore[call-arg]
1248            )
1249            await self.send_command(
1250                HCI_Write_Simple_Pairing_Mode_Command(
1251                    simple_pairing_mode=int(self.classic_ssp_enabled)
1252                )  # type: ignore[call-arg]
1253            )
1254            await self.send_command(
1255                HCI_Write_Secure_Connections_Host_Support_Command(
1256                    secure_connections_host_support=int(self.classic_sc_enabled)
1257                )  # type: ignore[call-arg]
1258            )
1259            await self.set_connectable(self.connectable)
1260            await self.set_discoverable(self.discoverable)
1261
1262        # Done
1263        self.powered_on = True
1264
1265    async def power_off(self) -> None:
1266        if self.powered_on:
1267            await self.host.flush()
1268            self.powered_on = False
1269
1270    def supports_le_feature(self, feature):
1271        return self.host.supports_le_feature(feature)
1272
1273    def supports_le_phy(self, phy):
1274        if phy == HCI_LE_1M_PHY:
1275            return True
1276
1277        feature_map = {
1278            HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
1279            HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
1280        }
1281        if phy not in feature_map:
1282            raise ValueError('invalid PHY')
1283
1284        return self.host.supports_le_feature(feature_map[phy])
1285
1286    async def start_advertising(
1287        self,
1288        advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
1289        target: Optional[Address] = None,
1290        own_address_type: int = OwnAddressType.RANDOM,
1291        auto_restart: bool = False,
1292    ) -> None:
1293        # If we're advertising, stop first
1294        if self.advertising:
1295            await self.stop_advertising()
1296
1297        # Set/update the advertising data if the advertising type allows it
1298        if advertising_type.has_data:
1299            await self.send_command(
1300                HCI_LE_Set_Advertising_Data_Command(
1301                    advertising_data=self.advertising_data
1302                ),  # type: ignore[call-arg]
1303                check_result=True,
1304            )
1305
1306        # Set/update the scan response data if the advertising is scannable
1307        if advertising_type.is_scannable:
1308            await self.send_command(
1309                HCI_LE_Set_Scan_Response_Data_Command(
1310                    scan_response_data=self.scan_response_data
1311                ),  # type: ignore[call-arg]
1312                check_result=True,
1313            )
1314
1315        # Decide what peer address to use
1316        if advertising_type.is_directed:
1317            if target is None:
1318                raise ValueError('directed advertising requires a target address')
1319
1320            peer_address = target
1321            peer_address_type = target.address_type
1322        else:
1323            peer_address = Address('00:00:00:00:00:00')
1324            peer_address_type = Address.PUBLIC_DEVICE_ADDRESS
1325
1326        # Set the advertising parameters
1327        await self.send_command(
1328            HCI_LE_Set_Advertising_Parameters_Command(
1329                advertising_interval_min=self.advertising_interval_min,
1330                advertising_interval_max=self.advertising_interval_max,
1331                advertising_type=int(advertising_type),
1332                own_address_type=own_address_type,
1333                peer_address_type=peer_address_type,
1334                peer_address=peer_address,
1335                advertising_channel_map=7,
1336                advertising_filter_policy=0,
1337            ),  # type: ignore[call-arg]
1338            check_result=True,
1339        )
1340
1341        # Enable advertising
1342        await self.send_command(
1343            HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),  # type: ignore[call-arg]
1344            check_result=True,
1345        )
1346
1347        self.advertising_own_address_type = own_address_type
1348        self.auto_restart_advertising = auto_restart
1349        self.advertising_type = advertising_type
1350        self.advertising = True
1351
1352    async def stop_advertising(self) -> None:
1353        # Disable advertising
1354        if self.advertising:
1355            await self.send_command(
1356                HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),  # type: ignore[call-arg]
1357                check_result=True,
1358            )
1359
1360            self.advertising_own_address_type = None
1361            self.advertising = False
1362            self.advertising_type = None
1363            self.auto_restart_advertising = False
1364
1365    @property
1366    def is_advertising(self):
1367        return self.advertising
1368
1369    async def start_scanning(
1370        self,
1371        legacy: bool = False,
1372        active: bool = True,
1373        scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL,  # Scan interval in ms
1374        scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW,  # Scan window in ms
1375        own_address_type: int = OwnAddressType.RANDOM,
1376        filter_duplicates: bool = False,
1377        scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
1378    ) -> None:
1379        # Check that the arguments are legal
1380        if scan_interval < scan_window:
1381            raise ValueError('scan_interval must be >= scan_window')
1382        if (
1383            scan_interval < DEVICE_MIN_SCAN_INTERVAL
1384            or scan_interval > DEVICE_MAX_SCAN_INTERVAL
1385        ):
1386            raise ValueError('scan_interval out of range')
1387        if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
1388            raise ValueError('scan_interval out of range')
1389
1390        # Reset the accumulators
1391        self.advertisement_accumulators = {}
1392
1393        # Enable scanning
1394        if not legacy and self.supports_le_feature(
1395            HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE
1396        ):
1397            # Set the scanning parameters
1398            scan_type = (
1399                HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING
1400                if active
1401                else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
1402            )
1403            scanning_filter_policy = (
1404                HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
1405            )  # TODO: support other types
1406
1407            scanning_phy_count = 0
1408            scanning_phys_bits = 0
1409            if HCI_LE_1M_PHY in scanning_phys:
1410                scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
1411                scanning_phy_count += 1
1412            if HCI_LE_CODED_PHY in scanning_phys:
1413                if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
1414                    scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
1415                    scanning_phy_count += 1
1416
1417            if scanning_phy_count == 0:
1418                raise ValueError('at least one scanning PHY must be enabled')
1419
1420            await self.send_command(
1421                HCI_LE_Set_Extended_Scan_Parameters_Command(
1422                    own_address_type=own_address_type,
1423                    scanning_filter_policy=scanning_filter_policy,
1424                    scanning_phys=scanning_phys_bits,
1425                    scan_types=[scan_type] * scanning_phy_count,
1426                    scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
1427                    scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
1428                ),  # type: ignore[call-arg]
1429                check_result=True,
1430            )
1431
1432            # Enable scanning
1433            await self.send_command(
1434                HCI_LE_Set_Extended_Scan_Enable_Command(
1435                    enable=1,
1436                    filter_duplicates=1 if filter_duplicates else 0,
1437                    duration=0,  # TODO allow other values
1438                    period=0,  # TODO allow other values
1439                ),  # type: ignore[call-arg]
1440                check_result=True,
1441            )
1442        else:
1443            # Set the scanning parameters
1444            scan_type = (
1445                HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING
1446                if active
1447                else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
1448            )
1449            await self.send_command(
1450                # pylint: disable=line-too-long
1451                HCI_LE_Set_Scan_Parameters_Command(
1452                    le_scan_type=scan_type,
1453                    le_scan_interval=int(scan_window / 0.625),
1454                    le_scan_window=int(scan_window / 0.625),
1455                    own_address_type=own_address_type,
1456                    scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
1457                ),  # type: ignore[call-arg]
1458                check_result=True,
1459            )
1460
1461            # Enable scanning
1462            await self.send_command(
1463                HCI_LE_Set_Scan_Enable_Command(
1464                    le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
1465                ),  # type: ignore[call-arg]
1466                check_result=True,
1467            )
1468
1469        self.scanning_is_passive = not active
1470        self.scanning = True
1471
1472    async def stop_scanning(self) -> None:
1473        # Disable scanning
1474        if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
1475            await self.send_command(
1476                HCI_LE_Set_Extended_Scan_Enable_Command(
1477                    enable=0, filter_duplicates=0, duration=0, period=0
1478                ),  # type: ignore[call-arg]
1479                check_result=True,
1480            )
1481        else:
1482            await self.send_command(
1483                HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),  # type: ignore[call-arg]
1484                check_result=True,
1485            )
1486
1487        self.scanning = False
1488
1489    @property
1490    def is_scanning(self):
1491        return self.scanning
1492
1493    @host_event_handler
1494    def on_advertising_report(self, report):
1495        if not (accumulator := self.advertisement_accumulators.get(report.address)):
1496            accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
1497            self.advertisement_accumulators[report.address] = accumulator
1498        if advertisement := accumulator.update(report):
1499            self.emit('advertisement', advertisement)
1500
1501    async def start_discovery(self, auto_restart: bool = True) -> None:
1502        await self.send_command(
1503            HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),  # type: ignore[call-arg]
1504            check_result=True,
1505        )
1506
1507        response = await self.send_command(
1508            HCI_Inquiry_Command(
1509                lap=HCI_GENERAL_INQUIRY_LAP,
1510                inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
1511                num_responses=0,  # Unlimited number of responses.
1512            )  # type: ignore[call-arg]
1513        )
1514        if response.status != HCI_Command_Status_Event.PENDING:
1515            self.discovering = False
1516            raise HCI_StatusError(response)
1517
1518        self.auto_restart_inquiry = auto_restart
1519        self.discovering = True
1520
1521    async def stop_discovery(self) -> None:
1522        if self.discovering:
1523            await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)  # type: ignore[call-arg]
1524        self.auto_restart_inquiry = True
1525        self.discovering = False
1526
1527    @host_event_handler
1528    def on_inquiry_result(self, address, class_of_device, data, rssi):
1529        self.emit(
1530            'inquiry_result',
1531            address,
1532            class_of_device,
1533            AdvertisingData.from_bytes(data),
1534            rssi,
1535        )
1536
1537    async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled):
1538        if inquiry_scan_enabled and page_scan_enabled:
1539            scan_enable = 0x03
1540        elif page_scan_enabled:
1541            scan_enable = 0x02
1542        elif inquiry_scan_enabled:
1543            scan_enable = 0x01
1544        else:
1545            scan_enable = 0x00
1546
1547        return await self.send_command(
1548            HCI_Write_Scan_Enable_Command(scan_enable=scan_enable)
1549        )
1550
1551    async def set_discoverable(self, discoverable: bool = True) -> None:
1552        self.discoverable = discoverable
1553        if self.classic_enabled:
1554            # Synthesize an inquiry response if none is set already
1555            if self.inquiry_response is None:
1556                self.inquiry_response = bytes(
1557                    AdvertisingData(
1558                        [
1559                            (
1560                                AdvertisingData.COMPLETE_LOCAL_NAME,
1561                                bytes(self.name, 'utf-8'),
1562                            )
1563                        ]
1564                    )
1565                )
1566
1567            # Update the controller
1568            await self.send_command(
1569                HCI_Write_Extended_Inquiry_Response_Command(
1570                    fec_required=0, extended_inquiry_response=self.inquiry_response
1571                ),  # type: ignore[call-arg]
1572                check_result=True,
1573            )
1574            await self.set_scan_enable(
1575                inquiry_scan_enabled=self.discoverable,
1576                page_scan_enabled=self.connectable,
1577            )
1578
1579    async def set_connectable(self, connectable: bool = True) -> None:
1580        self.connectable = connectable
1581        if self.classic_enabled:
1582            await self.set_scan_enable(
1583                inquiry_scan_enabled=self.discoverable,
1584                page_scan_enabled=self.connectable,
1585            )
1586
1587    async def connect(
1588        self,
1589        peer_address: Union[Address, str],
1590        transport: int = BT_LE_TRANSPORT,
1591        connection_parameters_preferences: Optional[
1592            Dict[int, ConnectionParametersPreferences]
1593        ] = None,
1594        own_address_type: int = OwnAddressType.RANDOM,
1595        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
1596    ) -> Connection:
1597        '''
1598        Request a connection to a peer.
1599        When transport is BLE, this method cannot be called if there is already a
1600        pending connection.
1601
1602        connection_parameters_preferences: (BLE only, ignored for BR/EDR)
1603          * None: use all PHYs with default parameters
1604          * map: each entry has a PHY as key and a ConnectionParametersPreferences
1605            object as value
1606
1607        own_address_type: (BLE only)
1608        '''
1609
1610        # Check parameters
1611        if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
1612            raise ValueError('invalid transport')
1613
1614        # Adjust the transport automatically if we need to
1615        if transport == BT_LE_TRANSPORT and not self.le_enabled:
1616            transport = BT_BR_EDR_TRANSPORT
1617        elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled:
1618            transport = BT_LE_TRANSPORT
1619
1620        # Check that there isn't already a pending connection
1621        if transport == BT_LE_TRANSPORT and self.is_le_connecting:
1622            raise InvalidStateError('connection already pending')
1623
1624        if isinstance(peer_address, str):
1625            try:
1626                peer_address = Address.from_string_for_transport(
1627                    peer_address, transport
1628                )
1629            except ValueError:
1630                # If the address is not parsable, assume it is a name instead
1631                logger.debug('looking for peer by name')
1632                peer_address = await self.find_peer_by_name(
1633                    peer_address, transport
1634                )  # TODO: timeout
1635        else:
1636            # All BR/EDR addresses should be public addresses
1637            if (
1638                transport == BT_BR_EDR_TRANSPORT
1639                and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS
1640            ):
1641                raise ValueError('BR/EDR addresses must be PUBLIC')
1642
1643        assert isinstance(peer_address, Address)
1644
1645        def on_connection(connection):
1646            if transport == BT_LE_TRANSPORT or (
1647                # match BR/EDR connection event against peer address
1648                connection.transport == transport
1649                and connection.peer_address == peer_address
1650            ):
1651                pending_connection.set_result(connection)
1652
1653        def on_connection_failure(error):
1654            if transport == BT_LE_TRANSPORT or (
1655                # match BR/EDR connection failure event against peer address
1656                error.transport == transport
1657                and error.peer_address == peer_address
1658            ):
1659                pending_connection.set_exception(error)
1660
1661        # Create a future so that we can wait for the connection's result
1662        pending_connection = asyncio.get_running_loop().create_future()
1663        self.on('connection', on_connection)
1664        self.on('connection_failure', on_connection_failure)
1665
1666        try:
1667            # Tell the controller to connect
1668            if transport == BT_LE_TRANSPORT:
1669                if connection_parameters_preferences is None:
1670                    if connection_parameters_preferences is None:
1671                        connection_parameters_preferences = {
1672                            HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
1673                            HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
1674                            HCI_LE_CODED_PHY: ConnectionParametersPreferences.default,
1675                        }
1676
1677                self.connect_own_address_type = own_address_type
1678
1679                if self.host.supports_command(
1680                    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND
1681                ):
1682                    # Only keep supported PHYs
1683                    phys = sorted(
1684                        list(
1685                            set(
1686                                filter(
1687                                    self.supports_le_phy,
1688                                    connection_parameters_preferences.keys(),
1689                                )
1690                            )
1691                        )
1692                    )
1693                    if not phys:
1694                        raise ValueError('at least one supported PHY needed')
1695
1696                    phy_count = len(phys)
1697                    initiating_phys = phy_list_to_bits(phys)
1698
1699                    connection_interval_mins = [
1700                        int(
1701                            connection_parameters_preferences[
1702                                phy
1703                            ].connection_interval_min
1704                            / 1.25
1705                        )
1706                        for phy in phys
1707                    ]
1708                    connection_interval_maxs = [
1709                        int(
1710                            connection_parameters_preferences[
1711                                phy
1712                            ].connection_interval_max
1713                            / 1.25
1714                        )
1715                        for phy in phys
1716                    ]
1717                    max_latencies = [
1718                        connection_parameters_preferences[phy].max_latency
1719                        for phy in phys
1720                    ]
1721                    supervision_timeouts = [
1722                        int(
1723                            connection_parameters_preferences[phy].supervision_timeout
1724                            / 10
1725                        )
1726                        for phy in phys
1727                    ]
1728                    min_ce_lengths = [
1729                        int(
1730                            connection_parameters_preferences[phy].min_ce_length / 0.625
1731                        )
1732                        for phy in phys
1733                    ]
1734                    max_ce_lengths = [
1735                        int(
1736                            connection_parameters_preferences[phy].max_ce_length / 0.625
1737                        )
1738                        for phy in phys
1739                    ]
1740
1741                    result = await self.send_command(
1742                        HCI_LE_Extended_Create_Connection_Command(
1743                            initiator_filter_policy=0,
1744                            own_address_type=own_address_type,
1745                            peer_address_type=peer_address.address_type,
1746                            peer_address=peer_address,
1747                            initiating_phys=initiating_phys,
1748                            scan_intervals=(
1749                                int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
1750                            )
1751                            * phy_count,
1752                            scan_windows=(
1753                                int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
1754                            )
1755                            * phy_count,
1756                            connection_interval_mins=connection_interval_mins,
1757                            connection_interval_maxs=connection_interval_maxs,
1758                            max_latencies=max_latencies,
1759                            supervision_timeouts=supervision_timeouts,
1760                            min_ce_lengths=min_ce_lengths,
1761                            max_ce_lengths=max_ce_lengths,
1762                        )  # type: ignore[call-arg]
1763                    )
1764                else:
1765                    if HCI_LE_1M_PHY not in connection_parameters_preferences:
1766                        raise ValueError('1M PHY preferences required')
1767
1768                    prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
1769                    result = await self.send_command(
1770                        HCI_LE_Create_Connection_Command(
1771                            le_scan_interval=int(
1772                                DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625
1773                            ),
1774                            le_scan_window=int(
1775                                DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625
1776                            ),
1777                            initiator_filter_policy=0,
1778                            peer_address_type=peer_address.address_type,
1779                            peer_address=peer_address,
1780                            own_address_type=own_address_type,
1781                            connection_interval_min=int(
1782                                prefs.connection_interval_min / 1.25
1783                            ),
1784                            connection_interval_max=int(
1785                                prefs.connection_interval_max / 1.25
1786                            ),
1787                            max_latency=prefs.max_latency,
1788                            supervision_timeout=int(prefs.supervision_timeout / 10),
1789                            min_ce_length=int(prefs.min_ce_length / 0.625),
1790                            max_ce_length=int(prefs.max_ce_length / 0.625),
1791                        )  # type: ignore[call-arg]
1792                    )
1793            else:
1794                # Save pending connection
1795                self.pending_connections[peer_address] = Connection.incomplete(
1796                    self, peer_address
1797                )
1798
1799                # TODO: allow passing other settings
1800                result = await self.send_command(
1801                    HCI_Create_Connection_Command(
1802                        bd_addr=peer_address,
1803                        packet_type=0xCC18,  # FIXME: change
1804                        page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE,
1805                        clock_offset=0x0000,
1806                        allow_role_switch=0x01,
1807                        reserved=0,
1808                    )  # type: ignore[call-arg]
1809                )
1810
1811            if result.status != HCI_Command_Status_Event.PENDING:
1812                raise HCI_StatusError(result)
1813
1814            # Wait for the connection process to complete
1815            if transport == BT_LE_TRANSPORT:
1816                self.le_connecting = True
1817
1818            if timeout is None:
1819                return await self.abort_on('flush', pending_connection)
1820
1821            try:
1822                return await asyncio.wait_for(
1823                    asyncio.shield(pending_connection), timeout
1824                )
1825            except asyncio.TimeoutError:
1826                if transport == BT_LE_TRANSPORT:
1827                    await self.send_command(HCI_LE_Create_Connection_Cancel_Command())  # type: ignore[call-arg]
1828                else:
1829                    await self.send_command(
1830                        HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)  # type: ignore[call-arg]
1831                    )
1832
1833                try:
1834                    return await self.abort_on('flush', pending_connection)
1835                except core.ConnectionError as error:
1836                    raise core.TimeoutError() from error
1837        finally:
1838            self.remove_listener('connection', on_connection)
1839            self.remove_listener('connection_failure', on_connection_failure)
1840            if transport == BT_LE_TRANSPORT:
1841                self.le_connecting = False
1842                self.connect_own_address_type = None
1843            else:
1844                self.pending_connections.pop(peer_address, None)
1845
1846    async def accept(
1847        self,
1848        peer_address: Union[Address, str] = Address.ANY,
1849        role: int = BT_PERIPHERAL_ROLE,
1850        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
1851    ) -> Connection:
1852        '''
1853        Wait and accept any incoming connection or a connection from `peer_address` when
1854        set.
1855
1856        Notes:
1857          * A `connect` to the same peer will not complete this call.
1858          * The `timeout` parameter is only handled while waiting for the connection
1859            request, once received and accepted, the controller shall issue a connection
1860            complete event.
1861        '''
1862
1863        if isinstance(peer_address, str):
1864            try:
1865                peer_address = Address(peer_address)
1866            except ValueError:
1867                # If the address is not parsable, assume it is a name instead
1868                logger.debug('looking for peer by name')
1869                peer_address = await self.find_peer_by_name(
1870                    peer_address, BT_BR_EDR_TRANSPORT
1871                )  # TODO: timeout
1872
1873        assert isinstance(peer_address, Address)
1874
1875        if peer_address == Address.NIL:
1876            raise ValueError('accept on nil address')
1877
1878        # Create a future so that we can wait for the request
1879        pending_request_fut = asyncio.get_running_loop().create_future()
1880
1881        if peer_address == Address.ANY:
1882            self.classic_pending_accepts[Address.ANY].append(pending_request_fut)
1883        elif peer_address in self.classic_pending_accepts:
1884            raise InvalidStateError('accept connection already pending')
1885        else:
1886            self.classic_pending_accepts[peer_address] = [pending_request_fut]
1887
1888        try:
1889            # Wait for a request or a completed connection
1890            pending_request = self.abort_on('flush', pending_request_fut)
1891            result = await (
1892                asyncio.wait_for(pending_request, timeout)
1893                if timeout
1894                else pending_request
1895            )
1896        except Exception:
1897            # Remove future from device context
1898            if peer_address == Address.ANY:
1899                self.classic_pending_accepts[Address.ANY].remove(pending_request_fut)
1900            else:
1901                self.classic_pending_accepts.pop(peer_address)
1902            raise
1903
1904        # Result may already be a completed connection,
1905        # see `on_connection` for details
1906        if isinstance(result, Connection):
1907            return result
1908
1909        # Otherwise, result came from `on_connection_request`
1910        peer_address, _class_of_device, _link_type = result
1911        assert isinstance(peer_address, Address)
1912
1913        # Create a future so that we can wait for the connection's result
1914        pending_connection = asyncio.get_running_loop().create_future()
1915
1916        def on_connection(connection):
1917            if (
1918                connection.transport == BT_BR_EDR_TRANSPORT
1919                and connection.peer_address == peer_address
1920            ):
1921                pending_connection.set_result(connection)
1922
1923        def on_connection_failure(error):
1924            if (
1925                error.transport == BT_BR_EDR_TRANSPORT
1926                and error.peer_address == peer_address
1927            ):
1928                pending_connection.set_exception(error)
1929
1930        self.on('connection', on_connection)
1931        self.on('connection_failure', on_connection_failure)
1932
1933        # Save pending connection
1934        self.pending_connections[peer_address] = Connection.incomplete(
1935            self, peer_address
1936        )
1937
1938        try:
1939            # Accept connection request
1940            await self.send_command(
1941                HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)  # type: ignore[call-arg]
1942            )
1943
1944            # Wait for connection complete
1945            return await self.abort_on('flush', pending_connection)
1946
1947        finally:
1948            self.remove_listener('connection', on_connection)
1949            self.remove_listener('connection_failure', on_connection_failure)
1950            self.pending_connections.pop(peer_address, None)
1951
1952    @asynccontextmanager
1953    async def connect_as_gatt(self, peer_address):
1954        async with AsyncExitStack() as stack:
1955            connection = await stack.enter_async_context(
1956                await self.connect(peer_address)
1957            )
1958            peer = await stack.enter_async_context(Peer(connection))
1959
1960            yield peer
1961
1962    @property
1963    def is_le_connecting(self):
1964        return self.le_connecting
1965
1966    @property
1967    def is_disconnecting(self):
1968        return self.disconnecting
1969
1970    async def cancel_connection(self, peer_address=None):
1971        # Low-energy: cancel ongoing connection
1972        if peer_address is None:
1973            if not self.is_le_connecting:
1974                return
1975            await self.send_command(
1976                HCI_LE_Create_Connection_Cancel_Command(), check_result=True
1977            )
1978
1979        # BR/EDR: try to cancel to ongoing connection
1980        # NOTE: This API does not prevent from trying to cancel a connection which is
1981        # not currently being created
1982        else:
1983            if isinstance(peer_address, str):
1984                try:
1985                    peer_address = Address(peer_address)
1986                except ValueError:
1987                    # If the address is not parsable, assume it is a name instead
1988                    logger.debug('looking for peer by name')
1989                    peer_address = await self.find_peer_by_name(
1990                        peer_address, BT_BR_EDR_TRANSPORT
1991                    )  # TODO: timeout
1992
1993            await self.send_command(
1994                HCI_Create_Connection_Cancel_Command(bd_addr=peer_address),
1995                check_result=True,
1996            )
1997
1998    async def disconnect(self, connection, reason):
1999        # Create a future so that we can wait for the disconnection's result
2000        pending_disconnection = asyncio.get_running_loop().create_future()
2001        connection.on('disconnection', pending_disconnection.set_result)
2002        connection.on('disconnection_failure', pending_disconnection.set_exception)
2003
2004        # Request a disconnection
2005        result = await self.send_command(
2006            HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
2007        )
2008
2009        try:
2010            if result.status != HCI_Command_Status_Event.PENDING:
2011                raise HCI_StatusError(result)
2012
2013            # Wait for the disconnection process to complete
2014            self.disconnecting = True
2015            return await self.abort_on('flush', pending_disconnection)
2016        finally:
2017            connection.remove_listener(
2018                'disconnection', pending_disconnection.set_result
2019            )
2020            connection.remove_listener(
2021                'disconnection_failure', pending_disconnection.set_exception
2022            )
2023            self.disconnecting = False
2024
2025    async def update_connection_parameters(
2026        self,
2027        connection,
2028        connection_interval_min,
2029        connection_interval_max,
2030        max_latency,
2031        supervision_timeout,
2032        min_ce_length=0,
2033        max_ce_length=0,
2034    ):
2035        '''
2036        NOTE: the name of the parameters may look odd, but it just follows the names
2037        used in the Bluetooth spec.
2038        '''
2039        result = await self.send_command(
2040            HCI_LE_Connection_Update_Command(
2041                connection_handle=connection.handle,
2042                connection_interval_min=connection_interval_min,
2043                connection_interval_max=connection_interval_max,
2044                max_latency=max_latency,
2045                supervision_timeout=supervision_timeout,
2046                min_ce_length=min_ce_length,
2047                max_ce_length=max_ce_length,
2048            )
2049        )
2050        if result.status != HCI_Command_Status_Event.PENDING:
2051            raise HCI_StatusError(result)
2052
2053    async def get_connection_rssi(self, connection):
2054        result = await self.send_command(
2055            HCI_Read_RSSI_Command(handle=connection.handle), check_result=True
2056        )
2057        return result.return_parameters.rssi
2058
2059    async def get_connection_phy(self, connection):
2060        result = await self.send_command(
2061            HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
2062            check_result=True,
2063        )
2064        return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
2065
2066    async def set_connection_phy(
2067        self, connection, tx_phys=None, rx_phys=None, phy_options=None
2068    ):
2069        if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND):
2070            logger.warning('ignoring request, command not supported')
2071            return
2072
2073        all_phys_bits = (1 if tx_phys is None else 0) | (
2074            (1 if rx_phys is None else 0) << 1
2075        )
2076
2077        result = await self.send_command(
2078            HCI_LE_Set_PHY_Command(
2079                connection_handle=connection.handle,
2080                all_phys=all_phys_bits,
2081                tx_phys=phy_list_to_bits(tx_phys),
2082                rx_phys=phy_list_to_bits(rx_phys),
2083                phy_options=0 if phy_options is None else int(phy_options),
2084            )
2085        )
2086
2087        if result.status != HCI_COMMAND_STATUS_PENDING:
2088            logger.warning(
2089                'HCI_LE_Set_PHY_Command failed: '
2090                f'{HCI_Constant.error_name(result.status)}'
2091            )
2092            raise HCI_StatusError(result)
2093
2094    async def set_default_phy(self, tx_phys=None, rx_phys=None):
2095        all_phys_bits = (1 if tx_phys is None else 0) | (
2096            (1 if rx_phys is None else 0) << 1
2097        )
2098
2099        return await self.send_command(
2100            HCI_LE_Set_Default_PHY_Command(
2101                all_phys=all_phys_bits,
2102                tx_phys=phy_list_to_bits(tx_phys),
2103                rx_phys=phy_list_to_bits(rx_phys),
2104            ),
2105            check_result=True,
2106        )
2107
2108    async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
2109        """
2110        Scan for a peer with a give name and return its address and transport
2111        """
2112
2113        # Create a future to wait for an address to be found
2114        peer_address = asyncio.get_running_loop().create_future()
2115
2116        # Scan/inquire with event handlers to handle scan/inquiry results
2117        def on_peer_found(address, ad_data):
2118            local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
2119            if local_name is None:
2120                local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
2121            if local_name is not None:
2122                if local_name.decode('utf-8') == name:
2123                    peer_address.set_result(address)
2124
2125        handler = None
2126        was_scanning = self.scanning
2127        was_discovering = self.discovering
2128        try:
2129            if transport == BT_LE_TRANSPORT:
2130                event_name = 'advertisement'
2131                handler = self.on(
2132                    event_name,
2133                    lambda advertisement: on_peer_found(
2134                        advertisement.address, advertisement.data
2135                    ),
2136                )
2137
2138                if not self.scanning:
2139                    await self.start_scanning(filter_duplicates=True)
2140
2141            elif transport == BT_BR_EDR_TRANSPORT:
2142                event_name = 'inquiry_result'
2143                handler = self.on(
2144                    event_name,
2145                    lambda address, class_of_device, eir_data, rssi: on_peer_found(
2146                        address, eir_data
2147                    ),
2148                )
2149
2150                if not self.discovering:
2151                    await self.start_discovery()
2152            else:
2153                return None
2154
2155            return await self.abort_on('flush', peer_address)
2156        finally:
2157            if handler is not None:
2158                self.remove_listener(event_name, handler)
2159
2160            if transport == BT_LE_TRANSPORT and not was_scanning:
2161                await self.stop_scanning()
2162            elif transport == BT_BR_EDR_TRANSPORT and not was_discovering:
2163                await self.stop_discovery()
2164
2165    @property
2166    def pairing_config_factory(self):
2167        return self.smp_manager.pairing_config_factory
2168
2169    @pairing_config_factory.setter
2170    def pairing_config_factory(self, pairing_config_factory):
2171        self.smp_manager.pairing_config_factory = pairing_config_factory
2172
2173    async def pair(self, connection):
2174        return await self.smp_manager.pair(connection)
2175
2176    def request_pairing(self, connection):
2177        return self.smp_manager.request_pairing(connection)
2178
2179    async def get_long_term_key(self, connection_handle, rand, ediv):
2180        if (connection := self.lookup_connection(connection_handle)) is None:
2181            return
2182
2183        # Start by looking for the key in an SMP session
2184        ltk = self.smp_manager.get_long_term_key(connection, rand, ediv)
2185        if ltk is not None:
2186            return ltk
2187
2188        # Then look for the key in the keystore
2189        if self.keystore is not None:
2190            keys = await self.keystore.get(str(connection.peer_address))
2191            if keys is not None:
2192                logger.debug('found keys in the key store')
2193                if keys.ltk:
2194                    return keys.ltk.value
2195
2196                if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
2197                    return keys.ltk_central.value
2198
2199                if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
2200                    return keys.ltk_peripheral.value
2201
2202    async def get_link_key(self, address):
2203        # Look for the key in the keystore
2204        if self.keystore is not None:
2205            keys = await self.keystore.get(str(address))
2206            if keys is not None:
2207                logger.debug('found keys in the key store')
2208                return keys.link_key.value
2209
2210    # [Classic only]
2211    async def authenticate(self, connection):
2212        # Set up event handlers
2213        pending_authentication = asyncio.get_running_loop().create_future()
2214
2215        def on_authentication():
2216            pending_authentication.set_result(None)
2217
2218        def on_authentication_failure(error_code):
2219            pending_authentication.set_exception(HCI_Error(error_code))
2220
2221        connection.on('connection_authentication', on_authentication)
2222        connection.on('connection_authentication_failure', on_authentication_failure)
2223
2224        # Request the authentication
2225        try:
2226            result = await self.send_command(
2227                HCI_Authentication_Requested_Command(
2228                    connection_handle=connection.handle
2229                )
2230            )
2231            if result.status != HCI_COMMAND_STATUS_PENDING:
2232                logger.warning(
2233                    'HCI_Authentication_Requested_Command failed: '
2234                    f'{HCI_Constant.error_name(result.status)}'
2235                )
2236                raise HCI_StatusError(result)
2237
2238            # Wait for the authentication to complete
2239            await connection.abort_on('disconnection', pending_authentication)
2240        finally:
2241            connection.remove_listener('connection_authentication', on_authentication)
2242            connection.remove_listener(
2243                'connection_authentication_failure', on_authentication_failure
2244            )
2245
2246    async def encrypt(self, connection, enable=True):
2247        if not enable and connection.transport == BT_LE_TRANSPORT:
2248            raise ValueError('`enable` parameter is classic only.')
2249
2250        # Set up event handlers
2251        pending_encryption = asyncio.get_running_loop().create_future()
2252
2253        def on_encryption_change():
2254            pending_encryption.set_result(None)
2255
2256        def on_encryption_failure(error_code):
2257            pending_encryption.set_exception(HCI_Error(error_code))
2258
2259        connection.on('connection_encryption_change', on_encryption_change)
2260        connection.on('connection_encryption_failure', on_encryption_failure)
2261
2262        # Request the encryption
2263        try:
2264            if connection.transport == BT_LE_TRANSPORT:
2265                # Look for a key in the key store
2266                if self.keystore is None:
2267                    raise RuntimeError('no key store')
2268
2269                keys = await self.keystore.get(str(connection.peer_address))
2270                if keys is None:
2271                    raise RuntimeError('keys not found in key store')
2272
2273                if keys.ltk is not None:
2274                    ltk = keys.ltk.value
2275                    rand = bytes(8)
2276                    ediv = 0
2277                elif keys.ltk_central is not None:
2278                    ltk = keys.ltk_central.value
2279                    rand = keys.ltk_central.rand
2280                    ediv = keys.ltk_central.ediv
2281                else:
2282                    raise RuntimeError('no LTK found for peer')
2283
2284                if connection.role != HCI_CENTRAL_ROLE:
2285                    raise InvalidStateError('only centrals can start encryption')
2286
2287                result = await self.send_command(
2288                    HCI_LE_Enable_Encryption_Command(
2289                        connection_handle=connection.handle,
2290                        random_number=rand,
2291                        encrypted_diversifier=ediv,
2292                        long_term_key=ltk,
2293                    )
2294                )
2295
2296                if result.status != HCI_COMMAND_STATUS_PENDING:
2297                    logger.warning(
2298                        'HCI_LE_Enable_Encryption_Command failed: '
2299                        f'{HCI_Constant.error_name(result.status)}'
2300                    )
2301                    raise HCI_StatusError(result)
2302            else:
2303                result = await self.send_command(
2304                    HCI_Set_Connection_Encryption_Command(
2305                        connection_handle=connection.handle,
2306                        encryption_enable=0x01 if enable else 0x00,
2307                    )
2308                )
2309
2310                if result.status != HCI_COMMAND_STATUS_PENDING:
2311                    logger.warning(
2312                        'HCI_Set_Connection_Encryption_Command failed: '
2313                        f'{HCI_Constant.error_name(result.status)}'
2314                    )
2315                    raise HCI_StatusError(result)
2316
2317            # Wait for the result
2318            await connection.abort_on('disconnection', pending_encryption)
2319        finally:
2320            connection.remove_listener(
2321                'connection_encryption_change', on_encryption_change
2322            )
2323            connection.remove_listener(
2324                'connection_encryption_failure', on_encryption_failure
2325            )
2326
2327    # [Classic only]
2328    async def switch_role(self, connection: Connection, role: int):
2329        pending_role_change = asyncio.get_running_loop().create_future()
2330
2331        def on_role_change(new_role):
2332            pending_role_change.set_result(new_role)
2333
2334        def on_role_change_failure(error_code):
2335            pending_role_change.set_exception(HCI_Error(error_code))
2336
2337        connection.on('role_change', on_role_change)
2338        connection.on('role_change_failure', on_role_change_failure)
2339
2340        try:
2341            result = await self.send_command(
2342                HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)  # type: ignore[call-arg]
2343            )
2344            if result.status != HCI_COMMAND_STATUS_PENDING:
2345                logger.warning(
2346                    'HCI_Switch_Role_Command failed: '
2347                    f'{HCI_Constant.error_name(result.status)}'
2348                )
2349                raise HCI_StatusError(result)
2350            await connection.abort_on('disconnection', pending_role_change)
2351        finally:
2352            connection.remove_listener('role_change', on_role_change)
2353            connection.remove_listener('role_change_failure', on_role_change_failure)
2354
2355    # [Classic only]
2356    async def request_remote_name(self, remote: Union[Address, Connection]) -> str:
2357        # Set up event handlers
2358        pending_name = asyncio.get_running_loop().create_future()
2359
2360        peer_address = remote if isinstance(remote, Address) else remote.peer_address
2361
2362        handler = self.on(
2363            'remote_name',
2364            lambda address, remote_name: pending_name.set_result(remote_name)
2365            if address == peer_address
2366            else None,
2367        )
2368        failure_handler = self.on(
2369            'remote_name_failure',
2370            lambda address, error_code: pending_name.set_exception(
2371                HCI_Error(error_code)
2372            )
2373            if address == peer_address
2374            else None,
2375        )
2376
2377        try:
2378            result = await self.send_command(
2379                HCI_Remote_Name_Request_Command(
2380                    bd_addr=peer_address,
2381                    page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
2382                    reserved=0,
2383                    clock_offset=0,  # TODO investigate non-0 values
2384                )  # type: ignore[call-arg]
2385            )
2386
2387            if result.status != HCI_COMMAND_STATUS_PENDING:
2388                logger.warning(
2389                    'HCI_Set_Connection_Encryption_Command failed: '
2390                    f'{HCI_Constant.error_name(result.status)}'
2391                )
2392                raise HCI_StatusError(result)
2393
2394            # Wait for the result
2395            return await self.abort_on('flush', pending_name)
2396        finally:
2397            self.remove_listener('remote_name', handler)
2398            self.remove_listener('remote_name_failure', failure_handler)
2399
2400    @host_event_handler
2401    def on_flush(self):
2402        self.emit('flush')
2403        for _, connection in self.connections.items():
2404            connection.emit('disconnection', 0)
2405        self.connections = {}
2406
2407    # [Classic only]
2408    @host_event_handler
2409    def on_link_key(self, bd_addr, link_key, key_type):
2410        # Store the keys in the key store
2411        if self.keystore:
2412            pairing_keys = PairingKeys()
2413            pairing_keys.link_key = PairingKeys.Key(value=link_key)
2414
2415            async def store_keys():
2416                try:
2417                    await self.keystore.update(str(bd_addr), pairing_keys)
2418                except Exception as error:
2419                    logger.warning(f'!!! error while storing keys: {error}')
2420
2421            self.abort_on('flush', store_keys())
2422
2423        if connection := self.find_connection_by_bd_addr(
2424            bd_addr, transport=BT_BR_EDR_TRANSPORT
2425        ):
2426            connection.link_key_type = key_type
2427
2428    def add_service(self, service):
2429        self.gatt_server.add_service(service)
2430
2431    def add_services(self, services):
2432        self.gatt_server.add_services(services)
2433
2434    def add_default_services(self, generic_access_service=True):
2435        # Add a GAP Service if requested
2436        if generic_access_service:
2437            self.gatt_server.add_service(GenericAccessService(self.name))
2438
2439    async def notify_subscriber(self, connection, attribute, value=None, force=False):
2440        await self.gatt_server.notify_subscriber(connection, attribute, value, force)
2441
2442    async def notify_subscribers(self, attribute, value=None, force=False):
2443        await self.gatt_server.notify_subscribers(attribute, value, force)
2444
2445    async def indicate_subscriber(self, connection, attribute, value=None, force=False):
2446        await self.gatt_server.indicate_subscriber(connection, attribute, value, force)
2447
2448    async def indicate_subscribers(self, attribute, value=None, force=False):
2449        await self.gatt_server.indicate_subscribers(attribute, value, force)
2450
2451    @host_event_handler
2452    def on_connection(
2453        self,
2454        connection_handle,
2455        transport,
2456        peer_address,
2457        peer_resolvable_address,
2458        role,
2459        connection_parameters,
2460    ):
2461        logger.debug(
2462            f'*** Connection: [0x{connection_handle:04X}] '
2463            f'{peer_address} as {HCI_Constant.role_name(role)}'
2464        )
2465        if connection_handle in self.connections:
2466            logger.warning(
2467                'new connection reuses the same handle as a previous connection'
2468            )
2469
2470        if transport == BT_BR_EDR_TRANSPORT:
2471            # Create a new connection
2472            connection = self.pending_connections.pop(peer_address)
2473            connection.complete(
2474                connection_handle, peer_resolvable_address, role, connection_parameters
2475            )
2476            self.connections[connection_handle] = connection
2477
2478            # Emit an event to notify listeners of the new connection
2479            self.emit('connection', connection)
2480        else:
2481            # Resolve the peer address if we can
2482            if self.address_resolver:
2483                if peer_address.is_resolvable:
2484                    resolved_address = self.address_resolver.resolve(peer_address)
2485                    if resolved_address is not None:
2486                        logger.debug(f'*** Address resolved as {resolved_address}')
2487                        peer_resolvable_address = peer_address
2488                        peer_address = resolved_address
2489
2490            # Guess which own address type is used for this connection.
2491            # This logic is somewhat correct but may need to be improved
2492            # when multiple advertising are run simultaneously.
2493            if self.connect_own_address_type is not None:
2494                own_address_type = self.connect_own_address_type
2495            else:
2496                own_address_type = self.advertising_own_address_type
2497
2498            # We are no longer advertising
2499            self.advertising_own_address_type = None
2500            self.advertising = False
2501
2502            if own_address_type in (
2503                OwnAddressType.PUBLIC,
2504                OwnAddressType.RESOLVABLE_OR_PUBLIC,
2505            ):
2506                self_address = self.public_address
2507            else:
2508                self_address = self.random_address
2509
2510            # Create a new connection
2511            connection = Connection(
2512                self,
2513                connection_handle,
2514                transport,
2515                self_address,
2516                peer_address,
2517                peer_resolvable_address,
2518                role,
2519                connection_parameters,
2520                ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
2521            )
2522            self.connections[connection_handle] = connection
2523
2524            # If supported, read which PHY we're connected with before
2525            # notifying listeners of the new connection.
2526            if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
2527
2528                async def read_phy():
2529                    result = await self.send_command(
2530                        HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
2531                        check_result=True,
2532                    )
2533                    connection.phy = ConnectionPHY(
2534                        result.return_parameters.tx_phy, result.return_parameters.rx_phy
2535                    )
2536                    # Emit an event to notify listeners of the new connection
2537                    self.emit('connection', connection)
2538
2539                # Do so asynchronously to not block the current event handler
2540                connection.abort_on('disconnection', read_phy())
2541
2542            else:
2543                # Emit an event to notify listeners of the new connection
2544                self.emit('connection', connection)
2545
2546    @host_event_handler
2547    def on_connection_failure(self, transport, peer_address, error_code):
2548        logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
2549
2550        # For directed advertising, this means a timeout
2551        if (
2552            transport == BT_LE_TRANSPORT
2553            and self.advertising
2554            and self.advertising_type.is_directed
2555        ):
2556            self.advertising_own_address_type = None
2557            self.advertising = False
2558
2559        # Notify listeners
2560        error = core.ConnectionError(
2561            error_code,
2562            transport,
2563            peer_address,
2564            'hci',
2565            HCI_Constant.error_name(error_code),
2566        )
2567        self.emit('connection_failure', error)
2568
2569    # FIXME: Explore a delegate-model for BR/EDR wait connection #56.
2570    @host_event_handler
2571    def on_connection_request(self, bd_addr, class_of_device, link_type):
2572        logger.debug(f'*** Connection request: {bd_addr}')
2573
2574        # match a pending future using `bd_addr`
2575        if bd_addr in self.classic_pending_accepts:
2576            future, *_ = self.classic_pending_accepts.pop(bd_addr)
2577            future.set_result((bd_addr, class_of_device, link_type))
2578
2579        # match first pending future for ANY address
2580        elif len(self.classic_pending_accepts[Address.ANY]) > 0:
2581            future = self.classic_pending_accepts[Address.ANY].pop(0)
2582            future.set_result((bd_addr, class_of_device, link_type))
2583
2584        # device configuration is set to accept any incoming connection
2585        elif self.classic_accept_any:
2586            # Save pending connection
2587            self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr)
2588
2589            self.host.send_command_sync(
2590                HCI_Accept_Connection_Request_Command(
2591                    bd_addr=bd_addr, role=0x01  # Remain the peripheral
2592                )
2593            )
2594
2595        # reject incoming connection
2596        else:
2597            self.host.send_command_sync(
2598                HCI_Reject_Connection_Request_Command(
2599                    bd_addr=bd_addr,
2600                    reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
2601                )
2602            )
2603
2604    @host_event_handler
2605    @with_connection_from_handle
2606    def on_disconnection(self, connection, reason):
2607        logger.debug(
2608            f'*** Disconnection: [0x{connection.handle:04X}] '
2609            f'{connection.peer_address} as {connection.role_name}, reason={reason}'
2610        )
2611        connection.emit('disconnection', reason)
2612
2613        # Remove the connection from the map
2614        del self.connections[connection.handle]
2615
2616        # Cleanup subsystems that maintain per-connection state
2617        self.gatt_server.on_disconnection(connection)
2618
2619        # Restart advertising if auto-restart is enabled
2620        if self.auto_restart_advertising:
2621            logger.debug('restarting advertising')
2622            self.abort_on(
2623                'flush',
2624                self.start_advertising(
2625                    advertising_type=self.advertising_type, auto_restart=True
2626                ),
2627            )
2628
2629    @host_event_handler
2630    @with_connection_from_handle
2631    def on_disconnection_failure(self, connection, error_code):
2632        logger.debug(f'*** Disconnection failed: {error_code}')
2633        error = core.ConnectionError(
2634            error_code,
2635            connection.transport,
2636            connection.peer_address,
2637            'hci',
2638            HCI_Constant.error_name(error_code),
2639        )
2640        connection.emit('disconnection_failure', error)
2641
2642    @host_event_handler
2643    @AsyncRunner.run_in_task()
2644    async def on_inquiry_complete(self):
2645        if self.auto_restart_inquiry:
2646            # Inquire again
2647            await self.start_discovery(auto_restart=True)
2648        else:
2649            self.auto_restart_inquiry = True
2650            self.discovering = False
2651            self.emit('inquiry_complete')
2652
2653    @host_event_handler
2654    @with_connection_from_handle
2655    def on_connection_authentication(self, connection):
2656        logger.debug(
2657            f'*** Connection Authentication: [0x{connection.handle:04X}] '
2658            f'{connection.peer_address} as {connection.role_name}'
2659        )
2660        connection.authenticated = True
2661        connection.emit('connection_authentication')
2662
2663    @host_event_handler
2664    @with_connection_from_handle
2665    def on_connection_authentication_failure(self, connection, error):
2666        logger.debug(
2667            f'*** Connection Authentication Failure: [0x{connection.handle:04X}] '
2668            f'{connection.peer_address} as {connection.role_name}, error={error}'
2669        )
2670        connection.emit('connection_authentication_failure', error)
2671
2672    @host_event_handler
2673    @with_connection_from_address
2674    def on_ssp_complete(self, connection):
2675        # On Secure Simple Pairing complete, in case:
2676        # - Connection isn't already authenticated
2677        # - AND we are not the initiator of the authentication
2678        # We must trigger authentication to known if we are truly authenticated
2679        if not connection.authenticating and not connection.authenticated:
2680            logger.debug(
2681                f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
2682                f'{connection.peer_address}'
2683            )
2684            asyncio.create_task(connection.authenticate())
2685
2686    # [Classic only]
2687    @host_event_handler
2688    @with_connection_from_address
2689    def on_authentication_io_capability_request(self, connection):
2690        # Ask what the pairing config should be for this connection
2691        pairing_config = self.pairing_config_factory(connection)
2692
2693        # Map the SMP IO capability to a Classic IO capability
2694        # pylint: disable=line-too-long
2695        io_capability = {
2696            smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
2697            smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
2698            smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
2699            smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
2700            smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
2701        }.get(pairing_config.delegate.io_capability)
2702
2703        if io_capability is None:
2704            logger.warning(
2705                f'cannot map IO capability ({pairing_config.delegate.io_capability}'
2706            )
2707            io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
2708
2709        # Compute the authentication requirements
2710        authentication_requirements = (
2711            # No Bonding
2712            (
2713                HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
2714                HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
2715            ),
2716            # General Bonding
2717            (
2718                HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
2719                HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
2720            ),
2721        )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
2722
2723        # Respond
2724        self.host.send_command_sync(
2725            HCI_IO_Capability_Request_Reply_Command(
2726                bd_addr=connection.peer_address,
2727                io_capability=io_capability,
2728                oob_data_present=0x00,  # Not present
2729                authentication_requirements=authentication_requirements,
2730            )
2731        )
2732
2733    # [Classic only]
2734    @host_event_handler
2735    @with_connection_from_address
2736    def on_authentication_user_confirmation_request(self, connection, code):
2737        # Ask what the pairing config should be for this connection
2738        pairing_config = self.pairing_config_factory(connection)
2739
2740        can_compare = pairing_config.delegate.io_capability not in (
2741            smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
2742            smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
2743        )
2744
2745        # Respond
2746        if can_compare:
2747
2748            async def compare_numbers():
2749                numbers_match = await connection.abort_on(
2750                    'disconnection',
2751                    pairing_config.delegate.compare_numbers(code, digits=6),
2752                )
2753                if numbers_match:
2754                    await self.host.send_command(
2755                        HCI_User_Confirmation_Request_Reply_Command(
2756                            bd_addr=connection.peer_address
2757                        )
2758                    )
2759                else:
2760                    await self.host.send_command(
2761                        HCI_User_Confirmation_Request_Negative_Reply_Command(
2762                            bd_addr=connection.peer_address
2763                        )
2764                    )
2765
2766            asyncio.create_task(compare_numbers())
2767        else:
2768
2769            async def confirm():
2770                confirm = await connection.abort_on(
2771                    'disconnection', pairing_config.delegate.confirm()
2772                )
2773                if confirm:
2774                    await self.host.send_command(
2775                        HCI_User_Confirmation_Request_Reply_Command(
2776                            bd_addr=connection.peer_address
2777                        )
2778                    )
2779                else:
2780                    await self.host.send_command(
2781                        HCI_User_Confirmation_Request_Negative_Reply_Command(
2782                            bd_addr=connection.peer_address
2783                        )
2784                    )
2785
2786            asyncio.create_task(confirm())
2787
2788    # [Classic only]
2789    @host_event_handler
2790    @with_connection_from_address
2791    def on_authentication_user_passkey_request(self, connection):
2792        # Ask what the pairing config should be for this connection
2793        pairing_config = self.pairing_config_factory(connection)
2794
2795        can_input = pairing_config.delegate.io_capability in (
2796            smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
2797            smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
2798        )
2799
2800        # Respond
2801        if can_input:
2802
2803            async def get_number():
2804                number = await connection.abort_on(
2805                    'disconnection', pairing_config.delegate.get_number()
2806                )
2807                if number is not None:
2808                    await self.host.send_command(
2809                        HCI_User_Passkey_Request_Reply_Command(
2810                            bd_addr=connection.peer_address, numeric_value=number
2811                        )
2812                    )
2813                else:
2814                    await self.host.send_command(
2815                        HCI_User_Passkey_Request_Negative_Reply_Command(
2816                            bd_addr=connection.peer_address
2817                        )
2818                    )
2819
2820            asyncio.create_task(get_number())
2821        else:
2822            self.host.send_command_sync(
2823                HCI_User_Passkey_Request_Negative_Reply_Command(
2824                    bd_addr=connection.peer_address
2825                )
2826            )
2827
2828    # [Classic only]
2829    @host_event_handler
2830    @with_connection_from_address
2831    def on_pin_code_request(self, connection):
2832        # classic legacy pairing
2833        # Ask what the pairing config should be for this connection
2834        pairing_config = self.pairing_config_factory(connection)
2835
2836        can_input = pairing_config.delegate.io_capability in (
2837            smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
2838            smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
2839        )
2840
2841        # respond the pin code
2842        if can_input:
2843
2844            async def get_pin_code():
2845                pin_code = await connection.abort_on(
2846                    'disconnection', pairing_config.delegate.get_string(16)
2847                )
2848
2849                if pin_code is not None:
2850                    pin_code = bytes(pin_code, encoding='utf-8')
2851                    pin_code_len = len(pin_code)
2852                    assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes"
2853                    await self.host.send_command(
2854                        HCI_PIN_Code_Request_Reply_Command(
2855                            bd_addr=connection.peer_address,
2856                            pin_code_length=pin_code_len,
2857                            pin_code=pin_code,
2858                        )
2859                    )
2860                else:
2861                    logger.debug("delegate.get_string() returned None")
2862                    await self.host.send_command(
2863                        HCI_PIN_Code_Request_Negative_Reply_Command(
2864                            bd_addr=connection.peer_address
2865                        )
2866                    )
2867
2868            asyncio.create_task(get_pin_code())
2869        else:
2870            self.host.send_command_sync(
2871                HCI_PIN_Code_Request_Negative_Reply_Command(
2872                    bd_addr=connection.peer_address
2873                )
2874            )
2875
2876    # [Classic only]
2877    @host_event_handler
2878    @with_connection_from_address
2879    def on_authentication_user_passkey_notification(self, connection, passkey):
2880        # Ask what the pairing config should be for this connection
2881        pairing_config = self.pairing_config_factory(connection)
2882
2883        connection.abort_on(
2884            'disconnection', pairing_config.delegate.display_number(passkey)
2885        )
2886
2887    # [Classic only]
2888    @host_event_handler
2889    @try_with_connection_from_address
2890    def on_remote_name(self, connection: Connection, address, remote_name):
2891        # Try to decode the name
2892        try:
2893            remote_name = remote_name.decode('utf-8')
2894            if connection:
2895                connection.peer_name = remote_name
2896                connection.emit('remote_name')
2897            self.emit('remote_name', address, remote_name)
2898        except UnicodeDecodeError as error:
2899            logger.warning('peer name is not valid UTF-8')
2900            if connection:
2901                connection.emit('remote_name_failure', error)
2902            else:
2903                self.emit('remote_name_failure', address, error)
2904
2905    # [Classic only]
2906    @host_event_handler
2907    @try_with_connection_from_address
2908    def on_remote_name_failure(self, connection: Connection, address, error):
2909        if connection:
2910            connection.emit('remote_name_failure', error)
2911        self.emit('remote_name_failure', address, error)
2912
2913    @host_event_handler
2914    @with_connection_from_handle
2915    def on_connection_encryption_change(self, connection, encryption):
2916        logger.debug(
2917            f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
2918            f'{connection.peer_address} as {connection.role_name}, '
2919            f'encryption={encryption}'
2920        )
2921        connection.encryption = encryption
2922        if (
2923            not connection.authenticated
2924            and encryption == HCI_Encryption_Change_Event.AES_CCM
2925        ):
2926            connection.authenticated = True
2927            connection.sc = True
2928        connection.emit('connection_encryption_change')
2929
2930    @host_event_handler
2931    @with_connection_from_handle
2932    def on_connection_encryption_failure(self, connection, error):
2933        logger.debug(
2934            f'*** Connection Encryption Failure: [0x{connection.handle:04X}] '
2935            f'{connection.peer_address} as {connection.role_name}, '
2936            f'error={error}'
2937        )
2938        connection.emit('connection_encryption_failure', error)
2939
2940    @host_event_handler
2941    @with_connection_from_handle
2942    def on_connection_encryption_key_refresh(self, connection):
2943        logger.debug(
2944            f'*** Connection Key Refresh: [0x{connection.handle:04X}] '
2945            f'{connection.peer_address} as {connection.role_name}'
2946        )
2947        connection.emit('connection_encryption_key_refresh')
2948
2949    @host_event_handler
2950    @with_connection_from_handle
2951    def on_connection_parameters_update(self, connection, connection_parameters):
2952        logger.debug(
2953            f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
2954            f'{connection.peer_address} as {connection.role_name}, '
2955            f'{connection_parameters}'
2956        )
2957        connection.parameters = connection_parameters
2958        connection.emit('connection_parameters_update')
2959
2960    @host_event_handler
2961    @with_connection_from_handle
2962    def on_connection_parameters_update_failure(self, connection, error):
2963        logger.debug(
2964            f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] '
2965            f'{connection.peer_address} as {connection.role_name}, '
2966            f'error={error}'
2967        )
2968        connection.emit('connection_parameters_update_failure', error)
2969
2970    @host_event_handler
2971    @with_connection_from_handle
2972    def on_connection_phy_update(self, connection, connection_phy):
2973        logger.debug(
2974            f'*** Connection PHY Update: [0x{connection.handle:04X}] '
2975            f'{connection.peer_address} as {connection.role_name}, '
2976            f'{connection_phy}'
2977        )
2978        connection.phy = connection_phy
2979        connection.emit('connection_phy_update')
2980
2981    @host_event_handler
2982    @with_connection_from_handle
2983    def on_connection_phy_update_failure(self, connection, error):
2984        logger.debug(
2985            f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] '
2986            f'{connection.peer_address} as {connection.role_name}, '
2987            f'error={error}'
2988        )
2989        connection.emit('connection_phy_update_failure', error)
2990
2991    @host_event_handler
2992    @with_connection_from_handle
2993    def on_connection_att_mtu_update(self, connection, att_mtu):
2994        logger.debug(
2995            f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] '
2996            f'{connection.peer_address} as {connection.role_name}, '
2997            f'{att_mtu}'
2998        )
2999        connection.att_mtu = att_mtu
3000        connection.emit('connection_att_mtu_update')
3001
3002    @host_event_handler
3003    @with_connection_from_handle
3004    def on_connection_data_length_change(
3005        self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time
3006    ):
3007        logger.debug(
3008            f'*** Connection Data Length Change: [0x{connection.handle:04X}] '
3009            f'{connection.peer_address} as {connection.role_name}'
3010        )
3011        connection.data_length = (
3012            max_tx_octets,
3013            max_tx_time,
3014            max_rx_octets,
3015            max_rx_time,
3016        )
3017        connection.emit('connection_data_length_change')
3018
3019    # [Classic only]
3020    @host_event_handler
3021    @with_connection_from_address
3022    def on_role_change(self, connection, new_role):
3023        connection.role = new_role
3024        connection.emit('role_change', new_role)
3025
3026    # [Classic only]
3027    @host_event_handler
3028    @try_with_connection_from_address
3029    def on_role_change_failure(self, connection, address, error):
3030        if connection:
3031            connection.emit('role_change_failure', error)
3032        self.emit('role_change_failure', address, error)
3033
3034    @with_connection_from_handle
3035    def on_pairing_start(self, connection):
3036        connection.emit('pairing_start')
3037
3038    @with_connection_from_handle
3039    def on_pairing(self, connection, keys, sc):
3040        connection.sc = sc
3041        connection.authenticated = True
3042        connection.emit('pairing', keys)
3043
3044    @with_connection_from_handle
3045    def on_pairing_failure(self, connection, reason):
3046        connection.emit('pairing_failure', reason)
3047
3048    @with_connection_from_handle
3049    def on_gatt_pdu(self, connection, pdu):
3050        # Parse the L2CAP payload into an ATT PDU object
3051        att_pdu = ATT_PDU.from_bytes(pdu)
3052
3053        # Conveniently, even-numbered op codes are client->server and
3054        # odd-numbered ones are server->client
3055        if att_pdu.op_code & 1:
3056            if connection.gatt_client is None:
3057                logger.warning(
3058                    color('no GATT client for connection 0x{connection_handle:04X}')
3059                )
3060                return
3061            connection.gatt_client.on_gatt_pdu(att_pdu)
3062        else:
3063            if connection.gatt_server is None:
3064                logger.warning(
3065                    color('no GATT server for connection 0x{connection_handle:04X}')
3066                )
3067                return
3068            connection.gatt_server.on_gatt_pdu(connection, att_pdu)
3069
3070    @with_connection_from_handle
3071    def on_smp_pdu(self, connection, pdu):
3072        self.smp_manager.on_smp_pdu(connection, pdu)
3073
3074    @host_event_handler
3075    @with_connection_from_handle
3076    def on_l2cap_pdu(self, connection, cid, pdu):
3077        self.l2cap_channel_manager.on_pdu(connection, cid, pdu)
3078
3079    def __str__(self):
3080        return (
3081            f'Device(name="{self.name}", '
3082            f'random_address="{self.random_address}", '
3083            f'public_address="{self.public_address}")'
3084        )
3085