• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18
19from bumble.hci import (
20    HCI_DISCONNECT_COMMAND,
21    HCI_LE_1M_PHY_BIT,
22    HCI_LE_CODED_PHY_BIT,
23    HCI_LE_READ_BUFFER_SIZE_COMMAND,
24    HCI_RESET_COMMAND,
25    HCI_SUCCESS,
26    HCI_LE_CONNECTION_COMPLETE_EVENT,
27    HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
28    Address,
29    CodingFormat,
30    CodecID,
31    HCI_Command,
32    HCI_Command_Complete_Event,
33    HCI_Command_Status_Event,
34    HCI_CustomPacket,
35    HCI_Disconnect_Command,
36    HCI_Event,
37    HCI_IsoDataPacket,
38    HCI_LE_Add_Device_To_Filter_Accept_List_Command,
39    HCI_LE_Advertising_Report_Event,
40    HCI_LE_Channel_Selection_Algorithm_Event,
41    HCI_LE_Connection_Complete_Event,
42    HCI_LE_Connection_Update_Command,
43    HCI_LE_Connection_Update_Complete_Event,
44    HCI_LE_Create_Connection_Command,
45    HCI_LE_Extended_Create_Connection_Command,
46    HCI_LE_Read_Buffer_Size_Command,
47    HCI_LE_Read_Remote_Features_Command,
48    HCI_LE_Read_Remote_Features_Complete_Event,
49    HCI_LE_Remove_Device_From_Filter_Accept_List_Command,
50    HCI_LE_Set_Advertising_Data_Command,
51    HCI_LE_Set_Advertising_Parameters_Command,
52    HCI_LE_Set_Default_PHY_Command,
53    HCI_LE_Set_Event_Mask_Command,
54    HCI_LE_Set_Extended_Advertising_Enable_Command,
55    HCI_LE_Set_Extended_Scan_Parameters_Command,
56    HCI_LE_Set_Random_Address_Command,
57    HCI_LE_Set_Scan_Enable_Command,
58    HCI_LE_Set_Scan_Parameters_Command,
59    HCI_LE_Setup_ISO_Data_Path_Command,
60    HCI_Number_Of_Completed_Packets_Event,
61    HCI_Packet,
62    HCI_PIN_Code_Request_Reply_Command,
63    HCI_Read_Local_Supported_Commands_Command,
64    HCI_Read_Local_Supported_Features_Command,
65    HCI_Read_Local_Version_Information_Command,
66    HCI_Reset_Command,
67    HCI_Set_Event_Mask_Command,
68)
69
70
71# -----------------------------------------------------------------------------
72# pylint: disable=invalid-name
73
74
75def basic_check(x):
76    packet = x.to_bytes()
77    print(packet.hex())
78    parsed = HCI_Packet.from_bytes(packet)
79    x_str = str(x)
80    parsed_str = str(parsed)
81    print(x_str)
82    parsed_bytes = parsed.to_bytes()
83    assert x_str == parsed_str
84    assert packet == parsed_bytes
85
86
87# -----------------------------------------------------------------------------
88def test_HCI_Event():
89    event = HCI_Event(0xF9)
90    basic_check(event)
91
92    event = HCI_Event(0xF8, bytes.fromhex('AABBCC'))
93    basic_check(event)
94
95
96# -----------------------------------------------------------------------------
97def test_HCI_LE_Connection_Complete_Event():
98    address = Address('00:11:22:33:44:55')
99    event = HCI_LE_Connection_Complete_Event(
100        status=HCI_SUCCESS,
101        connection_handle=1,
102        role=1,
103        peer_address_type=1,
104        peer_address=address,
105        connection_interval=3,
106        peripheral_latency=4,
107        supervision_timeout=5,
108        central_clock_accuracy=6,
109    )
110    basic_check(event)
111
112
113# -----------------------------------------------------------------------------
114def test_HCI_LE_Advertising_Report_Event():
115    address = Address('00:11:22:33:44:55/P')
116    report = HCI_LE_Advertising_Report_Event.Report(
117        HCI_LE_Advertising_Report_Event.Report.FIELDS,
118        event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
119        address_type=Address.PUBLIC_DEVICE_ADDRESS,
120        address=address,
121        data=bytes.fromhex(
122            '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03'
123        ),
124        rssi=100,
125    )
126    event = HCI_LE_Advertising_Report_Event([report])
127    basic_check(event)
128
129
130# -----------------------------------------------------------------------------
131def test_HCI_LE_Read_Remote_Features_Complete_Event():
132    event = HCI_LE_Read_Remote_Features_Complete_Event(
133        status=HCI_SUCCESS,
134        connection_handle=0x007,
135        le_features=bytes.fromhex('0011223344556677'),
136    )
137    basic_check(event)
138
139
140# -----------------------------------------------------------------------------
141def test_HCI_LE_Connection_Update_Complete_Event():
142    event = HCI_LE_Connection_Update_Complete_Event(
143        status=HCI_SUCCESS,
144        connection_handle=0x007,
145        connection_interval=10,
146        peripheral_latency=3,
147        supervision_timeout=5,
148    )
149    basic_check(event)
150
151
152# -----------------------------------------------------------------------------
153def test_HCI_LE_Channel_Selection_Algorithm_Event():
154    event = HCI_LE_Channel_Selection_Algorithm_Event(
155        connection_handle=7, channel_selection_algorithm=1
156    )
157    basic_check(event)
158
159
160# -----------------------------------------------------------------------------
161def test_HCI_Command_Complete_Event():
162    # With a serializable object
163    event = HCI_Command_Complete_Event(
164        num_hci_command_packets=34,
165        command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND,
166        return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
167            status=0,
168            hc_le_acl_data_packet_length=1234,
169            hc_total_num_le_acl_data_packets=56,
170        ),
171    )
172    basic_check(event)
173
174    # With an arbitrary byte array
175    event = HCI_Command_Complete_Event(
176        num_hci_command_packets=1,
177        command_opcode=HCI_RESET_COMMAND,
178        return_parameters=bytes([1, 2, 3, 4]),
179    )
180    basic_check(event)
181
182    # With a simple status as a 1-byte array
183    event = HCI_Command_Complete_Event(
184        num_hci_command_packets=1,
185        command_opcode=HCI_RESET_COMMAND,
186        return_parameters=bytes([7]),
187    )
188    basic_check(event)
189    event = HCI_Packet.from_bytes(event.to_bytes())
190    assert event.return_parameters == 7
191
192    # With a simple status as an integer status
193    event = HCI_Command_Complete_Event(
194        num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9
195    )
196    basic_check(event)
197    assert event.return_parameters == 9
198
199
200# -----------------------------------------------------------------------------
201def test_HCI_Command_Status_Event():
202    event = HCI_Command_Status_Event(
203        status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND
204    )
205    basic_check(event)
206
207
208# -----------------------------------------------------------------------------
209def test_HCI_Number_Of_Completed_Packets_Event():
210    event = HCI_Number_Of_Completed_Packets_Event([(1, 2), (3, 4)])
211    basic_check(event)
212
213
214# -----------------------------------------------------------------------------
215def test_HCI_Command():
216    command = HCI_Command(0x5566)
217    basic_check(command)
218
219    command = HCI_Command(0x5566, bytes.fromhex('AABBCC'))
220    basic_check(command)
221
222
223# -----------------------------------------------------------------------------
224def test_HCI_PIN_Code_Request_Reply_Command():
225    pin_code = b'1234'
226    pin_code_length = len(pin_code)
227    # here to make the test pass, we need to
228    # pad pin_code, as HCI_Object.format_fields
229    # does not do it for us
230    padded_pin_code = pin_code + bytes(16 - pin_code_length)
231    command = HCI_PIN_Code_Request_Reply_Command(
232        bd_addr=Address(
233            '00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS
234        ),
235        pin_code_length=pin_code_length,
236        pin_code=padded_pin_code,
237    )
238    basic_check(command)
239
240
241def test_HCI_Reset_Command():
242    command = HCI_Reset_Command()
243    basic_check(command)
244
245
246# -----------------------------------------------------------------------------
247def test_HCI_Read_Local_Version_Information_Command():
248    command = HCI_Read_Local_Version_Information_Command()
249    basic_check(command)
250
251
252# -----------------------------------------------------------------------------
253def test_HCI_Read_Local_Supported_Commands_Command():
254    command = HCI_Read_Local_Supported_Commands_Command()
255    basic_check(command)
256
257
258# -----------------------------------------------------------------------------
259def test_HCI_Read_Local_Supported_Features_Command():
260    command = HCI_Read_Local_Supported_Features_Command()
261    basic_check(command)
262
263
264# -----------------------------------------------------------------------------
265def test_HCI_Disconnect_Command():
266    command = HCI_Disconnect_Command(connection_handle=123, reason=0x11)
267    basic_check(command)
268
269
270# -----------------------------------------------------------------------------
271def test_HCI_Set_Event_Mask_Command():
272    command = HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('0011223344556677'))
273    basic_check(command)
274
275
276# -----------------------------------------------------------------------------
277def test_HCI_LE_Set_Event_Mask_Command():
278    command = HCI_LE_Set_Event_Mask_Command(
279        le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
280            [
281                HCI_LE_CONNECTION_COMPLETE_EVENT,
282                HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
283            ]
284        )
285    )
286    assert command.le_event_mask == bytes.fromhex('0100000000010000')
287    basic_check(command)
288
289
290# -----------------------------------------------------------------------------
291def test_HCI_LE_Set_Random_Address_Command():
292    command = HCI_LE_Set_Random_Address_Command(
293        random_address=Address('00:11:22:33:44:55')
294    )
295    basic_check(command)
296
297
298# -----------------------------------------------------------------------------
299def test_HCI_LE_Set_Advertising_Parameters_Command():
300    command = HCI_LE_Set_Advertising_Parameters_Command(
301        advertising_interval_min=20,
302        advertising_interval_max=30,
303        advertising_type=HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND,
304        own_address_type=Address.PUBLIC_DEVICE_ADDRESS,
305        peer_address_type=Address.RANDOM_DEVICE_ADDRESS,
306        peer_address=Address('00:11:22:33:44:55'),
307        advertising_channel_map=0x03,
308        advertising_filter_policy=1,
309    )
310    basic_check(command)
311
312
313# -----------------------------------------------------------------------------
314def test_HCI_LE_Set_Advertising_Data_Command():
315    command = HCI_LE_Set_Advertising_Data_Command(
316        advertising_data=bytes.fromhex('AABBCC')
317    )
318    basic_check(command)
319
320
321# -----------------------------------------------------------------------------
322def test_HCI_LE_Set_Scan_Parameters_Command():
323    command = HCI_LE_Set_Scan_Parameters_Command(
324        le_scan_type=1,
325        le_scan_interval=20,
326        le_scan_window=10,
327        own_address_type=1,
328        scanning_filter_policy=0,
329    )
330    basic_check(command)
331
332
333# -----------------------------------------------------------------------------
334def test_HCI_LE_Set_Scan_Enable_Command():
335    command = HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0)
336    basic_check(command)
337
338
339# -----------------------------------------------------------------------------
340def test_HCI_LE_Create_Connection_Command():
341    command = HCI_LE_Create_Connection_Command(
342        le_scan_interval=4,
343        le_scan_window=5,
344        initiator_filter_policy=1,
345        peer_address_type=1,
346        peer_address=Address('00:11:22:33:44:55'),
347        own_address_type=2,
348        connection_interval_min=7,
349        connection_interval_max=8,
350        max_latency=9,
351        supervision_timeout=10,
352        min_ce_length=11,
353        max_ce_length=12,
354    )
355    basic_check(command)
356
357
358# -----------------------------------------------------------------------------
359def test_HCI_LE_Extended_Create_Connection_Command():
360    command = HCI_LE_Extended_Create_Connection_Command(
361        initiator_filter_policy=0,
362        own_address_type=0,
363        peer_address_type=1,
364        peer_address=Address('00:11:22:33:44:55'),
365        initiating_phys=3,
366        scan_intervals=(10, 11),
367        scan_windows=(12, 13),
368        connection_interval_mins=(14, 15),
369        connection_interval_maxs=(16, 17),
370        max_latencies=(18, 19),
371        supervision_timeouts=(20, 21),
372        min_ce_lengths=(100, 101),
373        max_ce_lengths=(102, 103),
374    )
375    basic_check(command)
376
377
378# -----------------------------------------------------------------------------
379def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
380    command = HCI_LE_Add_Device_To_Filter_Accept_List_Command(
381        address_type=1, address=Address('00:11:22:33:44:55')
382    )
383    basic_check(command)
384
385
386# -----------------------------------------------------------------------------
387def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
388    command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
389        address_type=1, address=Address('00:11:22:33:44:55')
390    )
391    basic_check(command)
392
393
394# -----------------------------------------------------------------------------
395def test_HCI_LE_Connection_Update_Command():
396    command = HCI_LE_Connection_Update_Command(
397        connection_handle=0x0002,
398        connection_interval_min=10,
399        connection_interval_max=20,
400        max_latency=7,
401        supervision_timeout=3,
402        min_ce_length=100,
403        max_ce_length=200,
404    )
405    basic_check(command)
406
407
408# -----------------------------------------------------------------------------
409def test_HCI_LE_Read_Remote_Features_Command():
410    command = HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002)
411    basic_check(command)
412
413
414# -----------------------------------------------------------------------------
415def test_HCI_LE_Set_Default_PHY_Command():
416    command = HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1)
417    basic_check(command)
418
419
420# -----------------------------------------------------------------------------
421def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
422    command = HCI_LE_Set_Extended_Scan_Parameters_Command(
423        own_address_type=Address.RANDOM_DEVICE_ADDRESS,
424        # pylint: disable-next=line-too-long
425        scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
426        scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
427        scan_types=[
428            HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
429            HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
430            HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING,
431        ],
432        scan_intervals=[1, 2, 3],
433        scan_windows=[4, 5, 6],
434    )
435    basic_check(command)
436
437
438# -----------------------------------------------------------------------------
439def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
440    command = HCI_Packet.from_bytes(
441        bytes.fromhex('0139200e010301050008020600090307000a')
442    )
443    assert command.enable == 1
444    assert command.advertising_handles == [1, 2, 3]
445    assert command.durations == [5, 6, 7]
446    assert command.max_extended_advertising_events == [8, 9, 10]
447
448    command = HCI_LE_Set_Extended_Advertising_Enable_Command(
449        enable=1,
450        advertising_handles=[1, 2, 3],
451        durations=[5, 6, 7],
452        max_extended_advertising_events=[8, 9, 10],
453    )
454    basic_check(command)
455
456
457# -----------------------------------------------------------------------------
458def test_HCI_LE_Setup_ISO_Data_Path_Command():
459    command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
460
461    assert command.connection_handle == 0x0060
462    assert command.data_path_direction == 0x00
463    assert command.data_path_id == 0x01
464    assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
465    assert command.controller_delay == 0
466    assert command.codec_configuration == b''
467
468    command = HCI_LE_Setup_ISO_Data_Path_Command(
469        connection_handle=0x0060,
470        data_path_direction=0x00,
471        data_path_id=0x01,
472        codec_id=CodingFormat(CodecID.TRANSPARENT),
473        controller_delay=0x00,
474        codec_configuration=b'',
475    )
476    basic_check(command)
477
478
479# -----------------------------------------------------------------------------
480def test_address():
481    a = Address('C4:F2:17:1A:1D:BB')
482    assert not a.is_public
483    assert a.is_random
484    assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
485    assert not a.is_resolvable
486    assert not a.is_resolved
487    assert a.is_static
488
489
490# -----------------------------------------------------------------------------
491def test_custom():
492    data = bytes([0x77, 0x02, 0x01, 0x03])
493    packet = HCI_CustomPacket(data)
494    assert packet.hci_packet_type == 0x77
495    assert packet.payload == data
496
497
498# -----------------------------------------------------------------------------
499def test_iso_data_packet():
500    data = bytes.fromhex(
501        '05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
502        '52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
503    )
504    packet = HCI_IsoDataPacket.from_bytes(data)
505    assert packet.connection_handle == 0x0061
506    assert packet.packet_status_flag == 0
507    assert packet.pb_flag == 0x02
508    assert packet.ts_flag == 0x01
509    assert packet.data_total_length == 68
510    assert packet.time_stamp == 2716911914
511    assert packet.packet_sequence_number == 147
512    assert packet.iso_sdu_length == 60
513    assert packet.iso_sdu_fragment == bytes.fromhex(
514        'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
515        '6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
516    )
517
518    assert packet.to_bytes() == data
519
520
521# -----------------------------------------------------------------------------
522def run_test_events():
523    test_HCI_Event()
524    test_HCI_LE_Connection_Complete_Event()
525    test_HCI_LE_Advertising_Report_Event()
526    test_HCI_LE_Connection_Update_Complete_Event()
527    test_HCI_LE_Read_Remote_Features_Complete_Event()
528    test_HCI_LE_Channel_Selection_Algorithm_Event()
529    test_HCI_Command_Complete_Event()
530    test_HCI_Command_Status_Event()
531    test_HCI_Number_Of_Completed_Packets_Event()
532
533
534# -----------------------------------------------------------------------------
535def run_test_commands():
536    test_HCI_Command()
537    test_HCI_Reset_Command()
538    test_HCI_PIN_Code_Request_Reply_Command()
539    test_HCI_Read_Local_Version_Information_Command()
540    test_HCI_Read_Local_Supported_Commands_Command()
541    test_HCI_Read_Local_Supported_Features_Command()
542    test_HCI_Disconnect_Command()
543    test_HCI_Set_Event_Mask_Command()
544    test_HCI_LE_Set_Event_Mask_Command()
545    test_HCI_LE_Set_Random_Address_Command()
546    test_HCI_LE_Set_Advertising_Parameters_Command()
547    test_HCI_LE_Set_Advertising_Data_Command()
548    test_HCI_LE_Set_Scan_Parameters_Command()
549    test_HCI_LE_Set_Scan_Enable_Command()
550    test_HCI_LE_Create_Connection_Command()
551    test_HCI_LE_Extended_Create_Connection_Command()
552    test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
553    test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
554    test_HCI_LE_Connection_Update_Command()
555    test_HCI_LE_Read_Remote_Features_Command()
556    test_HCI_LE_Set_Default_PHY_Command()
557    test_HCI_LE_Set_Extended_Scan_Parameters_Command()
558    test_HCI_LE_Set_Extended_Advertising_Enable_Command()
559    test_HCI_LE_Setup_ISO_Data_Path_Command()
560
561
562# -----------------------------------------------------------------------------
563if __name__ == '__main__':
564    run_test_events()
565    run_test_commands()
566    test_address()
567    test_custom()
568    test_iso_data_packet()
569