# # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from future import standard_library standard_library.install_aliases() import copy import importlib import inspect import logging import os import pkgutil import signal import sys import thread import threading from vts.runners.host import base_test from vts.runners.host import config_parser from vts.runners.host import keys from vts.runners.host import logger from vts.runners.host import records from vts.runners.host import signals from vts.runners.host import utils def main(): """Execute the test class in a test module. This is to be used in a test script's main so the script can be executed directly. It will discover all the classes that inherit from BaseTestClass and excute them. all the test results will be aggregated into one. A VTS host-driven test case has three args: 1st arg: the path of a test case config file. 2nd arg: the serial ID of a target device (device config). 3rd arg: the path of a test case data dir. Returns: The TestResult object that holds the results of the test run. """ test_classes = [] main_module_members = sys.modules["__main__"] for _, module_member in main_module_members.__dict__.items(): if inspect.isclass(module_member): if issubclass(module_member, base_test.BaseTestClass): test_classes.append(module_member) # TODO(angli): Need to handle the case where more than one test class is in # a test script. The challenge is to handle multiple configs and how to do # default config in this case. if len(test_classes) != 1: logging.error("Expected 1 test class per file, found %s.", len(test_classes)) sys.exit(1) test_result = runTestClass(test_classes[0]) return test_result def runTestClass(test_class): """Execute one test class. This will create a TestRunner, execute one test run with one test class. Args: test_class: The test class to instantiate and execute. Returns: The TestResult object that holds the results of the test run. """ test_cls_name = test_class.__name__ if len(sys.argv) < 2: logging.warning("Missing a configuration file. Using the default.") test_configs = [config_parser.GetDefaultConfig(test_cls_name)] else: try: config_path = sys.argv[1] baseline_config = config_parser.GetDefaultConfig(test_cls_name) baseline_config[keys.ConfigKeys.KEY_TESTBED] = [ baseline_config[keys.ConfigKeys.KEY_TESTBED] ] test_configs = config_parser.load_test_config_file( config_path, baseline_config=baseline_config) except IndexError: logging.error("No valid config file found.") sys.exit(1) except Exception as e: logging.error("Unexpected exception") logging.exception(e) test_identifiers = [(test_cls_name, None)] for config in test_configs: if keys.ConfigKeys.KEY_TEST_MAX_TIMEOUT in config: timeout_sec = int(config[keys.ConfigKeys.KEY_TEST_MAX_TIMEOUT]) / 1000.0 else: timeout_sec = 60 * 60 * 3 logging.warning("%s unspecified. Set timeout to %s seconds.", keys.ConfigKeys.KEY_TEST_MAX_TIMEOUT, timeout_sec) # The default SIGINT handler sends KeyboardInterrupt to main thread. # On Windows, raising CTRL_C_EVENT, which is received as SIGINT, # has no effect on non-console process. interrupt_main() works but # does not unblock main thread's IO immediately. timeout_func = (raiseSigint if not utils.is_on_windows() else thread.interrupt_main) sig_timer = threading.Timer(timeout_sec, timeout_func) tr = TestRunner(config, test_identifiers) tr.parseTestConfig(config) try: sig_timer.start() tr.runTestClass(test_class, None) except KeyboardInterrupt as e: logging.exception("Aborted by timeout or ctrl+C: %s", e) except Exception as e: logging.error("Unexpected exception") logging.exception(e) finally: sig_timer.cancel() tr.stop() return tr.results def raiseSigint(): """Raises SIGINT.""" os.kill(os.getpid(), signal.SIGINT) class TestRunner(object): """The class that instantiates test classes, executes test cases, and report results. Attributes: test_run_info: A dictionary containing the information needed by test classes for this test run, including params, controllers, and other objects. All of these will be passed to test classes. test_configs: A dictionary that is the original test configuration passed in by user. id: A string that is the unique identifier of this test run. log_path: A string representing the path of the dir under which all logs from this test run should be written. controller_registry: A dictionary that holds the controller objects used in a test run. controller_destructors: A dictionary that holds the controller distructors. Keys are controllers' names. run_list: A list of tuples specifying what tests to run. results: The test result object used to record the results of this test run. running: A boolean signifies whether this test run is ongoing or not. test_cls_instances: list of test class instances that were executed or scheduled to be executed. """ def __init__(self, test_configs, run_list): self.test_run_info = {} self.test_run_info[keys.ConfigKeys.IKEY_DATA_FILE_PATH] = getattr( test_configs, keys.ConfigKeys.IKEY_DATA_FILE_PATH, "./") self.test_configs = test_configs self.testbed_configs = self.test_configs[keys.ConfigKeys.KEY_TESTBED] self.testbed_name = self.testbed_configs[ keys.ConfigKeys.KEY_TESTBED_NAME] start_time = logger.getLogFileTimestamp() self.id = "{}@{}".format(self.testbed_name, start_time) # log_path should be set before parsing configs. l_path = os.path.join(self.test_configs[keys.ConfigKeys.KEY_LOG_PATH], self.testbed_name, start_time) self.log_path = os.path.abspath(l_path) logger.setupTestLogger(self.log_path, self.testbed_name) self.controller_registry = {} self.controller_destructors = {} self.run_list = run_list self.results = records.TestResult() self.running = False self.test_cls_instances = [] def __enter__(self): return self def __exit__(self, *args): self.stop() def importTestModules(self, test_paths): """Imports test classes from test scripts. 1. Locate all .py files under test paths. 2. Import the .py files as modules. 3. Find the module members that are test classes. 4. Categorize the test classes by name. Args: test_paths: A list of directory paths where the test files reside. Returns: A dictionary where keys are test class name strings, values are actual test classes that can be instantiated. """ def is_testfile_name(name, ext): if ext == ".py": if name.endswith("Test") or name.endswith("_test"): return True return False file_list = utils.find_files(test_paths, is_testfile_name) test_classes = {} for path, name, _ in file_list: sys.path.append(path) try: module = importlib.import_module(name) except: for test_cls_name, _ in self.run_list: alt_name = name.replace('_', '').lower() alt_cls_name = test_cls_name.lower() # Only block if a test class on the run list causes an # import error. We need to check against both naming # conventions: AaaBbb and aaa_bbb. if name == test_cls_name or alt_name == alt_cls_name: msg = ("Encountered error importing test class %s, " "abort.") % test_cls_name # This exception is logged here to help with debugging # under py2, because "raise X from Y" syntax is only # supported under py3. logging.exception(msg) raise USERError(msg) continue for member_name in dir(module): if not member_name.startswith("__"): if member_name.endswith("Test"): test_class = getattr(module, member_name) if inspect.isclass(test_class): test_classes[member_name] = test_class return test_classes def verifyControllerModule(self, module): """Verifies a module object follows the required interface for controllers. Args: module: An object that is a controller module. This is usually imported with import statements or loaded by importlib. Raises: ControllerError is raised if the module does not match the vts.runners.host controller interface, or one of the required members is null. """ required_attributes = ("create", "destroy", "VTS_CONTROLLER_CONFIG_NAME") for attr in required_attributes: if not hasattr(module, attr): raise signals.ControllerError( ("Module %s missing required " "controller module attribute %s.") % (module.__name__, attr)) if not getattr(module, attr): raise signals.ControllerError( ("Controller interface %s in %s " "cannot be null.") % (attr, module.__name__)) def registerController(self, module, start_services=True): """Registers a controller module for a test run. This declares a controller dependency of this test class. If the target module exists and matches the controller interface, the controller module will be instantiated with corresponding configs in the test config file. The module should be imported first. Params: module: A module that follows the controller module interface. start_services: boolean, controls whether services (e.g VTS agent) are started on the target. Returns: A list of controller objects instantiated from controller_module. Raises: ControllerError is raised if no corresponding config can be found, or if the controller module has already been registered. """ logging.info("cwd: %s", os.getcwd()) logging.info("adb devices: %s", module.list_adb_devices()) self.verifyControllerModule(module) module_ref_name = module.__name__.split('.')[-1] if module_ref_name in self.controller_registry: raise signals.ControllerError( ("Controller module %s has already " "been registered. It can not be " "registered again.") % module_ref_name) # Create controller objects. create = module.create module_config_name = module.VTS_CONTROLLER_CONFIG_NAME if module_config_name not in self.testbed_configs: raise signals.ControllerError(("No corresponding config found for" " %s") % module_config_name) try: # Make a deep copy of the config to pass to the controller module, # in case the controller module modifies the config internally. original_config = self.testbed_configs[module_config_name] controller_config = copy.deepcopy(original_config) logging.info("controller_config: %s", controller_config) if "use_vts_agent" not in self.testbed_configs: objects = create(controller_config, start_services) else: objects = create(controller_config, self.testbed_configs["use_vts_agent"]) except: logging.exception(("Failed to initialize objects for controller " "%s, abort!"), module_config_name) raise if not isinstance(objects, list): raise ControllerError(("Controller module %s did not return a list" " of objects, abort.") % module_ref_name) self.controller_registry[module_ref_name] = objects logging.debug("Found %d objects for controller %s", len(objects), module_config_name) destroy_func = module.destroy self.controller_destructors[module_ref_name] = destroy_func return objects def unregisterControllers(self): """Destroy controller objects and clear internal registry. This will be called at the end of each TestRunner.run call. """ for name, destroy in self.controller_destructors.items(): try: logging.debug("Destroying %s.", name) dut = self.controller_destructors[name][0] destroy(self.controller_registry[name]) except: logging.exception("Exception occurred destroying %s.", name) self.controller_registry = {} self.controller_destructors = {} def parseTestConfig(self, test_configs): """Parses the test configuration and unpacks objects and parameters into a dictionary to be passed to test classes. Args: test_configs: A json object representing the test configurations. """ self.test_run_info[ keys.ConfigKeys.IKEY_TESTBED_NAME] = self.testbed_name # Unpack other params. self.test_run_info["registerController"] = self.registerController self.test_run_info[keys.ConfigKeys.IKEY_LOG_PATH] = self.log_path user_param_pairs = [] for item in test_configs.items(): if item[0] not in keys.ConfigKeys.RESERVED_KEYS: user_param_pairs.append(item) self.test_run_info[keys.ConfigKeys.IKEY_USER_PARAM] = copy.deepcopy( dict(user_param_pairs)) def runTestClass(self, test_cls, test_cases=None): """Instantiates and executes a test class. If test_cases is None, the test cases listed by self.tests will be executed instead. If self.tests is empty as well, no test case in this test class will be executed. Args: test_cls: The test class to be instantiated and executed. test_cases: List of test case names to execute within the class. Returns: A tuple, with the number of cases passed at index 0, and the total number of test cases at index 1. """ self.running = True with test_cls(self.test_run_info) as test_cls_instance: try: if test_cls_instance not in self.test_cls_instances: self.test_cls_instances.append(test_cls_instance) cls_result = test_cls_instance.run(test_cases) except signals.TestAbortAll as e: raise e def run(self): """Executes test cases. This will instantiate controller and test classes, and execute test classes. This can be called multiple times to repeatly execute the requested test cases. A call to TestRunner.stop should eventually happen to conclude the life cycle of a TestRunner. Args: test_classes: A dictionary where the key is test class name, and the values are actual test classes. """ if not self.running: self.running = True # Initialize controller objects and pack appropriate objects/params # to be passed to test class. self.parseTestConfig(self.test_configs) t_configs = self.test_configs[keys.ConfigKeys.KEY_TEST_PATHS] test_classes = self.importTestModules(t_configs) logging.debug("Executing run list %s.", self.run_list) try: for test_cls_name, test_case_names in self.run_list: if not self.running: break if test_case_names: logging.debug("Executing test cases %s in test class %s.", test_case_names, test_cls_name) else: logging.debug("Executing test class %s", test_cls_name) try: test_cls = test_classes[test_cls_name] except KeyError: raise USERError( ("Unable to locate class %s in any of the test " "paths specified.") % test_cls_name) try: self.runTestClass(test_cls, test_case_names) except signals.TestAbortAll as e: logging.warning( ("Abort all subsequent test classes. Reason: " "%s"), e) raise except Exception as e: logging.error("Unexpected exception") logging.exception(e) finally: self.unregisterControllers() def stop(self): """Releases resources from test run. Should always be called after TestRunner.run finishes. This function concludes a test run and writes out a test report. """ if self.running: for test_cls_instance in self.test_cls_instances: self.results += test_cls_instance.results msg = "\nSummary for test run %s: %s\n" % (self.id, self.results.summary()) self._writeResultsJsonString() logging.info(msg.strip()) logger.killTestLogger(logging.getLogger()) self.running = False def _writeResultsJsonString(self): """Writes out a json file with the test result info for easy parsing. """ path = os.path.join(self.log_path, "test_run_summary.json") with open(path, 'w') as f: f.write(self.results.jsonString())