# # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import os from vts.proto import VtsReportMessage_pb2 as ReportMsg from vts.runners.host import asserts from vts.runners.host import errors from vts.runners.host import keys from vts.runners.host import logger from vts.runners.host import records from vts.runners.host import signals from vts.runners.host import utils from vts.runners.host import const from vts.utils.python.common import list_utils from vts.utils.python.coverage import coverage_utils from vts.utils.python.profiling import profiling_utils from vts.utils.python.reporting import log_uploading_utils from vts.utils.python.systrace import systrace_utils from vts.utils.python.web import feature_utils from vts.utils.python.web import web_utils # Macro strings for test result reporting TEST_CASE_TOKEN = "[Test Case]" RESULT_LINE_TEMPLATE = TEST_CASE_TOKEN + " %s %s" STR_TEST = "test" STR_GENERATE = "generate" class BaseTestClass(object): """Base class for all test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. Most attributes of this class are set at runtime based on the configuration provided. Attributes: tests: A list of strings, each representing a test case name. TAG: A string used to refer to a test class. Default is the test class name. results: A records.TestResult object for aggregating test results from the execution of test cases. currentTestName: A string that's the name of the test case currently being executed. If no test is executing, this should be None. include_filer: A list of string, each representing a test case name to include. exclude_filer: A list of string, each representing a test case name to exclude. Has no effect if include_filer is not empty. abi_name: String, name of abi in use abi_bitness: String, bitness of abi in use web: WebFeature, object storing web feature util for test run coverage: CoverageFeature, object storing coverage feature util for test run profiling: ProfilingFeature, object storing profiling feature util for test run _skip_all_testcases: A boolean, can be set by a subclass in setUpClass() to skip all test cases. """ TAG = None def __init__(self, configs): self.tests = [] if not self.TAG: self.TAG = self.__class__.__name__ # Set all the controller objects and params. for name, value in configs.items(): setattr(self, name, value) self.results = records.TestResult() self.currentTestName = None # Setup test filters (optional) if keys.ConfigKeys.KEY_TEST_SUITE in self.user_params: test_suite = self.user_params[keys.ConfigKeys.KEY_TEST_SUITE] filters = [keys.ConfigKeys.KEY_INCLUDE_FILTER, keys.ConfigKeys.KEY_EXCLUDE_FILTER] for filter in filters: if filter in test_suite: setattr(self, filter, test_suite[filter]) # TODO: get abi information differently for multi-device support. # Set other optional parameters opt_param_names = [keys.ConfigKeys.IKEY_ABI_NAME, keys.ConfigKeys.IKEY_ABI_BITNESS, keys.ConfigKeys.IKEY_SKIP_ON_32BIT_ABI, keys.ConfigKeys.IKEY_SKIP_ON_64BIT_ABI, keys.ConfigKeys.IKEY_RUN_32BIT_ON_64BIT_ABI] self.getUserParams(opt_param_names=opt_param_names) self.web = web_utils.WebFeature(self.user_params) self.coverage = coverage_utils.CoverageFeature( self.user_params, web=self.web) self.profiling = profiling_utils.ProfilingFeature( self.user_params, web=self.web) self.systrace = systrace_utils.SystraceFeature( self.user_params, web=self.web) self.log_uploading = log_uploading_utils.LogUploadingFeature( self.user_params, web=self.web) self._skip_all_testcases = False def __enter__(self): return self def __exit__(self, *args): self._exec_func(self.cleanUp) def getUserParams(self, req_param_names=[], opt_param_names=[], **kwargs): """Unpacks user defined parameters in test config into individual variables. Instead of accessing the user param with self.user_params["xxx"], the variable can be directly accessed with self.xxx. A missing required param will raise an exception. If an optional param is missing, an INFO line will be logged. Args: req_param_names: A list of names of the required user params. opt_param_names: A list of names of the optional user params. **kwargs: Arguments that provide default values. e.g. getUserParams(required_list, opt_list, arg_a="hello") self.arg_a will be "hello" unless it is specified again in required_list or opt_list. Raises: BaseTestError is raised if a required user params is missing from test config. """ for k, v in kwargs.items(): setattr(self, k, v) for name in req_param_names: if name not in self.user_params: raise errors.BaseTestError(("Missing required user param '%s' " "in test configuration.") % name) setattr(self, name, self.user_params[name]) for name in opt_param_names: if name not in self.user_params: logging.info(("Missing optional user param '%s' in " "configuration, continue."), name) else: setattr(self, name, self.user_params[name]) def getUserParam(self, param_name, error_if_not_found=False, log_warning_and_continue_if_not_found=False, default_value=None): """Get the value of a single user parameter. This method returns the value of specified user parameter. Note: this method will not automatically set attribute using the parameter name and value. Args: param_name: string or list of string, denoting user parameter names. If provided a single string, self.user_params[""] will be accessed. If provided multiple strings, self.user_params[""][""][""]... will be accessed. error_if_not_found: bool, whether to raise error if parameter not exists. Default: False log_warning_and_continue_if_not_found: bool, log a warning message if parameter value not found. default_value: object, default value to return if not found. If error_if_not_found is True, this parameter has no effect. Default: None Returns: object, value of the specified parameter name chain if exists; if not exists. """ if not param_name: if error_if_not_found: raise errors.BaseTestError("empty param_name provided") logging.error("empty param_name") return default_value if not isinstance(param_name, list): param_name = [param_name] curr_obj = self.user_params for param in param_name: if param not in curr_obj: msg = "Missing user param '%s' in test configuration." % param_name if error_if_not_found: raise errors.BaseTestError(msg) elif log_warning_and_continue_if_not_found: logging.warn(msg) return default_value curr_obj = curr_obj[param] return curr_obj def _setUpClass(self): """Proxy function to guarantee the base implementation of setUpClass is called. """ return self.setUpClass() def setUpClass(self): """Setup function that will be called before executing any test case in the test class. To signal setup failure, return False or raise an exception. If exceptions were raised, the stack trace would appear in log, but the exceptions would not propagate to upper levels. Implementation is optional. """ pass def _tearDownClass(self): """Proxy function to guarantee the base implementation of tearDownClass is called. """ ret = self.tearDownClass() if self.log_uploading.enabled: self.log_uploading.UploadLogs() if self.web.enabled: self.web.Upload(self.results.requested, self.results.executed) return ret def tearDownClass(self): """Teardown function that will be called after all the selected test cases in the test class have been executed. Implementation is optional. """ pass def _testEntry(self, test_name): """Internal function to be called upon entry of a test case.""" self.currentTestName = test_name if self.web.enabled: self.web.AddTestReport(test_name) def _setUp(self, test_name): """Proxy function to guarantee the base implementation of setUp is called. """ if self.systrace.enabled: self.systrace.StartSystrace() return self.setUp() def setUp(self): """Setup function that will be called every time before executing each test case in the test class. To signal setup failure, return False or raise an exception. If exceptions were raised, the stack trace would appear in log, but the exceptions would not propagate to upper levels. Implementation is optional. """ def _testExit(self, test_name): """Internal function to be called upon exit of a test.""" self.currentTestName = None def _tearDown(self, test_name): """Proxy function to guarantee the base implementation of tearDown is called. """ if self.systrace.enabled: self.systrace.ProcessAndUploadSystrace(test_name) self.tearDown() def tearDown(self): """Teardown function that will be called every time a test case has been executed. Implementation is optional. """ def _onFail(self, record): """Proxy function to guarantee the base implementation of onFail is called. Args: record: The records.TestResultRecord object for the failed test case. """ test_name = record.test_name logging.error(record.details) begin_time = logger.epochToLogLineTimestamp(record.begin_time) logging.info(RESULT_LINE_TEMPLATE, test_name, record.result) if self.web.enabled: self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_FAIL) self.onFail(test_name, begin_time) def onFail(self, test_name, begin_time): """A function that is executed upon a test case failure. User implementation is optional. Args: test_name: Name of the test that triggered this function. begin_time: Logline format timestamp taken when the test started. """ def _onPass(self, record): """Proxy function to guarantee the base implementation of onPass is called. Args: record: The records.TestResultRecord object for the passed test case. """ test_name = record.test_name begin_time = logger.epochToLogLineTimestamp(record.begin_time) msg = record.details if msg: logging.info(msg) logging.info(RESULT_LINE_TEMPLATE, test_name, record.result) if self.web.enabled: self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_PASS) self.onPass(test_name, begin_time) def onPass(self, test_name, begin_time): """A function that is executed upon a test case passing. Implementation is optional. Args: test_name: Name of the test that triggered this function. begin_time: Logline format timestamp taken when the test started. """ def _onSkip(self, record): """Proxy function to guarantee the base implementation of onSkip is called. Args: record: The records.TestResultRecord object for the skipped test case. """ test_name = record.test_name begin_time = logger.epochToLogLineTimestamp(record.begin_time) logging.info(RESULT_LINE_TEMPLATE, test_name, record.result) logging.info("Reason to skip: %s", record.details) if self.web.enabled: self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_SKIP) self.onSkip(test_name, begin_time) def onSkip(self, test_name, begin_time): """A function that is executed upon a test case being skipped. Implementation is optional. Args: test_name: Name of the test that triggered this function. begin_time: Logline format timestamp taken when the test started. """ def _onSilent(self, record): """Proxy function to guarantee the base implementation of onSilent is called. Args: record: The records.TestResultRecord object for the skipped test case. """ test_name = record.test_name begin_time = logger.epochToLogLineTimestamp(record.begin_time) if self.web.enabled: self.web.SetTestResult(None) self.onSilent(test_name, begin_time) def onSilent(self, test_name, begin_time): """A function that is executed upon a test case being marked as silent. Implementation is optional. Args: test_name: Name of the test that triggered this function. begin_time: Logline format timestamp taken when the test started. """ def _onException(self, record): """Proxy function to guarantee the base implementation of onException is called. Args: record: The records.TestResultRecord object for the failed test case. """ test_name = record.test_name logging.exception(record.details) begin_time = logger.epochToLogLineTimestamp(record.begin_time) if self.web.enabled: self.web.SetTestResult(ReportMsg.TEST_CASE_RESULT_EXCEPTION) self.onException(test_name, begin_time) def onException(self, test_name, begin_time): """A function that is executed upon an unhandled exception from a test case. Implementation is optional. Args: test_name: Name of the test that triggered this function. begin_time: Logline format timestamp taken when the test started. """ def _exec_procedure_func(self, func, tr_record): """Executes a procedure function like onPass, onFail etc. This function will alternate the 'Result' of the test's record if exceptions happened when executing the procedure function. This will let signals.TestAbortAll through so abortAll works in all procedure functions. Args: func: The procedure function to be executed. tr_record: The TestResultRecord object associated with the test case executed. """ try: func(tr_record) except signals.TestAbortAll: raise except Exception as e: logging.exception("Exception happened when executing %s for %s.", func.__name__, self.currentTestName) tr_record.addError(func.__name__, e) def filterOneTest(self, test_name): """Check test filters for a test name. The first layer of filter is user defined test filters: if a include filter is not empty, only tests in include filter will be executed regardless whether they are also in exclude filter. Else if include filter is empty, only tests not in exclude filter will be executed. The second layer of filter is checking _skip_all_testcases flag: the subclass may set _skip_all_testcases to True in its implementation of setUpClass. If the flag is set, this method raises signals.TestSkip. The third layer of filter is checking abi bitness: if a test has a suffix indicating the intended architecture bitness, and the current abi bitness information is available, non matching tests will be skipped. By our convention, this function will look for bitness in suffix formated as "32bit", "32Bit", "32BIT", or 64 bit equivalents. This method assumes const.SUFFIX_32BIT and const.SUFFIX_64BIT are in lower cases. Args: test_name: string, name of a test Raises: signals.TestSilent if a test should not be executed signals.TestSkip if a test should be logged but not be executed """ if (hasattr(self, keys.ConfigKeys.KEY_INCLUDE_FILTER) and getattr(self, keys.ConfigKeys.KEY_INCLUDE_FILTER)): if test_name not in getattr(self, keys.ConfigKeys.KEY_INCLUDE_FILTER): logging.info("Test case '%s' not in include filter." % test_name) raise signals.TestSilent( "Test case '%s' not in include filter." % test_name) elif (hasattr(self, keys.ConfigKeys.KEY_EXCLUDE_FILTER) and test_name in getattr(self, keys.ConfigKeys.KEY_EXCLUDE_FILTER)): logging.info("Test case '%s' in exclude filter." % test_name) raise signals.TestSilent("Test case '%s' in exclude filter." % test_name) if self._skip_all_testcases: raise signals.TestSkip("All test cases skipped.") if hasattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS): bitness = getattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS) run_32bit_on_64bit_abi = getattr( self, keys.ConfigKeys.IKEY_RUN_32BIT_ON_64BIT_ABI, False) skip_on_32bit_abi = getattr( self, keys.ConfigKeys.IKEY_SKIP_ON_32BIT_ABI, False) skip_on_64bit_abi = getattr( self, keys.ConfigKeys.IKEY_SKIP_ON_64BIT_ABI, False) asserts.skipIf( ((skip_on_32bit_abi is True) and bitness == "32") or ( (skip_on_64bit_abi is True) and bitness == "64") or (test_name.lower().endswith(const.SUFFIX_32BIT) and bitness != "32") or ( test_name.lower().endswith(const.SUFFIX_64BIT) and bitness != "64" and not run_32bit_on_64bit_abi), "Test case '{}' excluded as ABI bitness is {}.".format( test_name, bitness)) def execOneTest(self, test_name, test_func, args, **kwargs): """Executes one test case and update test results. Executes one test case, create a records.TestResultRecord object with the execution information, and add the record to the test class's test results. Args: test_name: Name of the test. test_func: The test function. args: A tuple of params. kwargs: Extra kwargs. """ is_silenced = False tr_record = records.TestResultRecord(test_name, self.TAG) tr_record.testBegin() logging.info("%s %s", TEST_CASE_TOKEN, test_name) verdict = None try: ret = self._testEntry(test_name) asserts.assertTrue(ret is not False, "Setup test entry for %s failed." % test_name) self.filterOneTest(test_name) try: ret = self._setUp(test_name) asserts.assertTrue(ret is not False, "Setup for %s failed." % test_name) if args or kwargs: verdict = test_func(*args, **kwargs) else: verdict = test_func() finally: self._tearDown(test_name) except (signals.TestFailure, AssertionError) as e: tr_record.testFail(e) self._exec_procedure_func(self._onFail, tr_record) except signals.TestSkip as e: # Test skipped. tr_record.testSkip(e) self._exec_procedure_func(self._onSkip, tr_record) except (signals.TestAbortClass, signals.TestAbortAll) as e: # Abort signals, pass along. tr_record.testFail(e) raise e except signals.TestPass as e: # Explicit test pass. tr_record.testPass(e) self._exec_procedure_func(self._onPass, tr_record) except signals.TestSilent as e: # Suppress test reporting. is_silenced = True self._exec_procedure_func(self._onSilent, tr_record) self.results.requested.remove(test_name) except Exception as e: # Exception happened during test. logging.exception(e) tr_record.testError(e) self._exec_procedure_func(self._onException, tr_record) self._exec_procedure_func(self._onFail, tr_record) else: # Keep supporting return False for now. # TODO(angli): Deprecate return False support. if verdict or (verdict is None): # Test passed. tr_record.testPass() self._exec_procedure_func(self._onPass, tr_record) return # Test failed because it didn't return True. # This should be removed eventually. tr_record.testFail() self._exec_procedure_func(self._onFail, tr_record) finally: if not is_silenced: self.results.addRecord(tr_record) self._testExit(test_name) def runGeneratedTests(self, test_func, settings, args=None, kwargs=None, tag="", name_func=None): """Runs generated test cases. Generated test cases are not written down as functions, but as a list of parameter sets. This way we reduce code repetition and improve test case scalability. Args: test_func: The common logic shared by all these generated test cases. This function should take at least one argument, which is a parameter set. settings: A list of strings representing parameter sets. These are usually json strings that get loaded in the test_func. args: Iterable of additional position args to be passed to test_func. kwargs: Dict of additional keyword args to be passed to test_func tag: Name of this group of generated test cases. Ignored if name_func is provided and operates properly. name_func: A function that takes a test setting and generates a proper test name. The test name should be shorter than utils.MAX_FILENAME_LEN. Names over the limit will be truncated. Returns: A list of settings that did not pass. """ args = args or () kwargs = kwargs or {} failed_settings = [] for s in settings: test_name = "{} {}".format(tag, s) if name_func: try: test_name = name_func(s, *args, **kwargs) except: logging.exception(("Failed to get test name from " "test_func. Fall back to default %s"), test_name) self.results.requested.append(test_name) if len(test_name) > utils.MAX_FILENAME_LEN: test_name = test_name[:utils.MAX_FILENAME_LEN] previous_success_cnt = len(self.results.passed) self.execOneTest(test_name, test_func, (s, ) + args, **kwargs) if len(self.results.passed) - previous_success_cnt != 1: failed_settings.append(s) return failed_settings def _exec_func(self, func, *args): """Executes a function with exception safeguard. This will let signals.TestAbortAll through so abortAll works in all procedure functions. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occured. """ try: return func(*args) except signals.TestAbortAll: raise except: logging.exception("Exception happened when executing %s in %s.", func.__name__, self.TAG) return False def _get_all_test_names(self): """Finds all the function names that match the test case naming convention in this class. Returns: A list of strings, each is a test case name. """ test_names = [] for name in dir(self): if name.startswith(STR_TEST) or name.startswith(STR_GENERATE): attr_func = getattr(self, name) if hasattr(attr_func, "__call__"): test_names.append(name) return test_names def _get_test_funcs(self, test_names): """Obtain the actual functions of test cases based on test names. Args: test_names: A list of strings, each string is a test case name. Returns: A list of tuples of (string, function). String is the test case name, function is the actual test case function. Raises: errors.USERError is raised if the test name does not follow naming convention "test_*". This can only be caused by user input here. """ test_funcs = [] for test_name in test_names: if not hasattr(self, test_name): logging.warning("%s does not have test case %s.", self.TAG, test_name) elif (test_name.startswith(STR_TEST) or test_name.startswith(STR_GENERATE)): test_funcs.append((test_name, getattr(self, test_name))) else: msg = ("Test case name %s does not follow naming convention " "test*, abort.") % test_name raise errors.USERError(msg) return test_funcs def run(self, test_names=None): """Runs test cases within a test class by the order they appear in the execution list. One of these test cases lists will be executed, shown here in priority order: 1. The test_names list, which is passed from cmd line. Invalid names are guarded by cmd line arg parsing. 2. The self.tests list defined in test class. Invalid names are ignored. 3. All function that matches test case naming convention in the test class. Args: test_names: A list of string that are test case names requested in cmd line. Returns: The test results object of this class. """ logging.info("==========> %s <==========", self.TAG) # Devise the actual test cases to run in the test class. if not test_names: if self.tests: # Specified by run list in class. test_names = list(self.tests) else: # No test case specified by user, execute all in the test class test_names = self._get_all_test_names() self.results.requested = [test_name for test_name in test_names if test_name.startswith(STR_TEST)] tests = self._get_test_funcs(test_names) # Setup for the class. try: if self._setUpClass() is False: raise signals.TestFailure("Failed to setup %s." % self.TAG) except Exception as e: logging.exception("Failed to setup %s.", self.TAG) self.results.failClass(self.TAG, e) self._exec_func(self._tearDownClass) return self.results # Run tests in order. try: for test_name, test_func in tests: if test_name.startswith(STR_GENERATE): logging.info( "Executing generated test trigger function '%s'", test_name) test_func() logging.info("Finished '%s'", test_name) else: self.execOneTest(test_name, test_func, None) if self._skip_all_testcases and not self.results.executed: self.results.skipClass(self.TAG, "All test cases skipped; unable to find any test case.") return self.results except signals.TestAbortClass: logging.info("Received TestAbortClass signal") return self.results except signals.TestAbortAll as e: logging.info("Received TestAbortAll signal") # Piggy-back test results on this exception object so we don't lose # results from this test class. setattr(e, "results", self.results) raise e except Exception as e: # Exception happened during test. logging.exception(e) raise e finally: self._exec_func(self._tearDownClass) logging.info("Summary for test class %s: %s", self.TAG, self.results.summary()) def cleanUp(self): """A function that is executed upon completion of all tests cases selected in the test class. This function should clean up objects initialized in the constructor by user. """