1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import sys 21import time 22import traceback 23import warnings 24from functools import wraps 25from typing import Union 26 27from xdevice import convert_serial 28from xdevice import get_decode 29from xdevice import get_cst_time 30from xdevice import ConfigConst 31from xdevice import IDevice 32from xdevice import EnvPool 33from xdevice import is_env_pool_run_mode 34from xdevice import AgentMode 35from xdevice import Variables 36 37from devicetest.core.exception import BaseTestError 38from devicetest.core.exception import HdcCommandRejectedException 39from devicetest.core.exception import ShellCommandUnresponsiveException 40from devicetest.core.exception import DeviceNotFound 41from devicetest.core.exception import AppInstallError 42from devicetest.core.exception import RpcNotRunningError 43from devicetest.core.exception import TestFailure 44from devicetest.core.exception import TestError 45from devicetest.core.exception import TestSkip 46from devicetest.core.exception import TestTerminated 47from devicetest.core.exception import TestAbortManual 48from devicetest.core.exception import DeviceTestError 49from devicetest.core.exception import TestAssertionError 50from devicetest.core.constants import RunResult 51from devicetest.core.constants import RunSection 52from devicetest.core.constants import RunStatus 53from devicetest.core.variables import DeccVariable 54from devicetest.core.variables import CurCase 55from devicetest.error import ErrorMessage 56from devicetest.utils.time_util import TS 57from devicetest.utils.type_utils import T 58from devicetest.log.logger import DeviceTestLog as log 59from devicetest.controllers.tools.screen_agent import ScreenAgent 60 61RESULT_LINE_TEMPLATE = "[Test Step] %s %s" 62 63 64def validate_test_name(name): 65 """Checks if a test name is not null. 66 Args: 67 name: name of a test case. 68 Raises: 69 BaseTestError is raised if the name is null. 70 """ 71 if name == "" or name is None or len(name) < 1: 72 raise BaseTestError(ErrorMessage.TestCase.Code_0203004.format(name)) 73 74 75class DeviceRoot: 76 is_root_device = False 77 78 def __init__(self): 79 pass 80 81 @staticmethod 82 def set_device_root(is_root): 83 DeviceRoot.is_root_device = is_root 84 85 @staticmethod 86 def get_device_root(): 87 return DeviceRoot.is_root_device 88 89 90class CoreCase: 91 """Base class for all test classes to inherit from. 92 This class gets all the controller objects from test_runner and executes 93 the test cases requested within itself. 94 """ 95 96 def __init__(self, tag, configs): 97 self.cur_case = None 98 self.devices = [] 99 self.configs = configs 100 self.result = RunResult.PASSED 101 self.last_result = RunResult.PASSED 102 self.test_method_result = RunResult.PASSED 103 self.section = RunSection.SETUP 104 self.error_msg = '' 105 self.trace_info = '' 106 self.start_time = get_cst_time() 107 self.log = self.configs["log"] 108 self.set_project(self.configs["project"]) 109 self.con_fail_times = 0 110 self.fail_times = 0 111 self.step_flash_fail_msg = False 112 self.pass_through = Variables.config.pass_through 113 # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 114 self._is_loop_scenario = False 115 # proxy function 116 self.execption_callback = None 117 # case end function 118 self.case_end_callback = None 119 120 def __enter__(self): 121 return self 122 123 def __exit__(self, *args): 124 self._exec_func(self.clean_up) 125 126 def _print_error(self, error: str, result: RunResult = RunResult.FAILED, 127 refresh_method_result: bool = False, screenshot: bool = True): 128 """打印异常信息 129 error : str, error message 130 result: RunResult, RunResult class variables 131 refresh_method_result: bool, refresh result flag 132 screenshot: bool, 运行keyword/checkepr装饰的接口异常,默认是有失败截图,可将其设为False,可避免重复截图 133 """ 134 self.log.error(error) 135 self.error_msg = str(error) 136 self.result = result 137 if refresh_method_result: 138 self.test_method_result = result 139 if screenshot: 140 # 在非keyword装饰的接口执行失败的时候,对每个设备进行截图 141 for device in self.devices: 142 ScreenAgent.screen_take_picture([device], False, "shot_on_fail") 143 144 trace_info = traceback.format_exc() 145 index = self.cur_case.step_index 146 if index == -1: 147 self.log.error(self.error_msg) 148 self.log.error(trace_info) 149 return 150 step_error_id = f'step_error_{index}' 151 self.log.error(f'<span id="{step_error_id}">{self.error_msg}</span>') 152 self.log.error(trace_info) 153 _error = f'<a href="javascript:" onclick="gotoStep(\'{step_error_id}\')">{self.error_msg}</a>' 154 UpdateStep(index, error=_error) 155 156 def setup(self): 157 """Setup function that will be called before executing any test case 158 in the test class. Implementation is optional. 159 """ 160 return True 161 162 def process(self): 163 """process function that will be called before setup function and after 164 teardown function in the test class. Implementation is optional. 165 """ 166 pass 167 168 def teardown(self): 169 """Teardown function that will be called after all the selected test 170 cases in the test class have been executed. 171 Implementation is optional. 172 """ 173 pass 174 175 def exec_one_testcase(self, test_name, test_func): 176 """Executes one test case and update test results. 177 Args: 178 test_name: Name of the test. 179 test_func: The test function. 180 Returns: 181 True if the test passes, False otherwise. 182 """ 183 if not self._exec_func(self.setup_test): 184 self.log.error("Setup for {} failed, skipping.".format(test_name)) 185 _result = None 186 error = None 187 try: 188 verdict = test_func() 189 except TestSkip: 190 # Test skipped. 191 _result = True 192 self.log.debug("TestSkip") 193 except (AppInstallError, BaseTestError, DeviceNotFound, DeviceTestError, 194 TestAssertionError, TestError, TestFailure, TestTerminated, TestAbortManual) as exception: 195 # 上述异常,已设置错误码,无需再次设置 196 error = str(exception) 197 self._print_error(error, refresh_method_result=True, screenshot=False) 198 except HdcCommandRejectedException as exception: 199 error = ErrorMessage.Device.Code_0202302.format(exception) 200 self._print_error(error, refresh_method_result=True) 201 except ShellCommandUnresponsiveException as exception: 202 error = ErrorMessage.Device.Code_0202304.format(exception) 203 self._print_error(error, refresh_method_result=True) 204 except RpcNotRunningError as exception: 205 error = ErrorMessage.Device.Code_0202305.format(exception) 206 self._print_error(error, refresh_method_result=True) 207 except ConnectionRefusedError as exception: 208 error = ErrorMessage.Device.Code_0202306.format(exception) 209 self._print_error(error, refresh_method_result=True) 210 except ImportError as exception: 211 error = ErrorMessage.TestCase.Code_0203006.format(exception) 212 self._print_error(error, refresh_method_result=True, screenshot=False) 213 except Exception as exception: 214 error = ErrorMessage.TestCase.Code_0203002.format(exception) 215 self._print_error(error, refresh_method_result=True) 216 else: 217 # loop执行场景,由其方法内设置测试结果 218 if self._is_loop_scenario: 219 _result = True 220 return 221 if (verdict is None or verdict is True) \ 222 and self.test_method_result == RunResult.PASSED: 223 # Test passed. 224 self.print_case_result(test_name, RunResult.PASSED) 225 _result = True 226 # Test failed because it didn't return True. 227 # This should be removed eventually. 228 else: 229 error_msg = "test func '{}' The actual input value of " \ 230 "the checkpoint is inconsistent with the " \ 231 "expected result.".format(test_func.__name__) 232 self.log.error(error_msg) 233 self.test_method_result = self.result = RunResult.FAILED 234 self.error_msg = error_msg 235 _result = False 236 237 finally: 238 if self.execption_callback is not None and error is not None: 239 self.execption_callback(error) 240 self._exec_func(self.teardown_test) 241 242 def _exec_func(self, func, *args): 243 """Executes a function with exception safeguard. 244 Args: 245 func: Function to be executed. 246 args: Arguments to be passed to the function. 247 Returns: 248 Whatever the function returns, or False if unhandled exception 249 occured. 250 """ 251 ret = False 252 error = None 253 try: 254 func(*args) 255 ret = True 256 except (AppInstallError, DeviceNotFound, DeviceTestError, 257 TestAssertionError, TestError, TestAbortManual, TestTerminated) as exception: 258 # 上述异常,已设置错误码,无需再次设置 259 error = str(exception) 260 self._print_error(error, screenshot=False) 261 except HdcCommandRejectedException as exception: 262 error = ErrorMessage.Device.Code_0202302.format(exception) 263 self._print_error(error) 264 except ShellCommandUnresponsiveException as exception: 265 error = ErrorMessage.Device.Code_0202304.format(exception) 266 self._print_error(error) 267 except RpcNotRunningError as exception: 268 error = ErrorMessage.Device.Code_0202305.format(exception) 269 self._print_error(error) 270 except ConnectionRefusedError as exception: 271 error = ErrorMessage.Device.Code_0202306.format(exception) 272 self._print_error(error) 273 except Exception as exception: 274 error = ErrorMessage.TestCase.Code_0203002.format(exception) 275 self._print_error(error) 276 finally: 277 if self.execption_callback is not None and error is not None: 278 self.execption_callback(error) 279 return ret 280 281 def _get_test_funcs(self): 282 # All tests are selected if test_cases list is None. 283 # Load functions based on test names. Also find the longest test name. 284 test_funcs = [] 285 for test_name in self.tests: 286 try: 287 validate_test_name(test_name) 288 test_funcs.append((test_name, getattr(self, test_name))) 289 except AttributeError: 290 self.result = RunResult.FAILED 291 self.error_msg = "{} does not have test step {}.".format( 292 self.TAG, test_name) 293 self.log.error(self.error_msg) 294 295 except BaseTestError as exception: 296 self.result = RunResult.FAILED 297 self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc())) 298 self.log.error(str(exception)) 299 300 return test_funcs 301 302 def run(self, test_names=None): 303 """Runs test cases within a test class by the order they 304 appear in the test list. 305 Being in the test_names list makes the test case "requested". If its 306 name passes validation, then it'll be executed, otherwise the name will 307 be skipped. 308 Args: 309 test_names: A list of names of the requested test cases. If None, 310 all test cases in the class are considered requested. 311 Returns: 312 A tuple of: The number of requested test cases, the number of test 313 cases executed, and the number of test cases passed. 314 """ 315 if RunStatus.FINISHED == self.run_setup(): 316 return 317 self.run_tests(test_names) 318 run_result = self.result 319 self.run_teardown() 320 # 避免teardown报错导致结果失败1/./ 321 self.result = run_result 322 if self.case_end_callback is not None: 323 self.case_end_callback() 324 325 def run_setup(self): 326 self.section = RunSection.SETUP 327 self.cur_case.set_run_section(self.section) 328 self.run_setup_start() 329 self.log.info("**********SetUp Starts!") 330 ret = self._exec_func(self.setup) 331 if not ret: 332 self.log.info("**********SetUp Ends!") 333 self.log.error("setup step fails") 334 self.section = RunSection.TEARDOWN 335 self.cur_case.set_run_section(self.section) 336 self.log.info("**********TearDown Starts!") 337 self._exec_func(self.teardown) 338 if self.result != RunResult.BLOCKED: 339 self.result = RunResult.FAILED 340 self.log.info('**********TearDown Ends!') 341 342 self.print_case_result(self.TAG, self.result) 343 344 return RunStatus.FINISHED 345 else: 346 self.log.info("**********SetUp Ends!") 347 self.run_setup_end() 348 349 return RunStatus.RUNNING 350 351 def run_tests(self, test_names): 352 self.section = RunSection.TEST 353 self.cur_case.set_run_section(self.section) 354 if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list): 355 tests = self._get_test_funcs() 356 for test_name, test_func in tests: 357 self.run_test(test_name, test_func) 358 else: 359 self.run_process() 360 self.log.info("**********All test methods Ends!**********") 361 362 def run_process(self): 363 self._exec_func(self.process) 364 365 def run_test(self, test_name, test_func): 366 self.log.info("[Test Step] {}".format(test_name)) 367 self.test_method_result = RunResult.PASSED 368 self.exec_one_testcase(test_name, test_func) 369 self.test_method_end(test_name) 370 371 def run_teardown(self): 372 self.section = RunSection.TEARDOWN 373 self.cur_case.set_run_section(self.section) 374 self.run_teardown_start() 375 self.log.info("**********TearDown Starts!") 376 self._exec_func(self.teardown) 377 self.log.info("**********TearDown Ends!") 378 self.run_teardown_end() 379 self._exec_func(self.clean_up) 380 381 def run_perf_models(self, models, fail_break=False): 382 """ 383 models: list, list of model object 384 fail_break: bool, if this is set to True, break down the loop of model execution when it fails 385 """ 386 self._is_loop_scenario = True 387 fail_models, pass_models, total = [], [], len(models) 388 for model in models: 389 model_name = model.__class__.__name__ 390 # 预置model的测试结果pass,避免执行循环时测试结果受到干扰 391 self.test_method_result = RunResult.PASSED 392 self.log.info("Executing test model {}".format(model_name)) 393 # 执行jump_to_start_anchor成功后,再执行execute 394 self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor) 395 if self.test_method_result == RunResult.PASSED: 396 self.exec_one_testcase("{}.execute".format(model_name), model.execute) 397 if self.test_method_result == RunResult.PASSED: 398 pass_models.append(model_name) 399 continue 400 fail_models.append(model_name) 401 if fail_break: 402 break 403 fail_cnt, pass_cnt = len(fail_models), len(pass_models) 404 self.log.info("Test models executed result with " 405 "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models)) 406 # 所有model执行通过,用例pass 407 if pass_cnt == total: 408 self.error_msg = "" 409 self.result = RunResult.PASSED 410 return 411 self.result = RunResult.FAILED 412 413 def test_method_end(self, test_name): 414 """ 415 case test method end event 416 """ 417 self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result)) 418 self.log.info("TestMethod: {} End".format(test_name)) 419 420 def clean_up(self): 421 """A function that is executed upon completion of all tests cases 422 selected in the test class. 423 This function should clean up objects initialized in the constructor 424 by user. 425 """ 426 pass 427 428 def run_setup_start(self): 429 """A function that is before all tests cases and before setup cases 430 selected in the test class. 431 This function can be customized to create different base classes or use cases. 432 """ 433 self.setup_start() 434 435 def setup_start(self): 436 """A function that is before all tests cases 437 selected in the test class. 438 This function can be used to execute before running. 439 """ 440 pass 441 442 def run_setup_end(self): 443 """A function that is before all tests cases and after setup cases 444 selected in the test class. 445 This function can be customized to create different base classes or use cases. 446 """ 447 self.setup_end() 448 449 def setup_end(self): 450 """A function that is after setup cases 451 selected in the test class. 452 This function can be used to execute after setup. 453 """ 454 pass 455 456 @classmethod 457 def setup_test(cls): 458 """Setup function that will be called every time before executing each 459 test case in the test class. 460 461 Implementation is optional. 462 """ 463 return True 464 465 def teardown_test(self): 466 """Teardown function that will be called every time a test case has 467 been executed. 468 Implementation is optional. 469 """ 470 pass 471 472 def run_teardown_start(self): 473 """A function that is after all tests cases and before teardown cases 474 selected in the test class. 475 This function can be customized to create different base classes or use cases. 476 """ 477 self.teardown_start() 478 479 def teardown_start(self): 480 """A function that is before teardown cases 481 selected in the test class. 482 This function can be used to execute before running. 483 """ 484 pass 485 486 def run_teardown_end(self): 487 """A function that is after all tests cases and teardown cases 488 selected in the test class. 489 This function can be customized to create different base classes or use cases. 490 """ 491 self.teardown_end() 492 493 def teardown_end(self): 494 """A function that is after teardown cases 495 selected in the test class. 496 This function can be used to execute before running. 497 """ 498 pass 499 500 def print_case_result(self, case, result): 501 self.log.info("****************************Test {} result is: {}" 502 .format(case, result)) 503 504 def generate_fail_msg(self, msg): 505 if isinstance(self.error_msg, str) and self.error_msg != "": 506 pass 507 else: 508 self.error_msg = msg 509 return self.error_msg 510 511 def set_log(self, _log): 512 self.log = _log 513 514 def set_project(self, project): 515 self.project = project 516 DeccVariable.set_project_obj(self.project) 517 self.cur_case = DeccVariable.cur_case() 518 self._init_case_var(self.TAG) 519 self.log.info("init case variables success.") 520 521 def _init_case_var(self, tag): 522 if tag: 523 self.cur_case.set_name(tag) 524 else: 525 self.cur_case.set_name(self.project.execute_case_name) 526 527 if not hasattr(self, "tests"): 528 setattr(self, "tests", ["process"]) 529 request = self.configs.get("request", None) 530 repeat, repeat_round = 1, 1 531 if request is not None: 532 repeat, repeat_round = request.config.repeat, request.get_repeat_round() 533 self.cur_case.set_step_total(1) 534 self.cur_case.set_case_screenshot_dir( 535 self.project.test_suite_path, 536 self.project.task_report_dir, 537 self.project.cur_case_full_path, 538 repeat=repeat, repeat_round=repeat_round) 539 540 @classmethod 541 def step(cls, _ad, stepepr): 542 result = stepepr 543 if result is not None and isinstance(result, bool) and not result: 544 _ad.log.error(_ad.device_id + " exec step is fail") 545 elif _ad.screenshot: 546 pass 547 return result 548 549 550class BaseCase: 551 """Base class for simple test classes to inherit from. 552 This class gets all the controller objects from test_runner and executes 553 the test cases requested within itself. 554 """ 555 556 def __init__(self, tag, controllers: Union[IDevice, dict]): 557 self.log = None 558 if isinstance(controllers, dict): 559 if controllers.get("devices", None): 560 if not isinstance(controllers["devices"], list): 561 devices = list() 562 devices.append(controllers["devices"]) 563 self.devices = devices 564 else: 565 self.devices = controllers["devices"] 566 else: 567 devices = list() 568 devices.append(controllers) 569 self.devices = devices 570 self.result = RunResult.PASSED 571 self.error_msg = '' 572 self.trace_info = '' 573 self.start_time = get_cst_time() 574 self.test_method_result = None 575 self._set_device_attr(controllers) 576 self._auto_record_steps = False 577 self.configs = controllers 578 self.configs.setdefault("report_path", EnvPool.report_path) 579 # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 580 self._is_loop_scenario = False 581 # proxy function 582 self.execption_callback = None 583 # case end function 584 self.case_end_callback = None 585 self.pass_through = self.configs.get(ConfigConst.testargs, {}).get(ConfigConst.pass_through, {}) 586 self.cur_case = DeccVariable.cur_case() 587 588 def __enter__(self): 589 return self 590 591 def __exit__(self, *args): 592 pass 593 594 def _set_device_attr(self, controllers): 595 index = 1 596 for device in self.devices: 597 setattr(self, "Phone{}".format(index), device) 598 setattr(self, "device{}".format(index), device) 599 if controllers.get("screenshot", None): 600 setattr(device, "screenshot", controllers["screenshot"]) 601 if controllers.get("screenshot_fail", None): 602 setattr(device, "screenshot_fail", controllers["screenshot_fail"]) 603 else: 604 setattr(device, "screenshot_fail", True) 605 # 控制agent的模式, 默认是bin模式 606 mode = controllers.get("agent_mode", AgentMode.bin) 607 device.set_agent_mode(mode) 608 609 def set_auto_record_steps(self, flag): 610 """ 611 flag: bool, A switch be used to disable record steps automatically 612 """ 613 self._auto_record_steps = flag 614 615 def get_auto_record_steps_status(self): 616 """ 617 return if record steps automatically 618 """ 619 return self._auto_record_steps 620 621 def get_case_result(self): 622 return self.result 623 624 625class TestCase(CoreCase, BaseCase): 626 """Base class for all test classes to inherit from. 627 This class gets all the controller objects from test_runner and executes 628 the test cases requested within itself. 629 """ 630 631 def __init__(self, tag, configs): 632 if is_env_pool_run_mode(): 633 BaseCase.__init__(self, tag, configs) 634 else: 635 CoreCase.__init__(self, tag, configs) 636 self.devices = [] 637 self.device1 = None 638 self.device2 = None 639 self.set_devices(self.configs["devices"]) 640 self.testLoop = 0 641 642 def _exec_func(self, func, *args): 643 """Executes a function with exception safeguard. 644 Args: 645 func: Function to be executed. 646 args: Arguments to be passed to the function. 647 Returns: 648 Whatever the function returns, or False if unhandled exception 649 occured. 650 """ 651 return CoreCase._exec_func(self, func, *args) 652 653 def loop_start(self): 654 pass 655 656 def loop_end(self, testResult): 657 pass 658 659 def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None, 660 continues_fail=False): 661 662 self.con_fail_times = 0 663 self.last_result = RunResult.PASSED 664 665 self.fail_times = 0 666 self.testLoop = 0 667 for i in range(0, int(looptimes)): 668 self.log.info("--- Loop in %s time ---" % str(i + 1)) 669 self.testLoop += 1 670 if self.result != RunResult.PASSED: 671 if fail_break and not fail_times: 672 break 673 674 if self.project.record.is_shutdown(): 675 self.result = RunResult.FAILED 676 self.error_msg = "Testcase is stopped by manual!" 677 self.log.error(self.error_msg) 678 break 679 680 self.test_method_result = RunResult.PASSED 681 self.error_msg = "" 682 self.loop_start() 683 self.exec_one_testcase(test_name, getattr(self, test_name)) 684 self.loop_end(self.result) 685 686 self.log.info("--- Loop in %s-loop%s time result is: %s ---" 687 % (test_name, str(i + 1), self.test_method_result)) 688 self.test_method_end(self.test_method_result) 689 690 if DeccVariable.cur_case().test_method.func_ret: 691 self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1))) 692 DeccVariable.cur_case().test_method.func_ret.clear() 693 if self.test_method_result != "Passed": 694 self.fail_times += 1 695 696 if continues_fail: 697 if self.last_result != "Passed": 698 self.con_fail_times += 1 699 else: 700 self.con_fail_times = 1 701 702 if cat_log_step is not None: 703 self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step)) 704 if reset_test is not None: 705 self.exec_one_testcase(reset_test, getattr(self, reset_test)) 706 707 self.last_result = self.test_method_result 708 if continues_fail and fail_break and self.con_fail_times > fail_times: 709 break 710 if not continues_fail and fail_break and self.fail_times > fail_times: 711 break 712 713 if self.test_method_result != "Passed": 714 self.test_method_result = RunResult.PASSED 715 716 if not continues_fail and self.fail_times >= fail_times: 717 self.error_msg += " -- Loop fail %d times" % self.fail_times 718 self.log.error("{} fail {} times".format(test_name, self.fail_times)) 719 elif continues_fail and self.con_fail_times >= fail_times: 720 self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times 721 self.log.error("{} continues fail {} times".format(test_name, self.con_fail_times)) 722 else: 723 self.result = RunResult.PASSED 724 self.error_msg = "" 725 726 def set_devices(self, devices): 727 self.devices = devices 728 if not devices: 729 return 730 731 try: 732 for num, _ad in enumerate(self.devices, 1): 733 if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"): 734 setattr(_ad, "device_id", "device{}".format(num)) 735 # 兼容release2 增加id、serial 736 setattr(_ad, "id", _ad.device_id) 737 setattr(_ad, "serial", _ad.device_sn) 738 setattr(self, _ad.device_id, _ad) 739 setattr(self, "device{}".format(num), _ad) 740 except Exception as error: 741 err_msg = ErrorMessage.TestCase.Code_0203007 742 log.error(err_msg, is_traceback=True) 743 raise DeviceTestError(err_msg) from error 744 745 def set_property(self, index): 746 if isinstance(self.project.property_config, dict): 747 get_devices = self.project.property_config.get("devices") 748 if isinstance(get_devices, list) and len(get_devices) > index: 749 propertys = get_devices[index] 750 if isinstance(propertys, dict): 751 self.log.debug("set propertys: {}".format(propertys)) 752 return propertys 753 self.log.debug("get propertys: {}".format(propertys)) 754 self.log.warning("propertys not a dict!") 755 return {} 756 757 def get_property(self, property): 758 if hasattr(self, "propertys"): 759 get_value = self.propertys.get(property) 760 log.debug("get property {}:{}".format(property, get_value)) 761 return get_value 762 else: 763 log.warning("'Device' object has no attribute 'propertys'") 764 return None 765 766 def get_case_report_path(self): 767 return self.configs.get("report_path") 768 769 def get_case_result(self): 770 return self.result 771 772 def set_case_result_content(self, content: dict): 773 """set result content for testcase(供用例拓展结果内容) 774 content: dict, result content 775 """ 776 if not isinstance(content, dict) or not content: 777 return 778 self.log.info(f"setting result content: {content}") 779 setattr(self, "result_content", content) 780 781 def set_auto_record_steps(self, flag): 782 """ 783 flag: bool, A switch be used to disable record steps automatically 784 """ 785 warnings.warn("function is deprecated", DeprecationWarning) 786 cur_case = DeccVariable.cur_case() 787 if cur_case is None: 788 self.log.warning("current case object is none, can not disable record step automatically") 789 return 790 DeccVariable.cur_case().auto_record_steps_info = flag 791 792 793class WindowsTestCase(CoreCase): 794 """Base class for all windows test classes to inherit from. 795 This class gets all the controller objects from test_runner and executes 796 the test cases requested within itself. 797 """ 798 799 def __init__(self, tag, configs): 800 super().__init__(tag, configs) 801 802 def _exec_func(self, func, *args): 803 """Executes a function with exception safeguard. 804 Args: 805 func: Function to be executed. 806 args: Arguments to be passed to the function. 807 Returns: 808 Whatever the function returns, or False if unhandled exception 809 occurred. 810 """ 811 return CoreCase._exec_func(self, func, *args) 812 813 def clear_device_callback_method(self): 814 pass 815 816 817def _log_info_aw_information(func, args, kwargs, is_checkepr=False): 818 cur_case = DeccVariable.cur_case() 819 if len(cur_case.test_method.func_ret) == 0: 820 if is_checkepr: 821 cur_case.set_checkepr(True) 822 cur_case.cur_check_cmd.__init__() 823 else: 824 cur_case.set_checkepr(False) 825 aw_level = "aw" 826 elif len(cur_case.test_method.func_ret) == 1: 827 aw_level = "aw1" 828 else: 829 aw_level = "aw2" 830 aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) 831 log.info("<div class='{}'>{}</div>".format(aw_level, aw_info)) 832 833 834def _get_is_raise_exception(kwargs): 835 if "EXCEPTION" not in kwargs: 836 is_raise_exception = True 837 else: 838 is_raise_exception = kwargs.pop("EXCEPTION") 839 return is_raise_exception, kwargs 840 841 842def _get_msg_args(kwargs): 843 msg_args = None 844 if "failMsg" in kwargs: 845 msg_args = kwargs.pop("failMsg") 846 msg_args = '' if msg_args is None else ErrorMessage.TestCase.Code_0203002.format(msg_args) 847 848 return msg_args, kwargs 849 850 851def _get_ignore_fail(): 852 # 忽略定义在teardown里的接口的运行报错,使得teardown里的每个接口都能被执行 853 return DeccVariable.cur_case().run_section == RunSection.TEARDOWN 854 855 856def _screenshot_and_flash_error_msg(ignore_fail, is_raise_exception, msg_args, 857 func_name, args, error_msg): 858 ScreenAgent.screen_take_picture(args, False, func_name, is_raise_exception=is_raise_exception) 859 if not ignore_fail: 860 # 非teardown阶段 861 if is_raise_exception: 862 _flash_error_msg(msg_args, error_msg) 863 raise DeviceTestError(error_msg) 864 else: 865 # 忽略异常 866 log.info("Ignore current exception because parameter EXCEPTION is False.") 867 else: 868 # teardown阶段 869 _flash_error_msg(msg_args, error_msg) 870 871 872def _is_in_top_aw(): 873 starts_num = DeccVariable.cur_case().test_method.func_ret.count("Starts") 874 ends_num = DeccVariable.cur_case().test_method.func_ret.count("Ends") 875 log.debug("Starts: {}, Ends: {}".format(starts_num, ends_num)) 876 return True if starts_num == ends_num else False 877 878 879def _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time, 880 ignore_fail, is_raise_exception, msg_args): 881 aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) 882 result = False if isinstance(_ret, bool) and not _ret else True 883 cur_case = DeccVariable.cur_case() 884 if _is_in_top_aw(): 885 cost = 0 if cost_time is None else round(cost_time / 1000, 3) 886 log.info("<div class='aw'>{} cost: {}s</div>".format(aw_info, cost)) 887 cur_case.test_method.func_ret.clear() 888 ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception) 889 890 if not cur_case.checkepr and not result: 891 if is_raise_exception and not ignore_fail: 892 _flash_error_msg(msg_args, "测试用例执行失败") 893 if msg_args: 894 raise DeviceTestError(msg_args) 895 raise TestFailure(ErrorMessage.TestCase.Code_0203003.format(aw_info)) 896 897 898def _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail, 899 is_raise_exception, msg_args): 900 cur_case = DeccVariable.cur_case() 901 if _is_in_top_aw(): 902 cur_case.test_method.func_ret.clear() 903 result = False if isinstance(_ret, bool) and not _ret else True 904 ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception) 905 if not _ret: 906 aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) 907 if cur_case.cur_check_cmd.get_cur_check_status(): 908 msg = cur_case.cur_check_cmd.get_cur_check_msg() 909 else: 910 msg = "Check Result: {} = {}!".format(_ret, aw_info) 911 log.info("Return: {}".format(_ret)) 912 913 if is_raise_exception and not ignore_fail: 914 _flash_error_msg(msg_args, "测试用例执行失败") 915 if msg_args: 916 raise DeviceTestError(msg_args) 917 raise TestFailure(ErrorMessage.TestCase.Code_0203003.format(aw_info)) 918 else: 919 log.info(msg) 920 time.sleep(0.01) # 避免日志顺序混乱 921 922 else: 923 log.info("Return: {}".format(_ret)) 924 925 926def _check_exception(exception, in_method=False): 927 # 测试设备断连找不见, 直接抛出异常 928 find_result = re.search(r'device \w* not found|offline', str(exception)) 929 if find_result is not None: 930 err_msg = ErrorMessage.Device.Code_0202306 931 log.error(err_msg) 932 if in_method: 933 return True 934 raise DeviceNotFound(err_msg) 935 return False 936 937 938def checkepr(func: T) -> T: 939 @wraps(func) 940 def wrapper(*args, **kwargs): 941 # set default case obj 942 if DeccVariable.cur_case() is None: 943 cur_case = CurCase(log) 944 DeccVariable.set_cur_case_obj(cur_case) 945 DeccVariable.project.record.is_shutdown() 946 _res = run_checkepr(func, *args, **kwargs) 947 return _res 948 949 return wrapper 950 951 952def keyword(func: T) -> T: 953 @wraps(func) 954 def wrapper(*args, **kwargs): 955 # set default case obj 956 if DeccVariable.cur_case() is None: 957 cur_case = CurCase(log) 958 DeccVariable.set_cur_case_obj(cur_case) 959 DeccVariable.project.record.is_shutdown() 960 run_k = run_keyword(func, *args, **kwargs) 961 return run_k 962 963 return wrapper 964 965 966def run_keyword(func, *args, **kwargs): 967 _log_info_aw_information(func, args, kwargs) 968 DeccVariable.cur_case().test_method.func_ret.append("Starts") 969 is_raise_exception, kwargs = _get_is_raise_exception(kwargs) 970 msg_args, kwargs = _get_msg_args(kwargs) 971 ignore_fail = _get_ignore_fail() 972 is_exception = True 973 _ret = None 974 cost_time = 0 975 func_name = func.__name__ 976 try: 977 TS.start() 978 _ret = func(*args, **kwargs) 979 log.debug("func {} ret: {}".format(func_name, _ret)) 980 cost_time = TS.stop() 981 is_exception = False 982 983 if is_raise_exception and (not ignore_fail) and func_name == 'get_info_from_decc_svr': 984 if isinstance(_ret, dict): 985 if _ret['code'] != 200 and (_ret['success'] == 'false' 986 or _ret['success'] is False): 987 raise TestAssertionError('result error.') 988 except (DeviceNotFound, DeviceTestError) as e: 989 # 设备异常或DeviceTestError,直接将异常抛出,可解决错误码重复添加的问题 990 raise e 991 except TestAssertionError as exception: 992 # 断言的自定义异常优先于aw自定义的failMsg 993 _screenshot_and_flash_error_msg( 994 ignore_fail, is_raise_exception, str(exception), func_name, args, '') 995 except TypeError as exception: 996 error_msg = ErrorMessage.TestCase.Code_0203008.format(exception) 997 log.error(error_msg, is_traceback=True) 998 _screenshot_and_flash_error_msg( 999 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1000 except HdcCommandRejectedException as exception: 1001 _check_exception(exception) 1002 error_msg = ErrorMessage.Device.Code_0202302.format(exception) 1003 log.error(error_msg, is_traceback=True) 1004 _screenshot_and_flash_error_msg( 1005 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1006 except ShellCommandUnresponsiveException as exception: 1007 _check_exception(exception) 1008 error_msg = ErrorMessage.Device.Code_0202304.format(exception) 1009 log.error(error_msg, is_traceback=True) 1010 _screenshot_and_flash_error_msg( 1011 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1012 except AppInstallError as exception: 1013 _check_exception(exception) 1014 error_msg = ErrorMessage.Device.Code_0202307.format(exception) 1015 log.error(error_msg, is_traceback=True) 1016 _screenshot_and_flash_error_msg( 1017 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1018 except RpcNotRunningError as exception: 1019 _check_exception(exception) 1020 error_msg = ErrorMessage.Device.Code_0202305.format(exception) 1021 log.error(error_msg, is_traceback=True) 1022 _screenshot_and_flash_error_msg( 1023 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1024 except ConnectionRefusedError as exception: 1025 # 设备掉线connector client连接拒绝 1026 error_msg = ErrorMessage.Device.Code_0202306.format(exception) 1027 log.error(error_msg) 1028 raise DeviceNotFound(error_msg) from exception 1029 except Exception as exception: 1030 _check_exception(exception) 1031 error_msg = ErrorMessage.TestCase.Code_0203002.format(exception) 1032 log.error(error_msg, is_traceback=True) 1033 _screenshot_and_flash_error_msg( 1034 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1035 finally: 1036 DeccVariable.cur_case().test_method.func_ret.append("Ends") 1037 # 1.若执行接口出现异常,会进入_screenshot_and_flash_error_msg方法 1038 # 2.当接口不是定义在teardown里,且没有接口参数没有设置EXCEPTION=False, 1039 # 那么_screenshot_and_flash_error_msg方法会重新抛出DeviceTestError异常 1040 # 3.若接口不是定义在teardown里,执行出现异常,且回到了顶层aw,将Starts/Ends计数器清零 1041 if is_exception and not ignore_fail and _is_in_top_aw(): 1042 log.debug('----------finally clear Starts/Ends--------') 1043 DeccVariable.cur_case().test_method.func_ret.clear() 1044 1045 if is_exception: 1046 if _is_in_top_aw(): 1047 DeccVariable.cur_case().test_method.func_ret.clear() 1048 return False 1049 _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time, 1050 ignore_fail, is_raise_exception, msg_args) 1051 return _ret 1052 1053 1054def run_checkepr(func, *args, **kwargs): 1055 _log_info_aw_information(func, args, kwargs, is_checkepr=True) 1056 DeccVariable.cur_case().test_method.func_ret.append("Starts") 1057 is_raise_exception, kwargs = _get_is_raise_exception(kwargs) 1058 msg_args, kwargs = _get_msg_args(kwargs) 1059 ignore_fail = _get_ignore_fail() 1060 is_exception = True 1061 _ret = None 1062 func_name = func.__name__ 1063 try: 1064 TS.start() 1065 # 执行当前函数 1066 _ret = func(*args, **kwargs) 1067 log.debug("step {} execute result: {}".format(func_name, _ret)) 1068 TS.stop() 1069 is_exception = False 1070 except (DeviceNotFound, DeviceTestError) as e: 1071 # 设备异常或DeviceTestError,直接将异常抛出,可解决错误码重复添加的问题 1072 raise e 1073 except TestAssertionError as exception: 1074 _screenshot_and_flash_error_msg( 1075 ignore_fail, is_raise_exception, str(exception), func_name, args, '') 1076 except TypeError as exception: 1077 error_msg = ErrorMessage.TestCase.Code_0203008.format(exception) 1078 log.error(error_msg, is_traceback=True) 1079 _screenshot_and_flash_error_msg( 1080 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1081 except HdcCommandRejectedException as exception: 1082 _check_exception(exception) 1083 error_msg = ErrorMessage.Device.Code_0202302.format(exception) 1084 log.error(error_msg, is_traceback=True) 1085 _screenshot_and_flash_error_msg( 1086 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1087 except ShellCommandUnresponsiveException as exception: 1088 _check_exception(exception) 1089 error_msg = ErrorMessage.Device.Code_0202304.format(exception) 1090 log.error(error_msg, is_traceback=True) 1091 _screenshot_and_flash_error_msg( 1092 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1093 except AppInstallError as exception: 1094 _check_exception(exception) 1095 error_msg = ErrorMessage.Device.Code_0202307.format(exception) 1096 log.error(error_msg, is_traceback=True) 1097 _screenshot_and_flash_error_msg( 1098 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1099 except RpcNotRunningError as exception: 1100 _check_exception(exception) 1101 error_msg = ErrorMessage.Device.Code_0202305.format(exception) 1102 log.error(error_msg, is_traceback=True) 1103 _screenshot_and_flash_error_msg( 1104 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1105 except ConnectionRefusedError as exception: 1106 # 设备掉线connector client连接拒绝 1107 error_msg = ErrorMessage.Device.Code_0202306.format(exception) 1108 log.error(error_msg, is_traceback=True) 1109 raise DeviceNotFound(error_msg) from exception 1110 except Exception as exception: 1111 _check_exception(exception) 1112 error_msg = ErrorMessage.TestCase.Code_0203002.format(exception) 1113 log.error(error_msg, is_traceback=True) 1114 _screenshot_and_flash_error_msg( 1115 ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg) 1116 finally: 1117 DeccVariable.cur_case().test_method.func_ret.append("Ends") 1118 # 1.若执行接口出现异常,会进入_screenshot_and_flash_error_msg方法 1119 # 2.当被执行的接口不在teardown阶段里运行,且没有接口参数没有设置EXCEPTION=False, 1120 # _screenshot_and_flash_error_msg方法会重新抛出DeviceTestError异常 1121 # 3.若执行非teardown的接口出现异常,且回到了顶层aw,将Starts/Ends计数器清零 1122 if is_exception and not ignore_fail and _is_in_top_aw(): 1123 log.debug('----------finally clear Starts/Ends--------') 1124 DeccVariable.cur_case().test_method.func_ret.clear() 1125 1126 if is_exception: 1127 if _is_in_top_aw(): 1128 DeccVariable.cur_case().test_method.func_ret.clear() 1129 return False 1130 1131 _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail, 1132 is_raise_exception, msg_args) 1133 return _ret 1134 1135 1136def _flash_error_msg(msg_args, error_msg): 1137 log.info("flash error msg.") 1138 # 优先使用断言的自定义异常,然后再是failMsg,最后是捕获的异常 1139 if msg_args: 1140 if not DeccVariable.cur_case().test_method.error_msg or \ 1141 not DeccVariable.cur_case().test_method.step_flash_fail_msg: 1142 DeccVariable.cur_case().test_method.set_error_msg(msg_args) 1143 DeccVariable.cur_case().test_method.step_flash_fail_msg = True 1144 if not DeccVariable.cur_case().is_upload_method_result: 1145 DeccVariable.cur_case().set_error_msg(msg_args) 1146 else: 1147 if not DeccVariable.cur_case().test_method.error_msg: 1148 # 更新当前步骤error_msg 1149 DeccVariable.cur_case().test_method.set_error_msg(error_msg) 1150 if not DeccVariable.cur_case().is_upload_method_result \ 1151 and DeccVariable.cur_case().error_msg: 1152 DeccVariable.cur_case().set_error_msg(msg_args) 1153 1154 DeccVariable.cur_case().test_method.set_result(RunResult.FAILED) 1155 if DeccVariable.cur_case().case_result == RunResult.PASSED: 1156 DeccVariable.cur_case().set_case_result(RunResult.FAILED) 1157 1158 1159def _gen_aw_invoke_info_no_div(func, args, kwargs): 1160 all_args = [] 1161 name_id = None 1162 if args and getattr(args[0], "__module__", None): 1163 try: 1164 _ad = args[0] 1165 id_strings = [] 1166 _device = getattr(_ad, "_device", None) 1167 if _device: 1168 dev_alias = getattr(_device, "device_id", "") 1169 if dev_alias: 1170 id_strings.append(dev_alias) 1171 dev_sn = getattr(_ad, "device_sn", "") 1172 if dev_sn: 1173 id_strings.append(convert_serial(dev_sn)) 1174 name_id = ".".join(id_strings).replace(" ", ".") 1175 except Exception as exception: 1176 log.error(exception) 1177 args = args[1:] 1178 if name_id is not None: 1179 all_args.append(name_id) 1180 if args: 1181 for arg in args: 1182 all_args.append(str(arg)) 1183 if kwargs: 1184 for key, value in kwargs.items(): 1185 all_args.append("{}={}".format(key, value)) 1186 info_items = [ 1187 func.__module__.split(".")[-1:][0], ".", func.__name__, 1188 "(", ", ".join(all_args), ")" 1189 ] 1190 return "".join(info_items) 1191 1192 1193def GET_TRACEBACK(_trac=""): 1194 if _trac == "AW": 1195 return "".join(traceback.format_exception(*sys.exc_info())), \ 1196 traceback.format_exception(*sys.exc_info())[-1].strip() 1197 return "".join(traceback.format_exception(*sys.exc_info())) 1198 1199 1200def ASSERT(expect, actual): 1201 if expect != actual: 1202 raise TestFailure(ErrorMessage.Assertion.Code_0204026.format(expect, actual)) 1203 1204 1205def CHECK(message, expect, actual): 1206 if DeccVariable.cur_case() is None: 1207 cur_case = CurCase(log) 1208 DeccVariable.set_cur_case_obj(cur_case) 1209 return 1210 MESSAGE(message) 1211 EXPECT(expect) 1212 ACTUAL(actual) 1213 1214 1215def MESSAGE(arg): 1216 if DeccVariable.cur_case() is None: 1217 cur_case = CurCase(log) 1218 DeccVariable.set_cur_case_obj(cur_case) 1219 return 1220 DeccVariable.cur_case().cur_check_cmd.through = get_decode(arg) 1221 log.debug("Description: {}".format( 1222 DeccVariable.cur_case().cur_check_cmd.through)) 1223 1224 1225def EXPECT(arg): 1226 if DeccVariable.cur_case() is None: 1227 cur_case = CurCase(log) 1228 DeccVariable.set_cur_case_obj(cur_case) 1229 return 1230 DeccVariable.cur_case().cur_check_cmd.expect = get_decode(arg) 1231 log.debug("Expected: {}".format( 1232 DeccVariable.cur_case().cur_check_cmd.expect)) 1233 1234 1235def ACTUAL(arg): 1236 if DeccVariable.cur_case() is None: 1237 cur_case = CurCase(log) 1238 DeccVariable.set_cur_case_obj(cur_case) 1239 return 1240 DeccVariable.cur_case().cur_check_cmd.actual = get_decode(arg) 1241 log.debug("Actual: {}".format( 1242 DeccVariable.cur_case().cur_check_cmd.actual)) 1243 1244 1245def Step(name, **kwargs): 1246 """记录用例操作步骤,并展示在用例报告里 1247 Args: 1248 name: str, step name 1249 Example: 1250 Step("11") 1251 Step("11", video="a video address") 1252 """ 1253 cur_case = DeccVariable.cur_case() 1254 if cur_case is None: 1255 log.warning("current case object is none, recording step failed") 1256 return -1 1257 return cur_case.set_step_info(name, **kwargs) 1258 1259 1260def UpdateStep(index, **kwargs): 1261 """更新步骤记录信息 1262 Args: 1263 index: int, step index 1264 Example: 1265 index = Step("11") 1266 UpdateStep(index, video="a video address") 1267 """ 1268 cur_case = DeccVariable.cur_case() 1269 if cur_case is None: 1270 log.warning("current case object is none, updating step failed") 1271 return 1272 cur_case.update_step_info(index, **kwargs) 1273 1274 1275def CheckPoint(checkpoint): 1276 Step(checkpoint) 1277 1278 1279def CONFIG(): 1280 return DeccVariable.project.config_json 1281 1282 1283def get_report_dir(self=None): 1284 """ 1285 get Path to the framework execution case log folder 1286 Returns: log_dir_path 1287 """ 1288 if isinstance(self, TestCase): 1289 return self.project.task_report_dir 1290 return DeccVariable.project.task_report_dir 1291 1292 1293def RecordStep(name, **kwargs): 1294 """记录用例操作步骤,并展示在用例报告里 1295 name: str, step name, 作为唯一标识,用于更新记录 1296 Example: 1297 RecordStep("11", result=True) 1298 RecordStep("11", result=True, video="a video address") 1299 """ 1300 warnings.warn("RecordStep is deprecated, use Step instead", DeprecationWarning) 1301 cur_case = DeccVariable.cur_case() 1302 if cur_case is None: 1303 log.warning("current case object is none, recording step failed") 1304 return 1305 cur_case.set_step_info(name, **kwargs) 1306 1307 1308class Property: 1309 1310 def __init__(self): 1311 pass 1312 1313 def add_attributes(self, key, value): 1314 setattr(self, key, value) 1315 log.debug("Property setattr {}={}".format(key, value)) 1316