1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from blueberry.tests.gd.cert.captures import HalCaptures, HciCaptures 20from blueberry.tests.gd.cert.matchers import HciMatchers 21from blueberry.tests.gd.cert.py_hal import PyHal 22from blueberry.tests.gd.cert.py_hci import PyHci 23from blueberry.tests.gd.cert.truth import assertThat 24from blueberry.tests.gd.cert import gd_base_test 25from blueberry.facade import common_pb2 as common 26from mobly import test_runner 27 28import hci_packets as hci 29from blueberry.utils import bluetooth 30 31 32class DirectHciTest(gd_base_test.GdBaseTestClass): 33 34 def setup_class(self): 35 gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI', cert_module='HAL') 36 37 def setup_test(self): 38 gd_base_test.GdBaseTestClass.setup_test(self) 39 self.dut_hci = PyHci(self.dut, acl_streaming=True) 40 self.cert_hal = PyHal(self.cert) 41 self.cert_hal.send_hci_command(hci.Reset()) 42 43 def teardown_test(self): 44 self.dut_hci.close() 45 self.cert_hal.close() 46 gd_base_test.GdBaseTestClass.teardown_test(self) 47 48 def enqueue_acl_data(self, handle, pb_flag, b_flag, data): 49 acl = hci.Acl(handle=handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 50 self.dut.hci.SendAcl(common.Data(payload=acl.serialize())) 51 52 def test_local_hci_cmd_and_event(self): 53 # Loopback mode responds with ACL and SCO connection complete 54 self.dut_hci.register_for_events(hci.EventCode.LOOPBACK_COMMAND) 55 self.dut_hci.send_command(hci.WriteLoopbackMode(loopback_mode=hci.LoopbackMode.ENABLE_LOCAL)) 56 57 self.dut_hci.send_command(hci.ReadLocalName()) 58 assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.LoopbackOf(hci.ReadLocalName().serialize())) 59 60 def test_inquiry_from_dut(self): 61 self.dut_hci.register_for_events(hci.EventCode.INQUIRY_RESULT) 62 63 self.cert_hal.enable_inquiry_and_page_scan() 64 self.dut_hci.send_command(hci.Inquiry(lap=hci.Lap(lap=0x33), inquiry_length=0x30, num_responses=0xff)) 65 assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.EventWithCode(hci.EventCode.INQUIRY_RESULT)) 66 67 def test_le_ad_scan_cert_advertises(self): 68 self.dut_hci.register_for_le_events(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 69 hci.SubeventCode.ADVERTISING_REPORT) 70 71 # DUT Scans 72 self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01'))) 73 74 self.dut_hci.send_command( 75 hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 76 scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL, 77 scanning_phys=1, 78 parameters=[ 79 hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE, 80 le_scan_interval=6553, 81 le_scan_window=6553) 82 ])) 83 84 self.dut_hci.send_command( 85 hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED, 86 filter_duplicates=hci.FilterDuplicates.DISABLED, 87 duration=0, 88 period=0)) 89 90 # CERT Advertises 91 advertising_handle = 0 92 self.cert_hal.send_hci_command( 93 hci.LeSetExtendedAdvertisingParametersLegacy( 94 advertising_handle=advertising_handle, 95 legacy_advertising_event_properties=hci.LegacyAdvertisingEventProperties.ADV_IND, 96 primary_advertising_interval_min=512, 97 primary_advertising_interval_max=768, 98 primary_advertising_channel_map=7, 99 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 100 peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 101 peer_address=bluetooth.Address('A6:A5:A4:A3:A2:A1'), 102 advertising_filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES, 103 advertising_tx_power=0xF7, 104 advertising_sid=1, 105 scan_request_notification_enable=hci.Enable.DISABLED)) 106 107 self.cert_hal.send_hci_command( 108 hci.LeSetAdvertisingSetRandomAddress(advertising_handle=advertising_handle, 109 random_address=bluetooth.Address('0C:05:04:03:02:01'))) 110 111 self.cert_hal.send_hci_command( 112 hci.LeSetExtendedAdvertisingData( 113 advertising_handle=advertising_handle, 114 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 115 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 116 advertising_data=[hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(b'Im_A_Cert'))])) 117 118 self.cert_hal.send_hci_command( 119 hci.LeSetExtendedScanResponseData( 120 advertising_handle=advertising_handle, 121 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 122 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 123 scan_response_data=[hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME, data=list(b'Im_A_C'))])) 124 125 self.cert_hal.send_hci_command( 126 hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED, 127 enabled_sets=[ 128 hci.EnabledSet(advertising_handle=advertising_handle, 129 duration=0, 130 max_extended_advertising_events=0) 131 ])) 132 133 assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) 134 135 self.cert_hal.send_hci_command( 136 hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.DISABLED, 137 enabled_sets=[ 138 hci.EnabledSet(advertising_handle=advertising_handle, 139 duration=0, 140 max_extended_advertising_events=0) 141 ])) 142 143 self.dut_hci.send_command(hci.LeSetExtendedScanEnable(enable=hci.Enable.DISABLED)) 144 145 def _verify_le_connection_complete(self): 146 cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture() 147 assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_conn_complete_capture) 148 cert_handle = cert_conn_complete_capture.get().connection_handle 149 150 dut_conn_complete_capture = HciCaptures.LeConnectionCompleteCapture() 151 assertThat(self.dut_hci.get_le_event_stream()).emits(dut_conn_complete_capture) 152 dut_handle = dut_conn_complete_capture.get().connection_handle 153 154 return (dut_handle, cert_handle) 155 156 @staticmethod 157 def _create_phy_scan_params(): 158 return hci.LeCreateConnPhyScanParameters(scan_interval=0x60, 159 scan_window=0x30, 160 conn_interval_min=0x18, 161 conn_interval_max=0x28, 162 conn_latency=0, 163 supervision_timeout=0x1f4, 164 min_ce_length=0, 165 max_ce_length=0) 166 167 def test_le_connection_dut_advertises(self): 168 self.dut_hci.register_for_le_events(hci.SubeventCode.CONNECTION_COMPLETE, 169 hci.SubeventCode.ADVERTISING_SET_TERMINATED, 170 hci.SubeventCode.READ_REMOTE_FEATURES_COMPLETE) 171 # Cert Connects 172 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 173 self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01'))) 174 self.cert_hal.send_hci_command( 175 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS, 176 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 177 peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 178 peer_address=bluetooth.Address('0D:05:04:03:02:01'), 179 initiating_phys=1, 180 phy_scan_parameters=[self._create_phy_scan_params()])) 181 182 advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01') 183 advertisement.set_data(b'Im_The_DUT') 184 advertisement.set_scan_response(b'Im_The_D') 185 advertisement.start() 186 187 (dut_handle, cert_handle) = self._verify_le_connection_complete() 188 189 self.dut_hci.send_command(hci.LeReadRemoteFeatures(connection_handle=dut_handle)) 190 assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: packet.payload[0] == int( 191 hci.EventCode.LE_META_EVENT) and packet.payload[2] == int(hci.SubeventCode.READ_REMOTE_FEATURES_COMPLETE)) 192 193 # Send ACL Data 194 self.enqueue_acl_data(dut_handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 195 hci.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) 196 self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) 197 198 assertThat(self.cert_hal.get_acl_stream()).emits( 199 lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload) 200 assertThat(self.dut_hci.get_raw_acl_stream()).emits( 201 lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload) 202 203 def test_le_filter_accept_list_connection_cert_advertises(self): 204 # DUT Connects 205 self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01'))) 206 self.dut_hci.send_command( 207 hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM, 208 address=bluetooth.Address('0C:05:04:03:02:01'))) 209 self.dut_hci.send_command( 210 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, 211 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 212 initiating_phys=1, 213 phy_scan_parameters=[self._create_phy_scan_params()])) 214 215 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 216 advertisement = self.cert_hal.create_advertisement(1, 217 '0C:05:04:03:02:01', 218 min_interval=512, 219 max_interval=768, 220 peer_address='A6:A5:A4:A3:A2:A1', 221 tx_power=0x7f, 222 sid=0) 223 advertisement.set_data(b'Im_A_Cert') 224 advertisement.start() 225 226 # LeConnectionComplete 227 self._verify_le_connection_complete() 228 229 def test_le_filter_accept_list_connection_cert_advertises_legacy(self): 230 # DUT Connects 231 self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01'))) 232 self.dut_hci.send_command( 233 hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM, 234 address=bluetooth.Address('0C:05:04:03:02:01'))) 235 self.dut_hci.send_command( 236 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, 237 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 238 initiating_phys=1, 239 phy_scan_parameters=[self._create_phy_scan_params()])) 240 241 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 242 self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01'))) 243 244 advertisement = self.cert_hal.create_legacy_advertisement(min_interval=512, 245 max_interval=768, 246 peer_address='A6:A5:A4:A3:A2:A1') 247 advertisement.set_data(b'Im_A_Cert') 248 advertisement.start() 249 250 # LeConnectionComplete 251 self._verify_le_connection_complete() 252 253 def test_le_ad_scan_cert_advertises_legacy(self): 254 self.dut_hci.register_for_le_events(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 255 hci.SubeventCode.ADVERTISING_REPORT) 256 257 # DUT Scans 258 self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01'))) 259 260 self.dut_hci.send_command( 261 hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 262 scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL, 263 scanning_phys=1, 264 parameters=[ 265 hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE, 266 le_scan_interval=6553, 267 le_scan_window=6553) 268 ])) 269 270 self.dut_hci.send_command( 271 hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED, 272 filter_duplicates=hci.FilterDuplicates.DISABLED, 273 duration=0, 274 period=0)) 275 276 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 277 self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01'))) 278 279 advertisement = self.cert_hal.create_legacy_advertisement(min_interval=512, 280 max_interval=768, 281 peer_address='A6:A5:A4:A3:A2:A1') 282 advertisement.set_data(b'Im_A_Cert') 283 advertisement.start() 284 285 assertThat(self.dut_hci.get_le_event_stream()).emits( 286 HciMatchers.LeAdvertisement(address='0C:05:04:03:02:01', data=b'Im_A_Cert')) 287 288 def test_connection_dut_connects(self): 289 self.dut_hci.send_command(hci.WritePageTimeout(page_timeout=0x4000)) 290 291 self.cert_hal.enable_inquiry_and_page_scan() 292 address = self.cert_hal.read_own_address() 293 294 self.dut_hci.initiate_connection(address) 295 cert_acl = self.cert_hal.accept_connection() 296 dut_acl = self.dut_hci.complete_connection() 297 298 # Send ACL Data 299 dut_acl.send_first(b'Just SomeAclData') 300 cert_acl.send_first(b'Just SomeMoreAclData') 301 302 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 303 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 304 305 def test_connection_cert_connects(self): 306 self.cert_hal.send_hci_command(hci.WritePageTimeout(page_timeout=0x4000)) 307 308 self.dut_hci.enable_inquiry_and_page_scan() 309 address = self.dut_hci.read_own_address() 310 311 self.cert_hal.initiate_connection(address) 312 dut_acl = self.dut_hci.accept_connection() 313 cert_acl = self.cert_hal.complete_connection() 314 315 # Send ACL Data 316 dut_acl.send_first(b'This is just SomeAclData') 317 cert_acl.send_first(b'This is just SomeMoreAclData') 318 319 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 320 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 321 322 323if __name__ == '__main__': 324 test_runner.main() 325