1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18 19from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 20from cert.event_callback_stream import EventCallbackStream 21from cert.event_asserts import EventAsserts 22from google.protobuf import empty_pb2 23from facade import rootservice_pb2 as facade_rootservice_pb2 24from hal import facade_pb2 as hal_facade_pb2 25from bluetooth_packets_python3 import hci_packets 26import bluetooth_packets_python3 as bt_packets 27 28 29class SimpleHalTest(GdFacadeOnlyBaseTestClass): 30 31 def send_cert_hci_command(self, command): 32 self.cert_device.hal.SendHciCommand( 33 hal_facade_pb2.HciCommandPacket(payload=bytes(command.Serialize()))) 34 35 def send_cert_acl_data(self, handle, pb_flag, b_flag, acl): 36 lower = handle & 0xff 37 upper = (handle >> 8) & 0xf 38 upper = upper | int(pb_flag) & 0x3 39 upper = upper | ((int(b_flag) & 0x3) << 2) 40 lower_length = len(acl) & 0xff 41 upper_length = (len(acl) & 0xff00) >> 8 42 concatenated = bytes([lower, upper, lower_length, upper_length] + 43 list(acl)) 44 self.cert_device.hal.SendHciAcl( 45 hal_facade_pb2.HciAclPacket(payload=concatenated)) 46 47 def send_dut_hci_command(self, command): 48 self.device_under_test.hal.SendHciCommand( 49 hal_facade_pb2.HciCommandPacket(payload=bytes(command.Serialize()))) 50 51 def send_dut_acl_data(self, handle, pb_flag, b_flag, acl): 52 lower = handle & 0xff 53 upper = (handle >> 8) & 0xf 54 upper = upper | int(pb_flag) & 0x3 55 upper = upper | ((int(b_flag) & 0x3) << 2) 56 lower_length = len(acl) & 0xff 57 upper_length = (len(acl) & 0xff00) >> 8 58 concatenated = bytes([lower, upper, lower_length, upper_length] + 59 list(acl)) 60 self.device_under_test.hal.SendHciAcl( 61 hal_facade_pb2.HciAclPacket(payload=concatenated)) 62 63 def setup_test(self): 64 self.device_under_test.rootservice.StartStack( 65 facade_rootservice_pb2.StartStackRequest( 66 module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 67 'HAL'),)) 68 self.cert_device.rootservice.StartStack( 69 facade_rootservice_pb2.StartStackRequest( 70 module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 71 'HAL'),)) 72 73 self.device_under_test.wait_channel_ready() 74 self.cert_device.wait_channel_ready() 75 76 self.send_dut_hci_command(hci_packets.ResetBuilder()) 77 self.send_cert_hci_command(hci_packets.ResetBuilder()) 78 79 def teardown_test(self): 80 self.device_under_test.rootservice.StopStack( 81 facade_rootservice_pb2.StopStackRequest()) 82 self.cert_device.rootservice.StopStack( 83 facade_rootservice_pb2.StopStackRequest()) 84 85 def test_none_event(self): 86 with EventCallbackStream( 87 self.device_under_test.hal.FetchHciEvent( 88 empty_pb2.Empty())) as hci_event_stream: 89 hci_event_asserts = EventAsserts(hci_event_stream) 90 hci_event_asserts.assert_none(timeout=timedelta(seconds=1)) 91 92 def test_fetch_hci_event(self): 93 with EventCallbackStream( 94 self.device_under_test.hal.FetchHciEvent( 95 empty_pb2.Empty())) as hci_event_stream: 96 97 hci_event_asserts = EventAsserts(hci_event_stream) 98 99 self.send_dut_hci_command( 100 hci_packets.LeAddDeviceToWhiteListBuilder( 101 hci_packets.WhiteListAddressType.RANDOM, 102 '0C:05:04:03:02:01')) 103 event = hci_packets.LeAddDeviceToWhiteListCompleteBuilder( 104 1, hci_packets.ErrorCode.SUCCESS) 105 hci_event_asserts.assert_event_occurs( 106 lambda packet: bytes(event.Serialize()) in packet.payload) 107 108 def test_loopback_hci_command(self): 109 with EventCallbackStream( 110 self.device_under_test.hal.FetchHciEvent( 111 empty_pb2.Empty())) as hci_event_stream: 112 113 self.send_dut_hci_command( 114 hci_packets.WriteLoopbackModeBuilder( 115 hci_packets.LoopbackMode.ENABLE_LOCAL)) 116 117 hci_event_asserts = EventAsserts(hci_event_stream) 118 119 command = hci_packets.LeAddDeviceToWhiteListBuilder( 120 hci_packets.WhiteListAddressType.RANDOM, '0C:05:04:03:02:01') 121 self.send_dut_hci_command(command) 122 hci_event_asserts.assert_event_occurs( 123 lambda packet: bytes(command.Serialize()) in packet.payload) 124 125 def test_inquiry_from_dut(self): 126 with EventCallbackStream( 127 self.device_under_test.hal.FetchHciEvent( 128 empty_pb2.Empty())) as hci_event_stream: 129 hci_event_asserts = EventAsserts(hci_event_stream) 130 self.send_cert_hci_command( 131 hci_packets.WriteScanEnableBuilder( 132 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 133 lap = hci_packets.Lap() 134 lap.lap = 0x33 135 self.send_dut_hci_command( 136 hci_packets.InquiryBuilder(lap, 0x30, 0xff)) 137 hci_event_asserts.assert_event_occurs( 138 lambda packet: b'\x02\x0f' in packet.payload 139 # Expecting an HCI Event (code 0x02, length 0x0f) 140 ) 141 142 def test_le_ad_scan_cert_advertises(self): 143 with EventCallbackStream( 144 self.device_under_test.hal.FetchHciEvent( 145 empty_pb2.Empty())) as hci_event_stream: 146 hci_event_asserts = EventAsserts(hci_event_stream) 147 148 # DUT scans 149 self.send_dut_hci_command( 150 hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 151 phy_scan_params = hci_packets.PhyScanParameters() 152 phy_scan_params.le_scan_interval = 6553 153 phy_scan_params.le_scan_window = 6553 154 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE 155 156 self.send_dut_hci_command( 157 hci_packets.LeSetExtendedScanParametersBuilder( 158 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 159 hci_packets.LeSetScanningFilterPolicy.ACCEPT_ALL, 1, 160 [phy_scan_params])) 161 self.send_dut_hci_command( 162 hci_packets.LeSetExtendedScanEnableBuilder( 163 hci_packets.Enable.ENABLED, 164 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 165 166 # CERT Advertises 167 advertising_handle = 0 168 self.send_cert_hci_command( 169 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 170 advertising_handle, 171 hci_packets.LegacyAdvertisingProperties.ADV_IND, 172 512, 173 768, 174 7, 175 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 176 hci_packets.PeerAddressType. 177 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 178 'A6:A5:A4:A3:A2:A1', 179 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 180 0x7F, 181 0, # SID 182 hci_packets.Enable.DISABLED # Scan request notification 183 )) 184 185 self.send_cert_hci_command( 186 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 187 advertising_handle, '0C:05:04:03:02:01')) 188 189 gap_name = hci_packets.GapData() 190 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 191 gap_name.data = list(bytes(b'Im_A_Cert!')) # TODO: Fix and remove ! 192 193 self.send_cert_hci_command( 194 hci_packets.LeSetExtendedAdvertisingDataBuilder( 195 advertising_handle, 196 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 197 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 198 [gap_name])) 199 enabled_set = hci_packets.EnabledSet() 200 enabled_set.advertising_handle = advertising_handle 201 enabled_set.duration = 0 202 enabled_set.max_extended_advertising_events = 0 203 self.send_cert_hci_command( 204 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 205 hci_packets.Enable.ENABLED, [enabled_set])) 206 207 hci_event_asserts.assert_event_occurs( 208 lambda packet: b'Im_A_Cert' in packet.payload) 209 210 # Disable Advertising 211 self.send_cert_hci_command( 212 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 213 hci_packets.Enable.DISABLED, [enabled_set])) 214 215 # Disable Scanning 216 self.send_dut_hci_command( 217 hci_packets.LeSetExtendedScanEnableBuilder( 218 hci_packets.Enable.ENABLED, 219 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 220 221 def test_le_connection_dut_advertises(self): 222 with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \ 223 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_pb2.Empty())) as cert_hci_event_stream, \ 224 EventCallbackStream(self.device_under_test.hal.FetchHciAcl(empty_pb2.Empty())) as acl_data_stream, \ 225 EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_pb2.Empty())) as cert_acl_data_stream: 226 227 hci_event_asserts = EventAsserts(hci_event_stream) 228 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 229 acl_data_asserts = EventAsserts(acl_data_stream) 230 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 231 232 # Cert Connects 233 self.send_cert_hci_command( 234 hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) 235 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 236 phy_scan_params.scan_interval = 0x60 237 phy_scan_params.scan_window = 0x30 238 phy_scan_params.conn_interval_min = 0x18 239 phy_scan_params.conn_interval_max = 0x28 240 phy_scan_params.conn_latency = 0 241 phy_scan_params.supervision_timeout = 0x1f4 242 phy_scan_params.min_ce_length = 0 243 phy_scan_params.max_ce_length = 0 244 self.send_cert_hci_command( 245 hci_packets.LeExtendedCreateConnectionBuilder( 246 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, 247 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 248 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 249 '0D:05:04:03:02:01', 1, [phy_scan_params])) 250 251 # DUT Advertises 252 advertising_handle = 0 253 self.send_dut_hci_command( 254 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 255 advertising_handle, 256 hci_packets.LegacyAdvertisingProperties.ADV_IND, 257 400, 258 450, 259 7, 260 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 261 hci_packets.PeerAddressType. 262 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 263 '00:00:00:00:00:00', 264 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 265 0xF8, 266 1, #SID 267 hci_packets.Enable.DISABLED # Scan request notification 268 )) 269 270 self.send_dut_hci_command( 271 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 272 advertising_handle, '0D:05:04:03:02:01')) 273 274 gap_name = hci_packets.GapData() 275 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 276 gap_name.data = list( 277 bytes(b'Im_The_DUT!')) # TODO: Fix and remove ! 278 279 self.send_dut_hci_command( 280 hci_packets.LeSetExtendedAdvertisingDataBuilder( 281 advertising_handle, 282 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 283 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 284 [gap_name])) 285 286 gap_short_name = hci_packets.GapData() 287 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 288 gap_short_name.data = list(bytes(b'Im_The_D')) 289 290 self.send_dut_hci_command( 291 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 292 advertising_handle, 293 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 294 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 295 [gap_short_name])) 296 297 enabled_set = hci_packets.EnabledSet() 298 enabled_set.advertising_handle = advertising_handle 299 enabled_set.duration = 0 300 enabled_set.max_extended_advertising_events = 0 301 self.send_dut_hci_command( 302 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 303 hci_packets.Enable.ENABLED, [enabled_set])) 304 305 conn_handle = 0xfff 306 307 def payload_handle(packet): 308 packet_bytes = packet.payload 309 if b'\x3e\x13\x01\x00' in packet_bytes: 310 nonlocal conn_handle 311 cc_view = hci_packets.LeConnectionCompleteView( 312 hci_packets.LeMetaEventView( 313 hci_packets.EventPacketView( 314 bt_packets.PacketViewLittleEndian( 315 list(packet_bytes))))) 316 conn_handle = cc_view.GetConnectionHandle() 317 return True 318 return False 319 320 cert_hci_event_asserts.assert_event_occurs(payload_handle) 321 cert_handle = conn_handle 322 conn_handle = 0xfff 323 hci_event_asserts.assert_event_occurs(payload_handle) 324 dut_handle = conn_handle 325 326 # Send ACL Data 327 self.send_dut_acl_data( 328 dut_handle, hci_packets.PacketBoundaryFlag. 329 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 330 hci_packets.BroadcastFlag.POINT_TO_POINT, 331 bytes(b'Just SomeAclData')) 332 self.send_cert_acl_data( 333 cert_handle, hci_packets.PacketBoundaryFlag. 334 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 335 hci_packets.BroadcastFlag.POINT_TO_POINT, 336 bytes(b'Just SomeMoreAclData')) 337 338 cert_acl_data_asserts.assert_event_occurs( 339 lambda packet: b'SomeAclData' in packet.payload) 340 acl_data_asserts.assert_event_occurs( 341 lambda packet: b'SomeMoreAclData' in packet.payload) 342 343 def test_le_white_list_connection_cert_advertises(self): 344 with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \ 345 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_pb2.Empty())) as cert_hci_event_stream: 346 hci_event_asserts = EventAsserts(hci_event_stream) 347 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 348 349 # DUT Connects 350 self.send_dut_hci_command( 351 hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 352 self.send_dut_hci_command( 353 hci_packets.LeAddDeviceToWhiteListBuilder( 354 hci_packets.WhiteListAddressType.RANDOM, 355 '0C:05:04:03:02:01')) 356 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 357 phy_scan_params.scan_interval = 0x60 358 phy_scan_params.scan_window = 0x30 359 phy_scan_params.conn_interval_min = 0x18 360 phy_scan_params.conn_interval_max = 0x28 361 phy_scan_params.conn_latency = 0 362 phy_scan_params.supervision_timeout = 0x1f4 363 phy_scan_params.min_ce_length = 0 364 phy_scan_params.max_ce_length = 0 365 self.send_dut_hci_command( 366 hci_packets.LeExtendedCreateConnectionBuilder( 367 hci_packets.InitiatorFilterPolicy.USE_WHITE_LIST, 368 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 369 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 370 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) 371 372 # CERT Advertises 373 advertising_handle = 1 374 self.send_cert_hci_command( 375 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 376 advertising_handle, 377 hci_packets.LegacyAdvertisingProperties.ADV_IND, 378 512, 379 768, 380 7, 381 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 382 hci_packets.PeerAddressType. 383 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 384 'A6:A5:A4:A3:A2:A1', 385 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 386 0x7F, 387 0, # SID 388 hci_packets.Enable.DISABLED # Scan request notification 389 )) 390 391 self.send_cert_hci_command( 392 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 393 advertising_handle, '0C:05:04:03:02:01')) 394 395 gap_name = hci_packets.GapData() 396 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 397 gap_name.data = list(bytes(b'Im_A_Cert!')) # TODO: Fix and remove ! 398 399 self.send_cert_hci_command( 400 hci_packets.LeSetExtendedAdvertisingDataBuilder( 401 advertising_handle, 402 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 403 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 404 [gap_name])) 405 enabled_set = hci_packets.EnabledSet() 406 enabled_set.advertising_handle = 1 407 enabled_set.duration = 0 408 enabled_set.max_extended_advertising_events = 0 409 self.send_cert_hci_command( 410 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 411 hci_packets.Enable.ENABLED, [enabled_set])) 412 413 # LeConnectionComplete 414 cert_hci_event_asserts.assert_event_occurs( 415 lambda packet: b'\x3e\x13\x01\x00' in packet.payload, 416 timeout=timedelta(seconds=20)) 417 hci_event_asserts.assert_event_occurs( 418 lambda packet: b'\x3e\x13\x01\x00' in packet.payload, 419 timeout=timedelta(seconds=20)) 420