1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import time 22 23import click 24 25from bumble.company_ids import COMPANY_IDENTIFIERS 26from bumble.colors import color 27from bumble.core import name_or_number 28from bumble.hci import ( 29 map_null_terminated_utf8_string, 30 LeFeatureMask, 31 HCI_SUCCESS, 32 HCI_VERSION_NAMES, 33 LMP_VERSION_NAMES, 34 HCI_Command, 35 HCI_Command_Complete_Event, 36 HCI_Command_Status_Event, 37 HCI_READ_BUFFER_SIZE_COMMAND, 38 HCI_Read_Buffer_Size_Command, 39 HCI_READ_BD_ADDR_COMMAND, 40 HCI_Read_BD_ADDR_Command, 41 HCI_READ_LOCAL_NAME_COMMAND, 42 HCI_Read_Local_Name_Command, 43 HCI_LE_READ_BUFFER_SIZE_COMMAND, 44 HCI_LE_Read_Buffer_Size_Command, 45 HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND, 46 HCI_LE_Read_Maximum_Data_Length_Command, 47 HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND, 48 HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command, 49 HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND, 50 HCI_LE_Read_Maximum_Advertising_Data_Length_Command, 51 HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, 52 HCI_LE_Read_Suggested_Default_Data_Length_Command, 53 HCI_Read_Local_Version_Information_Command, 54) 55from bumble.host import Host 56from bumble.transport import open_transport_or_link 57 58 59# ----------------------------------------------------------------------------- 60def command_succeeded(response): 61 if isinstance(response, HCI_Command_Status_Event): 62 return response.status == HCI_SUCCESS 63 if isinstance(response, HCI_Command_Complete_Event): 64 return response.return_parameters.status == HCI_SUCCESS 65 return False 66 67 68# ----------------------------------------------------------------------------- 69async def get_classic_info(host: Host) -> None: 70 if host.supports_command(HCI_READ_BD_ADDR_COMMAND): 71 response = await host.send_command(HCI_Read_BD_ADDR_Command()) 72 if command_succeeded(response): 73 print() 74 print( 75 color('Classic Address:', 'yellow'), 76 response.return_parameters.bd_addr.to_string(False), 77 ) 78 79 if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND): 80 response = await host.send_command(HCI_Read_Local_Name_Command()) 81 if command_succeeded(response): 82 print() 83 print( 84 color('Local Name:', 'yellow'), 85 map_null_terminated_utf8_string(response.return_parameters.local_name), 86 ) 87 88 89# ----------------------------------------------------------------------------- 90async def get_le_info(host: Host) -> None: 91 print() 92 93 if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND): 94 response = await host.send_command( 95 HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command() 96 ) 97 if command_succeeded(response): 98 print( 99 color('LE Number Of Supported Advertising Sets:', 'yellow'), 100 response.return_parameters.num_supported_advertising_sets, 101 '\n', 102 ) 103 104 if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND): 105 response = await host.send_command( 106 HCI_LE_Read_Maximum_Advertising_Data_Length_Command() 107 ) 108 if command_succeeded(response): 109 print( 110 color('LE Maximum Advertising Data Length:', 'yellow'), 111 response.return_parameters.max_advertising_data_length, 112 '\n', 113 ) 114 115 if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND): 116 response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command()) 117 if command_succeeded(response): 118 print( 119 color('Maximum Data Length:', 'yellow'), 120 ( 121 f'tx:{response.return_parameters.supported_max_tx_octets}/' 122 f'{response.return_parameters.supported_max_tx_time}, ' 123 f'rx:{response.return_parameters.supported_max_rx_octets}/' 124 f'{response.return_parameters.supported_max_rx_time}' 125 ), 126 '\n', 127 ) 128 129 if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND): 130 response = await host.send_command( 131 HCI_LE_Read_Suggested_Default_Data_Length_Command() 132 ) 133 if command_succeeded(response): 134 print( 135 color('Suggested Default Data Length:', 'yellow'), 136 f'{response.return_parameters.suggested_max_tx_octets}/' 137 f'{response.return_parameters.suggested_max_tx_time}', 138 '\n', 139 ) 140 141 print(color('LE Features:', 'yellow')) 142 for feature in host.supported_le_features: 143 print(LeFeatureMask(feature).name) 144 145 146# ----------------------------------------------------------------------------- 147async def get_acl_flow_control_info(host: Host) -> None: 148 print() 149 150 if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND): 151 response = await host.send_command( 152 HCI_Read_Buffer_Size_Command(), check_result=True 153 ) 154 print( 155 color('ACL Flow Control:', 'yellow'), 156 f'{response.return_parameters.hc_total_num_acl_data_packets} ' 157 f'packets of size {response.return_parameters.hc_acl_data_packet_length}', 158 ) 159 160 if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): 161 response = await host.send_command( 162 HCI_LE_Read_Buffer_Size_Command(), check_result=True 163 ) 164 print( 165 color('LE ACL Flow Control:', 'yellow'), 166 f'{response.return_parameters.hc_total_num_le_acl_data_packets} ' 167 f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}', 168 ) 169 170 171# ----------------------------------------------------------------------------- 172async def async_main(latency_probes, transport): 173 print('<<< connecting to HCI...') 174 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 175 print('<<< connected') 176 177 host = Host(hci_source, hci_sink) 178 await host.reset() 179 180 # Measure the latency if requested 181 latencies = [] 182 if latency_probes: 183 for _ in range(latency_probes): 184 start = time.time() 185 await host.send_command(HCI_Read_Local_Version_Information_Command()) 186 latencies.append(1000 * (time.time() - start)) 187 print( 188 color('HCI Command Latency:', 'yellow'), 189 ( 190 f'min={min(latencies):.2f}, ' 191 f'max={max(latencies):.2f}, ' 192 f'average={sum(latencies)/len(latencies):.2f}' 193 ), 194 '\n', 195 ) 196 197 # Print version 198 print(color('Version:', 'yellow')) 199 print( 200 color(' Manufacturer: ', 'green'), 201 name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier), 202 ) 203 print( 204 color(' HCI Version: ', 'green'), 205 name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version), 206 ) 207 print(color(' HCI Subversion:', 'green'), host.local_version.hci_subversion) 208 print( 209 color(' LMP Version: ', 'green'), 210 name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version), 211 ) 212 print(color(' LMP Subversion:', 'green'), host.local_version.lmp_subversion) 213 214 # Get the Classic info 215 await get_classic_info(host) 216 217 # Get the LE info 218 await get_le_info(host) 219 220 # Print the ACL flow control info 221 await get_acl_flow_control_info(host) 222 223 # Print the list of commands supported by the controller 224 print() 225 print(color('Supported Commands:', 'yellow')) 226 for command in host.supported_commands: 227 print(' ', HCI_Command.command_name(command)) 228 229 230# ----------------------------------------------------------------------------- 231@click.command() 232@click.option( 233 '--latency-probes', 234 metavar='N', 235 type=int, 236 help='Send N commands to measure HCI transport latency statistics', 237) 238@click.argument('transport') 239def main(latency_probes, transport): 240 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) 241 asyncio.run(async_main(latency_probes, transport)) 242 243 244# ----------------------------------------------------------------------------- 245if __name__ == '__main__': 246 main() 247