1#!/usr/bin/env python3 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Setup: 18This test only requires two fuchsia devices. 19""" 20 21from acts import signals 22from acts.base_test import BaseTestClass 23from acts.test_decorators import test_tracker_info 24from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size 25from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name 26from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 27from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices 28from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name 29import time 30 31 32class BtFuchsiaEPTest(BaseTestClass): 33 ble_advertise_interval = 50 34 scan_timeout_seconds = 60 35 default_iterations = 1000 36 adv_name = generate_id_by_size(10) 37 test_adv_data = { 38 "name": adv_name, 39 "appearance": None, 40 "service_data": None, 41 "tx_power_level": None, 42 "service_uuids": None, 43 "manufacturer_data": None, 44 "uris": None, 45 } 46 test_connectable = True 47 test_scan_response = None 48 49 def setup_class(self): 50 super().setup_class() 51 for fd in self.fuchsia_devices: 52 fd.bts_lib.initBluetoothSys() 53 self.pri_dut = self.fuchsia_devices[0] 54 self.sec_dut = self.fuchsia_devices[1] 55 56 def on_fail(self, test_name, begin_time): 57 for fd in self.fuchsia_devices: 58 fd.take_bug_report(test_name, begin_time) 59 self._unbond_all_known_devices() 60 self.sec_dut.ble_lib.bleStopBleAdvertising() 61 self._kill_media_services() 62 63 def teardown_class(self): 64 self._kill_media_services() 65 66 def _kill_media_services(self): 67 """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices. 68 """ 69 ssh_timeout = 30 70 for fd in self.fuchsia_devices: 71 fd.send_command_ssh("killall bt-a2dp*", 72 timeout=ssh_timeout, 73 skip_status_code_check=True) 74 fd.send_command_ssh("killall bt-avrcp*", 75 timeout=ssh_timeout, 76 skip_status_code_check=True) 77 78 def _unbond_all_known_devices(self): 79 """For all Fuchsia devices, unbond any known pairings. 80 """ 81 time.sleep(5) 82 for fd in self.fuchsia_devices: 83 unbond_all_known_devices(fd, self.log) 84 85 def test_ble_awareness(self): 86 """Verify that Fuchsia devices can advertise and scan each other 87 88 Verify a Fuchsia device that starts a BLE advertisesement can be 89 found by a Fuchsia BLE scanner. 90 91 Steps: 92 1. On one Fuchsia device set an advertisement 93 2. On one Fuchsia device, scan for the advertisement by name 94 95 Expected Result: 96 Verify that there are no errors after each GATT connection. 97 98 Returns: 99 signals.TestPass if no errors 100 signals.TestFailure if there are any errors during the test. 101 102 TAGS: BLE 103 Priority: 0 104 """ 105 106 self.sec_dut.ble_lib.bleStartBleAdvertising( 107 self.test_adv_data, self.test_scan_response, 108 self.ble_advertise_interval, self.test_connectable) 109 110 device = le_scan_for_device_by_name(self.pri_dut, self.log, 111 self.adv_name, 112 self.scan_timeout_seconds) 113 self.sec_dut.ble_lib.bleStopBleAdvertising() 114 if device is None: 115 raise signals.TestFailure("Scanner unable to find advertisement.") 116 raise signals.TestPass("Success") 117 118 def test_gatt_central_peripheral(self): 119 """Verify that Fuchsia devices can perform GATT operations 120 121 Verify a Fuchsia devices can perform GATT connections and interactions. 122 123 Steps: 124 1. On one Fuchsia device set an advertisement 125 2. On one Fuchsia device, scan for the advertisement by name 126 3. Perform GATT connection over LE 127 4. Pair both devices. 128 5. Perform GATT read/write operations. 129 6. Perform GATT disconnection. 130 131 Expected Result: 132 Verify that there are no errors after each GATT connection. 133 134 Returns: 135 signals.TestPass if no errors 136 signals.TestFailure if there are any errors during the test. 137 138 TAGS: BLE 139 Priority: 0 140 """ 141 self._unbond_all_known_devices() 142 143 source_device_name = generate_id_by_size(10) 144 self.pri_dut.bts_lib.setName(source_device_name) 145 146 self.sec_dut.ble_lib.bleStartBleAdvertising( 147 self.test_adv_data, self.test_scan_response, 148 self.ble_advertise_interval, self.test_connectable) 149 150 device = le_scan_for_device_by_name(self.pri_dut, self.log, 151 self.adv_name, 152 self.scan_timeout_seconds) 153 if device is None: 154 raise signals.TestFailure("Scanner unable to find advertisement.") 155 156 connect_result = self.pri_dut.gattc_lib.bleConnectToPeripheral( 157 device["id"]) 158 if connect_result.get("error") is not None: 159 raise signals.TestFailure("GATT Connection failed with: {}".format( 160 connect_result.get("error"))) 161 162 if not verify_device_state_by_name(self.pri_dut, self.log, 163 self.adv_name, "CONNECTED", None): 164 raise signals.TestFailure( 165 "Failed to connect to device {}.".format(target_device_name)) 166 167 if not verify_device_state_by_name( 168 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 169 raise signals.TestFailure( 170 "Failed to connect to device {}.".format(source_device_name)) 171 172 security_level = "ENCRYPTED" 173 non_bondable = False 174 transport = 2 #LE 175 self.pri_dut.bts_lib.pair(device["id"], security_level, non_bondable, 176 transport) 177 178 services = None 179 if not verify_device_state_by_name(self.pri_dut, self.log, 180 self.adv_name, "BONDED", services): 181 raise signals.TestFailure( 182 "Failed to pair device {}.".format(target_device_name)) 183 184 if not verify_device_state_by_name(self.sec_dut, self.log, 185 source_device_name, "BONDED", 186 services): 187 raise signals.TestFailure( 188 "Failed to pair device {}.".format(source_device_name)) 189 190 disconnect_result = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 191 device["id"]) 192 if disconnect_result.get("error") is not None: 193 raise signals.TestFailure( 194 "GATT Disconnection failed with: {}".format( 195 connect_result.get("error"))) 196 197 self.sec_dut.ble_lib.bleStopBleAdvertising() 198 199 # TODO: Setup Proper GATT server and verify services published are found 200 201 raise signals.TestPass("Success") 202 203 def test_pairing_a2dp(self): 204 """Verify that Fuchsia devices can pair to each other and establish 205 an A2DP connection 206 207 Verify that Fuchsia devices can pair to each other and establish 208 an A2DP connection 209 210 Steps: 211 1. Clear out all bonded devices 212 2. Stop any A2DP services running on the device 213 Needed to take ownership of the services 214 3. Init sink and source opposite devices 215 4. Start pairing delegate for all Fuchsia devices 216 5. Set sink device to be discoverable 217 6. Discover sink device from source device 218 7. Connect to sink device from source device 219 8. Pair to sink device 220 9. Validate paired devices and services present 221 222 Expected Result: 223 Verify devices are successfully paired and appropriate a2dp 224 services are running. 225 226 Returns: 227 signals.TestPass if no errors 228 signals.TestFailure if there are any errors during the test. 229 230 TAGS: BREDR, A2DP 231 Priority: 0 232 """ 233 self._unbond_all_known_devices() 234 self._kill_media_services() 235 236 source_device_name = generate_id_by_size(10) 237 target_device_name = generate_id_by_size(10) 238 239 self.pri_dut.bts_lib.setName(source_device_name) 240 self.sec_dut.bts_lib.setName(target_device_name) 241 242 input_capabilities = "NONE" 243 output_capabilities = "NONE" 244 245 # Initialize a2dp on both devices. 246 self.pri_dut.avdtp_lib.init() 247 self.sec_dut.avdtp_lib.init() 248 249 self.pri_dut.bts_lib.acceptPairing(input_capabilities, 250 output_capabilities) 251 252 self.sec_dut.bts_lib.acceptPairing(input_capabilities, 253 output_capabilities) 254 self.sec_dut.bts_lib.setDiscoverable(True) 255 256 unique_mac_addr_id = bredr_scan_for_device_by_name( 257 self.pri_dut, self.log, target_device_name, 258 self.scan_timeout_seconds) 259 260 if not unique_mac_addr_id: 261 raise signals.TestFailure( 262 "Failed to find device {}.".format(target_device_name)) 263 264 connect_result = self.pri_dut.bts_lib.connectDevice(unique_mac_addr_id) 265 if connect_result.get("error") is not None: 266 raise signals.TestFailure("Failed to connect with {}.".format( 267 connect_result.get("error"))) 268 269 # We pair before checking the CONNECTED status because BR/EDR semantics 270 # were recently changed such that if pairing is not confirmed, then bt 271 # does not report connected = True. 272 security_level = "NONE" 273 bondable = True 274 transport = 1 #BREDR 275 pair_result = self.pri_dut.bts_lib.pair(unique_mac_addr_id, 276 security_level, bondable, 277 transport) 278 if pair_result.get("error") is not None: 279 raise signals.TestFailure("Failed to pair with {}.".format( 280 pair_result.get("error"))) 281 282 if not verify_device_state_by_name( 283 self.pri_dut, self.log, target_device_name, "CONNECTED", None): 284 raise signals.TestFailure( 285 "Failed to connect to device {}.".format(target_device_name)) 286 287 if not verify_device_state_by_name( 288 self.sec_dut, self.log, source_device_name, "CONNECTED", None): 289 raise signals.TestFailure( 290 "Failed to connect to device {}.".format(source_device_name)) 291 292 #TODO: Validation of services and paired devices (b/175641870) 293 # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb 294 # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb 295 #TODO: Make an easy function for checking/updating devices 296 services = None 297 if not verify_device_state_by_name(self.pri_dut, self.log, 298 target_device_name, "BONDED", 299 services): 300 raise signals.TestFailure( 301 "Failed to pair device {}.".format(target_device_name)) 302 303 if not verify_device_state_by_name(self.sec_dut, self.log, 304 source_device_name, "BONDED", 305 services): 306 raise signals.TestFailure( 307 "Failed to pair device {}.".format(source_device_name)) 308 309 raise signals.TestPass("Success") 310