#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2022 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import re import sys import time import traceback import warnings from functools import wraps from xdevice import convert_serial from xdevice import get_decode from xdevice import get_cst_time from xdevice import ConfigConst from devicetest.core.error_message import ErrorMessage from devicetest.core.exception import BaseTestError from devicetest.core.exception import HdcCommandRejectedException from devicetest.core.exception import ShellCommandUnresponsiveException from devicetest.core.exception import DeviceNotFound from devicetest.core.exception import AppInstallError from devicetest.core.exception import RpcNotRunningError from devicetest.core.exception import TestFailure from devicetest.core.exception import TestError from devicetest.core.exception import TestSkip from devicetest.core.exception import TestTerminated from devicetest.core.exception import TestAbortManual from devicetest.core.exception import DeviceTestError from devicetest.core.exception import TestAssertionError from devicetest.core.constants import RunResult from devicetest.core.constants import RunSection from devicetest.core.constants import RunStatus from devicetest.core.variables import DeccVariable from devicetest.core.variables import CurCase from devicetest.utils.time_util import TS from devicetest.utils.type_utils import T from devicetest.log.logger import DeviceTestLog as log from devicetest.controllers.tools.screen_agent import ScreenAgent RESULT_LINE_TEMPLATE = "[Test Step] %s %s" def validate_test_name(name): """Checks if a test name is not null. Args: name: name of a test case. Raises: BaseTestError is raised if the name is null. """ if name == "" or name is None or len(name) < 1: raise BaseTestError("Invalid test case name found: {}, " "test method couldn't be none.".format(name)) class DeviceRoot: is_root_device = False def __init__(self): pass @staticmethod def set_device_root(is_root): DeviceRoot.is_root_device = is_root @staticmethod def get_device_root(): return DeviceRoot.is_root_device class BaseCase: """Base class for all test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): self.cur_case = None self.devices = [] self.configs = configs self.result = RunResult.PASSED self.last_result = RunResult.PASSED self.test_method_result = RunResult.PASSED self.section = RunSection.SETUP self.error_msg = '' self.start_time = get_cst_time() self.log = self.configs["log"] self.set_project(self.configs["project"]) self.con_fail_times = 0 self.fail_times = 0 self.step_flash_fail_msg = False self.pass_through = None self._test_args_para_parse(self.configs.get("testargs", None)) # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 self._is_loop_scenario = False # proxy function self.execption_callback = None # case end function self.case_end_callback = None def __enter__(self): return self def __exit__(self, *args): self._exec_func(self.clean_up) def _test_args_para_parse(self, paras): if not paras: return paras = dict(paras) for para_name in paras.keys(): para_name = para_name.strip() para_values = paras.get(para_name, []) if para_name == ConfigConst.pass_through: self.pass_through = para_values else: continue def _print_error(self, exception, error=None, result=RunResult.FAILED, refresh_method_result=False): self.result = result if refresh_method_result: self.test_method_result = result if error is not None: self.error_msg = self.generate_fail_msg("{}: {}".format(error.Topic, exception)) self.log.error(error.Message.en, error_no=error.Code) else: self.error_msg = str(exception) trace_info = traceback.format_exc() self.log.error(self.error_msg) self.log.error(trace_info) index = self.cur_case.step_index if index == -1: return UpdateStep(index, result="fail\n" + _get_fail_line_from_exception(trace_info, self.TAG)) def setup(self): """Setup function that will be called before executing any test case in the test class. Implementation is optional. """ return True def process(self): """process function that will be called before setup function and after teardown function in the test class. Implementation is optional. """ pass def teardown(self): """Teardown function that will be called after all the selected test cases in the test class have been executed. Implementation is optional. """ pass def exec_one_testcase(self, test_name, test_func): """Executes one test case and update test results. Args: test_name: Name of the test. test_func: The test function. Returns: True if the test passes, False otherwise. """ if not self._exec_func(self.setup_test): self.log.error("Setup for {} failed, skipping.".format(test_name)) _result = None error = None try: verdict = test_func() except TestSkip: # Test skipped. _result = True self.log.debug("TestSkip") except (DeviceNotFound, TestAssertionError, TestTerminated, TestAbortManual, TestError, BaseTestError, DeviceTestError) as exception: error = exception self._print_error(exception, refresh_method_result=True) except HdcCommandRejectedException as exception: error = exception self._print_error(exception, ErrorMessage.Error_01211, refresh_method_result=True) except ShellCommandUnresponsiveException as exception: error = exception self._print_error(exception, ErrorMessage.Error_01212, refresh_method_result=True) except AppInstallError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01213, refresh_method_result=True) except RpcNotRunningError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01440, refresh_method_result=True) except ConnectionRefusedError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01217, refresh_method_result=True) except ImportError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01100, refresh_method_result=True) except TestFailure as exception: error = exception self._print_error(exception, refresh_method_result=True) except Exception as exception: error = exception self._print_error(exception, ErrorMessage.Error_01203, refresh_method_result=True) else: # loop执行场景,由其方法内设置测试结果 if self._is_loop_scenario: _result = True return if (verdict is None or verdict is True) \ and self.test_method_result == RunResult.PASSED: # Test passed. self.print_case_result(test_name, RunResult.PASSED) _result = True # Test failed because it didn't return True. # This should be removed eventually. else: error_msg = "test func '{}' The actual input value of " \ "the checkpoint is inconsistent with the " \ "expected result.".format(test_func.__name__) self.log.error( ErrorMessage.Error_01200.Message.en.format(error_msg), error_no=ErrorMessage.Error_01200.Code) self.test_method_result = self.result = RunResult.FAILED self.error_msg = "{}:{}".format(ErrorMessage.Error_01200.Topic, error_msg) _result = False finally: if self.execption_callback is not None and error is not None: self.execption_callback(error) self._exec_func(self.teardown_test) def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occured. """ ret = False error = None try: func(*args) ret = True except (TestError, TestAbortManual, TestTerminated, TestAssertionError, DeviceTestError, DeviceNotFound) as exception: error = exception self._print_error(exception) except HdcCommandRejectedException as exception: error = exception self._print_error(exception, ErrorMessage.Error_01211) except ShellCommandUnresponsiveException as exception: error = exception self._print_error(exception, ErrorMessage.Error_01212) except AppInstallError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01213) except RpcNotRunningError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01440) except ConnectionRefusedError as exception: error = exception self._print_error(exception, ErrorMessage.Error_01217) except Exception as exception: error = exception self._print_error(exception) finally: if self.execption_callback is not None and error is not None: self.execption_callback(error) return ret def get_error_code(self): if self.section == RunSection.SETUP: self.log.error(ErrorMessage.Error_01202.Message.en, error_no=ErrorMessage.Error_01202.Code) return ErrorMessage.Error_01202.Topic elif self.section == RunSection.TEARDOWN: self.log.error(ErrorMessage.Error_01204.Message.en, error_no=ErrorMessage.Error_01204.Code) return ErrorMessage.Error_01204.Topic else: self.log.error(ErrorMessage.Error_01203.Message.en, error_no=ErrorMessage.Error_01203.Code) return ErrorMessage.Error_01203.Topic def _get_test_funcs(self): # All tests are selected if test_cases list is None. # Load functions based on test names. Also find the longest test name. test_funcs = [] for test_name in self.tests: try: validate_test_name(test_name) test_funcs.append((test_name, getattr(self, test_name))) except AttributeError: self.result = RunResult.FAILED self.error_msg = "{} does not have test step {}.".format( self.TAG, test_name) self.log.error(self.error_msg) except BaseTestError as exception: self.result = RunResult.FAILED self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc())) self.log.error(str(exception)) return test_funcs def run(self, test_names=None): """Runs test cases within a test class by the order they appear in the test list. Being in the test_names list makes the test case "requested". If its name passes validation, then it'll be executed, otherwise the name will be skipped. Args: test_names: A list of names of the requested test cases. If None, all test cases in the class are considered requested. Returns: A tuple of: The number of requested test cases, the number of test cases executed, and the number of test cases passed. """ if RunStatus.FINISHED == self.run_setup(): return self.run_tests(test_names) self.run_teardown() if self.case_end_callback is not None: self.case_end_callback() def run_setup(self): self.section = RunSection.SETUP self.cur_case.set_run_section(self.section) self.run_setup_start() self.log.info("**********SetUp Starts!") ret = self._exec_func(self.setup) if not ret: self.log.info("**********SetUp Ends!") self.log.error("setup step fails") self.section = RunSection.TEARDOWN self.cur_case.set_run_section(self.section) self.log.info("**********TearDown Starts!") self._exec_func(self.teardown) if self.result != RunResult.BLOCKED: self.result = RunResult.FAILED self.log.info('**********TearDown Ends!') self.print_case_result(self.TAG, self.result) return RunStatus.FINISHED else: self.log.info("**********SetUp Ends!") self.run_setup_end() return RunStatus.RUNNING def run_tests(self, test_names): self.section = RunSection.TEST self.cur_case.set_run_section(self.section) if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list): tests = self._get_test_funcs() for test_name, test_func in tests: self.run_test(test_name, test_func) else: self.run_process() self.log.info("**********All test methods Ends!**********") def run_process(self): self._exec_func(self.process) def run_test(self, test_name, test_func): self.log.info("[Test Step] {}".format(test_name)) self.test_method_result = RunResult.PASSED self.exec_one_testcase(test_name, test_func) self.test_method_end(test_name) def run_teardown(self): self.section = RunSection.TEARDOWN self.cur_case.set_run_section(self.section) self.run_teardown_start() self.log.info("**********TearDown Starts!") self._exec_func(self.teardown) self.log.info("**********TearDown Ends!") self.run_teardown_end() self._exec_func(self.clean_up) def run_perf_models(self, models, fail_break=False): """ models: list, list of model object fail_break: bool, if this is set to True, break down the loop of model execution when it fails """ self._is_loop_scenario = True fail_models, pass_models, total = [], [], len(models) for model in models: model_name = model.__class__.__name__ # 预置model的测试结果pass,避免执行循环时测试结果受到干扰 self.test_method_result = RunResult.PASSED self.log.info("Executing test model {}".format(model_name)) # 执行jump_to_start_anchor成功后,再执行execute self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor) if self.test_method_result == RunResult.PASSED: self.exec_one_testcase("{}.execute".format(model_name), model.execute) if self.test_method_result == RunResult.PASSED: pass_models.append(model_name) continue fail_models.append(model_name) if fail_break: break fail_cnt, pass_cnt = len(fail_models), len(pass_models) self.log.info("Test models executed result with " "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models)) # 所有model执行通过,用例pass if pass_cnt == total: self.error_msg = "" self.result = RunResult.PASSED return # 设置因fail退出,用例fail if fail_break: self.result = RunResult.FAILED return desc = ["(", ", ".join(fail_models[:4])] if fail_cnt > 4: desc.append(", etc.") desc.append(")") fail_desc = "".join(desc) # 所有model执行失败 if fail_cnt == total: self.error_msg = "All test models fail to be executed {}".format(fail_desc) self.result = RunResult.FAILED return # 部分model通过 self.error_msg = "Some test models fail to be executed {}".format(fail_desc) self.result = RunResult.FAILED def test_method_end(self, test_name): """ case test method end event """ self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result)) self.log.info("TestMethod: {} End".format(test_name)) def clean_up(self): """A function that is executed upon completion of all tests cases selected in the test class. This function should clean up objects initialized in the constructor by user. """ pass def run_setup_start(self): """A function that is before all tests cases and before setup cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.setup_start() def setup_start(self): """A function that is before all tests cases selected in the test class. This function can be used to execute before running. """ pass def run_setup_end(self): """A function that is before all tests cases and after setup cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.setup_end() def setup_end(self): """A function that is after setup cases selected in the test class. This function can be used to execute after setup. """ pass @classmethod def setup_test(cls): """Setup function that will be called every time before executing each test case in the test class. Implementation is optional. """ return True def teardown_test(self): """Teardown function that will be called every time a test case has been executed. Implementation is optional. """ pass def run_teardown_start(self): """A function that is after all tests cases and before teardown cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.teardown_start() def teardown_start(self): """A function that is before teardown cases selected in the test class. This function can be used to execute before running. """ pass def run_teardown_end(self): """A function that is after all tests cases and teardown cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.teardown_end() def teardown_end(self): """A function that is after teardown cases selected in the test class. This function can be used to execute before running. """ pass def print_case_result(self, case, result): self.log.info("****************************Test {} result is: {}" .format(case, result)) def generate_fail_msg(self, msg): if isinstance(self.error_msg, str) and self.error_msg != "": pass else: self.error_msg = msg return self.error_msg def set_log(self, _log): self.log = _log def set_project(self, project): self.project = project DeccVariable.set_project_obj(self.project) self.cur_case = DeccVariable.cur_case() self._init_case_var(self.TAG) self.log.info("init case variables success.") def _init_case_var(self, tag): if tag: self.cur_case.set_name(tag) else: self.cur_case.set_name(self.project.execute_case_name) if not hasattr(self, "tests"): setattr(self, "tests", ["process"]) self.cur_case.set_step_total(1) self.cur_case.set_case_screenshot_dir(self.project.test_suite_path, self.project.task_report_dir, self.project.cur_case_full_path) self.cur_case.report_path = self.cur_case.case_screenshot_dir + ".html" @classmethod def step(cls, _ad, stepepr): result = stepepr if result is not None and isinstance(result, bool) and not result: _ad.log.error(_ad.device_id + " exec step is fail") elif _ad.screenshot: pass return result def set_screenrecorder_and_screenshot(self, screenrecorder: bool, screenshot: bool = True): """ Set whether to enable screen recording or screenshot for the device in the test case. """ for device in self.devices: setattr(device, "screenshot", screenshot) if hasattr(device, "is_oh"): setattr(device, "screenrecorder", screenrecorder) class TestCase(BaseCase): """Base class for all test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): super().__init__(tag, configs) self.devices = [] self.device1 = None self.device2 = None self.set_devices(self.configs["devices"]) self.testLoop = 0 def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occured. """ return BaseCase._exec_func(self, func, *args) def loop_start(self): pass def loop_end(self, testResult): pass def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None, continues_fail=False): self.con_fail_times = 0 self.last_result = RunResult.PASSED self.fail_times = 0 self.testLoop = 0 for i in range(0, int(looptimes)): self.log.info("--- Loop in %s time ---" % str(i + 1)) self.testLoop += 1 if self.result != RunResult.PASSED: if fail_break and not fail_times: break if self.project.record.is_shutdown(): self.result = RunResult.FAILED self.error_msg = "Testcase is stopped by manual!" self.log.error(self.error_msg) break self.test_method_result = RunResult.PASSED self.error_msg = "" self.loop_start() self.exec_one_testcase(test_name, getattr(self, test_name)) self.loop_end(self.result) self.log.info("--- Loop in %s-loop%s time result is: %s ---" % (test_name, str(i + 1), self.test_method_result)) self.test_method_end(self.test_method_result) if DeccVariable.cur_case().test_method.func_ret: self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1))) DeccVariable.cur_case().test_method.func_ret.clear() if self.test_method_result != "Passed": self.fail_times += 1 if continues_fail: if self.last_result != "Passed": self.con_fail_times += 1 else: self.con_fail_times = 1 if cat_log_step is not None: self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step)) if reset_test is not None: self.exec_one_testcase(reset_test, getattr(self, reset_test)) self.last_result = self.test_method_result if continues_fail and fail_break and self.con_fail_times > fail_times: break if not continues_fail and fail_break and self.fail_times > fail_times: break if self.test_method_result != "Passed": self.test_method_result = RunResult.PASSED if not continues_fail and self.fail_times >= fail_times: self.error_msg += " -- Loop fail %d times" % self.fail_times self.log.error( "DeviceTest-[{}] {} fail {} times".format(ErrorMessage.Error_01438.Code, test_name, self.fail_times)) elif continues_fail and self.con_fail_times >= fail_times: self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times self.log.error( "DeviceTest-[{}] {} continues fail {} times".format(ErrorMessage.Error_01438.Code, test_name, self.con_fail_times)) else: self.result = RunResult.PASSED self.error_msg = "" def set_devices(self, devices): self.devices = devices if not devices: return try: for num, _ad in enumerate(self.devices, 1): if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"): setattr(_ad, "device_id", "device{}".format(num)) # 兼容release2 增加id、serial setattr(_ad, "id", _ad.device_id) setattr(_ad, "serial", _ad.device_sn) setattr(self, _ad.device_id, _ad) setattr(self, "device{}".format(num), _ad) except Exception as error: log.error(ErrorMessage.Error_01218.Message.en, error_no=ErrorMessage.Error_01218.Code, is_traceback=True) raise DeviceTestError(ErrorMessage.Error_01218.Topic) from error def set_property(self, index): if isinstance(self.project.property_config, dict): get_devices = self.project.property_config.get("devices") if isinstance(get_devices, list) and len(get_devices) > index: propertys = get_devices[index] if isinstance(propertys, dict): self.log.debug("set propertys: {}".format(propertys)) return propertys self.log.debug("get propertys: {}".format(propertys)) self.log.warning("propertys not a dict!") return {} def get_property(self, property): if hasattr(self, "propertys"): get_value = self.propertys.get(property) log.debug("get property {}:{}".format(property, get_value)) return get_value else: log.warning("'Device' bject has no attribute 'propertys'") return None def get_case_report_path(self): report_path = self.configs.get("report_path") temp_task = "{}temp{}task".format(os.sep, os.sep) if report_path and isinstance(report_path, str) and temp_task in report_path: report_dir_path = report_path.split(temp_task)[0] return report_dir_path return report_path def get_case_result(self): return self.result def set_auto_record_steps(self, flag): """ flag: bool, A switch be used to disable record steps automatically """ warnings.warn("function is deprecated", DeprecationWarning) cur_case = DeccVariable.cur_case() if cur_case is None: self.log.warning("current case object is none, can not disable record step automatically") return DeccVariable.cur_case().auto_record_steps_info = flag class WindowsTestCase(BaseCase): """Base class for all windows test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): super().__init__(tag, configs) def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occurred. """ return BaseCase._exec_func(self, func, *args) def clear_device_callback_method(self): pass def _log_info_aw_information(func, args, kwargs, is_checkepr=False): cur_case = DeccVariable.cur_case() if len(cur_case.test_method.func_ret) == 0: if is_checkepr: cur_case.set_checkepr(True) cur_case.cur_check_cmd.__init__() else: cur_case.set_checkepr(False) aw_level = "aw" elif len(cur_case.test_method.func_ret) == 1: aw_level = "aw1" else: aw_level = "aw2" aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) log.info("