#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2022 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import re import sys import time import traceback import warnings from functools import wraps from typing import Union from xdevice import convert_serial from xdevice import get_decode from xdevice import get_cst_time from xdevice import ConfigConst from xdevice import IDevice from xdevice import EnvPool from xdevice import is_env_pool_run_mode from xdevice import AgentMode from xdevice import Variables from devicetest.core.exception import BaseTestError from devicetest.core.exception import HdcCommandRejectedException from devicetest.core.exception import ShellCommandUnresponsiveException from devicetest.core.exception import DeviceNotFound from devicetest.core.exception import AppInstallError from devicetest.core.exception import RpcNotRunningError from devicetest.core.exception import TestFailure from devicetest.core.exception import TestError from devicetest.core.exception import TestSkip from devicetest.core.exception import TestTerminated from devicetest.core.exception import TestAbortManual from devicetest.core.exception import DeviceTestError from devicetest.core.exception import TestAssertionError from devicetest.core.constants import RunResult from devicetest.core.constants import RunSection from devicetest.core.constants import RunStatus from devicetest.core.variables import DeccVariable from devicetest.core.variables import CurCase from devicetest.error import ErrorMessage from devicetest.utils.time_util import TS from devicetest.utils.type_utils import T from devicetest.log.logger import DeviceTestLog as log from devicetest.controllers.tools.screen_agent import ScreenAgent RESULT_LINE_TEMPLATE = "[Test Step] %s %s" def validate_test_name(name): """Checks if a test name is not null. Args: name: name of a test case. Raises: BaseTestError is raised if the name is null. """ if name == "" or name is None or len(name) < 1: raise BaseTestError(ErrorMessage.TestCase.Code_0203004.format(name)) class DeviceRoot: is_root_device = False def __init__(self): pass @staticmethod def set_device_root(is_root): DeviceRoot.is_root_device = is_root @staticmethod def get_device_root(): return DeviceRoot.is_root_device class CoreCase: """Base class for all test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): self.cur_case = None self.devices = [] self.configs = configs self.result = RunResult.PASSED self.last_result = RunResult.PASSED self.test_method_result = RunResult.PASSED self.section = RunSection.SETUP self.error_msg = '' self.trace_info = '' self.start_time = get_cst_time() self.log = self.configs["log"] self.set_project(self.configs["project"]) self.con_fail_times = 0 self.fail_times = 0 self.step_flash_fail_msg = False self.pass_through = Variables.config.pass_through # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 self._is_loop_scenario = False # proxy function self.execption_callback = None # case end function self.case_end_callback = None def __enter__(self): return self def __exit__(self, *args): self._exec_func(self.clean_up) def _print_error(self, error: str, result: RunResult = RunResult.FAILED, refresh_method_result: bool = False, screenshot: bool = True): """打印异常信息 error : str, error message result: RunResult, RunResult class variables refresh_method_result: bool, refresh result flag screenshot: bool, 运行keyword/checkepr装饰的接口异常,默认是有失败截图,可将其设为False,可避免重复截图 """ self.log.error(error) self.error_msg = str(error) self.result = result if refresh_method_result: self.test_method_result = result if screenshot: # 在非keyword装饰的接口执行失败的时候,对每个设备进行截图 for device in self.devices: ScreenAgent.screen_take_picture([device], False, "shot_on_fail") trace_info = traceback.format_exc() index = self.cur_case.step_index if index == -1: self.log.error(self.error_msg) self.log.error(trace_info) return step_error_id = f'step_error_{index}' self.log.error(f'{self.error_msg}') self.log.error(trace_info) _error = f'{self.error_msg}' UpdateStep(index, error=_error) def setup(self): """Setup function that will be called before executing any test case in the test class. Implementation is optional. """ return True def process(self): """process function that will be called before setup function and after teardown function in the test class. Implementation is optional. """ pass def teardown(self): """Teardown function that will be called after all the selected test cases in the test class have been executed. Implementation is optional. """ pass def exec_one_testcase(self, test_name, test_func): """Executes one test case and update test results. Args: test_name: Name of the test. test_func: The test function. Returns: True if the test passes, False otherwise. """ if not self._exec_func(self.setup_test): self.log.error("Setup for {} failed, skipping.".format(test_name)) _result = None error = None try: verdict = test_func() except TestSkip: # Test skipped. _result = True self.log.debug("TestSkip") except (AppInstallError, BaseTestError, DeviceNotFound, DeviceTestError, TestAssertionError, TestError, TestFailure, TestTerminated, TestAbortManual) as exception: # 上述异常,已设置错误码,无需再次设置 error = str(exception) self._print_error(error, refresh_method_result=True, screenshot=False) except HdcCommandRejectedException as exception: error = ErrorMessage.Device.Code_0202302.format(exception) self._print_error(error, refresh_method_result=True) except ShellCommandUnresponsiveException as exception: error = ErrorMessage.Device.Code_0202304.format(exception) self._print_error(error, refresh_method_result=True) except RpcNotRunningError as exception: error = ErrorMessage.Device.Code_0202305.format(exception) self._print_error(error, refresh_method_result=True) except ConnectionRefusedError as exception: error = ErrorMessage.Device.Code_0202306.format(exception) self._print_error(error, refresh_method_result=True) except ImportError as exception: error = ErrorMessage.TestCase.Code_0203006.format(exception) self._print_error(error, refresh_method_result=True, screenshot=False) except Exception as exception: error = ErrorMessage.TestCase.Code_0203002.format(exception) self._print_error(error, refresh_method_result=True) else: # loop执行场景,由其方法内设置测试结果 if self._is_loop_scenario: _result = True return if (verdict is None or verdict is True) \ and self.test_method_result == RunResult.PASSED: # Test passed. self.print_case_result(test_name, RunResult.PASSED) _result = True # Test failed because it didn't return True. # This should be removed eventually. else: error_msg = "test func '{}' The actual input value of " \ "the checkpoint is inconsistent with the " \ "expected result.".format(test_func.__name__) self.log.error(error_msg) self.test_method_result = self.result = RunResult.FAILED self.error_msg = error_msg _result = False finally: if self.execption_callback is not None and error is not None: self.execption_callback(error) self._exec_func(self.teardown_test) def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occured. """ ret = False error = None try: func(*args) ret = True except (AppInstallError, DeviceNotFound, DeviceTestError, TestAssertionError, TestError, TestAbortManual, TestTerminated) as exception: # 上述异常,已设置错误码,无需再次设置 error = str(exception) self._print_error(error, screenshot=False) except HdcCommandRejectedException as exception: error = ErrorMessage.Device.Code_0202302.format(exception) self._print_error(error) except ShellCommandUnresponsiveException as exception: error = ErrorMessage.Device.Code_0202304.format(exception) self._print_error(error) except RpcNotRunningError as exception: error = ErrorMessage.Device.Code_0202305.format(exception) self._print_error(error) except ConnectionRefusedError as exception: error = ErrorMessage.Device.Code_0202306.format(exception) self._print_error(error) except Exception as exception: error = ErrorMessage.TestCase.Code_0203002.format(exception) self._print_error(error) finally: if self.execption_callback is not None and error is not None: self.execption_callback(error) return ret def _get_test_funcs(self): # All tests are selected if test_cases list is None. # Load functions based on test names. Also find the longest test name. test_funcs = [] for test_name in self.tests: try: validate_test_name(test_name) test_funcs.append((test_name, getattr(self, test_name))) except AttributeError: self.result = RunResult.FAILED self.error_msg = "{} does not have test step {}.".format( self.TAG, test_name) self.log.error(self.error_msg) except BaseTestError as exception: self.result = RunResult.FAILED self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc())) self.log.error(str(exception)) return test_funcs def run(self, test_names=None): """Runs test cases within a test class by the order they appear in the test list. Being in the test_names list makes the test case "requested". If its name passes validation, then it'll be executed, otherwise the name will be skipped. Args: test_names: A list of names of the requested test cases. If None, all test cases in the class are considered requested. Returns: A tuple of: The number of requested test cases, the number of test cases executed, and the number of test cases passed. """ if RunStatus.FINISHED == self.run_setup(): return self.run_tests(test_names) run_result = self.result self.run_teardown() # 避免teardown报错导致结果失败1/./ self.result = run_result if self.case_end_callback is not None: self.case_end_callback() def run_setup(self): self.section = RunSection.SETUP self.cur_case.set_run_section(self.section) self.run_setup_start() self.log.info("**********SetUp Starts!") ret = self._exec_func(self.setup) if not ret: self.log.info("**********SetUp Ends!") self.log.error("setup step fails") self.section = RunSection.TEARDOWN self.cur_case.set_run_section(self.section) self.log.info("**********TearDown Starts!") self._exec_func(self.teardown) if self.result != RunResult.BLOCKED: self.result = RunResult.FAILED self.log.info('**********TearDown Ends!') self.print_case_result(self.TAG, self.result) return RunStatus.FINISHED else: self.log.info("**********SetUp Ends!") self.run_setup_end() return RunStatus.RUNNING def run_tests(self, test_names): self.section = RunSection.TEST self.cur_case.set_run_section(self.section) if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list): tests = self._get_test_funcs() for test_name, test_func in tests: self.run_test(test_name, test_func) else: self.run_process() self.log.info("**********All test methods Ends!**********") def run_process(self): self._exec_func(self.process) def run_test(self, test_name, test_func): self.log.info("[Test Step] {}".format(test_name)) self.test_method_result = RunResult.PASSED self.exec_one_testcase(test_name, test_func) self.test_method_end(test_name) def run_teardown(self): self.section = RunSection.TEARDOWN self.cur_case.set_run_section(self.section) self.run_teardown_start() self.log.info("**********TearDown Starts!") self._exec_func(self.teardown) self.log.info("**********TearDown Ends!") self.run_teardown_end() self._exec_func(self.clean_up) def run_perf_models(self, models, fail_break=False): """ models: list, list of model object fail_break: bool, if this is set to True, break down the loop of model execution when it fails """ self._is_loop_scenario = True fail_models, pass_models, total = [], [], len(models) for model in models: model_name = model.__class__.__name__ # 预置model的测试结果pass,避免执行循环时测试结果受到干扰 self.test_method_result = RunResult.PASSED self.log.info("Executing test model {}".format(model_name)) # 执行jump_to_start_anchor成功后,再执行execute self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor) if self.test_method_result == RunResult.PASSED: self.exec_one_testcase("{}.execute".format(model_name), model.execute) if self.test_method_result == RunResult.PASSED: pass_models.append(model_name) continue fail_models.append(model_name) if fail_break: break fail_cnt, pass_cnt = len(fail_models), len(pass_models) self.log.info("Test models executed result with " "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models)) # 所有model执行通过,用例pass if pass_cnt == total: self.error_msg = "" self.result = RunResult.PASSED return self.result = RunResult.FAILED def test_method_end(self, test_name): """ case test method end event """ self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result)) self.log.info("TestMethod: {} End".format(test_name)) def clean_up(self): """A function that is executed upon completion of all tests cases selected in the test class. This function should clean up objects initialized in the constructor by user. """ pass def run_setup_start(self): """A function that is before all tests cases and before setup cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.setup_start() def setup_start(self): """A function that is before all tests cases selected in the test class. This function can be used to execute before running. """ pass def run_setup_end(self): """A function that is before all tests cases and after setup cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.setup_end() def setup_end(self): """A function that is after setup cases selected in the test class. This function can be used to execute after setup. """ pass @classmethod def setup_test(cls): """Setup function that will be called every time before executing each test case in the test class. Implementation is optional. """ return True def teardown_test(self): """Teardown function that will be called every time a test case has been executed. Implementation is optional. """ pass def run_teardown_start(self): """A function that is after all tests cases and before teardown cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.teardown_start() def teardown_start(self): """A function that is before teardown cases selected in the test class. This function can be used to execute before running. """ pass def run_teardown_end(self): """A function that is after all tests cases and teardown cases selected in the test class. This function can be customized to create different base classes or use cases. """ self.teardown_end() def teardown_end(self): """A function that is after teardown cases selected in the test class. This function can be used to execute before running. """ pass def print_case_result(self, case, result): self.log.info("****************************Test {} result is: {}" .format(case, result)) def generate_fail_msg(self, msg): if isinstance(self.error_msg, str) and self.error_msg != "": pass else: self.error_msg = msg return self.error_msg def set_log(self, _log): self.log = _log def set_project(self, project): self.project = project DeccVariable.set_project_obj(self.project) self.cur_case = DeccVariable.cur_case() self._init_case_var(self.TAG) self.log.info("init case variables success.") def _init_case_var(self, tag): if tag: self.cur_case.set_name(tag) else: self.cur_case.set_name(self.project.execute_case_name) if not hasattr(self, "tests"): setattr(self, "tests", ["process"]) request = self.configs.get("request", None) repeat, repeat_round = 1, 1 if request is not None: repeat, repeat_round = request.config.repeat, request.get_repeat_round() self.cur_case.set_step_total(1) self.cur_case.set_case_screenshot_dir( self.project.test_suite_path, self.project.task_report_dir, self.project.cur_case_full_path, repeat=repeat, repeat_round=repeat_round) @classmethod def step(cls, _ad, stepepr): result = stepepr if result is not None and isinstance(result, bool) and not result: _ad.log.error(_ad.device_id + " exec step is fail") elif _ad.screenshot: pass return result class BaseCase: """Base class for simple test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, controllers: Union[IDevice, dict]): self.log = None if isinstance(controllers, dict): if controllers.get("devices", None): if not isinstance(controllers["devices"], list): devices = list() devices.append(controllers["devices"]) self.devices = devices else: self.devices = controllers["devices"] else: devices = list() devices.append(controllers) self.devices = devices self.result = RunResult.PASSED self.error_msg = '' self.trace_info = '' self.start_time = get_cst_time() self.test_method_result = None self._set_device_attr(controllers) self._auto_record_steps = False self.configs = controllers self.configs.setdefault("report_path", EnvPool.report_path) # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 self._is_loop_scenario = False # proxy function self.execption_callback = None # case end function self.case_end_callback = None self.pass_through = self.configs.get(ConfigConst.testargs, {}).get(ConfigConst.pass_through, {}) self.cur_case = DeccVariable.cur_case() def __enter__(self): return self def __exit__(self, *args): pass def _set_device_attr(self, controllers): index = 1 for device in self.devices: setattr(self, "Phone{}".format(index), device) setattr(self, "device{}".format(index), device) if controllers.get("screenshot", None): setattr(device, "screenshot", controllers["screenshot"]) if controllers.get("screenshot_fail", None): setattr(device, "screenshot_fail", controllers["screenshot_fail"]) else: setattr(device, "screenshot_fail", True) # 控制agent的模式, 默认是bin模式 mode = controllers.get("agent_mode", AgentMode.bin) device.set_agent_mode(mode) def set_auto_record_steps(self, flag): """ flag: bool, A switch be used to disable record steps automatically """ self._auto_record_steps = flag def get_auto_record_steps_status(self): """ return if record steps automatically """ return self._auto_record_steps def get_case_result(self): return self.result class TestCase(CoreCase, BaseCase): """Base class for all test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): if is_env_pool_run_mode(): BaseCase.__init__(self, tag, configs) else: CoreCase.__init__(self, tag, configs) self.devices = [] self.device1 = None self.device2 = None self.set_devices(self.configs["devices"]) self.testLoop = 0 def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occured. """ return CoreCase._exec_func(self, func, *args) def loop_start(self): pass def loop_end(self, testResult): pass def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None, continues_fail=False): self.con_fail_times = 0 self.last_result = RunResult.PASSED self.fail_times = 0 self.testLoop = 0 for i in range(0, int(looptimes)): self.log.info("--- Loop in %s time ---" % str(i + 1)) self.testLoop += 1 if self.result != RunResult.PASSED: if fail_break and not fail_times: break if self.project.record.is_shutdown(): self.result = RunResult.FAILED self.error_msg = "Testcase is stopped by manual!" self.log.error(self.error_msg) break self.test_method_result = RunResult.PASSED self.error_msg = "" self.loop_start() self.exec_one_testcase(test_name, getattr(self, test_name)) self.loop_end(self.result) self.log.info("--- Loop in %s-loop%s time result is: %s ---" % (test_name, str(i + 1), self.test_method_result)) self.test_method_end(self.test_method_result) if DeccVariable.cur_case().test_method.func_ret: self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1))) DeccVariable.cur_case().test_method.func_ret.clear() if self.test_method_result != "Passed": self.fail_times += 1 if continues_fail: if self.last_result != "Passed": self.con_fail_times += 1 else: self.con_fail_times = 1 if cat_log_step is not None: self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step)) if reset_test is not None: self.exec_one_testcase(reset_test, getattr(self, reset_test)) self.last_result = self.test_method_result if continues_fail and fail_break and self.con_fail_times > fail_times: break if not continues_fail and fail_break and self.fail_times > fail_times: break if self.test_method_result != "Passed": self.test_method_result = RunResult.PASSED if not continues_fail and self.fail_times >= fail_times: self.error_msg += " -- Loop fail %d times" % self.fail_times self.log.error("{} fail {} times".format(test_name, self.fail_times)) elif continues_fail and self.con_fail_times >= fail_times: self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times self.log.error("{} continues fail {} times".format(test_name, self.con_fail_times)) else: self.result = RunResult.PASSED self.error_msg = "" def set_devices(self, devices): self.devices = devices if not devices: return try: for num, _ad in enumerate(self.devices, 1): if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"): setattr(_ad, "device_id", "device{}".format(num)) # 兼容release2 增加id、serial setattr(_ad, "id", _ad.device_id) setattr(_ad, "serial", _ad.device_sn) setattr(self, _ad.device_id, _ad) setattr(self, "device{}".format(num), _ad) except Exception as error: err_msg = ErrorMessage.TestCase.Code_0203007 log.error(err_msg, is_traceback=True) raise DeviceTestError(err_msg) from error def set_property(self, index): if isinstance(self.project.property_config, dict): get_devices = self.project.property_config.get("devices") if isinstance(get_devices, list) and len(get_devices) > index: propertys = get_devices[index] if isinstance(propertys, dict): self.log.debug("set propertys: {}".format(propertys)) return propertys self.log.debug("get propertys: {}".format(propertys)) self.log.warning("propertys not a dict!") return {} def get_property(self, property): if hasattr(self, "propertys"): get_value = self.propertys.get(property) log.debug("get property {}:{}".format(property, get_value)) return get_value else: log.warning("'Device' object has no attribute 'propertys'") return None def get_case_report_path(self): return self.configs.get("report_path") def get_case_result(self): return self.result def set_case_result_content(self, content: dict): """set result content for testcase(供用例拓展结果内容) content: dict, result content """ if not isinstance(content, dict) or not content: return self.log.info(f"setting result content: {content}") setattr(self, "result_content", content) def set_auto_record_steps(self, flag): """ flag: bool, A switch be used to disable record steps automatically """ warnings.warn("function is deprecated", DeprecationWarning) cur_case = DeccVariable.cur_case() if cur_case is None: self.log.warning("current case object is none, can not disable record step automatically") return DeccVariable.cur_case().auto_record_steps_info = flag class WindowsTestCase(CoreCase): """Base class for all windows test classes to inherit from. This class gets all the controller objects from test_runner and executes the test cases requested within itself. """ def __init__(self, tag, configs): super().__init__(tag, configs) def _exec_func(self, func, *args): """Executes a function with exception safeguard. Args: func: Function to be executed. args: Arguments to be passed to the function. Returns: Whatever the function returns, or False if unhandled exception occurred. """ return CoreCase._exec_func(self, func, *args) def clear_device_callback_method(self): pass def _log_info_aw_information(func, args, kwargs, is_checkepr=False): cur_case = DeccVariable.cur_case() if len(cur_case.test_method.func_ret) == 0: if is_checkepr: cur_case.set_checkepr(True) cur_case.cur_check_cmd.__init__() else: cur_case.set_checkepr(False) aw_level = "aw" elif len(cur_case.test_method.func_ret) == 1: aw_level = "aw1" else: aw_level = "aw2" aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) log.info("