1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18 19from bumble.hci import ( 20 HCI_DISCONNECT_COMMAND, 21 HCI_LE_1M_PHY_BIT, 22 HCI_LE_CODED_PHY_BIT, 23 HCI_LE_READ_BUFFER_SIZE_COMMAND, 24 HCI_RESET_COMMAND, 25 HCI_SUCCESS, 26 HCI_LE_CONNECTION_COMPLETE_EVENT, 27 HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, 28 Address, 29 CodingFormat, 30 CodecID, 31 HCI_Command, 32 HCI_Command_Complete_Event, 33 HCI_Command_Status_Event, 34 HCI_CustomPacket, 35 HCI_Disconnect_Command, 36 HCI_Event, 37 HCI_IsoDataPacket, 38 HCI_LE_Add_Device_To_Filter_Accept_List_Command, 39 HCI_LE_Advertising_Report_Event, 40 HCI_LE_Channel_Selection_Algorithm_Event, 41 HCI_LE_Connection_Complete_Event, 42 HCI_LE_Connection_Update_Command, 43 HCI_LE_Connection_Update_Complete_Event, 44 HCI_LE_Create_Connection_Command, 45 HCI_LE_Extended_Create_Connection_Command, 46 HCI_LE_Read_Buffer_Size_Command, 47 HCI_LE_Read_Remote_Features_Command, 48 HCI_LE_Read_Remote_Features_Complete_Event, 49 HCI_LE_Remove_Device_From_Filter_Accept_List_Command, 50 HCI_LE_Set_Advertising_Data_Command, 51 HCI_LE_Set_Advertising_Parameters_Command, 52 HCI_LE_Set_Default_PHY_Command, 53 HCI_LE_Set_Event_Mask_Command, 54 HCI_LE_Set_Extended_Advertising_Enable_Command, 55 HCI_LE_Set_Extended_Scan_Parameters_Command, 56 HCI_LE_Set_Random_Address_Command, 57 HCI_LE_Set_Scan_Enable_Command, 58 HCI_LE_Set_Scan_Parameters_Command, 59 HCI_LE_Setup_ISO_Data_Path_Command, 60 HCI_Number_Of_Completed_Packets_Event, 61 HCI_Packet, 62 HCI_PIN_Code_Request_Reply_Command, 63 HCI_Read_Local_Supported_Commands_Command, 64 HCI_Read_Local_Supported_Features_Command, 65 HCI_Read_Local_Version_Information_Command, 66 HCI_Reset_Command, 67 HCI_Set_Event_Mask_Command, 68) 69 70 71# ----------------------------------------------------------------------------- 72# pylint: disable=invalid-name 73 74 75def basic_check(x): 76 packet = x.to_bytes() 77 print(packet.hex()) 78 parsed = HCI_Packet.from_bytes(packet) 79 x_str = str(x) 80 parsed_str = str(parsed) 81 print(x_str) 82 parsed_bytes = parsed.to_bytes() 83 assert x_str == parsed_str 84 assert packet == parsed_bytes 85 86 87# ----------------------------------------------------------------------------- 88def test_HCI_Event(): 89 event = HCI_Event(0xF9) 90 basic_check(event) 91 92 event = HCI_Event(0xF8, bytes.fromhex('AABBCC')) 93 basic_check(event) 94 95 96# ----------------------------------------------------------------------------- 97def test_HCI_LE_Connection_Complete_Event(): 98 address = Address('00:11:22:33:44:55') 99 event = HCI_LE_Connection_Complete_Event( 100 status=HCI_SUCCESS, 101 connection_handle=1, 102 role=1, 103 peer_address_type=1, 104 peer_address=address, 105 connection_interval=3, 106 peripheral_latency=4, 107 supervision_timeout=5, 108 central_clock_accuracy=6, 109 ) 110 basic_check(event) 111 112 113# ----------------------------------------------------------------------------- 114def test_HCI_LE_Advertising_Report_Event(): 115 address = Address('00:11:22:33:44:55/P') 116 report = HCI_LE_Advertising_Report_Event.Report( 117 HCI_LE_Advertising_Report_Event.Report.FIELDS, 118 event_type=HCI_LE_Advertising_Report_Event.ADV_IND, 119 address_type=Address.PUBLIC_DEVICE_ADDRESS, 120 address=address, 121 data=bytes.fromhex( 122 '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' 123 ), 124 rssi=100, 125 ) 126 event = HCI_LE_Advertising_Report_Event([report]) 127 basic_check(event) 128 129 130# ----------------------------------------------------------------------------- 131def test_HCI_LE_Read_Remote_Features_Complete_Event(): 132 event = HCI_LE_Read_Remote_Features_Complete_Event( 133 status=HCI_SUCCESS, 134 connection_handle=0x007, 135 le_features=bytes.fromhex('0011223344556677'), 136 ) 137 basic_check(event) 138 139 140# ----------------------------------------------------------------------------- 141def test_HCI_LE_Connection_Update_Complete_Event(): 142 event = HCI_LE_Connection_Update_Complete_Event( 143 status=HCI_SUCCESS, 144 connection_handle=0x007, 145 connection_interval=10, 146 peripheral_latency=3, 147 supervision_timeout=5, 148 ) 149 basic_check(event) 150 151 152# ----------------------------------------------------------------------------- 153def test_HCI_LE_Channel_Selection_Algorithm_Event(): 154 event = HCI_LE_Channel_Selection_Algorithm_Event( 155 connection_handle=7, channel_selection_algorithm=1 156 ) 157 basic_check(event) 158 159 160# ----------------------------------------------------------------------------- 161def test_HCI_Command_Complete_Event(): 162 # With a serializable object 163 event = HCI_Command_Complete_Event( 164 num_hci_command_packets=34, 165 command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND, 166 return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters( 167 status=0, 168 hc_le_acl_data_packet_length=1234, 169 hc_total_num_le_acl_data_packets=56, 170 ), 171 ) 172 basic_check(event) 173 174 # With an arbitrary byte array 175 event = HCI_Command_Complete_Event( 176 num_hci_command_packets=1, 177 command_opcode=HCI_RESET_COMMAND, 178 return_parameters=bytes([1, 2, 3, 4]), 179 ) 180 basic_check(event) 181 182 # With a simple status as a 1-byte array 183 event = HCI_Command_Complete_Event( 184 num_hci_command_packets=1, 185 command_opcode=HCI_RESET_COMMAND, 186 return_parameters=bytes([7]), 187 ) 188 basic_check(event) 189 event = HCI_Packet.from_bytes(event.to_bytes()) 190 assert event.return_parameters == 7 191 192 # With a simple status as an integer status 193 event = HCI_Command_Complete_Event( 194 num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9 195 ) 196 basic_check(event) 197 assert event.return_parameters == 9 198 199 200# ----------------------------------------------------------------------------- 201def test_HCI_Command_Status_Event(): 202 event = HCI_Command_Status_Event( 203 status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND 204 ) 205 basic_check(event) 206 207 208# ----------------------------------------------------------------------------- 209def test_HCI_Number_Of_Completed_Packets_Event(): 210 event = HCI_Number_Of_Completed_Packets_Event([(1, 2), (3, 4)]) 211 basic_check(event) 212 213 214# ----------------------------------------------------------------------------- 215def test_HCI_Command(): 216 command = HCI_Command(0x5566) 217 basic_check(command) 218 219 command = HCI_Command(0x5566, bytes.fromhex('AABBCC')) 220 basic_check(command) 221 222 223# ----------------------------------------------------------------------------- 224def test_HCI_PIN_Code_Request_Reply_Command(): 225 pin_code = b'1234' 226 pin_code_length = len(pin_code) 227 # here to make the test pass, we need to 228 # pad pin_code, as HCI_Object.format_fields 229 # does not do it for us 230 padded_pin_code = pin_code + bytes(16 - pin_code_length) 231 command = HCI_PIN_Code_Request_Reply_Command( 232 bd_addr=Address( 233 '00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS 234 ), 235 pin_code_length=pin_code_length, 236 pin_code=padded_pin_code, 237 ) 238 basic_check(command) 239 240 241def test_HCI_Reset_Command(): 242 command = HCI_Reset_Command() 243 basic_check(command) 244 245 246# ----------------------------------------------------------------------------- 247def test_HCI_Read_Local_Version_Information_Command(): 248 command = HCI_Read_Local_Version_Information_Command() 249 basic_check(command) 250 251 252# ----------------------------------------------------------------------------- 253def test_HCI_Read_Local_Supported_Commands_Command(): 254 command = HCI_Read_Local_Supported_Commands_Command() 255 basic_check(command) 256 257 258# ----------------------------------------------------------------------------- 259def test_HCI_Read_Local_Supported_Features_Command(): 260 command = HCI_Read_Local_Supported_Features_Command() 261 basic_check(command) 262 263 264# ----------------------------------------------------------------------------- 265def test_HCI_Disconnect_Command(): 266 command = HCI_Disconnect_Command(connection_handle=123, reason=0x11) 267 basic_check(command) 268 269 270# ----------------------------------------------------------------------------- 271def test_HCI_Set_Event_Mask_Command(): 272 command = HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('0011223344556677')) 273 basic_check(command) 274 275 276# ----------------------------------------------------------------------------- 277def test_HCI_LE_Set_Event_Mask_Command(): 278 command = HCI_LE_Set_Event_Mask_Command( 279 le_event_mask=HCI_LE_Set_Event_Mask_Command.mask( 280 [ 281 HCI_LE_CONNECTION_COMPLETE_EVENT, 282 HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, 283 ] 284 ) 285 ) 286 assert command.le_event_mask == bytes.fromhex('0100000000010000') 287 basic_check(command) 288 289 290# ----------------------------------------------------------------------------- 291def test_HCI_LE_Set_Random_Address_Command(): 292 command = HCI_LE_Set_Random_Address_Command( 293 random_address=Address('00:11:22:33:44:55') 294 ) 295 basic_check(command) 296 297 298# ----------------------------------------------------------------------------- 299def test_HCI_LE_Set_Advertising_Parameters_Command(): 300 command = HCI_LE_Set_Advertising_Parameters_Command( 301 advertising_interval_min=20, 302 advertising_interval_max=30, 303 advertising_type=HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND, 304 own_address_type=Address.PUBLIC_DEVICE_ADDRESS, 305 peer_address_type=Address.RANDOM_DEVICE_ADDRESS, 306 peer_address=Address('00:11:22:33:44:55'), 307 advertising_channel_map=0x03, 308 advertising_filter_policy=1, 309 ) 310 basic_check(command) 311 312 313# ----------------------------------------------------------------------------- 314def test_HCI_LE_Set_Advertising_Data_Command(): 315 command = HCI_LE_Set_Advertising_Data_Command( 316 advertising_data=bytes.fromhex('AABBCC') 317 ) 318 basic_check(command) 319 320 321# ----------------------------------------------------------------------------- 322def test_HCI_LE_Set_Scan_Parameters_Command(): 323 command = HCI_LE_Set_Scan_Parameters_Command( 324 le_scan_type=1, 325 le_scan_interval=20, 326 le_scan_window=10, 327 own_address_type=1, 328 scanning_filter_policy=0, 329 ) 330 basic_check(command) 331 332 333# ----------------------------------------------------------------------------- 334def test_HCI_LE_Set_Scan_Enable_Command(): 335 command = HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0) 336 basic_check(command) 337 338 339# ----------------------------------------------------------------------------- 340def test_HCI_LE_Create_Connection_Command(): 341 command = HCI_LE_Create_Connection_Command( 342 le_scan_interval=4, 343 le_scan_window=5, 344 initiator_filter_policy=1, 345 peer_address_type=1, 346 peer_address=Address('00:11:22:33:44:55'), 347 own_address_type=2, 348 connection_interval_min=7, 349 connection_interval_max=8, 350 max_latency=9, 351 supervision_timeout=10, 352 min_ce_length=11, 353 max_ce_length=12, 354 ) 355 basic_check(command) 356 357 358# ----------------------------------------------------------------------------- 359def test_HCI_LE_Extended_Create_Connection_Command(): 360 command = HCI_LE_Extended_Create_Connection_Command( 361 initiator_filter_policy=0, 362 own_address_type=0, 363 peer_address_type=1, 364 peer_address=Address('00:11:22:33:44:55'), 365 initiating_phys=3, 366 scan_intervals=(10, 11), 367 scan_windows=(12, 13), 368 connection_interval_mins=(14, 15), 369 connection_interval_maxs=(16, 17), 370 max_latencies=(18, 19), 371 supervision_timeouts=(20, 21), 372 min_ce_lengths=(100, 101), 373 max_ce_lengths=(102, 103), 374 ) 375 basic_check(command) 376 377 378# ----------------------------------------------------------------------------- 379def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command(): 380 command = HCI_LE_Add_Device_To_Filter_Accept_List_Command( 381 address_type=1, address=Address('00:11:22:33:44:55') 382 ) 383 basic_check(command) 384 385 386# ----------------------------------------------------------------------------- 387def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command(): 388 command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command( 389 address_type=1, address=Address('00:11:22:33:44:55') 390 ) 391 basic_check(command) 392 393 394# ----------------------------------------------------------------------------- 395def test_HCI_LE_Connection_Update_Command(): 396 command = HCI_LE_Connection_Update_Command( 397 connection_handle=0x0002, 398 connection_interval_min=10, 399 connection_interval_max=20, 400 max_latency=7, 401 supervision_timeout=3, 402 min_ce_length=100, 403 max_ce_length=200, 404 ) 405 basic_check(command) 406 407 408# ----------------------------------------------------------------------------- 409def test_HCI_LE_Read_Remote_Features_Command(): 410 command = HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002) 411 basic_check(command) 412 413 414# ----------------------------------------------------------------------------- 415def test_HCI_LE_Set_Default_PHY_Command(): 416 command = HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1) 417 basic_check(command) 418 419 420# ----------------------------------------------------------------------------- 421def test_HCI_LE_Set_Extended_Scan_Parameters_Command(): 422 command = HCI_LE_Set_Extended_Scan_Parameters_Command( 423 own_address_type=Address.RANDOM_DEVICE_ADDRESS, 424 # pylint: disable-next=line-too-long 425 scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, 426 scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4), 427 scan_types=[ 428 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, 429 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, 430 HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING, 431 ], 432 scan_intervals=[1, 2, 3], 433 scan_windows=[4, 5, 6], 434 ) 435 basic_check(command) 436 437 438# ----------------------------------------------------------------------------- 439def test_HCI_LE_Set_Extended_Advertising_Enable_Command(): 440 command = HCI_Packet.from_bytes( 441 bytes.fromhex('0139200e010301050008020600090307000a') 442 ) 443 assert command.enable == 1 444 assert command.advertising_handles == [1, 2, 3] 445 assert command.durations == [5, 6, 7] 446 assert command.max_extended_advertising_events == [8, 9, 10] 447 448 command = HCI_LE_Set_Extended_Advertising_Enable_Command( 449 enable=1, 450 advertising_handles=[1, 2, 3], 451 durations=[5, 6, 7], 452 max_extended_advertising_events=[8, 9, 10], 453 ) 454 basic_check(command) 455 456 457# ----------------------------------------------------------------------------- 458def test_HCI_LE_Setup_ISO_Data_Path_Command(): 459 command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000')) 460 461 assert command.connection_handle == 0x0060 462 assert command.data_path_direction == 0x00 463 assert command.data_path_id == 0x01 464 assert command.codec_id == CodingFormat(CodecID.TRANSPARENT) 465 assert command.controller_delay == 0 466 assert command.codec_configuration == b'' 467 468 command = HCI_LE_Setup_ISO_Data_Path_Command( 469 connection_handle=0x0060, 470 data_path_direction=0x00, 471 data_path_id=0x01, 472 codec_id=CodingFormat(CodecID.TRANSPARENT), 473 controller_delay=0x00, 474 codec_configuration=b'', 475 ) 476 basic_check(command) 477 478 479# ----------------------------------------------------------------------------- 480def test_address(): 481 a = Address('C4:F2:17:1A:1D:BB') 482 assert not a.is_public 483 assert a.is_random 484 assert a.address_type == Address.RANDOM_DEVICE_ADDRESS 485 assert not a.is_resolvable 486 assert not a.is_resolved 487 assert a.is_static 488 489 490# ----------------------------------------------------------------------------- 491def test_custom(): 492 data = bytes([0x77, 0x02, 0x01, 0x03]) 493 packet = HCI_CustomPacket(data) 494 assert packet.hci_packet_type == 0x77 495 assert packet.payload == data 496 497 498# ----------------------------------------------------------------------------- 499def test_iso_data_packet(): 500 data = bytes.fromhex( 501 '05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697' 502 '52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2' 503 ) 504 packet = HCI_IsoDataPacket.from_bytes(data) 505 assert packet.connection_handle == 0x0061 506 assert packet.packet_status_flag == 0 507 assert packet.pb_flag == 0x02 508 assert packet.ts_flag == 0x01 509 assert packet.data_total_length == 68 510 assert packet.time_stamp == 2716911914 511 assert packet.packet_sequence_number == 147 512 assert packet.iso_sdu_length == 60 513 assert packet.iso_sdu_fragment == bytes.fromhex( 514 'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3' 515 '6281bc77ed6a3206d984bcdabee6be831c699cb50e2' 516 ) 517 518 assert packet.to_bytes() == data 519 520 521# ----------------------------------------------------------------------------- 522def run_test_events(): 523 test_HCI_Event() 524 test_HCI_LE_Connection_Complete_Event() 525 test_HCI_LE_Advertising_Report_Event() 526 test_HCI_LE_Connection_Update_Complete_Event() 527 test_HCI_LE_Read_Remote_Features_Complete_Event() 528 test_HCI_LE_Channel_Selection_Algorithm_Event() 529 test_HCI_Command_Complete_Event() 530 test_HCI_Command_Status_Event() 531 test_HCI_Number_Of_Completed_Packets_Event() 532 533 534# ----------------------------------------------------------------------------- 535def run_test_commands(): 536 test_HCI_Command() 537 test_HCI_Reset_Command() 538 test_HCI_PIN_Code_Request_Reply_Command() 539 test_HCI_Read_Local_Version_Information_Command() 540 test_HCI_Read_Local_Supported_Commands_Command() 541 test_HCI_Read_Local_Supported_Features_Command() 542 test_HCI_Disconnect_Command() 543 test_HCI_Set_Event_Mask_Command() 544 test_HCI_LE_Set_Event_Mask_Command() 545 test_HCI_LE_Set_Random_Address_Command() 546 test_HCI_LE_Set_Advertising_Parameters_Command() 547 test_HCI_LE_Set_Advertising_Data_Command() 548 test_HCI_LE_Set_Scan_Parameters_Command() 549 test_HCI_LE_Set_Scan_Enable_Command() 550 test_HCI_LE_Create_Connection_Command() 551 test_HCI_LE_Extended_Create_Connection_Command() 552 test_HCI_LE_Add_Device_To_Filter_Accept_List_Command() 553 test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command() 554 test_HCI_LE_Connection_Update_Command() 555 test_HCI_LE_Read_Remote_Features_Command() 556 test_HCI_LE_Set_Default_PHY_Command() 557 test_HCI_LE_Set_Extended_Scan_Parameters_Command() 558 test_HCI_LE_Set_Extended_Advertising_Enable_Command() 559 test_HCI_LE_Setup_ISO_Data_Path_Command() 560 561 562# ----------------------------------------------------------------------------- 563if __name__ == '__main__': 564 run_test_events() 565 run_test_commands() 566 test_address() 567 test_custom() 568 test_iso_data_packet() 569