• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import time
22
23import click
24
25from bumble.company_ids import COMPANY_IDENTIFIERS
26from bumble.colors import color
27from bumble.core import name_or_number
28from bumble.hci import (
29    map_null_terminated_utf8_string,
30    LeFeatureMask,
31    HCI_SUCCESS,
32    HCI_VERSION_NAMES,
33    LMP_VERSION_NAMES,
34    HCI_Command,
35    HCI_Command_Complete_Event,
36    HCI_Command_Status_Event,
37    HCI_READ_BUFFER_SIZE_COMMAND,
38    HCI_Read_Buffer_Size_Command,
39    HCI_READ_BD_ADDR_COMMAND,
40    HCI_Read_BD_ADDR_Command,
41    HCI_READ_LOCAL_NAME_COMMAND,
42    HCI_Read_Local_Name_Command,
43    HCI_LE_READ_BUFFER_SIZE_COMMAND,
44    HCI_LE_Read_Buffer_Size_Command,
45    HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
46    HCI_LE_Read_Maximum_Data_Length_Command,
47    HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
48    HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
49    HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
50    HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
51    HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
52    HCI_LE_Read_Suggested_Default_Data_Length_Command,
53    HCI_Read_Local_Version_Information_Command,
54)
55from bumble.host import Host
56from bumble.transport import open_transport_or_link
57
58
59# -----------------------------------------------------------------------------
60def command_succeeded(response):
61    if isinstance(response, HCI_Command_Status_Event):
62        return response.status == HCI_SUCCESS
63    if isinstance(response, HCI_Command_Complete_Event):
64        return response.return_parameters.status == HCI_SUCCESS
65    return False
66
67
68# -----------------------------------------------------------------------------
69async def get_classic_info(host: Host) -> None:
70    if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
71        response = await host.send_command(HCI_Read_BD_ADDR_Command())
72        if command_succeeded(response):
73            print()
74            print(
75                color('Classic Address:', 'yellow'),
76                response.return_parameters.bd_addr.to_string(False),
77            )
78
79    if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
80        response = await host.send_command(HCI_Read_Local_Name_Command())
81        if command_succeeded(response):
82            print()
83            print(
84                color('Local Name:', 'yellow'),
85                map_null_terminated_utf8_string(response.return_parameters.local_name),
86            )
87
88
89# -----------------------------------------------------------------------------
90async def get_le_info(host: Host) -> None:
91    print()
92
93    if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
94        response = await host.send_command(
95            HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
96        )
97        if command_succeeded(response):
98            print(
99                color('LE Number Of Supported Advertising Sets:', 'yellow'),
100                response.return_parameters.num_supported_advertising_sets,
101                '\n',
102            )
103
104    if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
105        response = await host.send_command(
106            HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
107        )
108        if command_succeeded(response):
109            print(
110                color('LE Maximum Advertising Data Length:', 'yellow'),
111                response.return_parameters.max_advertising_data_length,
112                '\n',
113            )
114
115    if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
116        response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
117        if command_succeeded(response):
118            print(
119                color('Maximum Data Length:', 'yellow'),
120                (
121                    f'tx:{response.return_parameters.supported_max_tx_octets}/'
122                    f'{response.return_parameters.supported_max_tx_time}, '
123                    f'rx:{response.return_parameters.supported_max_rx_octets}/'
124                    f'{response.return_parameters.supported_max_rx_time}'
125                ),
126                '\n',
127            )
128
129    if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
130        response = await host.send_command(
131            HCI_LE_Read_Suggested_Default_Data_Length_Command()
132        )
133        if command_succeeded(response):
134            print(
135                color('Suggested Default Data Length:', 'yellow'),
136                f'{response.return_parameters.suggested_max_tx_octets}/'
137                f'{response.return_parameters.suggested_max_tx_time}',
138                '\n',
139            )
140
141    print(color('LE Features:', 'yellow'))
142    for feature in host.supported_le_features:
143        print(LeFeatureMask(feature).name)
144
145
146# -----------------------------------------------------------------------------
147async def get_acl_flow_control_info(host: Host) -> None:
148    print()
149
150    if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
151        response = await host.send_command(
152            HCI_Read_Buffer_Size_Command(), check_result=True
153        )
154        print(
155            color('ACL Flow Control:', 'yellow'),
156            f'{response.return_parameters.hc_total_num_acl_data_packets} '
157            f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
158        )
159
160    if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
161        response = await host.send_command(
162            HCI_LE_Read_Buffer_Size_Command(), check_result=True
163        )
164        print(
165            color('LE ACL Flow Control:', 'yellow'),
166            f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
167            f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
168        )
169
170
171# -----------------------------------------------------------------------------
172async def async_main(latency_probes, transport):
173    print('<<< connecting to HCI...')
174    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
175        print('<<< connected')
176
177        host = Host(hci_source, hci_sink)
178        await host.reset()
179
180        # Measure the latency if requested
181        latencies = []
182        if latency_probes:
183            for _ in range(latency_probes):
184                start = time.time()
185                await host.send_command(HCI_Read_Local_Version_Information_Command())
186                latencies.append(1000 * (time.time() - start))
187            print(
188                color('HCI Command Latency:', 'yellow'),
189                (
190                    f'min={min(latencies):.2f}, '
191                    f'max={max(latencies):.2f}, '
192                    f'average={sum(latencies)/len(latencies):.2f}'
193                ),
194                '\n',
195            )
196
197        # Print version
198        print(color('Version:', 'yellow'))
199        print(
200            color('  Manufacturer:  ', 'green'),
201            name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier),
202        )
203        print(
204            color('  HCI Version:   ', 'green'),
205            name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version),
206        )
207        print(color('  HCI Subversion:', 'green'), host.local_version.hci_subversion)
208        print(
209            color('  LMP Version:   ', 'green'),
210            name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version),
211        )
212        print(color('  LMP Subversion:', 'green'), host.local_version.lmp_subversion)
213
214        # Get the Classic info
215        await get_classic_info(host)
216
217        # Get the LE info
218        await get_le_info(host)
219
220        # Print the ACL flow control info
221        await get_acl_flow_control_info(host)
222
223        # Print the list of commands supported by the controller
224        print()
225        print(color('Supported Commands:', 'yellow'))
226        for command in host.supported_commands:
227            print('  ', HCI_Command.command_name(command))
228
229
230# -----------------------------------------------------------------------------
231@click.command()
232@click.option(
233    '--latency-probes',
234    metavar='N',
235    type=int,
236    help='Send N commands to measure HCI transport latency statistics',
237)
238@click.argument('transport')
239def main(latency_probes, transport):
240    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
241    asyncio.run(async_main(latency_probes, transport))
242
243
244# -----------------------------------------------------------------------------
245if __name__ == '__main__':
246    main()
247