• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import json
19import asyncio
20import logging
21
22from .hci import *
23from .host import Host
24from .gatt import *
25from .gap import GenericAccessService
26from .core import AdvertisingData, BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE
27from .utils import AsyncRunner, CompositeEventEmitter, setup_event_forwarding, composite_listener
28from . import gatt_client
29from . import gatt_server
30from . import smp
31from . import sdp
32from . import l2cap
33from . import keys
34
35# -----------------------------------------------------------------------------
36# Logging
37# -----------------------------------------------------------------------------
38logger = logging.getLogger(__name__)
39
40# -----------------------------------------------------------------------------
41# Constants
42# -----------------------------------------------------------------------------
43DEVICE_DEFAULT_ADDRESS              = '00:00:00:00:00:00'
44DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000  # ms
45DEVICE_DEFAULT_ADVERTISING_DATA     = ''
46DEVICE_DEFAULT_NAME                 = 'Bumble'
47DEVICE_DEFAULT_INQUIRY_LENGTH       = 8  # 10.24 seconds
48DEVICE_DEFAULT_CLASS_OF_DEVICE      = 0
49DEVICE_DEFAULT_SCAN_RESPONSE_DATA   = b''
50DEVICE_DEFAULT_DATA_LENGTH          = (27, 328, 27, 328)
51DEVICE_DEFAULT_SCAN_INTERVAL        = 60  # ms
52DEVICE_DEFAULT_SCAN_WINDOW          = 60  # ms
53DEVICE_MIN_SCAN_INTERVAL            = 25
54DEVICE_MAX_SCAN_INTERVAL            = 10240
55DEVICE_MIN_SCAN_WINDOW              = 25
56DEVICE_MAX_SCAN_WINDOW              = 10240
57
58# -----------------------------------------------------------------------------
59# Classes
60# -----------------------------------------------------------------------------
61
62
63# -----------------------------------------------------------------------------
64class AdvertisementDataAccumulator:
65    def __init__(self):
66        self.advertising_data = AdvertisingData()
67        self.last_advertisement_type = None
68        self.connectable = False
69        self.flushable = False
70
71    def update(self, data, advertisement_type):
72        if advertisement_type == HCI_LE_Advertising_Report_Event.SCAN_RSP:
73            if self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP:
74                self.advertising_data.append(data)
75                self.flushable = True
76        else:
77            self.advertising_data = AdvertisingData.from_bytes(data)
78            self.flushable = self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP
79
80        if advertisement_type == HCI_LE_Advertising_Report_Event.ADV_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND:
81            self.connectable = True
82        elif advertisement_type == HCI_LE_Advertising_Report_Event.ADV_SCAN_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND:
83            self.connectable = False
84
85        self.last_advertisement_type = advertisement_type
86
87
88# -----------------------------------------------------------------------------
89class Peer:
90    def __init__(self, connection):
91        self.connection = connection
92
93        # Create a GATT client for the connection
94        self.gatt_client = gatt_client.Client(connection)
95        connection.gatt_client = self.gatt_client
96
97    @property
98    def services(self):
99        return self.gatt_client.services
100
101    async def request_mtu(self, mtu):
102        return await self.gatt_client.request_mtu(mtu)
103
104    async def discover_service(self, uuid):
105        return await self.gatt_client.discover_service(uuid)
106
107    async def discover_services(self, uuids = []):
108        return await self.gatt_client.discover_services(uuids)
109
110    async def discover_included_services(self, service):
111        return await self.gatt_client.discover_included_services(service)
112
113    async def discover_characteristics(self, uuids = [], service = None):
114        return await self.gatt_client.discover_characteristics(uuids = uuids, service = service)
115
116    async def discover_descriptors(self, characteristic = None, start_handle = None, end_handle = None):
117        return await self.gatt_client.discover_descriptors(characteristic, start_handle, end_handle)
118
119    async def discover_attributes(self):
120        return await self.gatt_client.discover_attributes()
121
122    async def subscribe(self, characteristic, subscriber=None):
123        return await self.gatt_client.subscribe(characteristic, subscriber)
124
125    async def read_value(self, attribute):
126        return await self.gatt_client.read_value(attribute)
127
128    async def write_value(self, attribute, value, with_response=False):
129        return await self.gatt_client.write_value(attribute, value, with_response)
130
131    async def read_characteristics_by_uuid(self, uuid, service=None):
132        return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
133
134    def get_services_by_uuid(self, uuid):
135        return self.gatt_client.get_services_by_uuid(uuid)
136
137    def get_characteristics_by_uuid(self, uuid, service = None):
138        return self.gatt_client.get_characteristics_by_uuid(uuid, service)
139
140    def create_service_proxy(self, proxy_class):
141        return proxy_class.from_client(self.gatt_client)
142
143    async def discover_service_and_create_proxy(self, proxy_class):
144        # Discover the first matching service and its characteristics
145        services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
146        if services:
147            service = services[0]
148            await service.discover_characteristics()
149            return self.create_service_proxy(proxy_class)
150
151    # [Classic only]
152    async def request_name(self):
153        return await self.connection.request_remote_name()
154
155    def __str__(self):
156        return f'{self.connection.peer_address} as {self.connection.role_name}'
157
158
159# -----------------------------------------------------------------------------
160class Connection(CompositeEventEmitter):
161    @composite_listener
162    class Listener:
163        def on_disconnection(self, reason):
164            pass
165
166        def on_connection_parameters_update(self):
167            pass
168
169        def on_connection_parameters_update_failure(self, error):
170            pass
171
172        def on_connection_phy_update(self):
173            pass
174
175        def on_connection_phy_update_failure(self, error):
176            pass
177
178        def on_connection_att_mtu_update(self):
179            pass
180
181        def on_connection_encryption_change(self):
182            pass
183
184        def on_connection_encryption_key_refresh(self):
185            pass
186
187    def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters):
188        super().__init__()
189        self.device                  = device
190        self.handle                  = handle
191        self.transport               = transport
192        self.peer_address            = peer_address
193        self.peer_resolvable_address = peer_resolvable_address
194        self.peer_name               = None  # Classic only
195        self.role                    = role
196        self.parameters              = parameters
197        self.encryption              = 0
198        self.authenticated           = False
199        self.phy                     = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
200        self.att_mtu                 = ATT_DEFAULT_MTU
201        self.data_length             = DEVICE_DEFAULT_DATA_LENGTH
202        self.gatt_client             = None  # Per-connection client
203        self.gatt_server             = device.gatt_server  # By default, use the device's shared server
204
205    @property
206    def role_name(self):
207        return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
208
209    @property
210    def is_encrypted(self):
211        return self.encryption != 0
212
213    def send_l2cap_pdu(self, cid, pdu):
214        self.device.send_l2cap_pdu(self.handle, cid, pdu)
215
216    def create_l2cap_connector(self, psm):
217        return self.device.create_l2cap_connector(self, psm)
218
219    async def disconnect(self, reason = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR):
220        return await self.device.disconnect(self, reason)
221
222    async def pair(self):
223        return await self.device.pair(self)
224
225    def request_pairing(self):
226        return self.device.request_pairing(self)
227
228    # [Classic only]
229    async def authenticate(self):
230        return await self.device.authenticate(self)
231
232    async def encrypt(self):
233        return await self.device.encrypt(self)
234
235    async def update_parameters(
236        self,
237        conn_interval_min,
238        conn_interval_max,
239        conn_latency,
240        supervision_timeout
241    ):
242        return await self.device.update_connection_parameters(
243            self,
244            conn_interval_min,
245            conn_interval_max,
246            conn_latency,
247            supervision_timeout
248        )
249
250    # [Classic only]
251    async def request_remote_name(self):
252        return await self.device.request_remote_name(self)
253
254    def __str__(self):
255        return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
256
257
258# -----------------------------------------------------------------------------
259class DeviceConfiguration:
260    def __init__(self):
261        # Setup defaults
262        self.name                     = DEVICE_DEFAULT_NAME
263        self.address                  = DEVICE_DEFAULT_ADDRESS
264        self.class_of_device          = DEVICE_DEFAULT_CLASS_OF_DEVICE
265        self.scan_response_data       = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
266        self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
267        self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
268        self.le_enabled               = True
269        # LE host enable 2nd parameter
270        self.le_simultaneous_enabled  = True
271        self.classic_sc_enabled       = True
272        self.classic_ssp_enabled      = True
273        self.advertising_data = bytes(
274            AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
275        )
276        self.irk      = bytes(16)  # This really must be changed for any level of security
277        self.keystore = None
278
279    def load_from_dict(self, config):
280        # Load simple properties
281        self.name = config.get('name', self.name)
282        self.address = Address(config.get('address', self.address))
283        self.class_of_device = config.get('class_of_device', self.class_of_device)
284        self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
285        self.advertising_interval_max = self.advertising_interval_min
286        self.keystore                 = config.get('keystore')
287        self.le_enabled               = config.get('le_enabled', self.le_enabled)
288        self.le_simultaneous_enabled  = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
289        self.classic_sc_enabled       = config.get('classic_sc_enabled', self.classic_sc_enabled)
290        self.classic_ssp_enabled      = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
291
292        # Load or synthesize an IRK
293        irk = config.get('irk')
294        if irk:
295            self.irk = bytes.fromhex(irk)
296        else:
297            # Construct an IRK from the address bytes
298            # NOTE: this is not secure, but will always give the same IRK for the same address
299            address_bytes = bytes(self.address)
300            self.irk = (address_bytes * 3)[:16]
301
302        # Load advertising data
303        advertising_data = config.get('advertising_data')
304        if advertising_data:
305            self.advertising_data = bytes.fromhex(advertising_data)
306
307    def load_from_file(self, filename):
308        with open(filename, 'r') as file:
309            self.load_from_dict(json.load(file))
310
311# -----------------------------------------------------------------------------
312# Decorators used with the following Device class
313# (we define them outside of the Device class, because defining decorators
314#  within a class requires unnecessarily complicated acrobatics)
315# -----------------------------------------------------------------------------
316
317# Decorator that converts the first argument from a connection handle to a connection
318def with_connection_from_handle(function):
319    @functools.wraps(function)
320    def wrapper(self, connection_handle, *args, **kwargs):
321        if (connection := self.lookup_connection(connection_handle)) is None:
322            raise ValueError('no connection for handle')
323        return function(self, connection, *args, **kwargs)
324    return wrapper
325
326
327# Decorator that converts the first argument from a bluetooth address to a connection
328def with_connection_from_address(function):
329    @functools.wraps(function)
330    def wrapper(self, address, *args, **kwargs):
331        for connection in self.connections.values():
332            if connection.peer_address == address:
333                return function(self, connection, *args, **kwargs)
334        raise ValueError('no connection for address')
335    return wrapper
336
337
338# Decorator that adds a method to the list of event handlers for host events.
339# This assumes that the method name starts with `on_`
340def host_event_handler(function):
341    device_host_event_handlers.append(function.__name__[3:])
342    return function
343
344
345# List of host event handlers for the Device class.
346# (we define this list outside the class, because referencing a class in method
347#  decorators is not straightforward)
348device_host_event_handlers = []
349
350
351# -----------------------------------------------------------------------------
352class Device(CompositeEventEmitter):
353
354    @composite_listener
355    class Listener:
356        def on_advertisement(self, address, data, rssi, advertisement_type):
357            pass
358
359        def on_inquiry_result(self, address, class_of_device, data, rssi):
360            pass
361
362        def on_connection(self, connection):
363            pass
364
365        def on_connection_failure(self, error):
366            pass
367
368        def on_characteristic_subscription(self, connection, characteristic, notify_enabled, indicate_enabled):
369            pass
370
371    @classmethod
372    def with_hci(cls, name, address, hci_source, hci_sink):
373        '''
374        Create a Device instance with a Host configured to communicate with a controller
375        through an HCI source/sink
376        '''
377        host = Host(controller_source = hci_source, controller_sink = hci_sink)
378        return cls(name = name, address = address, host = host)
379
380    @classmethod
381    def from_config_file(cls, filename):
382        config = DeviceConfiguration()
383        config.load_from_file(filename)
384        return cls(config=config)
385
386    @classmethod
387    def from_config_file_with_hci(cls, filename, hci_source, hci_sink):
388        config = DeviceConfiguration()
389        config.load_from_file(filename)
390        host = Host(controller_source = hci_source, controller_sink = hci_sink)
391        return cls(config = config, host = host)
392
393    def __init__(self, name = None, address = None, config = None, host = None, generic_access_service = True):
394        super().__init__()
395
396        self._host                    = None
397        self.powered_on               = False
398        self.advertising              = False
399        self.auto_restart_advertising = False
400        self.command_timeout          = 10  # seconds
401        self.gatt_server              = gatt_server.Server(self)
402        self.sdp_server               = sdp.Server(self)
403        self.l2cap_channel_manager    = l2cap.ChannelManager()
404        self.advertisement_data       = {}
405        self.scanning                 = False
406        self.discovering              = False
407        self.connecting               = False
408        self.disconnecting            = False
409        self.connections              = {}  # Connections, by connection handle
410        self.classic_enabled          = False
411        self.discoverable             = False
412        self.connectable              = False
413        self.inquiry_response         = None
414        self.address_resolver         = None
415
416        # Use the initial config or a default
417        self.public_address = Address('00:00:00:00:00:00')
418        if config is None:
419            config = DeviceConfiguration()
420        self.name                     = config.name
421        self.random_address           = config.address
422        self.class_of_device          = config.class_of_device
423        self.scan_response_data       = config.scan_response_data
424        self.advertising_data         = config.advertising_data
425        self.advertising_interval_min = config.advertising_interval_min
426        self.advertising_interval_max = config.advertising_interval_max
427        self.keystore                 = keys.KeyStore.create_for_device(config)
428        self.irk                      = config.irk
429        self.le_enabled               = config.le_enabled
430        self.le_simultaneous_enabled  = config.le_simultaneous_enabled
431        self.classic_ssp_enabled      = config.classic_ssp_enabled
432        self.classic_sc_enabled       = config.classic_sc_enabled
433
434        # If a name is passed, override the name from the config
435        if name:
436            self.name = name
437
438        # If an address is passed, override the address from the config
439        if address:
440            if type(address) is str:
441                address = Address(address)
442            self.random_address = address
443
444        # Setup SMP
445        # TODO: allow using a public address
446        self.smp_manager = smp.Manager(self, self.random_address)
447
448        # Register the SDP server with the L2CAP Channel Manager
449        self.sdp_server.register(self.l2cap_channel_manager)
450
451        # Add a GAP Service if requested
452        if generic_access_service:
453            self.gatt_server.add_service(GenericAccessService(self.name))
454
455        # Forward some events
456        setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
457
458        # Set the initial host
459        self.host = host
460
461    @property
462    def host(self):
463        return self._host
464
465    @host.setter
466    def host(self, host):
467        # Unsubscribe from events from the current host
468        if self._host:
469            for event_name in device_host_event_handlers:
470                self._host.remove_listener(event_name, getattr(self, f'on_{event_name}'))
471
472        # Subscribe to events from the new host
473        if host:
474            for event_name in device_host_event_handlers:
475                host.on(event_name, getattr(self, f'on_{event_name}'))
476
477        # Update the references to the new host
478        self._host                      = host
479        self.l2cap_channel_manager.host = host
480
481        # Set providers for the new host
482        if host:
483            host.long_term_key_provider = self.get_long_term_key
484            host.link_key_provider      = self.get_link_key
485
486    @property
487    def sdp_service_records(self):
488        return self.sdp_server.service_records
489
490    @sdp_service_records.setter
491    def sdp_service_records(self, service_records):
492        self.sdp_server.service_records = service_records
493
494    def lookup_connection(self, connection_handle):
495        if connection := self.connections.get(connection_handle):
496            return connection
497
498    def find_connection_by_bd_addr(self, bd_addr, transport=None):
499        for connection in self.connections.values():
500            if connection.peer_address.get_bytes() == bd_addr.get_bytes():
501                if transport is None or connection.transport == transport:
502                    return connection
503
504    def register_l2cap_server(self, psm, server):
505        self.l2cap_channel_manager.register_server(psm, server)
506
507    def create_l2cap_connector(self, connection, psm):
508        return lambda: self.l2cap_channel_manager.connect(connection, psm)
509
510    def create_l2cap_registrar(self, psm):
511        return lambda handler: self.register_l2cap_server(psm, handler)
512
513    def send_l2cap_pdu(self, connection_handle, cid, pdu):
514        self.host.send_l2cap_pdu(connection_handle, cid, pdu)
515
516    async def send_command(self, command):
517        try:
518            return await asyncio.wait_for(self.host.send_command(command), self.command_timeout)
519        except asyncio.TimeoutError:
520            logger.warning('!!! Command timed out')
521
522    async def power_on(self):
523        # Reset the controller
524        await self.host.reset()
525
526        response = await self.send_command(HCI_Read_BD_ADDR_Command())
527        if response.return_parameters.status == HCI_SUCCESS:
528            logger.debug(color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow'))
529            self.public_address = response.return_parameters.bd_addr
530
531
532        await self.send_command(HCI_Write_LE_Host_Support_Command(
533            le_supported_host = int(self.le_enabled),
534            simultaneous_le_host = int(self.le_simultaneous_enabled),
535        ))
536        if self.le_enabled:
537            # Set the controller address
538            await self.send_command(HCI_LE_Set_Random_Address_Command(
539                random_address = self.random_address
540            ))
541
542            # Load the address resolving list
543            if self.keystore:
544                await self.send_command(HCI_LE_Clear_Resolving_List_Command())
545
546                resolving_keys = await self.keystore.get_resolving_keys()
547                for (irk, address) in resolving_keys:
548                    await self.send_command(
549                        HCI_LE_Add_Device_To_Resolving_List_Command(
550                            peer_identity_address_type = address.address_type,
551                            peer_identity_address      = address,
552                            peer_irk                   = irk,
553                            local_irk                  = self.irk
554                        )
555                    )
556
557                # Enable address resolution
558                # await self.send_command(
559                #     HCI_LE_Set_Address_Resolution_Enable_Command(address_resolution_enable=1)
560                # )
561
562                # Create a host-side address resolver
563                self.address_resolver = smp.AddressResolver(resolving_keys)
564
565        if self.classic_enabled:
566            await self.send_command(
567                HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
568            )
569            await self.send_command(
570                HCI_Write_Class_Of_Device_Command(class_of_device = self.class_of_device)
571            )
572            await self.send_command(
573                HCI_Write_Simple_Pairing_Mode_Command(
574                    simple_pairing_mode=int(self.classic_ssp_enabled))
575            )
576            await self.send_command(
577                HCI_Write_Secure_Connections_Host_Support_Command(
578                    secure_connections_host_support=int(self.classic_sc_enabled))
579            )
580
581        # Let the SMP manager know about the address
582        # TODO: allow using a public address
583        self.smp_manager.address = self.random_address
584
585        # Done
586        self.powered_on = True
587
588    async def start_advertising(self, auto_restart=False):
589        self.auto_restart_advertising = auto_restart
590
591        # If we're advertising, stop first
592        if self.advertising:
593            await self.stop_advertising()
594
595        # Set/update the advertising data
596        await self.send_command(HCI_LE_Set_Advertising_Data_Command(
597            advertising_data = self.advertising_data
598        ))
599
600        # Set/update the scan response data
601        await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
602            scan_response_data = self.scan_response_data
603        ))
604
605        # Set the advertising parameters
606        await self.send_command(HCI_LE_Set_Advertising_Parameters_Command(
607            # TODO: use real values, not fixed ones
608            advertising_interval_min  = self.advertising_interval_min,
609            advertising_interval_max  = self.advertising_interval_max,
610            advertising_type          = HCI_LE_Set_Advertising_Parameters_Command.ADV_IND,
611            own_address_type          = Address.RANDOM_DEVICE_ADDRESS,  # TODO: allow using the public address
612            peer_address_type         = Address.PUBLIC_DEVICE_ADDRESS,
613            peer_address              = Address('00:00:00:00:00:00'),
614            advertising_channel_map   = 7,
615            advertising_filter_policy = 0
616        ))
617
618        # Enable advertising
619        await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
620            advertising_enable = 1
621        ))
622
623        self.advertising = True
624
625    async def stop_advertising(self):
626        # Disable advertising
627        if self.advertising:
628            await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
629                advertising_enable = 0
630            ))
631
632            self.advertising = False
633
634    @property
635    def is_advertising(self):
636        return self.advertising
637
638    async def start_scanning(
639        self,
640        active=True,
641        scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL,  # Scan interval in ms
642        scan_window=DEVICE_DEFAULT_SCAN_WINDOW,      # Scan window in ms
643        own_address_type=Address.RANDOM_DEVICE_ADDRESS,
644        filter_duplicates=False
645    ):
646        # Check that the arguments are legal
647        if scan_interval < scan_window:
648            raise ValueError('scan_interval must be >= scan_window')
649        if scan_interval < DEVICE_MIN_SCAN_INTERVAL or scan_interval > DEVICE_MAX_SCAN_INTERVAL:
650            raise ValueError('scan_interval out of range')
651        if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
652            raise ValueError('scan_interval out of range')
653
654        # Set the scanning parameters
655        scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
656        await self.send_command(HCI_LE_Set_Scan_Parameters_Command(
657            le_scan_type           = scan_type,
658            le_scan_interval       = int(scan_window / 0.625),
659            le_scan_window         = int(scan_window / 0.625),
660            own_address_type       = own_address_type,
661            scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
662        ))
663
664        # Enable scanning
665        await self.send_command(HCI_LE_Set_Scan_Enable_Command(
666            le_scan_enable    = 1,
667            filter_duplicates = 1 if filter_duplicates else 0
668        ))
669        self.scanning = True
670
671    async def stop_scanning(self):
672        await self.send_command(HCI_LE_Set_Scan_Enable_Command(
673            le_scan_enable    = 0,
674            filter_duplicates = 0
675        ))
676        self.scanning = False
677
678    @property
679    def is_scanning(self):
680        return self.scanning
681
682    @host_event_handler
683    def on_advertising_report(self, address, data, rssi, advertisement_type):
684        if not (accumulator := self.advertisement_data.get(address)):
685            accumulator = AdvertisementDataAccumulator()
686            self.advertisement_data[address] = accumulator
687        accumulator.update(data, advertisement_type)
688        if accumulator.flushable:
689            self.emit(
690                'advertisement',
691                address,
692                accumulator.advertising_data,
693                rssi,
694                accumulator.connectable
695            )
696
697    async def start_discovery(self):
698        await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE))
699
700        response = await self.send_command(HCI_Inquiry_Command(
701            lap            = HCI_GENERAL_INQUIRY_LAP,
702            inquiry_length = DEVICE_DEFAULT_INQUIRY_LENGTH,
703            num_responses  = 0  # Unlimited number of responses.
704        ))
705        if response.status != HCI_Command_Status_Event.PENDING:
706            self.discovering = False
707            raise RuntimeError(f'HCI_Inquiry command failed: {HCI_Constant.status_name(response.status)} ({response.status})')
708
709        self.discovering = True
710
711    async def stop_discovery(self):
712        await self.send_command(HCI_Inquiry_Cancel_Command())
713        self.discovering = False
714
715    @host_event_handler
716    def on_inquiry_result(self, address, class_of_device, data, rssi):
717        self.emit(
718            'inquiry_result',
719            address,
720            class_of_device,
721            AdvertisingData.from_bytes(data),
722            rssi
723        )
724
725    async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled):
726        if inquiry_scan_enabled and page_scan_enabled:
727            scan_enable = 0x03
728        elif page_scan_enabled:
729            scan_enable = 0x02
730        elif inquiry_scan_enabled:
731            scan_enable = 0x01
732        else:
733            scan_enable = 0x00
734
735        return await self.send_command(HCI_Write_Scan_Enable_Command(scan_enable = scan_enable))
736
737    async def set_discoverable(self, discoverable=True):
738        self.discoverable = discoverable
739        if self.classic_enabled:
740            # Synthesize an inquiry response if none is set already
741            if self.inquiry_response is None:
742                self.inquiry_response = bytes(
743                    AdvertisingData([
744                        (AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))
745                    ])
746                )
747
748            # Update the controller
749            await self.host.send_command(
750                HCI_Write_Extended_Inquiry_Response_Command(
751                    fec_required              = 0,
752                    extended_inquiry_response = self.inquiry_response
753                )
754            )
755            await self.set_scan_enable(
756                inquiry_scan_enabled = self.discoverable,
757                page_scan_enabled    = self.connectable
758            )
759
760    async def set_connectable(self, connectable=True):
761        self.connectable = connectable
762        if self.classic_enabled:
763            await self.set_scan_enable(
764                inquiry_scan_enabled = self.discoverable,
765                page_scan_enabled    = self.connectable
766            )
767
768    async def connect(self, peer_address, transport=BT_LE_TRANSPORT):
769        '''
770        Request a connection to a peer.
771        This method cannot be called if there is already a pending connection.
772        '''
773
774        # Adjust the transport automatically if we need to
775        if transport == BT_LE_TRANSPORT and not self.le_enabled:
776            transport = BT_BR_EDR_TRANSPORT
777        elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled:
778            transport = BT_LE_TRANSPORT
779
780        # Check that there isn't already a pending connection
781        if self.is_connecting:
782            raise InvalidStateError('connection already pending')
783
784        if type(peer_address) is str:
785            try:
786                peer_address = Address(peer_address)
787            except ValueError:
788                # If the address is not parssable, assume it is a name instead
789                logger.debug('looking for peer by name')
790                peer_address = await self.find_peer_by_name(peer_address, transport)
791
792        # Create a future so that we can wait for the connection's result
793        pending_connection = asyncio.get_running_loop().create_future()
794        self.on('connection', pending_connection.set_result)
795        self.on('connection_failure', pending_connection.set_exception)
796
797        # Tell the controller to connect
798        if transport == BT_LE_TRANSPORT:
799            # TODO: use real values, not fixed ones
800            result = await self.send_command(HCI_LE_Create_Connection_Command(
801                le_scan_interval        = 96,
802                le_scan_window          = 96,
803                initiator_filter_policy = 0,
804                peer_address_type       = peer_address.address_type,
805                peer_address            = peer_address,
806                own_address_type        = Address.RANDOM_DEVICE_ADDRESS,
807                conn_interval_min       = 12,
808                conn_interval_max       = 24,
809                conn_latency            = 0,
810                supervision_timeout     = 72,
811                minimum_ce_length       = 0,
812                maximum_ce_length       = 0
813            ))
814        else:
815            # TODO: use real values, not fixed ones
816            result = await self.send_command(HCI_Create_Connection_Command(
817                bd_addr                   = peer_address,
818                packet_type               = 0xCC18,  # FIXME: change
819                page_scan_repetition_mode = HCI_R2_PAGE_SCAN_REPETITION_MODE,
820                clock_offset              = 0x0000,
821                allow_role_switch         = 0x01,
822                reserved                  = 0
823            ))
824
825        try:
826            if result.status != HCI_Command_Status_Event.PENDING:
827                raise RuntimeError(f'HCI_LE_Create_Connection_Command failed: {HCI_Constant.status_name(result.status)} ({result.status})')
828
829            # Wait for the connection process to complete
830            self.connecting = True
831            return await pending_connection
832        finally:
833            self.remove_listener('connection', pending_connection.set_result)
834            self.remove_listener('connection_failure', pending_connection.set_exception)
835            self.connecting = False
836
837    @property
838    def is_connecting(self):
839        return self.connecting
840
841    @property
842    def is_disconnecting(self):
843        return self.disconnecting
844
845    async def cancel_connection(self):
846        if not self.is_connecting:
847            return
848        await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
849
850    async def disconnect(self, connection, reason):
851        # Create a future so that we can wait for the disconnection's result
852        pending_disconnection = asyncio.get_running_loop().create_future()
853        connection.on('disconnection', pending_disconnection.set_result)
854        connection.on('disconnection_failure', pending_disconnection.set_exception)
855
856        # Request a disconnection
857        result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
858
859        try:
860            if result.status != HCI_Command_Status_Event.PENDING:
861                raise RuntimeError(f'HCI_Disconnect_Command failed: {HCI_Constant.status_name(result.status)} ({result.status})')
862
863            # Wait for the disconnection process to complete
864            self.disconnecting = True
865            return await pending_disconnection
866        finally:
867            connection.remove_listener('disconnection', pending_disconnection.set_result)
868            connection.remove_listener('disconnection_failure', pending_disconnection.set_exception)
869            self.disconnecting = False
870
871    async def update_connection_parameters(
872        self,
873        connection,
874        conn_interval_min,
875        conn_interval_max,
876        conn_latency,
877        supervision_timeout,
878        minimum_ce_length = 0,
879        maximum_ce_length = 0
880    ):
881        '''
882        NOTE: the name of the parameters may look odd, but it just follows the names used in the Bluetooth spec.
883        '''
884        await self.send_command(HCI_LE_Connection_Update_Command(
885            connection_handle   = connection.handle,
886            conn_interval_min   = conn_interval_min,
887            conn_interval_max   = conn_interval_max,
888            conn_latency        = conn_latency,
889            supervision_timeout = supervision_timeout,
890            minimum_ce_length   = minimum_ce_length,
891            maximum_ce_length   = maximum_ce_length
892        ))
893        # TODO: check result
894
895    async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
896        """
897        Scan for a peer with a give name and return its address and transport
898        """
899
900        # Create a future to wait for an address to be found
901        peer_address = asyncio.get_running_loop().create_future()
902
903        # Scan/inquire with event handlers to handle scan/inquiry results
904        def on_peer_found(address, ad_data):
905            local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME)
906            if local_name is None:
907                local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME)
908            if local_name is not None:
909                if local_name.decode('utf-8') == name:
910                    peer_address.set_result(address)
911        try:
912            handler = None
913            if transport == BT_LE_TRANSPORT:
914                event_name = 'advertisement'
915                handler = self.on(
916                    event_name,
917                    lambda address, ad_data, rssi, connectable:
918                        on_peer_found(address, ad_data)
919                )
920
921                was_scanning = self.scanning
922                if not self.scanning:
923                    await self.start_scanning(filter_duplicates=True)
924
925            elif transport == BT_BR_EDR_TRANSPORT:
926                event_name = 'inquiry_result'
927                handler = self.on(
928                    event_name,
929                    lambda address, class_of_device, eir_data, rssi:
930                        on_peer_found(address, eir_data)
931                )
932
933                was_discovering = self.discovering
934                if not self.discovering:
935                    await self.start_discovery()
936            else:
937                return None
938
939            return await peer_address
940        finally:
941            if handler is not None:
942                self.remove_listener(event_name, handler)
943
944            if transport == BT_LE_TRANSPORT and not was_scanning:
945                await self.stop_scanning()
946            elif transport == BT_BR_EDR_TRANSPORT and not was_discovering:
947                await self.stop_discovery()
948
949    @property
950    def pairing_config_factory(self):
951        return self.smp_manager.pairing_config_factory
952
953    @pairing_config_factory.setter
954    def pairing_config_factory(self, pairing_config_factory):
955        self.smp_manager.pairing_config_factory = pairing_config_factory
956
957    async def pair(self, connection):
958        return await self.smp_manager.pair(connection)
959
960    def request_pairing(self, connection):
961        return self.smp_manager.request_pairing(connection)
962
963    async def get_long_term_key(self, connection_handle, rand, ediv):
964        if (connection := self.lookup_connection(connection_handle)) is None:
965            return
966
967        # Start by looking for the key in an SMP session
968        ltk = self.smp_manager.get_long_term_key(connection, rand, ediv)
969        if ltk is not None:
970            return ltk
971
972        # Then look for the key in the keystore
973        if self.keystore is not None:
974            keys = await self.keystore.get(str(connection.peer_address))
975            if keys is not None:
976                logger.debug('found keys in the key store')
977                if keys.ltk:
978                    return keys.ltk.value
979                elif connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
980                    return keys.ltk_central.value
981                elif connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
982                    return keys.ltk_peripheral.value
983
984    async def get_link_key(self, address):
985        # Look for the key in the keystore
986        if self.keystore is not None:
987            keys = await self.keystore.get(str(address))
988            if keys is not None:
989                logger.debug('found keys in the key store')
990                return keys.link_key.value
991
992    # [Classic only]
993    async def authenticate(self, connection):
994        # Set up event handlers
995        pending_authentication = asyncio.get_running_loop().create_future()
996
997        def on_authentication():
998            pending_authentication.set_result(None)
999
1000        def on_authentication_failure(error_code):
1001            pending_authentication.set_exception(HCI_Error(error_code))
1002
1003        connection.on('connection_authentication', on_authentication)
1004        connection.on('connection_authentication_failure',  on_authentication_failure)
1005
1006        # Request the authentication
1007        try:
1008            result = await self.send_command(
1009                HCI_Authentication_Requested_Command(connection_handle = connection.handle)
1010            )
1011            if result.status != HCI_COMMAND_STATUS_PENDING:
1012                logger.warn(f'HCI_Authentication_Requested_Command failed: {HCI_Constant.error_name(result.status)}')
1013                raise HCI_Error(result.status)
1014
1015            # Wait for the authentication to complete
1016            await pending_authentication
1017        finally:
1018            connection.remove_listener('connection_authentication', on_authentication)
1019            connection.remove_listener('connection_authentication_failure',  on_authentication_failure)
1020
1021    async def encrypt(self, connection):
1022        # Set up event handlers
1023        pending_encryption = asyncio.get_running_loop().create_future()
1024
1025        def on_encryption_change():
1026            pending_encryption.set_result(None)
1027
1028        def on_encryption_failure(error_code):
1029            pending_encryption.set_exception(HCI_Error(error_code))
1030
1031        connection.on('connection_encryption_change',  on_encryption_change)
1032        connection.on('connection_encryption_failure', on_encryption_failure)
1033
1034        # Request the encryption
1035        try:
1036            if connection.transport == BT_LE_TRANSPORT:
1037                # Look for a key in the key store
1038                if self.keystore is None:
1039                    raise RuntimeError('no key store')
1040
1041                keys = await self.keystore.get(str(connection.peer_address))
1042                if keys is None:
1043                    raise RuntimeError('keys not found in key store')
1044
1045                if keys.ltk is not None:
1046                    ltk  = keys.ltk.value
1047                    rand = bytes(8)
1048                    ediv = 0
1049                elif keys.ltk_central is not None:
1050                    ltk  = keys.ltk_central.value
1051                    rand = keys.ltk_central.rand
1052                    ediv = keys.ltk_central.ediv
1053                else:
1054                    raise RuntimeError('no LTK found for peer')
1055
1056                if connection.role != HCI_CENTRAL_ROLE:
1057                    raise InvalidStateError('only centrals can start encryption')
1058
1059                result = await self.send_command(
1060                    HCI_LE_Start_Encryption_Command(
1061                        connection_handle     = connection.handle,
1062                        random_number         = rand,
1063                        encrypted_diversifier = ediv,
1064                        long_term_key         = ltk
1065                    )
1066                )
1067
1068                if result.status != HCI_COMMAND_STATUS_PENDING:
1069                    logger.warn(f'HCI_LE_Start_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
1070                    raise HCI_Error(result.status)
1071            else:
1072                result = await self.send_command(
1073                    HCI_Set_Connection_Encryption_Command(
1074                        connection_handle = connection.handle,
1075                        encryption_enable = 0x01
1076                    )
1077                )
1078
1079                if result.status != HCI_COMMAND_STATUS_PENDING:
1080                    logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
1081                    raise HCI_Error(result.status)
1082
1083            # Wait for the result
1084            await pending_encryption
1085        finally:
1086            connection.remove_listener('connection_encryption_change',  on_encryption_change)
1087            connection.remove_listener('connection_encryption_failure', on_encryption_failure)
1088
1089    # [Classic only]
1090    async def request_remote_name(self, connection):
1091        # Set up event handlers
1092        pending_name = asyncio.get_running_loop().create_future()
1093
1094        def on_remote_name():
1095            pending_name.set_result(connection.peer_name)
1096
1097        def on_remote_name_failure(error_code):
1098            pending_name.set_exception(HCI_Error(error_code))
1099
1100        connection.on('remote_name', on_remote_name)
1101        connection.on('remote_name_failure', on_remote_name_failure)
1102
1103        try:
1104            result = await self.send_command(
1105                HCI_Remote_Name_Request_Command(
1106                    bd_addr                   = connection.peer_address,
1107                    page_scan_repetition_mode = HCI_Remote_Name_Request_Command.R0,  # TODO investigate other options
1108                    reserved                  = 0,
1109                    clock_offset              = 0  # TODO investigate non-0 values
1110                )
1111            )
1112
1113            if result.status != HCI_COMMAND_STATUS_PENDING:
1114                logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
1115                raise HCI_Error(result.status)
1116
1117            # Wait for the result
1118            return await pending_name
1119        finally:
1120            connection.remove_listener('remote_name', on_remote_name)
1121            connection.remove_listener('remote_name_failure', on_remote_name_failure)
1122
1123    # [Classic only]
1124    @host_event_handler
1125    def on_link_key(self, bd_addr, link_key, key_type):
1126        # Store the keys in the key store
1127        if self.keystore:
1128            pairing_keys = keys.PairingKeys()
1129            pairing_keys.link_key = keys.PairingKeys.Key(value = link_key)
1130
1131            async def store_keys():
1132                try:
1133                    await self.keystore.update(str(bd_addr), pairing_keys)
1134                except Exception as error:
1135                    logger.warn(f'!!! error while storing keys: {error}')
1136
1137            asyncio.create_task(store_keys())
1138
1139    def add_service(self, service):
1140        self.gatt_server.add_service(service)
1141
1142    def add_services(self, services):
1143        self.gatt_server.add_services(services)
1144
1145    async def notify_subscriber(self, connection, attribute, force=False):
1146        await self.gatt_server.notify_subscriber(connection, attribute, force)
1147
1148    async def notify_subscribers(self, attribute, force=False):
1149        await self.gatt_server.notify_subscribers(attribute, force)
1150
1151    async def indicate_subscriber(self, connection, attribute, force=False):
1152        await self.gatt_server.indicate_subscriber(connection, attribute, force)
1153
1154    async def indicate_subscribers(self, attribute):
1155        await self.gatt_server.indicate_subscribers(attribute)
1156
1157    @host_event_handler
1158    def on_connection(self, connection_handle, transport, peer_address, peer_resolvable_address, role, connection_parameters):
1159        logger.debug(f'*** Connection: [0x{connection_handle:04X}] {peer_address} as {HCI_Constant.role_name(role)}')
1160        if connection_handle in self.connections:
1161            logger.warn('new connection reuses the same handle as a previous connection')
1162
1163        # Resolve the peer address if we can
1164        if self.address_resolver:
1165            if peer_address.is_resolvable:
1166                resolved_address = self.address_resolver.resolve(peer_address)
1167                if resolved_address is not None:
1168                    logger.debug(f'*** Address resolved as {resolved_address}')
1169                    peer_resolvable_address = peer_address
1170                    peer_address = resolved_address
1171
1172        # Create a new connection
1173        connection = Connection(
1174            self,
1175            connection_handle,
1176            transport,
1177            peer_address,
1178            peer_resolvable_address,
1179            role,
1180            connection_parameters
1181        )
1182        self.connections[connection_handle] = connection
1183
1184        # We are no longer advertising
1185        self.advertising = False
1186
1187        # Emit an event to notify listeners of the new connection
1188        self.emit('connection', connection)
1189
1190    @host_event_handler
1191    def on_connection_failure(self, error_code):
1192        logger.debug(f'*** Connection failed: {error_code}')
1193        error = ConnectionError(
1194            error_code,
1195            'hci',
1196            HCI_Constant.error_name(error_code)
1197        )
1198        self.emit('connection_failure', error)
1199
1200    @host_event_handler
1201    @with_connection_from_handle
1202    def on_disconnection(self, connection, reason):
1203        logger.debug(f'*** Disconnection: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, reason={reason}')
1204        connection.emit('disconnection', reason)
1205
1206        # Remove the connection from the map
1207        del self.connections[connection.handle]
1208
1209        # Cleanup subsystems that maintain per-connection state
1210        self.gatt_server.on_disconnection(connection)
1211
1212        # Restart advertising if auto-restart is enabled
1213        if self.auto_restart_advertising:
1214            logger.debug('restarting advertising')
1215            asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
1216
1217    @host_event_handler
1218    @with_connection_from_handle
1219    def on_disconnection_failure(self, connection, error_code):
1220        logger.debug(f'*** Disconnection failed: {error_code}')
1221        error = ConnectionError(
1222            error_code,
1223            'hci',
1224            HCI_Constant.error_name(error_code)
1225        )
1226        connection.emit('disconnection_failure', error)
1227
1228    @host_event_handler
1229    @AsyncRunner.run_in_task()
1230    async def on_inquiry_complete(self):
1231        if self.discovering:
1232            # Inquire again
1233            await self.start_discovery()
1234
1235    @host_event_handler
1236    @with_connection_from_handle
1237    def on_connection_authentication(self, connection):
1238        logger.debug(f'*** Connection Authentication: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
1239        connection.authenticated = True
1240        connection.emit('connection_authentication')
1241
1242    @host_event_handler
1243    @with_connection_from_handle
1244    def on_connection_authentication_failure(self, connection, error):
1245        logger.debug(f'*** Connection Authentication Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
1246        connection.emit('connection_authentication_failure', error)
1247
1248    # [Classic only]
1249    @host_event_handler
1250    @with_connection_from_address
1251    def on_authentication_io_capability_request(self, connection):
1252        # Ask what the pairing config should be for this connection
1253        pairing_config = self.pairing_config_factory(connection)
1254
1255        # Map the SMP IO capability to a Classic IO capability
1256        io_capability = {
1257            smp.SMP_DISPLAY_ONLY_IO_CAPABILITY:       HCI_DISPLAY_ONLY_IO_CAPABILITY,
1258            smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY:     HCI_DISPLAY_YES_NO_IO_CAPABILITY,
1259            smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY:      HCI_KEYBOARD_ONLY_IO_CAPABILITY,
1260            smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
1261            smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY:   HCI_DISPLAY_YES_NO_IO_CAPABILITY
1262        }.get(pairing_config.delegate.io_capability)
1263
1264        if io_capability is None:
1265            logger.warning(f'cannot map IO capability ({pairing_config.delegate.io_capability}')
1266            io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
1267
1268        # Compute the authentication requirements
1269        authentication_requirements = (
1270            # No Bonding
1271            (
1272                HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
1273                HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS
1274            ),
1275            # General Bonding
1276            (
1277                HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
1278                HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS
1279            )
1280        )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
1281
1282        # Respond
1283        self.host.send_command_sync(
1284            HCI_IO_Capability_Request_Reply_Command(
1285                bd_addr                     = connection.peer_address,
1286                io_capability               = io_capability,
1287                oob_data_present            = 0x00,  # Not present
1288                authentication_requirements = authentication_requirements
1289            )
1290        )
1291
1292    # [Classic only]
1293    @host_event_handler
1294    @with_connection_from_address
1295    def on_authentication_user_confirmation_request(self, connection, code):
1296        # Ask what the pairing config should be for this connection
1297        pairing_config = self.pairing_config_factory(connection)
1298
1299        can_confirm = pairing_config.delegate.io_capability not in {
1300            smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
1301            smp.SMP_DISPLAY_ONLY_IO_CAPABILITY
1302        }
1303
1304        # Respond
1305        if can_confirm and pairing_config.delegate:
1306            async def compare_numbers():
1307                numbers_match = await pairing_config.delegate.compare_numbers(code, digits=6)
1308                if numbers_match:
1309                    self.host.send_command_sync(
1310                        HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
1311                    )
1312                else:
1313                    self.host.send_command_sync(
1314                        HCI_User_Confirmation_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
1315                    )
1316
1317            asyncio.create_task(compare_numbers())
1318        else:
1319            self.host.send_command_sync(
1320                HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
1321            )
1322
1323    # [Classic only]
1324    @host_event_handler
1325    @with_connection_from_address
1326    def on_authentication_user_passkey_request(self, connection):
1327        # Ask what the pairing config should be for this connection
1328        pairing_config = self.pairing_config_factory(connection)
1329
1330        can_input = pairing_config.delegate.io_capability in {
1331            smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
1332            smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
1333        }
1334
1335        # Respond
1336        if can_input and pairing_config.delegate:
1337            async def get_number():
1338                number = await pairing_config.delegate.get_number()
1339                if number is not None:
1340                    self.host.send_command_sync(
1341                        HCI_User_Passkey_Request_Reply_Command(
1342                            bd_addr       = connection.peer_address,
1343                            numeric_value = number)
1344                    )
1345                else:
1346                    self.host.send_command_sync(
1347                        HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
1348                    )
1349
1350            asyncio.create_task(get_number())
1351        else:
1352            self.host.send_command_sync(
1353                HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
1354            )
1355
1356    # [Classic only]
1357    @host_event_handler
1358    @with_connection_from_address
1359    def on_remote_name(self, connection, remote_name):
1360        # Try to decode the name
1361        try:
1362            connection.peer_name = remote_name.decode('utf-8')
1363            connection.emit('remote_name')
1364        except UnicodeDecodeError as error:
1365            logger.warning('peer name is not valid UTF-8')
1366            connection.emit('remote_name_failure', error)
1367
1368    # [Classic only]
1369    @host_event_handler
1370    @with_connection_from_address
1371    def on_remote_name_failure(self, connection, error):
1372        connection.emit('remote_name_failure', error)
1373
1374    @host_event_handler
1375    @with_connection_from_handle
1376    def on_connection_encryption_change(self, connection, encryption):
1377        logger.debug(f'*** Connection Encryption Change: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, encryption={encryption}')
1378        connection.encryption = encryption
1379        connection.emit('connection_encryption_change')
1380
1381    @host_event_handler
1382    @with_connection_from_handle
1383    def on_connection_encryption_failure(self, connection, error):
1384        logger.debug(f'*** Connection Encryption Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
1385        connection.emit('connection_encryption_failure', error)
1386
1387    @host_event_handler
1388    @with_connection_from_handle
1389    def on_connection_encryption_key_refresh(self, connection):
1390        logger.debug(f'*** Connection Key Refresh: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
1391        connection.emit('connection_encryption_key_refresh')
1392
1393    @host_event_handler
1394    @with_connection_from_handle
1395    def on_connection_parameters_update(self, connection, connection_parameters):
1396        logger.debug(f'*** Connection Parameters Update: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, {connection_parameters}')
1397        connection.parameters = connection_parameters
1398        connection.emit('connection_parameters_update')
1399
1400    @host_event_handler
1401    @with_connection_from_handle
1402    def on_connection_parameters_update_failure(self, connection, error):
1403        logger.debug(f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
1404        connection.emit('connection_parameters_update_failure', error)
1405
1406    @host_event_handler
1407    @with_connection_from_handle
1408    def on_connection_phy_update(self, connection, connection_phy):
1409        logger.debug(f'*** Connection PHY Update: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, {connection_phy}')
1410        connection.phy = connection_phy
1411        connection.emit('connection_phy_update')
1412
1413    @host_event_handler
1414    @with_connection_from_handle
1415    def on_connection_phy_update_failure(self, connection, error):
1416        logger.debug(f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
1417        connection.emit('connection_phy_update_failure', error)
1418
1419    @host_event_handler
1420    @with_connection_from_handle
1421    def on_connection_att_mtu_update(self, connection, att_mtu):
1422        logger.debug(f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, {att_mtu}')
1423        connection.att_mtu = att_mtu
1424        connection.emit('connection_att_mtu_update')
1425
1426    @host_event_handler
1427    @with_connection_from_handle
1428    def on_connection_data_length_change(self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time):
1429        logger.debug(f'*** Connection Data Length Change: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
1430        connection.data_length = (max_tx_octets, max_tx_time, max_rx_octets, max_rx_time)
1431        connection.emit('connection_data_length_change')
1432
1433    @with_connection_from_handle
1434    def on_pairing_start(self, connection):
1435        connection.emit('pairing_start')
1436
1437    @with_connection_from_handle
1438    def on_pairing(self, connection, keys):
1439        connection.emit('pairing', keys)
1440
1441    @with_connection_from_handle
1442    def on_pairing_failure(self, connection, reason):
1443        connection.emit('pairing_failure', reason)
1444
1445    @host_event_handler
1446    @with_connection_from_handle
1447    def on_gatt_pdu(self, connection, pdu):
1448        # Parse the L2CAP payload into an ATT PDU object
1449        att_pdu = ATT_PDU.from_bytes(pdu)
1450
1451        # Conveniently, even-numbered op codes are client->server and
1452        # odd-numbered ones are server->client
1453        if att_pdu.op_code & 1:
1454            if connection.gatt_client is None:
1455                logger.warn(color('no GATT client for connection 0x{connection_handle:04X}'))
1456                return
1457            connection.gatt_client.on_gatt_pdu(att_pdu)
1458        else:
1459            if connection.gatt_server is None:
1460                logger.warn(color('no GATT server for connection 0x{connection_handle:04X}'))
1461                return
1462            connection.gatt_server.on_gatt_pdu(connection, att_pdu)
1463
1464    @host_event_handler
1465    @with_connection_from_handle
1466    def on_smp_pdu(self, connection, pdu):
1467        self.smp_manager.on_smp_pdu(connection, pdu)
1468
1469    @host_event_handler
1470    @with_connection_from_handle
1471    def on_l2cap_pdu(self, connection, cid, pdu):
1472        self.l2cap_channel_manager.on_pdu(connection, cid, pdu)
1473
1474    def __str__(self):
1475        return f'Device(name="{self.name}", random_address="{self.random_address}"", public_address="{self.public_address}")'
1476