1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""This is the PTS base class that is inherited from all PTS 17Tests. 18""" 19 20import ctypes 21import random 22import re 23import time 24import traceback 25 26from ctypes import * 27from datetime import datetime 28 29from acts import signals 30from acts.base_test import BaseTestClass 31from acts.controllers.bluetooth_pts_device import VERDICT_STRINGS 32from acts.controllers.fuchsia_device import FuchsiaDevice 33from acts.signals import TestSignal 34from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 35from acts_contrib.test_utils.bt.bt_constants import gatt_transport 36from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 37 38 39class PtsBaseClass(BaseTestClass): 40 """ Class for representing common functionality across all PTS tests. 41 42 This includes the ability to rerun tests due to PTS instability, 43 common PTS action mappings, and setup/teardown related devices. 44 45 """ 46 scan_timeout_seconds = 10 47 peer_identifier = None 48 49 def setup_class(self): 50 super().setup_class() 51 if 'dut' in self.user_params: 52 if self.user_params['dut'] == 'fuchsia_devices': 53 self.dut = create_bluetooth_device(self.fuchsia_devices[0]) 54 elif self.user_params['dut'] == 'android_devices': 55 self.dut = create_bluetooth_device(self.android_devices[0]) 56 else: 57 raise ValueError('Invalid DUT specified in config. (%s)' % 58 self.user_params['dut']) 59 else: 60 # Default is an fuchsia device 61 self.dut = create_bluetooth_device(self.fuchsia_devices[0]) 62 63 self.characteristic_read_not_permitted_uuid = self.user_params.get( 64 "characteristic_read_not_permitted_uuid") 65 self.characteristic_read_not_permitted_handle = self.user_params.get( 66 "characteristic_read_not_permitted_handle") 67 self.characteristic_read_invalid_handle = self.user_params.get( 68 "characteristic_read_invalid_handle") 69 self.characteristic_attribute_not_found_uuid = self.user_params.get( 70 "characteristic_attribute_not_found_uuid") 71 self.write_characteristic_not_permitted_handle = self.user_params.get( 72 "write_characteristic_not_permitted_handle") 73 74 self.pts = self.bluetooth_pts_device[0] 75 # MMI functions commented out until implemented. Added for tracking 76 # purposes. 77 self.pts_action_mapping = { 78 "A2DP": { 79 1: self.a2dp_mmi_iut_connectable, 80 1002: self.a2dp_mmi_iut_accept_connect, 81 1020: self.a2dp_mmi_initiate_open_stream, 82 }, 83 "GATT": { 84 1: self.mmi_make_iut_connectable, 85 2: self.mmi_iut_initiate_connection, 86 3: self.mmi_iut_initiate_disconnection, 87 # 4: self.mmi_iut_no_security, 88 # 5: self.mmi_iut_initiate_br_connection, 89 10: self.mmi_discover_primary_service, 90 # 11: self.mmi_confirm_no_primary_service_small, 91 # 12: self.mmi_iut_mtu_exchange, 92 # 13: self.mmi_discover_all_service_record, 93 # 14: self.mmi_iut_discover_gatt_service_record, 94 15: self.mmi_iut_find_included_services, 95 # 16: self.mmi_confirm_no_characteristic_uuid_small, 96 17: self.mmi_confirm_primary_service, 97 # 18: self.mmi_send_primary_service_uuid, 98 # 19: self.mmi_confirm_primary_service_uuid, 99 # 22: self.confirm_primary_service_1801, 100 24: self.mmi_confirm_include_service, 101 26: self.mmi_confirm_characteristic_service, 102 # 27: self.perform_read_all_characteristics, 103 29: self. 104 mmi_discover_service_uuid_range, # AKA: discover service by uuid 105 # 31: self.perform_read_all_descriptors, 106 48: self.mmi_iut_send_read_characteristic_handle, 107 58: self.mmi_iut_send_read_descriptor_handle, 108 70: self.mmi_send_write_command, 109 74: self.mmi_send_write_request, 110 76: self.mmi_send_prepare_write, 111 77: self.mmi_iut_send_prepare_write_greater_offset, 112 80: self.mmi_iut_send_prepare_write_greater, 113 110: self.mmi_iut_enter_handle_read_not_permitted, 114 111: self.mmi_iut_enter_uuid_read_not_permitted, 115 118: self.mmi_iut_enter_handle_invalid, 116 119: self.mmi_iut_enter_uuid_attribute_not_found, 117 120: self.mmi_iut_enter_handle_write_not_permitted, 118 2000: self.mmi_verify_secure_id, # Enter pairing pin from DUT. 119 }, 120 "SDP": { 121 # TODO: Implement MMIs as necessary 122 } 123 } 124 self.pts.bind_to(self.process_next_action) 125 126 def teardown_class(self): 127 self.pts.clean_up() 128 129 def setup_test(self): 130 # Always start the test with RESULT_INCOMP 131 self.pts.pts_test_result = VERDICT_STRINGS['RESULT_INCOMP'] 132 133 def teardown_test(self): 134 return True 135 136 @staticmethod 137 def pts_test_wrap(fn): 138 def _safe_wrap_test_case(self, *args, **kwargs): 139 test_id = "{}:{}:{}".format(self.__class__.__name__, fn.__name__, 140 time.time()) 141 log_string = "[Test ID] {}".format(test_id) 142 self.log.info(log_string) 143 try: 144 self.dut.log_info("Started " + log_string) 145 result = fn(self, *args, **kwargs) 146 self.dut.log_info("Finished " + log_string) 147 rerun_count = self.user_params.get("pts_auto_rerun_count", 0) 148 for i in range(int(rerun_count)): 149 if result is not True: 150 self.teardown_test() 151 log_string = "[Rerun Test ID] {}. Run #{} run failed... Retrying".format( 152 test_id, i + 1) 153 self.log.info(log_string) 154 self.setup_test() 155 self.dut.log_info("Rerun Started " + log_string) 156 result = fn(self, *args, **kwargs) 157 else: 158 return result 159 return result 160 except TestSignal: 161 raise 162 except Exception as e: 163 self.log.error(traceback.format_exc()) 164 self.log.error(str(e)) 165 raise 166 return fn(self, *args, **kwargs) 167 168 return _safe_wrap_test_case 169 170 def process_next_action(self, action): 171 func = self.pts_action_mapping.get( 172 self.pts.pts_profile_mmi_request).get(action, "Nothing") 173 if func != 'Nothing': 174 func() 175 176 ### BEGIN A2DP MMI Actions ### 177 178 def a2dp_mmi_iut_connectable(self): 179 self.dut.start_profile_a2dp_sink() 180 self.dut.set_discoverable(True) 181 182 def a2dp_mmi_iut_accept_connect(self): 183 self.dut.start_profile_a2dp_sink() 184 self.dut.set_discoverable(True) 185 186 def a2dp_mmi_initiate_open_stream(self): 187 self.dut.a2dp_initiate_open_stream() 188 189 ### END A2DP MMI Actions ### 190 191 ### BEGIN GATT MMI Actions ### 192 193 def create_write_value_by_size(self, size): 194 write_value = [] 195 for i in range(size): 196 write_value.append(i % 256) 197 return write_value 198 199 def mmi_send_write_command(self): 200 description_to_parse = self.pts.current_implicit_send_description 201 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 202 handle = int(raw_handle.group(1), 16) 203 raw_size = re.search('with <= \'(.*)\' byte', description_to_parse) 204 size = int(raw_size.group(1)) 205 self.dut.gatt_client_write_characteristic_without_response_by_handle( 206 self.peer_identifier, handle, 207 self.create_write_value_by_size(size)) 208 209 def mmi_send_write_request(self): 210 description_to_parse = self.pts.current_implicit_send_description 211 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 212 handle = int(raw_handle.group(1), 16) 213 raw_size = re.search('with <= \'(.*)\' byte', description_to_parse) 214 size = int(raw_size.group(1)) 215 offset = 0 216 self.dut.gatt_client_write_characteristic_by_handle( 217 self.peer_identifier, handle, offset, 218 self.create_write_value_by_size(size)) 219 220 def mmi_send_prepare_write(self): 221 description_to_parse = self.pts.current_implicit_send_description 222 raw_handle = re.search('handle = \'(.*)\'O <=', description_to_parse) 223 handle = int(raw_handle.group(1), 16) 224 raw_size = re.search('<= \'(.*)\' byte', description_to_parse) 225 size = int(math.floor(int(raw_size.group(1)) / 2)) 226 offset = int(size / 2) 227 self.dut.gatt_client_write_characteristic_by_handle( 228 self.peer_identifier, handle, offset, 229 self.create_write_value_by_size(size)) 230 231 def mmi_iut_send_prepare_write_greater_offset(self): 232 description_to_parse = self.pts.current_implicit_send_description 233 raw_handle = re.search('handle = \'(.*)\'O and', description_to_parse) 234 handle = int(raw_handle.group(1), 16) 235 raw_offset = re.search('greater than \'(.*)\' byte', 236 description_to_parse) 237 offset = int(raw_offset.group(1)) 238 size = 1 239 self.dut.gatt_client_write_characteristic_by_handle( 240 self.peer_identifier, handle, offset, 241 self.create_write_value_by_size(size)) 242 243 def mmi_iut_send_prepare_write_greater(self): 244 description_to_parse = self.pts.current_implicit_send_description 245 raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse) 246 handle = int(raw_handle.group(1), 16) 247 raw_size = re.search('greater than \'(.*)\' byte', 248 description_to_parse) 249 size = int(raw_size.group(1)) 250 offset = 0 251 self.dut.gatt_client_write_characteristic_by_handle( 252 self.peer_identifier, handle, offset, 253 self.create_write_value_by_size(size)) 254 255 def mmi_make_iut_connectable(self): 256 adv_data = { 257 "name": fuchsia_name, 258 "appearance": None, 259 "service_data": None, 260 "tx_power_level": None, 261 "service_uuids": None, 262 "manufacturer_data": None, 263 "uris": None, 264 } 265 scan_response = None 266 connectable = True 267 interval = 1000 268 269 self.dut.start_le_advertisement(adv_data, scan_response, interval, 270 connectable) 271 272 def mmi_iut_enter_uuid_read_not_permitted(self): 273 self.pts.extra_answers.append( 274 self.characteristic_read_not_permitted_uuid) 275 276 def mmi_iut_enter_handle_read_not_permitted(self): 277 self.pts.extra_answers.append( 278 self.characteristic_read_not_permitted_handle) 279 280 def mmi_iut_enter_handle_invalid(self): 281 self.pts.extra_answers.append(self.characteristic_read_invalid_handle) 282 283 def mmi_iut_enter_uuid_attribute_not_found(self): 284 self.pts.extra_answers.append( 285 self.characteristic_attribute_not_found_uuid) 286 287 def mmi_iut_enter_handle_write_not_permitted(self): 288 self.pts.extra_answers.append( 289 self.write_characteristic_not_permitted_handle) 290 291 def mmi_verify_secure_id(self): 292 self.pts.extra_answers.append(self.dut.get_pairing_pin()) 293 294 def mmi_discover_service_uuid_range(self, uuid): 295 self.dut.gatt_client_mmi_discover_service_uuid_range( 296 self.peer_identifier, uuid) 297 298 def mmi_iut_initiate_connection(self): 299 autoconnect = False 300 transport = gatt_transport['le'] 301 adv_name = "PTS" 302 self.peer_identifier = self.dut.le_scan_with_name_filter( 303 "PTS", self.scan_timeout_seconds) 304 if self.peer_identifier is None: 305 raise signals.TestFailure("Scanner unable to find advertisement.") 306 tries = 3 307 for _ in range(tries): 308 if self.dut.gatt_connect(self.peer_identifier, transport, 309 autoconnect): 310 return 311 312 raise signals.TestFailure("Unable to connect to peripheral.") 313 314 def mmi_iut_initiate_disconnection(self): 315 if not self.dut.gatt_disconnect(self.peer_identifier): 316 raise signals.TestFailure("Failed to disconnect from peer.") 317 318 def mmi_discover_primary_service(self): 319 self.dut.gatt_refresh() 320 321 def mmi_iut_find_included_services(self): 322 self.dut.gatt_refresh() 323 324 test_result = self.pts.execute_test(test_name) 325 return test_result 326 327 def mmi_confirm_primary_service(self): 328 # TODO: Write verifier that 1800 and 1801 exists. For now just pass. 329 return True 330 331 def mmi_confirm_characteristic_service(self): 332 # TODO: Write verifier that no services exist. For now just pass. 333 return True 334 335 def mmi_confirm_include_service(self, uuid_description): 336 # TODO: Write verifier that input services exist. For now just pass. 337 # Note: List comes in the form of a long string to parse: 338 # Attribute Handle = '0002'O Included Service Attribute handle = '0080'O,End Group Handle = '0085'O,Service UUID = 'A00B'O 339 # \n 340 # Attribute Handle = '0021'O Included Service Attribute handle = '0001'O,End Group Handle = '0006'O,Service UUID = 'A00D'O 341 # \n ... 342 return True 343 344 def mmi_iut_send_read_characteristic_handle(self): 345 description_to_parse = self.pts.current_implicit_send_description 346 raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse) 347 handle = int(raw_handle.group(1), 16) 348 self.dut.gatt_client_read_characteristic_by_handle( 349 self.peer_identifier, handle) 350 351 def mmi_iut_send_read_descriptor_handle(self): 352 description_to_parse = self.pts.current_implicit_send_description 353 raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse) 354 handle = int(raw_handle.group(1), 16) 355 self.dut.gatt_client_descriptor_read_by_handle(self.peer_identifier, 356 handle) 357 358 ### END GATT MMI Actions ### 359