1# Lint as: python2, python3 2# Copyright 2019 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7This class provides wrapper functions for Bluetooth quick health test 8batches or packages 9""" 10 11from __future__ import absolute_import 12from __future__ import division 13from __future__ import print_function 14 15import functools 16import logging 17import tempfile 18import threading 19import time 20 21import common 22from autotest_lib.client.common_lib import error 23from autotest_lib.server import site_utils 24from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests 25from autotest_lib.server.cros.multimedia import remote_facade_factory 26from autotest_lib.client.bin import utils 27from six.moves import range 28 29class BluetoothAdapterQuickTests(bluetooth_adapter_tests.BluetoothAdapterTests): 30 """This class provide wrapper function for Bluetooth quick health test 31 batches or packages. 32 The Bluetooth quick test infrastructure provides a way to quickly run a set 33 of tests. As for today, auto-test ramp up time per test is about 90-120 34 seconds, where a typical Bluetooth test may take ~30-60 seconds to run. 35 36 The quick test infra, implemented in this class, saves this huge overhead 37 by running only the minimal reset and cleanup operations required between 38 each set of tests (takes a few seconds). 39 40 This class provides wrapper functions to start and end a test, a batch or a 41 package. A batch is defined as a set of tests, preferably with a common 42 subject. A package is a set of batches. 43 This class takes care of tests, batches, and packages test results, and 44 prints out summaries to results. The class also resets and cleans up 45 required states between tests, batches and packages. 46 47 A batch can also run as a separate auto-test. There is a place holder to 48 add a way to run a specific test of a batch autonomously. 49 50 A batch can be implemented by inheriting from this class, and using its 51 wrapper functions. A package can be implemented by inheriting from a set of 52 batches. 53 54 Adding a test to one of the batches is as easy as adding a method to the 55 class of the batch. 56 """ 57 58 # Some delay is needed between tests. TODO(yshavit): investigate and remove 59 TEST_SLEEP_SECS = 3 60 GCS_MTBF_BUCKET = 'gs://chromeos-mtbf-bt-results/' 61 62 63 def restart_peers(self): 64 """Restart and clear peer devices""" 65 # Restart the link to device 66 logging.info('Restarting peer devices...') 67 68 # Grab current device list for initialization 69 connected_devices = self.devices 70 self.cleanup(test_state='MID') 71 72 for device_type, device_list in connected_devices.items(): 73 for device in device_list: 74 if device is not None: 75 logging.info('Restarting %s', device_type) 76 self.get_device(device_type, on_start=False) 77 78 79 def start_peers(self, devices): 80 """Start peer devices""" 81 # Start the link to devices 82 if self.use_btpeer: 83 logging.info('Starting peer devices...') 84 self.get_device_rasp(devices) 85 86 # Grab all the addresses to verify RSSI 87 addresses = [] 88 for device_type, device_list in self.devices.items(): 89 # Skip bluetooth_tester since it won't be discoverable 90 if 'TESTER' in device_type: 91 continue 92 93 for device in device_list: 94 addresses.append(device.address) 95 self.start_agent(device) 96 97 # Make sure device RSSI is sufficient 98 self.verify_device_rssi(addresses) 99 100 def _print_delimiter(self): 101 logging.info('=======================================================') 102 103 @staticmethod 104 def _get_update_btpeers_arguments(args_dict=None): 105 """Parse the update_btpeers argument""" 106 key = 'update_btpeers' 107 if args_dict is not None and key in args_dict: 108 return args_dict[key].lower() != 'false' 109 return True 110 111 def quick_test_init(self, 112 host, 113 use_btpeer=True, 114 flag='Quick Health', 115 args_dict=None, 116 start_browser=False): 117 """Inits the test batch""" 118 self.host = host 119 self.start_browser = start_browser 120 self.use_btpeer = use_btpeer 121 update_btpeers = self._get_update_btpeers_arguments(args_dict) 122 btpeer_args = [] 123 if args_dict is not None: 124 btpeer_args = self.host.get_btpeer_arguments(args_dict) 125 #factory can not be declared as local variable, otherwise 126 #factory._proxy.__del__ will be invoked, which shutdown the xmlrpc 127 # server, which log out the user. 128 129 self.factory = remote_facade_factory.RemoteFacadeFactory( 130 host, no_chrome=not self.start_browser, disable_arc=True) 131 try: 132 self.bluetooth_facade = self.factory.create_bluetooth_facade() 133 except Exception as e: 134 logging.error('Exception %s while creating bluetooth_facade', 135 str(e)) 136 raise error.TestFail('Unable to create bluetooth_facade') 137 138 139 if self.use_btpeer: 140 self.input_facade = self.factory.create_input_facade() 141 142 self.host.initialize_btpeer(btpeer_args=btpeer_args) 143 logging.info('%s Bluetooth peers found', 144 len(self.host.btpeer_list)) 145 logging.info('labels: %s', self.host.get_labels()) 146 147 if len(self.host.btpeer_list) == 0: 148 raise error.TestFail('Unable to find a Bluetooth peer') 149 150 # Check the chameleond version on the peer and update if necessary 151 if update_btpeers: 152 if not self.update_btpeer(): 153 logging.error('Updating btpeers failed. Ignored') 154 else: 155 logging.info('No attempting peer update.') 156 157 # Query connected devices on our btpeer at init time 158 self.available_devices = self.list_devices_available() 159 160 for btpeer in self.host.btpeer_list: 161 btpeer.register_raspPi_log(self.outputdir) 162 163 self.btpeer_group = dict() 164 # Create copy of btpeer_group 165 self.btpeer_group_copy = dict() 166 self.group_btpeers_type() 167 168 169 # Clear the active devices for this test 170 self.active_test_devices = {} 171 172 self.enable_disable_debug_log(enable=True) 173 174 # Disable cellular services, as they can sometimes interfere with 175 # suspend/resume, i.e. b/161920740 176 self.enable_disable_cellular(enable=False) 177 178 self.enable_disable_ui(enable=False) 179 180 # Delete files created in previous run 181 self.host.run('[ ! -d {0} ] || rm -rf {0} || true'.format( 182 self.BTMON_DIR_LOG_PATH)) 183 self.host.run('[ ! -d {0} ] || rm -rf {0} || true'.format( 184 self.USBMON_DIR_LOG_PATH)) 185 186 self.start_new_btmon() 187 self.start_new_usbmon() 188 189 self.flag = flag 190 self.test_iter = None 191 192 self.bat_tests_results = [] 193 self.bat_pass_count = 0 194 self.bat_fail_count = 0 195 self.bat_testna_count = 0 196 self.bat_warn_count = 0 197 self.bat_name = None 198 self.bat_iter = None 199 200 self.pkg_tests_results = [] 201 self.pkg_pass_count = 0 202 self.pkg_fail_count = 0 203 self.pkg_testna_count = 0 204 self.pkg_warn_count = 0 205 self.pkg_name = None 206 self.pkg_iter = None 207 self.pkg_is_running = False 208 self.mtbf_end = False 209 self.mtbf_end_lock = threading.Lock() 210 211 212 @staticmethod 213 def quick_test_test_decorator(test_name, devices={}, flags=['All'], 214 model_testNA=[], 215 model_testWarn=[], 216 skip_models=[], 217 skip_chipsets=[], 218 shared_devices_count=0): 219 """A decorator providing a wrapper to a quick test. 220 Using the decorator a test method can implement only the core 221 test and let the decorator handle the quick test wrapper methods 222 (test_start and test_end). 223 224 @param test_name: the name of the test to log. 225 @param devices: list of device names which are going to be used 226 in the following test. 227 @param flags: list of string to describe who should run the 228 test. The string could be one of the following: 229 ['AVL', 'Quick Health', 'All']. 230 @param model_testNA: If the current platform is in this list, 231 failures are emitted as TestNAError. 232 @param model_testWarn: If the current platform is in this list, 233 failures are emitted as TestWarn. 234 @param skip_models: Raises TestNA on these models and doesn't attempt 235 to run the tests. 236 @param skip_chipsets: Raises TestNA on these chipset and doesn't 237 attempt to run the tests. 238 239 """ 240 241 def decorator(test_method): 242 """A decorator wrapper of the decorated test_method. 243 @param test_method: the test method being decorated. 244 @returns the wrapper of the test method. 245 """ 246 247 def _check_runnable(self): 248 """Check if the test could be run""" 249 250 # Check that the test is runnable in current setting 251 if not(self.flag in flags or 'All' in flags): 252 logging.info('SKIPPING TEST %s', test_name) 253 logging.info('flag %s not in %s', self.flag, flags) 254 self._print_delimiter() 255 return False 256 return True 257 258 259 def _is_enough_peers_present(self): 260 """Check if enough peer devices are available.""" 261 262 # Check that btpeer has all required devices before running 263 for device_type, number in devices.items(): 264 if self.available_devices.get(device_type, 0) < number: 265 logging.info('SKIPPING TEST %s', test_name) 266 logging.info('%s not available', device_type) 267 self._print_delimiter() 268 return False 269 270 # Check if there are enough peers 271 total_num_devices = sum(devices.values()) + shared_devices_count 272 if total_num_devices > len(self.host.btpeer_list): 273 logging.info('SKIPPING TEST %s', test_name) 274 logging.info( 275 'Number of devices required %s is greater' 276 'than number of peers available %d', 277 total_num_devices, len(self.host.btpeer_list)) 278 self._print_delimiter() 279 return False 280 return True 281 282 @functools.wraps(test_method) 283 def wrapper(self): 284 """A wrapper of the decorated method.""" 285 # Set test name before exiting so batches correctly identify 286 # failing tests 287 self.test_name = test_name 288 289 if not _check_runnable(self): 290 return 291 292 try: 293 if not _is_enough_peers_present(self): 294 logging.info('Not enough peer available') 295 raise error.TestNAError('Not enough peer available') 296 297 model = self.get_base_platform_name() 298 if model in skip_models: 299 logging.info('SKIPPING TEST %s', test_name) 300 raise error.TestNAError( 301 'Test not supported on this model') 302 303 chipset = self.get_chipset_name() 304 logging.debug('Bluetooth module name is %s', chipset) 305 if chipset in skip_chipsets: 306 logging.info('SKIPPING TEST %s on chipset %s', 307 test_name, chipset) 308 raise error.TestNAError( 309 'Test not supported on this chipset') 310 311 self.quick_test_test_start(test_name, devices, 312 shared_devices_count) 313 314 test_method(self) 315 except error.TestError as e: 316 self.fails.append('[--- error {} ({})]'.format( 317 test_method.__name__, str(e))) 318 except error.TestFail as e: 319 if not bool(self.fails): 320 self.fails.append('[--- failed {} ({})]'.format( 321 test_method.__name__, str(e))) 322 except error.TestNAError as e: 323 self.fails.append('[--- SKIPPED: {}]'.format(str(e))) 324 except Exception as e: 325 self.fails.append('[--- unknown error {} ({})]'.format( 326 test_method.__name__, str(e))) 327 328 self.quick_test_test_end(model_testNA=model_testNA, 329 model_testWarn=model_testWarn) 330 return wrapper 331 332 return decorator 333 334 335 def quick_test_test_start( 336 self, test_name=None, devices={}, shared_devices_count=0): 337 """Start a quick test. The method clears and restarts adapter on DUT 338 as well as peer devices. In addition the methods prints test start 339 traces. 340 """ 341 # Bluetoothd could have crashed behind the scenes; check to see if 342 # everything is still ok and recover if needed. 343 self.test_is_facade_valid() 344 self.test_is_adapter_valid() 345 346 # Reset the adapter 347 self.test_reset_on_adapter() 348 # Initialize bluetooth_adapter_tests class (also clears self.fails) 349 self.initialize() 350 # Start and peer HID devices 351 self.start_peers(devices) 352 self.shared_peers = self.host.btpeer_list[-shared_devices_count:] 353 354 if test_name is not None: 355 time.sleep(self.TEST_SLEEP_SECS) 356 self._print_delimiter() 357 logging.info('Starting test: %s', test_name) 358 self.log_message('Starting test: %s'% test_name) 359 360 def quick_test_test_end(self, model_testNA=[], model_testWarn=[]): 361 """Log and track the test results""" 362 result_msgs = [] 363 model = self.get_base_platform_name() 364 365 if self.test_iter is not None: 366 result_msgs += ['Test Iter: ' + str(self.test_iter)] 367 368 if self.bat_iter is not None: 369 result_msgs += ['Batch Iter: ' + str(self.bat_iter)] 370 371 if self.pkg_is_running is True: 372 result_msgs += ['Package iter: ' + str(self.pkg_iter)] 373 374 if self.bat_name is not None: 375 result_msgs += ['Batch Name: ' + self.bat_name] 376 377 if self.test_name is not None: 378 result_msgs += ['Test Name: ' + self.test_name] 379 380 result_msg = ", ".join(result_msgs) 381 382 if not bool(self.fails): 383 result_msg = 'PASSED | ' + result_msg 384 self.bat_pass_count += 1 385 self.pkg_pass_count += 1 386 # The test should be marked as TESTNA if any of the test expressions 387 # were SKIPPED (they threw their own TESTNA error) or the model is in 388 # the list of NA models (so any failure is considered NA instead) 389 elif model in model_testNA or any(['SKIPPED' in x 390 for x in self.fails]): 391 result_msg = 'TESTNA | ' + result_msg 392 self.bat_testna_count += 1 393 self.pkg_testna_count += 1 394 elif model in model_testWarn: 395 result_msg = 'WARN | ' + result_msg 396 self.bat_warn_count += 1 397 self.pkg_warn_count += 1 398 else: 399 result_msg = 'FAIL | ' + result_msg 400 self.bat_fail_count += 1 401 self.pkg_fail_count += 1 402 403 logging.info(result_msg) 404 self.log_message(result_msg) 405 self._print_delimiter() 406 self.bat_tests_results.append(result_msg) 407 self.pkg_tests_results.append(result_msg) 408 409 if self.test_name is not None: 410 logging.info('Cleanning up and restarting towards next test...') 411 412 413 # Bluetoothd could have crashed behind the scenes; check if everything 414 # is ok and recover if needed. This is done as part of clean-up as well 415 # to make sure we can fully remove pairing info between tests 416 self.test_is_facade_valid() 417 418 self.bluetooth_facade.stop_discovery() 419 420 # Store a copy of active devices for raspi reset in the final step 421 self.active_test_devices = self.devices 422 423 # Disconnect devices used in the test, and remove the pairing. 424 for device_list in self.devices.values(): 425 for device in device_list: 426 if device is not None: 427 self.stop_agent(device) 428 logging.info('Clear device %s from DUT', device.name) 429 self.bluetooth_facade.disconnect_device(device.address) 430 device_is_paired = self.bluetooth_facade.device_is_paired( 431 device.address) 432 if device_is_paired: 433 self.bluetooth_facade.remove_device_object( 434 device.address) 435 436 # Also remove pairing on Peer 437 logging.info('Clearing DUT from %s', device.name) 438 try: 439 device.RemoveDevice(self.bluetooth_facade.address) 440 except Exception as e: 441 # If peer doesn't expose RemoveDevice, ignore failure 442 if not (e.__class__.__name__ == 'Fault' and 443 'is not supported' in str(e)): 444 logging.info('Couldn\'t Remove: {}'.format(e)) 445 raise 446 447 448 # Repopulate btpeer_group for next tests 449 # Clear previous tets's leftover entries. Don't delete the 450 # btpeer_group dictionary though, it'll be used as it is. 451 if self.use_btpeer: 452 for device_type in self.btpeer_group: 453 if len(self.btpeer_group[device_type]) > 0: 454 del self.btpeer_group[device_type][:] 455 456 # Repopulate 457 self.group_btpeers_type() 458 459 # Close the connection between peers 460 self.cleanup(test_state='NEW') 461 462 @staticmethod 463 def quick_test_batch_decorator(batch_name): 464 """A decorator providing a wrapper to a batch. 465 Using the decorator a test batch method can implement only its 466 core tests invocations and let the decorator handle the wrapper, 467 which is taking care for whether to run a specific test or the 468 batch as a whole and and running the batch in iterations 469 470 @param batch_name: the name of the batch to log 471 """ 472 473 def decorator(batch_method): 474 """A decorator wrapper of the decorated test_method. 475 @param test_method: the test method being decorated. 476 @returns the wrapper of the test method. 477 """ 478 479 @functools.wraps(batch_method) 480 def wrapper(self, num_iterations=1, test_name=None): 481 """A wrapper of the decorated method. 482 @param num_iterations: how many interations to run 483 @param test_name: specifc test to run otherwise None to run 484 the whole batch 485 """ 486 if test_name is not None: 487 single_test_method = getattr(self, test_name) 488 for iter in range(1,num_iterations+1): 489 self.test_iter = iter 490 single_test_method() 491 492 if self.fails: 493 # If failure is marked as TESTNA, prioritize that over 494 # a failure. Same with WARN. 495 if self.bat_testna_count > 0: 496 raise error.TestNAError(self.fails) 497 elif self.bat_warn_count > 0: 498 raise error.TestWarn(self.fails) 499 else: 500 raise error.TestFail(self.fails) 501 else: 502 for iter in range(1,num_iterations+1): 503 self.quick_test_batch_start(batch_name, iter) 504 batch_method(self, num_iterations, test_name) 505 self.quick_test_batch_end() 506 return wrapper 507 508 return decorator 509 510 511 def quick_test_batch_start(self, bat_name, iteration=1): 512 """Start a test batch. The method clears and set batch variables""" 513 self.bat_tests_results = [] 514 self.bat_pass_count = 0 515 self.bat_fail_count = 0 516 self.bat_testna_count = 0 517 self.bat_warn_count = 0 518 self.bat_name = bat_name 519 self.bat_iter = iteration 520 521 522 def quick_test_batch_end(self): 523 """Print results summary of a test batch """ 524 logging.info( 525 '%s Test Batch Summary: total pass %d, total fail %d, ' 526 'warn %d, NA %d', self.bat_name, self.bat_pass_count, 527 self.bat_fail_count, self.bat_warn_count, 528 self.bat_testna_count) 529 for result in self.bat_tests_results: 530 logging.info(result) 531 self._print_delimiter(); 532 if self.bat_fail_count > 0: 533 logging.error('===> Test Batch Failed! More than one failure') 534 self._print_delimiter(); 535 if self.pkg_is_running is False: 536 raise error.TestFail(self.bat_tests_results) 537 elif self.bat_testna_count > 0: 538 logging.error('===> Test Batch Passed! Some TestNA results') 539 self._print_delimiter(); 540 if self.pkg_is_running is False: 541 raise error.TestNAError(self.bat_tests_results) 542 elif self.bat_warn_count > 0: 543 logging.error('===> Test Batch Passed! Some WARN results') 544 self._print_delimiter(); 545 if self.pkg_is_running is False: 546 raise error.TestWarn(self.bat_tests_results) 547 else: 548 logging.info('===> Test Batch Passed! zero failures') 549 self._print_delimiter(); 550 551 552 def quick_test_package_start(self, pkg_name): 553 """Start a test package. The method clears and set batch variables""" 554 self.pkg_tests_results = [] 555 self.pkg_pass_count = 0 556 self.pkg_fail_count = 0 557 self.pkg_name = pkg_name 558 self.pkg_is_running = True 559 560 561 def quick_test_print_summary(self): 562 """Print results summary of a test package""" 563 logging.info( 564 '%s Test Package Summary: total pass %d, total fail %d, ' 565 'Warn %d, NA %d', self.pkg_name, self.pkg_pass_count, 566 self.pkg_fail_count, self.pkg_warn_count, 567 self.pkg_testna_count) 568 for result in self.pkg_tests_results: 569 logging.info(result) 570 self._print_delimiter(); 571 572 573 def quick_test_package_update_iteration(self, iteration): 574 """Update state and print log per package iteration. 575 Must be called to have a proper package test result tracking. 576 """ 577 self.pkg_iter = iteration 578 if self.pkg_name is None: 579 logging.error('Error: no quick package is running') 580 raise error.TestFail('Error: no quick package is running') 581 logging.info('Starting %s Test Package iteration %d', 582 self.pkg_name, iteration) 583 584 585 def quick_test_package_end(self): 586 """Print final result of a test package""" 587 if self.pkg_fail_count > 0: 588 logging.error('===> Test Package Failed! More than one failure') 589 self._print_delimiter(); 590 raise error.TestFail(self.bat_tests_results) 591 elif self.pkg_testna_count > 0: 592 logging.error('===> Test Package Passed! Some TestNA results') 593 self._print_delimiter(); 594 raise error.TestNAError(self.bat_tests_results) 595 elif self.pkg_warn_count > 0: 596 logging.error('===> Test Package Passed! Some WARN results') 597 self._print_delimiter(); 598 raise error.TestWarn(self.bat_tests_results) 599 else: 600 logging.info('===> Test Package Passed! zero failures') 601 self._print_delimiter(); 602 self.pkg_is_running = False 603 604 605 def quick_test_cleanup(self): 606 """ Cleanup any state test server and all device""" 607 608 # Clear any raspi devices at very end of test 609 for device_list in self.active_test_devices.values(): 610 for device in device_list: 611 if device is not None: 612 self.clear_raspi_device(device) 613 self.device_set_powered(device, False) 614 615 # Reset the adapter 616 self.test_reset_on_adapter() 617 # Initialize bluetooth_adapter_tests class (also clears self.fails) 618 self.initialize() 619 620 621 @staticmethod 622 def quick_test_mtbf_decorator(timeout_mins, test_name): 623 """A decorator enabling a test to be run as a MTBF test, it will run 624 the underlying test in a infinite loop until it fails or timeout is 625 reached, in both cases the time elapsed time will be reported. 626 627 @param timeout_mins: the max execution time of the test, once the 628 time is up the test will report success and exit 629 @param test_name: the MTBF test name to be output to the dashboard 630 """ 631 632 def decorator(batch_method): 633 """A decorator wrapper of the decorated batch_method. 634 @param batch_method: the batch method being decorated. 635 @returns the wrapper of the batch method. 636 """ 637 638 @functools.wraps(batch_method) 639 def wrapper(self, *args, **kwargs): 640 """A wrapper of the decorated method""" 641 self.mtbf_end = False 642 mtbf_timer = threading.Timer( 643 timeout_mins * 60, self.mtbf_timeout) 644 mtbf_timer.start() 645 start_time = time.time() 646 board = self.host.get_board().split(':')[1] 647 model = self.host.get_model_from_cros_config() 648 build = self.host.get_release_version() 649 milestone = 'M' + self.host.get_chromeos_release_milestone() 650 in_lab = site_utils.host_in_lab(self.host.hostname) 651 while True: 652 with self.mtbf_end_lock: 653 # The test ran the full duration without failure 654 if self.mtbf_end: 655 self.report_mtbf_result( 656 True, start_time, test_name, model, build, 657 milestone, board, in_lab) 658 break 659 try: 660 batch_method(self, *args, **kwargs) 661 except Exception as e: 662 logging.info("Caught a failure: %r", e) 663 self.report_mtbf_result( 664 False, start_time, test_name, model, build, 665 milestone, board, in_lab) 666 # Don't report the test run as failed for MTBF 667 self.fails = [] 668 break 669 670 mtbf_timer.cancel() 671 672 return wrapper 673 674 return decorator 675 676 677 def mtbf_timeout(self): 678 """Handle time out event of a MTBF test""" 679 with self.mtbf_end_lock: 680 self.mtbf_end = True 681 682 683 def report_mtbf_result(self, success, start_time, test_name, model, build, 684 milestone, board, in_lab): 685 """Report MTBF result by uploading it to GCS""" 686 duration_secs = int(time.time() - start_time) 687 start_time = int(start_time) 688 gm_time_struct = time.localtime(start_time) 689 output_file_name = self.GCS_MTBF_BUCKET + \ 690 time.strftime('%Y-%m-%d/', gm_time_struct) + \ 691 time.strftime('%H-%M-%S.csv', gm_time_struct) 692 693 mtbf_result = '{0},{1},{2},{3},{4},{5},{6},{7}'.format( 694 model, build, milestone, start_time * 1000000, duration_secs, 695 success, test_name, board) 696 with tempfile.NamedTemporaryFile() as tmp_file: 697 tmp_file.write(mtbf_result.encode('utf-8')) 698 tmp_file.flush() 699 cmd = 'gsutil cp {0} {1}'.format(tmp_file.name, output_file_name) 700 logging.info('Result to upload %s %s', mtbf_result, cmd) 701 # Only upload the result when running in the lab. 702 if in_lab: 703 logging.info('Uploading result') 704 utils.run(cmd) 705