1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Setup: 18This test only requires two fuchsia devices. 19""" 20 21from acts import signals 22from acts.base_test import BaseTestClass 23from acts.test_decorators import test_tracker_info 24from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size 25from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name 26from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 27from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices 28from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name 29import time 30 31 32class BtFuchsiaEPTest(BaseTestClass): 33 ble_advertise_interval = 50 34 scan_timeout_seconds = 60 35 default_iterations = 1000 36 adv_name = generate_id_by_size(10) 37 test_adv_data = { 38 "name": adv_name, 39 "appearance": None, 40 "service_data": None, 41 "tx_power_level": None, 42 "service_uuids": None, 43 "manufacturer_data": None, 44 "uris": None, 45 } 46 test_connectable = True 47 test_scan_response = None 48 49 def setup_class(self): 50 super().setup_class() 51 for fd in self.fuchsia_devices: 52 fd.bts_lib.initBluetoothSys() 53 self.pri_dut = self.fuchsia_devices[0] 54 self.sec_dut = self.fuchsia_devices[1] 55 56 def on_fail(self, test_name, begin_time): 57 for fd in self.fuchsia_devices: 58 fd.take_bug_report(test_name, begin_time) 59 self._unbond_all_known_devices() 60 self.sec_dut.ble_lib.bleStopBleAdvertising() 61 self._kill_media_services() 62 63 def teardown_class(self): 64 self._kill_media_services() 65 66 def _kill_media_services(self): 67 """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices. 68 """ 69 ssh_timeout = 30 70 for fd in self.fuchsia_devices: 71 fd.send_command_ssh("killall bt-a2dp*", 72 timeout=ssh_timeout, 73 skip_status_code_check=True) 74 fd.send_command_ssh("killall bt-avrcp*", 75 timeout=ssh_timeout, 76 skip_status_code_check=True) 77 78 def _unbond_all_known_devices(self): 79 """For all Fuchsia devices, unbond any known pairings. 80 """ 81 time.sleep(5) 82 for fd in self.fuchsia_devices: 83 unbond_all_known_devices(fd, self.log) 84 85 def test_ble_awareness(self): 86 """Verify that Fuchsia devices can advertise and scan each other 87 88 Verify a Fuchsia device that starts a BLE advertisesement can be 89 found by a Fuchsia BLE scanner. 90 91 Steps: 92 1. On one Fuchsia device set an advertisement 93 2. On one Fuchsia device, scan for the advertisement by name 94 95 Expected Result: 96 Verify that there are no errors after each GATT connection. 97 98 Returns: 99 signals.TestPass if no errors 100 signals.TestFailure if there are any errors during the test. 101 102 TAGS: BLE 103 Priority: 0 104 """ 105 106 self.sec_dut.ble_lib.bleStartBleAdvertising( 107 self.test_adv_data, self.test_scan_response, 108 self.ble_advertise_interval, self.test_connectable) 109 110 device = le_scan_for_device_by_name(self.pri_dut, self.log, 111 self.adv_name, 112 self.scan_timeout_seconds) 113 self.sec_dut.ble_lib.bleStopBleAdvertising() 114 if device is None: 115 raise signals.TestFailure("Scanner unable to find advertisement.") 116 raise signals.TestPass("Success") 117 118 def test_gatt_central_peripheral(self): 119 """Verify that Fuchsia devices can perform GATT operations 120 121 Verify a Fuchsia devices can perform GATT connections and interactions. 122 123 Steps: 124 1. On one Fuchsia device set an advertisement 125 2. On one Fuchsia device, scan for the advertisement by name 126 3. Perform GATT connection over LE 127 4. Pair both devices. 128 5. Perform GATT read/write operations. 129 6. Perform GATT disconnection. 130 131 Expected Result: 132 Verify that there are no errors after each GATT connection. 133 134 Returns: 135 signals.TestPass if no errors 136 signals.TestFailure if there are any errors during the test. 137 138 TAGS: BLE 139 Priority: 0 140 """ 141 self._unbond_all_known_devices() 142 143 source_device_name = generate_id_by_size(10) 144 self.pri_dut.bts_lib.setName(source_device_name) 145 146 self.sec_dut.ble_lib.bleStartBleAdvertising( 147 self.test_adv_data, self.test_scan_response, 148 self.ble_advertise_interval, self.test_connectable) 149 150 device = le_scan_for_device_by_name(self.pri_dut, self.log, 151 self.adv_name, 152 self.scan_timeout_seconds) 153 if device is None: 154 raise signals.TestFailure("Scanner unable to find advertisement.") 155 156 connect_result = self.pri_dut.gattc_lib.bleConnectToPeripheral( 157 device["id"]) 158 if connect_result.get("error") is not None: 159 raise signals.TestFailure("GATT Connection failed with: {}".format( 160 connect_result.get("error"))) 161 162 if not verify_device_state_by_name(self.pri_dut, self.log, 163 self.adv_name, "CONNECTED", None): 164 raise signals.TestFailure( 165 "Failed to connect to device {}.".format(target_device_name)) 166 167 if not verify_device_state_by_name( 168 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 169 raise signals.TestFailure( 170 "Failed to connect to device {}.".format(source_device_name)) 171 172 security_level = "ENCRYPTED" 173 non_bondable = False 174 transport = 2 #LE 175 self.pri_dut.bts_lib.pair(device["id"], security_level, non_bondable, 176 transport) 177 178 services = None 179 if not verify_device_state_by_name(self.pri_dut, self.log, 180 self.adv_name, "BONDED", services): 181 raise signals.TestFailure( 182 "Failed to pair device {}.".format(target_device_name)) 183 184 if not verify_device_state_by_name(self.sec_dut, self.log, 185 source_device_name, "BONDED", 186 services): 187 raise signals.TestFailure( 188 "Failed to pair device {}.".format(source_device_name)) 189 190 disconnect_result = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 191 device["id"]) 192 if disconnect_result.get("error") is not None: 193 raise signals.TestFailure( 194 "GATT Disconnection failed with: {}".format( 195 connect_result.get("error"))) 196 197 self.sec_dut.ble_lib.bleStopBleAdvertising() 198 199 # TODO: Setup Proper GATT server and verify services published are found 200 201 raise signals.TestPass("Success") 202 203 def test_pairing_a2dp(self): 204 """Verify that Fuchsia devices can pair to each other and establish 205 an A2DP connection 206 207 Verify that Fuchsia devices can pair to each other and establish 208 an A2DP connection 209 210 Steps: 211 1. Clear out all bonded devices 212 2. Stop any A2DP services running on the device 213 Needed to take ownership of the services 214 3. Init sink and source opposite devices 215 4. Start pairing delegate for all Fuchsia devices 216 5. Set sink device to be discoverable 217 6. Discover sink device from source device 218 7. Connect to sink device from source device 219 8. Pair to sink device 220 9. Validate paired devices and services present 221 222 Expected Result: 223 Verify devices are successfully paired and appropriate a2dp 224 services are running. 225 226 Returns: 227 signals.TestPass if no errors 228 signals.TestFailure if there are any errors during the test. 229 230 TAGS: BREDR, A2DP 231 Priority: 0 232 """ 233 self._unbond_all_known_devices() 234 self._kill_media_services() 235 236 source_device_name = generate_id_by_size(10) 237 target_device_name = generate_id_by_size(10) 238 239 self.pri_dut.bts_lib.setName(source_device_name) 240 self.sec_dut.bts_lib.setName(target_device_name) 241 242 input_capabilities = "NONE" 243 output_capabilities = "NONE" 244 self.pri_dut.avdtp_lib.init() 245 self.pri_dut.control_daemon("bt-avrcp.cmx", "start") 246 self.sec_dut.avdtp_lib.init(initiator_delay=2000) 247 self.sec_dut.control_daemon("bt-avrcp-target.cmx", "start") 248 self.pri_dut.bts_lib.acceptPairing(input_capabilities, 249 output_capabilities) 250 251 self.sec_dut.bts_lib.acceptPairing(input_capabilities, 252 output_capabilities) 253 self.sec_dut.bts_lib.setDiscoverable(True) 254 255 unique_mac_addr_id = bredr_scan_for_device_by_name( 256 self.pri_dut, self.log, target_device_name, 257 self.scan_timeout_seconds) 258 259 if not unique_mac_addr_id: 260 raise signals.TestFailure( 261 "Failed to find device {}.".format(target_device_name)) 262 263 connect_result = self.pri_dut.bts_lib.connectDevice(unique_mac_addr_id) 264 if connect_result.get("error") is not None: 265 raise signals.TestFailure("Failed to connect with {}.".format( 266 connect_result.get("error"))) 267 268 if not verify_device_state_by_name( 269 self.pri_dut, self.log, target_device_name, "CONNECTED", None): 270 raise signals.TestFailure( 271 "Failed to connect to device {}.".format(target_device_name)) 272 273 if not verify_device_state_by_name( 274 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 275 raise signals.TestFailure( 276 "Failed to connect to device {}.".format(source_device_name)) 277 278 security_level = "NONE" 279 bondable = True 280 transport = 1 #BREDR 281 pair_result = self.pri_dut.bts_lib.pair(unique_mac_addr_id, 282 security_level, bondable, 283 transport) 284 if pair_result.get("error") is not None: 285 raise signals.TestFailure("Failed to pair with {}.".format( 286 pair_result.get("error"))) 287 288 #TODO: Validation of services and paired devices (b/175641870) 289 # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb 290 # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb 291 #TODO: Make an easy function for checking/updating devices 292 services = None 293 if not verify_device_state_by_name(self.pri_dut, self.log, 294 target_device_name, "BONDED", 295 services): 296 raise signals.TestFailure( 297 "Failed to pair device {}.".format(target_device_name)) 298 299 if not verify_device_state_by_name(self.sec_dut, self.log, 300 source_device_name, "BONDED", 301 services): 302 raise signals.TestFailure( 303 "Failed to pair device {}.".format(source_device_name)) 304 305 raise signals.TestPass("Success") 306