• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19import asyncio
20import itertools
21import random
22import struct
23from bumble.colors import color
24from bumble.core import (
25    BT_CENTRAL_ROLE,
26    BT_PERIPHERAL_ROLE,
27    BT_LE_TRANSPORT,
28    BT_BR_EDR_TRANSPORT,
29)
30
31from bumble.hci import (
32    HCI_ACL_DATA_PACKET,
33    HCI_COMMAND_DISALLOWED_ERROR,
34    HCI_COMMAND_PACKET,
35    HCI_COMMAND_STATUS_PENDING,
36    HCI_CONNECTION_TIMEOUT_ERROR,
37    HCI_CONTROLLER_BUSY_ERROR,
38    HCI_EVENT_PACKET,
39    HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
40    HCI_LE_1M_PHY,
41    HCI_SUCCESS,
42    HCI_UNKNOWN_HCI_COMMAND_ERROR,
43    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
44    HCI_VERSION_BLUETOOTH_CORE_5_0,
45    Address,
46    HCI_AclDataPacket,
47    HCI_AclDataPacketAssembler,
48    HCI_Command_Complete_Event,
49    HCI_Command_Status_Event,
50    HCI_Connection_Complete_Event,
51    HCI_Connection_Request_Event,
52    HCI_Disconnection_Complete_Event,
53    HCI_Encryption_Change_Event,
54    HCI_LE_Advertising_Report_Event,
55    HCI_LE_Connection_Complete_Event,
56    HCI_LE_Read_Remote_Features_Complete_Event,
57    HCI_Number_Of_Completed_Packets_Event,
58    HCI_Packet,
59    HCI_Role_Change_Event,
60)
61from typing import Optional, Union, Dict
62
63
64# -----------------------------------------------------------------------------
65# Logging
66# -----------------------------------------------------------------------------
67logger = logging.getLogger(__name__)
68
69
70# -----------------------------------------------------------------------------
71# Utils
72# -----------------------------------------------------------------------------
73class DataObject:
74    pass
75
76
77# -----------------------------------------------------------------------------
78class Connection:
79    def __init__(self, controller, handle, role, peer_address, link, transport):
80        self.controller = controller
81        self.handle = handle
82        self.role = role
83        self.peer_address = peer_address
84        self.link = link
85        self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
86        self.transport = transport
87
88    def on_hci_acl_data_packet(self, packet):
89        self.assembler.feed_packet(packet)
90        self.controller.send_hci_packet(
91            HCI_Number_Of_Completed_Packets_Event([(self.handle, 1)])
92        )
93
94    def on_acl_pdu(self, data):
95        if self.link:
96            self.link.send_acl_data(
97                self.controller, self.peer_address, self.transport, data
98            )
99
100
101# -----------------------------------------------------------------------------
102class Controller:
103    def __init__(
104        self,
105        name,
106        host_source=None,
107        host_sink=None,
108        link=None,
109        public_address: Optional[Union[bytes, str, Address]] = None,
110    ):
111        self.name = name
112        self.hci_sink = None
113        self.link = link
114
115        self.central_connections: Dict[
116            Address, Connection
117        ] = {}  # Connections where this controller is the central
118        self.peripheral_connections: Dict[
119            Address, Connection
120        ] = {}  # Connections where this controller is the peripheral
121        self.classic_connections: Dict[
122            Address, Connection
123        ] = {}  # Connections in BR/EDR
124
125        self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
126        self.hci_revision = 0
127        self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0
128        self.lmp_subversion = 0
129        self.lmp_features = bytes.fromhex(
130            '0000000060000000'
131        )  # BR/EDR Not Supported, LE Supported (Controller)
132        self.manufacturer_name = 0xFFFF
133        self.hc_le_data_packet_length = 27
134        self.hc_total_num_le_data_packets = 64
135        self.event_mask = 0
136        self.event_mask_page_2 = 0
137        self.supported_commands = bytes.fromhex(
138            '2000800000c000000000e40000002822000000000000040000f7ffff7f000000'
139            '30f0f9ff01008004000000000000000000000000000000000000000000000000'
140        )
141        self.le_event_mask = 0
142        self.advertising_parameters = None
143        self.le_features = bytes.fromhex('ff49010000000000')
144        self.le_states = bytes.fromhex('ffff3fffff030000')
145        self.advertising_channel_tx_power = 0
146        self.filter_accept_list_size = 8
147        self.filter_duplicates = False
148        self.resolving_list_size = 8
149        self.supported_max_tx_octets = 27
150        self.supported_max_tx_time = 10000  # microseconds
151        self.supported_max_rx_octets = 27
152        self.supported_max_rx_time = 10000  # microseconds
153        self.suggested_max_tx_octets = 27
154        self.suggested_max_tx_time = 0x0148  # microseconds
155        self.default_phy = bytes([0, 0, 0])
156        self.le_scan_type = 0
157        self.le_scan_interval = 0x10
158        self.le_scan_window = 0x10
159        self.le_scan_enable = 0
160        self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS
161        self.le_scanning_filter_policy = 0
162        self.le_scan_response_data = None
163        self.le_address_resolution = False
164        self.le_rpa_timeout = 0
165        self.sync_flow_control = False
166        self.local_name = 'Bumble'
167
168        self.advertising_interval = 2000  # Fixed for now
169        self.advertising_data = None
170        self.advertising_timer_handle = None
171
172        self._random_address = Address('00:00:00:00:00:00')
173        if isinstance(public_address, Address):
174            self._public_address = public_address
175        elif public_address is not None:
176            self._public_address = Address(
177                public_address, Address.PUBLIC_DEVICE_ADDRESS
178            )
179        else:
180            self._public_address = Address('00:00:00:00:00:00')
181
182        # Set the source and sink interfaces
183        if host_source:
184            host_source.set_packet_sink(self)
185        self.host = host_sink
186
187        # Add this controller to the link if specified
188        if link:
189            link.add_controller(self)
190
191    @property
192    def host(self):
193        return self.hci_sink
194
195    @host.setter
196    def host(self, host):
197        '''
198        Sets the host (sink) for this controller, and set this controller as the
199        controller (sink) for the host
200        '''
201        self.set_packet_sink(host)
202        if host:
203            host.controller = self
204
205    def set_packet_sink(self, sink):
206        '''
207        Method from the Packet Source interface
208        '''
209        self.hci_sink = sink
210
211    @property
212    def public_address(self):
213        return self._public_address
214
215    @public_address.setter
216    def public_address(self, address):
217        if isinstance(address, str):
218            address = Address(address)
219        self._public_address = address
220
221    @property
222    def random_address(self):
223        return self._random_address
224
225    @random_address.setter
226    def random_address(self, address):
227        if isinstance(address, str):
228            address = Address(address)
229        self._random_address = address
230        logger.debug(f'new random address: {address}')
231
232        if self.link:
233            self.link.on_address_changed(self)
234
235    # Packet Sink protocol (packets coming from the host via HCI)
236    def on_packet(self, packet):
237        self.on_hci_packet(HCI_Packet.from_bytes(packet))
238
239    def on_hci_packet(self, packet):
240        logger.debug(
241            f'{color("<<<", "blue")} [{self.name}] '
242            f'{color("HOST -> CONTROLLER", "blue")}: {packet}'
243        )
244
245        # If the packet is a command, invoke the handler for this packet
246        if packet.hci_packet_type == HCI_COMMAND_PACKET:
247            self.on_hci_command_packet(packet)
248        elif packet.hci_packet_type == HCI_EVENT_PACKET:
249            self.on_hci_event_packet(packet)
250        elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
251            self.on_hci_acl_data_packet(packet)
252        else:
253            logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
254
255    def on_hci_command_packet(self, command):
256        handler_name = f'on_{command.name.lower()}'
257        handler = getattr(self, handler_name, self.on_hci_command)
258        result = handler(command)
259        if isinstance(result, bytes):
260            self.send_hci_packet(
261                HCI_Command_Complete_Event(
262                    num_hci_command_packets=1,
263                    command_opcode=command.op_code,
264                    return_parameters=result,
265                )
266            )
267
268    def on_hci_event_packet(self, _event):
269        logger.warning('!!! unexpected event packet')
270
271    def on_hci_acl_data_packet(self, packet):
272        # Look for the connection to which this data belongs
273        connection = self.find_connection_by_handle(packet.connection_handle)
274        if connection is None:
275            logger.warning(
276                f'!!! no connection for handle 0x{packet.connection_handle:04X}'
277            )
278            return
279
280        # Pass the packet to the connection
281        connection.on_hci_acl_data_packet(packet)
282
283    def send_hci_packet(self, packet):
284        logger.debug(
285            f'{color(">>>", "green")} [{self.name}] '
286            f'{color("CONTROLLER -> HOST", "green")}: {packet}'
287        )
288        if self.host:
289            self.host.on_packet(packet.to_bytes())
290
291    # This method allow the controller to emulate the same API as a transport source
292    async def wait_for_termination(self):
293        # For now, just wait forever
294        await asyncio.get_running_loop().create_future()
295
296    ############################################################
297    # Link connections
298    ############################################################
299    def allocate_connection_handle(self):
300        handle = 0
301        max_handle = 0
302        for connection in itertools.chain(
303            self.central_connections.values(),
304            self.peripheral_connections.values(),
305            self.classic_connections.values(),
306        ):
307            max_handle = max(max_handle, connection.handle)
308            if connection.handle == handle:
309                # Already used, continue searching after the current max
310                handle = max_handle + 1
311        return handle
312
313    def find_le_connection_by_address(self, address):
314        return self.central_connections.get(address) or self.peripheral_connections.get(
315            address
316        )
317
318    def find_classic_connection_by_address(self, address):
319        return self.classic_connections.get(address)
320
321    def find_connection_by_handle(self, handle):
322        for connection in itertools.chain(
323            self.central_connections.values(),
324            self.peripheral_connections.values(),
325            self.classic_connections.values(),
326        ):
327            if connection.handle == handle:
328                return connection
329        return None
330
331    def find_central_connection_by_handle(self, handle):
332        for connection in self.central_connections.values():
333            if connection.handle == handle:
334                return connection
335        return None
336
337    def find_classic_connection_by_handle(self, handle):
338        for connection in self.classic_connections.values():
339            if connection.handle == handle:
340                return connection
341        return None
342
343    def on_link_central_connected(self, central_address):
344        '''
345        Called when an incoming connection occurs from a central on the link
346        '''
347
348        # Allocate (or reuse) a connection handle
349        peer_address = central_address
350        peer_address_type = central_address.address_type
351        connection = self.peripheral_connections.get(peer_address)
352        if connection is None:
353            connection_handle = self.allocate_connection_handle()
354            connection = Connection(
355                self,
356                connection_handle,
357                BT_PERIPHERAL_ROLE,
358                peer_address,
359                self.link,
360                BT_LE_TRANSPORT,
361            )
362            self.peripheral_connections[peer_address] = connection
363            logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
364
365        # Then say that the connection has completed
366        self.send_hci_packet(
367            HCI_LE_Connection_Complete_Event(
368                status=HCI_SUCCESS,
369                connection_handle=connection.handle,
370                role=connection.role,
371                peer_address_type=peer_address_type,
372                peer_address=peer_address,
373                connection_interval=10,  # FIXME
374                peripheral_latency=0,  # FIXME
375                supervision_timeout=10,  # FIXME
376                central_clock_accuracy=7,  # FIXME
377            )
378        )
379
380    def on_link_central_disconnected(self, peer_address, reason):
381        '''
382        Called when an active disconnection occurs from a peer
383        '''
384
385        # Send a disconnection complete event
386        if connection := self.peripheral_connections.get(peer_address):
387            self.send_hci_packet(
388                HCI_Disconnection_Complete_Event(
389                    status=HCI_SUCCESS,
390                    connection_handle=connection.handle,
391                    reason=reason,
392                )
393            )
394
395            # Remove the connection
396            del self.peripheral_connections[peer_address]
397        else:
398            logger.warning(f'!!! No peripheral connection found for {peer_address}')
399
400    def on_link_peripheral_connection_complete(
401        self, le_create_connection_command, status
402    ):
403        '''
404        Called by the link when a connection has been made or has failed to be made
405        '''
406
407        if status == HCI_SUCCESS:
408            # Allocate (or reuse) a connection handle
409            peer_address = le_create_connection_command.peer_address
410            connection = self.central_connections.get(peer_address)
411            if connection is None:
412                connection_handle = self.allocate_connection_handle()
413                connection = Connection(
414                    self,
415                    connection_handle,
416                    BT_CENTRAL_ROLE,
417                    peer_address,
418                    self.link,
419                    BT_LE_TRANSPORT,
420                )
421                self.central_connections[peer_address] = connection
422                logger.debug(
423                    f'New CENTRAL connection handle: 0x{connection_handle:04X}'
424                )
425        else:
426            connection = None
427
428        # Say that the connection has completed
429        self.send_hci_packet(
430            # pylint: disable=line-too-long
431            HCI_LE_Connection_Complete_Event(
432                status=status,
433                connection_handle=connection.handle if connection else 0,
434                role=BT_CENTRAL_ROLE,
435                peer_address_type=le_create_connection_command.peer_address_type,
436                peer_address=le_create_connection_command.peer_address,
437                connection_interval=le_create_connection_command.connection_interval_min,
438                peripheral_latency=le_create_connection_command.max_latency,
439                supervision_timeout=le_create_connection_command.supervision_timeout,
440                central_clock_accuracy=0,
441            )
442        )
443
444    def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
445        '''
446        Called when a disconnection has been completed
447        '''
448
449        # Send a disconnection complete event
450        self.send_hci_packet(
451            HCI_Disconnection_Complete_Event(
452                status=status,
453                connection_handle=disconnection_command.connection_handle,
454                reason=disconnection_command.reason,
455            )
456        )
457
458        # Remove the connection
459        if connection := self.find_central_connection_by_handle(
460            disconnection_command.connection_handle
461        ):
462            logger.debug(f'CENTRAL Connection removed: {connection}')
463            del self.central_connections[connection.peer_address]
464
465    def on_link_peripheral_disconnected(self, peer_address):
466        '''
467        Called when a connection to a peripheral is broken
468        '''
469
470        # Send a disconnection complete event
471        if connection := self.central_connections.get(peer_address):
472            self.send_hci_packet(
473                HCI_Disconnection_Complete_Event(
474                    status=HCI_SUCCESS,
475                    connection_handle=connection.handle,
476                    reason=HCI_CONNECTION_TIMEOUT_ERROR,
477                )
478            )
479
480            # Remove the connection
481            del self.central_connections[peer_address]
482        else:
483            logger.warning(f'!!! No central connection found for {peer_address}')
484
485    def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk):
486        # For now, just setup the encryption without asking the host
487        if connection := self.find_le_connection_by_address(peer_address):
488            self.send_hci_packet(
489                HCI_Encryption_Change_Event(
490                    status=0, connection_handle=connection.handle, encryption_enabled=1
491                )
492            )
493
494    def on_link_acl_data(self, sender_address, transport, data):
495        # Look for the connection to which this data belongs
496        if transport == BT_LE_TRANSPORT:
497            connection = self.find_le_connection_by_address(sender_address)
498        else:
499            connection = self.find_classic_connection_by_address(sender_address)
500        if connection is None:
501            logger.warning(f'!!! no connection for {sender_address}')
502            return
503
504        # Send the data to the host
505        # TODO: should fragment
506        acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data)
507        self.send_hci_packet(acl_packet)
508
509    def on_link_advertising_data(self, sender_address, data):
510        # Ignore if we're not scanning
511        if self.le_scan_enable == 0:
512            return
513
514        # Send a scan report
515        report = HCI_LE_Advertising_Report_Event.Report(
516            HCI_LE_Advertising_Report_Event.Report.FIELDS,
517            event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
518            address_type=sender_address.address_type,
519            address=sender_address,
520            data=data,
521            rssi=-50,
522        )
523        self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
524
525        # Simulate a scan response
526        report = HCI_LE_Advertising_Report_Event.Report(
527            HCI_LE_Advertising_Report_Event.Report.FIELDS,
528            event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP,
529            address_type=sender_address.address_type,
530            address=sender_address,
531            data=data,
532            rssi=-50,
533        )
534        self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
535
536    ############################################################
537    # Classic link connections
538    ############################################################
539
540    def on_classic_connection_request(self, peer_address, link_type):
541        self.send_hci_packet(
542            HCI_Connection_Request_Event(
543                bd_addr=peer_address,
544                class_of_device=0,
545                link_type=link_type,
546            )
547        )
548
549    def on_classic_connection_complete(self, peer_address, status):
550        if status == HCI_SUCCESS:
551            # Allocate (or reuse) a connection handle
552            peer_address = peer_address
553            connection = self.classic_connections.get(peer_address)
554            if connection is None:
555                connection_handle = self.allocate_connection_handle()
556                connection = Connection(
557                    controller=self,
558                    handle=connection_handle,
559                    # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
560                    role=BT_CENTRAL_ROLE,
561                    peer_address=peer_address,
562                    link=self.link,
563                    transport=BT_BR_EDR_TRANSPORT,
564                )
565                self.classic_connections[peer_address] = connection
566                logger.debug(
567                    f'New CLASSIC connection handle: 0x{connection_handle:04X}'
568                )
569            else:
570                connection_handle = connection.handle
571            self.send_hci_packet(
572                HCI_Connection_Complete_Event(
573                    status=status,
574                    connection_handle=connection_handle,
575                    bd_addr=peer_address,
576                    encryption_enabled=False,
577                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
578                )
579            )
580        else:
581            connection = None
582            self.send_hci_packet(
583                HCI_Connection_Complete_Event(
584                    status=status,
585                    connection_handle=0,
586                    bd_addr=peer_address,
587                    encryption_enabled=False,
588                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
589                )
590            )
591
592    def on_classic_disconnected(self, peer_address, reason):
593        # Send a disconnection complete event
594        if connection := self.classic_connections.get(peer_address):
595            self.send_hci_packet(
596                HCI_Disconnection_Complete_Event(
597                    status=HCI_SUCCESS,
598                    connection_handle=connection.handle,
599                    reason=reason,
600                )
601            )
602
603            # Remove the connection
604            del self.classic_connections[peer_address]
605        else:
606            logger.warning(f'!!! No classic connection found for {peer_address}')
607
608    def on_classic_role_change(self, peer_address, new_role):
609        self.send_hci_packet(
610            HCI_Role_Change_Event(
611                status=HCI_SUCCESS,
612                bd_addr=peer_address,
613                new_role=new_role,
614            )
615        )
616
617    ############################################################
618    # Advertising support
619    ############################################################
620    def on_advertising_timer_fired(self):
621        self.send_advertising_data()
622        self.advertising_timer_handle = asyncio.get_running_loop().call_later(
623            self.advertising_interval / 1000.0, self.on_advertising_timer_fired
624        )
625
626    def start_advertising(self):
627        # Stop any ongoing advertising before we start again
628        self.stop_advertising()
629
630        # Advertise now
631        self.advertising_timer_handle = asyncio.get_running_loop().call_soon(
632            self.on_advertising_timer_fired
633        )
634
635    def stop_advertising(self):
636        if self.advertising_timer_handle is not None:
637            self.advertising_timer_handle.cancel()
638            self.advertising_timer_handle = None
639
640    def send_advertising_data(self):
641        if self.link and self.advertising_data:
642            self.link.send_advertising_data(self.random_address, self.advertising_data)
643
644    @property
645    def is_advertising(self):
646        return self.advertising_timer_handle is not None
647
648    ############################################################
649    # HCI handlers
650    ############################################################
651    def on_hci_command(self, command):
652        logger.warning(color(f'--- Unsupported command {command}', 'red'))
653        return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR])
654
655    def on_hci_create_connection_command(self, command):
656        '''
657        See Bluetooth spec Vol 2, Part E - 7.1.5 Create Connection command
658        '''
659
660        if self.link is None:
661            return
662        logger.debug(f'Connection request to {command.bd_addr}')
663
664        # Check that we don't already have a pending connection
665        if self.link.get_pending_connection():
666            self.send_hci_packet(
667                HCI_Command_Status_Event(
668                    status=HCI_CONTROLLER_BUSY_ERROR,
669                    num_hci_command_packets=1,
670                    command_opcode=command.op_code,
671                )
672            )
673            return
674
675        self.link.classic_connect(self, command.bd_addr)
676
677        # Say that the connection is pending
678        self.send_hci_packet(
679            HCI_Command_Status_Event(
680                status=HCI_COMMAND_STATUS_PENDING,
681                num_hci_command_packets=1,
682                command_opcode=command.op_code,
683            )
684        )
685
686    def on_hci_disconnect_command(self, command):
687        '''
688        See Bluetooth spec Vol 2, Part E - 7.1.6 Disconnect Command
689        '''
690        # First, say that the disconnection is pending
691        self.send_hci_packet(
692            HCI_Command_Status_Event(
693                status=HCI_COMMAND_STATUS_PENDING,
694                num_hci_command_packets=1,
695                command_opcode=command.op_code,
696            )
697        )
698
699        # Notify the link of the disconnection
700        handle = command.connection_handle
701        if connection := self.find_central_connection_by_handle(handle):
702            if self.link:
703                self.link.disconnect(
704                    self.random_address, connection.peer_address, command
705                )
706            else:
707                # Remove the connection
708                del self.central_connections[connection.peer_address]
709        elif connection := self.find_classic_connection_by_handle(handle):
710            if self.link:
711                self.link.classic_disconnect(
712                    self,
713                    connection.peer_address,
714                    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
715                )
716            else:
717                # Remove the connection
718                del self.classic_connections[connection.peer_address]
719
720    def on_hci_accept_connection_request_command(self, command):
721        '''
722        See Bluetooth spec Vol 2, Part E - 7.1.8 Accept Connection Request command
723        '''
724
725        if self.link is None:
726            return
727        self.send_hci_packet(
728            HCI_Command_Status_Event(
729                status=HCI_SUCCESS,
730                num_hci_command_packets=1,
731                command_opcode=command.op_code,
732            )
733        )
734        self.link.classic_accept_connection(self, command.bd_addr, command.role)
735
736    def on_hci_switch_role_command(self, command):
737        '''
738        See Bluetooth spec Vol 2, Part E - 7.2.8 Switch Role command
739        '''
740
741        if self.link is None:
742            return
743        self.send_hci_packet(
744            HCI_Command_Status_Event(
745                status=HCI_SUCCESS,
746                num_hci_command_packets=1,
747                command_opcode=command.op_code,
748            )
749        )
750        self.link.classic_switch_role(self, command.bd_addr, command.role)
751
752    def on_hci_set_event_mask_command(self, command):
753        '''
754        See Bluetooth spec Vol 2, Part E - 7.3.1 Set Event Mask Command
755        '''
756        self.event_mask = command.event_mask
757        return bytes([HCI_SUCCESS])
758
759    def on_hci_reset_command(self, _command):
760        '''
761        See Bluetooth spec Vol 2, Part E - 7.3.2 Reset Command
762        '''
763        # TODO: cleanup what needs to be reset
764        return bytes([HCI_SUCCESS])
765
766    def on_hci_write_local_name_command(self, command):
767        '''
768        See Bluetooth spec Vol 2, Part E - 7.3.11 Write Local Name Command
769        '''
770        local_name = command.local_name
771        if len(local_name):
772            try:
773                first_null = local_name.find(0)
774                if first_null >= 0:
775                    local_name = local_name[:first_null]
776                self.local_name = str(local_name, 'utf-8')
777            except UnicodeDecodeError:
778                pass
779        return bytes([HCI_SUCCESS])
780
781    def on_hci_read_local_name_command(self, _command):
782        '''
783        See Bluetooth spec Vol 2, Part E - 7.3.12 Read Local Name Command
784        '''
785        local_name = bytes(self.local_name, 'utf-8')[:248]
786        if len(local_name) < 248:
787            local_name = local_name + bytes(248 - len(local_name))
788
789        return bytes([HCI_SUCCESS]) + local_name
790
791    def on_hci_read_class_of_device_command(self, _command):
792        '''
793        See Bluetooth spec Vol 2, Part E - 7.3.25 Read Class of Device Command
794        '''
795        return bytes([HCI_SUCCESS, 0, 0, 0])
796
797    def on_hci_write_class_of_device_command(self, _command):
798        '''
799        See Bluetooth spec Vol 2, Part E - 7.3.26 Write Class of Device Command
800        '''
801        return bytes([HCI_SUCCESS])
802
803    def on_hci_read_synchronous_flow_control_enable_command(self, _command):
804        '''
805        See Bluetooth spec Vol 2, Part E - 7.3.36 Read Synchronous Flow Control Enable
806        Command
807        '''
808        if self.sync_flow_control:
809            ret = 1
810        else:
811            ret = 0
812        return bytes([HCI_SUCCESS, ret])
813
814    def on_hci_write_synchronous_flow_control_enable_command(self, command):
815        '''
816        See Bluetooth spec Vol 2, Part E - 7.3.37 Write Synchronous Flow Control Enable
817        Command
818        '''
819        ret = HCI_SUCCESS
820        if command.synchronous_flow_control_enable == 1:
821            self.sync_flow_control = True
822        elif command.synchronous_flow_control_enable == 0:
823            self.sync_flow_control = False
824        else:
825            ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
826        return bytes([ret])
827
828    def on_hci_write_extended_inquiry_response_command(self, _command):
829        '''
830        See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
831        '''
832        return bytes([HCI_SUCCESS])
833
834    def on_hci_write_simple_pairing_mode_command(self, _command):
835        '''
836        See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
837        '''
838        return bytes([HCI_SUCCESS])
839
840    def on_hci_set_event_mask_page_2_command(self, command):
841        '''
842        See Bluetooth spec Vol 2, Part E - 7.3.69 Set Event Mask Page 2 Command
843        '''
844        self.event_mask_page_2 = command.event_mask_page_2
845        return bytes([HCI_SUCCESS])
846
847    def on_hci_read_le_host_support_command(self, _command):
848        '''
849        See Bluetooth spec Vol 2, Part E - 7.3.78 Write LE Host Support Command
850        '''
851        return bytes([HCI_SUCCESS, 1, 0])
852
853    def on_hci_write_le_host_support_command(self, _command):
854        '''
855        See Bluetooth spec Vol 2, Part E - 7.3.79 Write LE Host Support Command
856        '''
857        # TODO / Just ignore for now
858        return bytes([HCI_SUCCESS])
859
860    def on_hci_write_authenticated_payload_timeout_command(self, command):
861        '''
862        See Bluetooth spec Vol 2, Part E - 7.3.94 Write Authenticated Payload Timeout
863        Command
864        '''
865        # TODO
866        return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
867
868    def on_hci_read_local_version_information_command(self, _command):
869        '''
870        See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
871        '''
872        return struct.pack(
873            '<BBHBHH',
874            HCI_SUCCESS,
875            self.hci_version,
876            self.hci_revision,
877            self.lmp_version,
878            self.manufacturer_name,
879            self.lmp_subversion,
880        )
881
882    def on_hci_read_local_supported_commands_command(self, _command):
883        '''
884        See Bluetooth spec Vol 2, Part E - 7.4.2 Read Local Supported Commands Command
885        '''
886        return bytes([HCI_SUCCESS]) + self.supported_commands
887
888    def on_hci_read_local_supported_features_command(self, _command):
889        '''
890        See Bluetooth spec Vol 2, Part E - 7.4.3 Read Local Supported Features Command
891        '''
892        return bytes([HCI_SUCCESS]) + self.lmp_features
893
894    def on_hci_read_bd_addr_command(self, _command):
895        '''
896        See Bluetooth spec Vol 2, Part E - 7.4.6 Read BD_ADDR Command
897        '''
898        bd_addr = (
899            self._public_address.to_bytes()
900            if self._public_address is not None
901            else bytes(6)
902        )
903        return bytes([HCI_SUCCESS]) + bd_addr
904
905    def on_hci_le_set_event_mask_command(self, command):
906        '''
907        See Bluetooth spec Vol 2, Part E - 7.8.1 LE Set Event Mask Command
908        '''
909        self.le_event_mask = command.le_event_mask
910        return bytes([HCI_SUCCESS])
911
912    def on_hci_le_read_buffer_size_command(self, _command):
913        '''
914        See Bluetooth spec Vol 2, Part E - 7.8.2 LE Read Buffer Size Command
915        '''
916        return struct.pack(
917            '<BHB',
918            HCI_SUCCESS,
919            self.hc_le_data_packet_length,
920            self.hc_total_num_le_data_packets,
921        )
922
923    def on_hci_le_read_local_supported_features_command(self, _command):
924        '''
925        See Bluetooth spec Vol 2, Part E - 7.8.3 LE Read Local Supported Features
926        Command
927        '''
928        return bytes([HCI_SUCCESS]) + self.le_features
929
930    def on_hci_le_set_random_address_command(self, command):
931        '''
932        See Bluetooth spec Vol 2, Part E - 7.8.4 LE Set Random Address Command
933        '''
934        self.random_address = command.random_address
935        return bytes([HCI_SUCCESS])
936
937    def on_hci_le_set_advertising_parameters_command(self, command):
938        '''
939        See Bluetooth spec Vol 2, Part E - 7.8.5 LE Set Advertising Parameters Command
940        '''
941        self.advertising_parameters = command
942        return bytes([HCI_SUCCESS])
943
944    def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command):
945        '''
946        See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Physical Channel
947        Tx Power Command
948        '''
949        return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
950
951    def on_hci_le_set_advertising_data_command(self, command):
952        '''
953        See Bluetooth spec Vol 2, Part E - 7.8.7 LE Set Advertising Data Command
954        '''
955        self.advertising_data = command.advertising_data
956        return bytes([HCI_SUCCESS])
957
958    def on_hci_le_set_scan_response_data_command(self, command):
959        '''
960        See Bluetooth spec Vol 2, Part E - 7.8.8 LE Set Scan Response Data Command
961        '''
962        self.le_scan_response_data = command.scan_response_data
963        return bytes([HCI_SUCCESS])
964
965    def on_hci_le_set_advertising_enable_command(self, command):
966        '''
967        See Bluetooth spec Vol 2, Part E - 7.8.9 LE Set Advertising Enable Command
968        '''
969        if command.advertising_enable:
970            self.start_advertising()
971        else:
972            self.stop_advertising()
973
974        return bytes([HCI_SUCCESS])
975
976    def on_hci_le_set_scan_parameters_command(self, command):
977        '''
978        See Bluetooth spec Vol 2, Part E - 7.8.10 LE Set Scan Parameters Command
979        '''
980        self.le_scan_type = command.le_scan_type
981        self.le_scan_interval = command.le_scan_interval
982        self.le_scan_window = command.le_scan_window
983        self.le_scan_own_address_type = command.own_address_type
984        self.le_scanning_filter_policy = command.scanning_filter_policy
985        return bytes([HCI_SUCCESS])
986
987    def on_hci_le_set_scan_enable_command(self, command):
988        '''
989        See Bluetooth spec Vol 2, Part E - 7.8.11 LE Set Scan Enable Command
990        '''
991        self.le_scan_enable = command.le_scan_enable
992        self.filter_duplicates = command.filter_duplicates
993        return bytes([HCI_SUCCESS])
994
995    def on_hci_le_create_connection_command(self, command):
996        '''
997        See Bluetooth spec Vol 2, Part E - 7.8.12 LE Create Connection Command
998        '''
999
1000        if not self.link:
1001            return
1002
1003        logger.debug(f'Connection request to {command.peer_address}')
1004
1005        # Check that we don't already have a pending connection
1006        if self.link.get_pending_connection():
1007            self.send_hci_packet(
1008                HCI_Command_Status_Event(
1009                    status=HCI_COMMAND_DISALLOWED_ERROR,
1010                    num_hci_command_packets=1,
1011                    command_opcode=command.op_code,
1012                )
1013            )
1014            return
1015
1016        # Initiate the connection
1017        self.link.connect(self.random_address, command)
1018
1019        # Say that the connection is pending
1020        self.send_hci_packet(
1021            HCI_Command_Status_Event(
1022                status=HCI_COMMAND_STATUS_PENDING,
1023                num_hci_command_packets=1,
1024                command_opcode=command.op_code,
1025            )
1026        )
1027
1028    def on_hci_le_create_connection_cancel_command(self, _command):
1029        '''
1030        See Bluetooth spec Vol 2, Part E - 7.8.13 LE Create Connection Cancel Command
1031        '''
1032        return bytes([HCI_SUCCESS])
1033
1034    def on_hci_le_read_filter_accept_list_size_command(self, _command):
1035        '''
1036        See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size
1037        Command
1038        '''
1039        return bytes([HCI_SUCCESS, self.filter_accept_list_size])
1040
1041    def on_hci_le_clear_filter_accept_list_command(self, _command):
1042        '''
1043        See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
1044        '''
1045        return bytes([HCI_SUCCESS])
1046
1047    def on_hci_le_add_device_to_filter_accept_list_command(self, _command):
1048        '''
1049        See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List
1050        Command
1051        '''
1052        return bytes([HCI_SUCCESS])
1053
1054    def on_hci_le_remove_device_from_filter_accept_list_command(self, _command):
1055        '''
1056        See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept
1057        List Command
1058        '''
1059        return bytes([HCI_SUCCESS])
1060
1061    def on_hci_le_read_remote_features_command(self, command):
1062        '''
1063        See Bluetooth spec Vol 2, Part E - 7.8.21 LE Read Remote Features Command
1064        '''
1065
1066        # First, say that the command is pending
1067        self.send_hci_packet(
1068            HCI_Command_Status_Event(
1069                status=HCI_COMMAND_STATUS_PENDING,
1070                num_hci_command_packets=1,
1071                command_opcode=command.op_code,
1072            )
1073        )
1074
1075        # Then send the remote features
1076        self.send_hci_packet(
1077            HCI_LE_Read_Remote_Features_Complete_Event(
1078                status=HCI_SUCCESS,
1079                connection_handle=0,
1080                le_features=bytes.fromhex('dd40000000000000'),
1081            )
1082        )
1083
1084    def on_hci_le_rand_command(self, _command):
1085        '''
1086        See Bluetooth spec Vol 2, Part E - 7.8.23 LE Rand Command
1087        '''
1088        return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
1089
1090    def on_hci_le_enable_encryption_command(self, command):
1091        '''
1092        See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
1093        '''
1094
1095        # Check the parameters
1096        if not (
1097            connection := self.find_central_connection_by_handle(
1098                command.connection_handle
1099            )
1100        ):
1101            logger.warning('connection not found')
1102            return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1103
1104        # Notify that the connection is now encrypted
1105        self.link.on_connection_encrypted(
1106            self.random_address,
1107            connection.peer_address,
1108            command.random_number,
1109            command.encrypted_diversifier,
1110            command.long_term_key,
1111        )
1112
1113        self.send_hci_packet(
1114            HCI_Command_Status_Event(
1115                status=HCI_COMMAND_STATUS_PENDING,
1116                num_hci_command_packets=1,
1117                command_opcode=command.op_code,
1118            )
1119        )
1120
1121        return None
1122
1123    def on_hci_le_read_supported_states_command(self, _command):
1124        '''
1125        See Bluetooth spec Vol 2, Part E - 7.8.27 LE Read Supported States Command
1126        '''
1127        return bytes([HCI_SUCCESS]) + self.le_states
1128
1129    def on_hci_le_read_suggested_default_data_length_command(self, _command):
1130        '''
1131        See Bluetooth spec Vol 2, Part E - 7.8.34 LE Read Suggested Default Data Length
1132        Command
1133        '''
1134        return struct.pack(
1135            '<BHH',
1136            HCI_SUCCESS,
1137            self.suggested_max_tx_octets,
1138            self.suggested_max_tx_time,
1139        )
1140
1141    def on_hci_le_write_suggested_default_data_length_command(self, command):
1142        '''
1143        See Bluetooth spec Vol 2, Part E - 7.8.35 LE Write Suggested Default Data Length
1144        Command
1145        '''
1146        self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack(
1147            '<HH', command.parameters[:4]
1148        )
1149        return bytes([HCI_SUCCESS])
1150
1151    def on_hci_le_read_local_p_256_public_key_command(self, _command):
1152        '''
1153        See Bluetooth spec Vol 2, Part E - 7.8.36 LE Read P-256 Public Key Command
1154        '''
1155        # TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event
1156        return bytes([HCI_SUCCESS])
1157
1158    def on_hci_le_add_device_to_resolving_list_command(self, _command):
1159        '''
1160        See Bluetooth spec Vol 2, Part E - 7.8.38 LE Add Device To Resolving List
1161        Command
1162        '''
1163        return bytes([HCI_SUCCESS])
1164
1165    def on_hci_le_clear_resolving_list_command(self, _command):
1166        '''
1167        See Bluetooth spec Vol 2, Part E - 7.8.40 LE Clear Resolving List Command
1168        '''
1169        return bytes([HCI_SUCCESS])
1170
1171    def on_hci_le_read_resolving_list_size_command(self, _command):
1172        '''
1173        See Bluetooth spec Vol 2, Part E - 7.8.41 LE Read Resolving List Size Command
1174        '''
1175        return bytes([HCI_SUCCESS, self.resolving_list_size])
1176
1177    def on_hci_le_set_address_resolution_enable_command(self, command):
1178        '''
1179        See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable
1180        Command
1181        '''
1182        ret = HCI_SUCCESS
1183        if command.address_resolution_enable == 1:
1184            self.le_address_resolution = True
1185        elif command.address_resolution_enable == 0:
1186            self.le_address_resolution = False
1187        else:
1188            ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
1189        return bytes([ret])
1190
1191    def on_hci_le_set_resolvable_private_address_timeout_command(self, command):
1192        '''
1193        See Bluetooth spec Vol 2, Part E - 7.8.45 LE Set Resolvable Private Address
1194        Timeout Command
1195        '''
1196        self.le_rpa_timeout = command.rpa_timeout
1197        return bytes([HCI_SUCCESS])
1198
1199    def on_hci_le_read_maximum_data_length_command(self, _command):
1200        '''
1201        See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
1202        '''
1203        return struct.pack(
1204            '<BHHHH',
1205            HCI_SUCCESS,
1206            self.supported_max_tx_octets,
1207            self.supported_max_tx_time,
1208            self.supported_max_rx_octets,
1209            self.supported_max_rx_time,
1210        )
1211
1212    def on_hci_le_read_phy_command(self, command):
1213        '''
1214        See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY Command
1215        '''
1216        return struct.pack(
1217            '<BHBB',
1218            HCI_SUCCESS,
1219            command.connection_handle,
1220            HCI_LE_1M_PHY,
1221            HCI_LE_1M_PHY,
1222        )
1223
1224    def on_hci_le_set_default_phy_command(self, command):
1225        '''
1226        See Bluetooth spec Vol 2, Part E - 7.8.48 LE Set Default PHY Command
1227        '''
1228        self.default_phy = {
1229            'all_phys': command.all_phys,
1230            'tx_phys': command.tx_phys,
1231            'rx_phys': command.rx_phys,
1232        }
1233        return bytes([HCI_SUCCESS])
1234
1235    def on_hci_le_read_transmit_power_command(self, _command):
1236        '''
1237        See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
1238        '''
1239        return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
1240