1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import os 19import re 20import sys 21import time 22import traceback 23import warnings 24from functools import wraps 25 26from xdevice import convert_serial 27from xdevice import get_decode 28from xdevice import get_cst_time 29from xdevice import ConfigConst 30 31from devicetest.core.error_message import ErrorMessage 32from devicetest.core.exception import BaseTestError 33from devicetest.core.exception import HdcCommandRejectedException 34from devicetest.core.exception import ShellCommandUnresponsiveException 35from devicetest.core.exception import DeviceNotFound 36from devicetest.core.exception import AppInstallError 37from devicetest.core.exception import RpcNotRunningError 38from devicetest.core.exception import TestFailure 39from devicetest.core.exception import TestError 40from devicetest.core.exception import TestSkip 41from devicetest.core.exception import TestTerminated 42from devicetest.core.exception import TestAbortManual 43from devicetest.core.exception import DeviceTestError 44from devicetest.core.exception import TestAssertionError 45from devicetest.core.constants import RunResult 46from devicetest.core.constants import RunSection 47from devicetest.core.constants import RunStatus 48from devicetest.core.variables import DeccVariable 49from devicetest.core.variables import CurCase 50from devicetest.utils.time_util import TS 51from devicetest.utils.type_utils import T 52from devicetest.log.logger import DeviceTestLog as log 53from devicetest.controllers.tools.screen_agent import ScreenAgent 54 55RESULT_LINE_TEMPLATE = "[Test Step] %s %s" 56 57 58def validate_test_name(name): 59 """Checks if a test name is not null. 60 Args: 61 name: name of a test case. 62 Raises: 63 BaseTestError is raised if the name is null. 64 """ 65 if name == "" or name is None or len(name) < 1: 66 raise BaseTestError("Invalid test case name found: {}, " 67 "test method couldn't be none.".format(name)) 68 69 70class DeviceRoot: 71 is_root_device = False 72 73 def __init__(self): 74 pass 75 76 @staticmethod 77 def set_device_root(is_root): 78 DeviceRoot.is_root_device = is_root 79 80 @staticmethod 81 def get_device_root(): 82 return DeviceRoot.is_root_device 83 84 85class BaseCase: 86 """Base class for all test classes to inherit from. 87 This class gets all the controller objects from test_runner and executes 88 the test cases requested within itself. 89 """ 90 91 def __init__(self, tag, configs): 92 self.cur_case = None 93 self.devices = [] 94 self.configs = configs 95 self.result = RunResult.PASSED 96 self.last_result = RunResult.PASSED 97 self.test_method_result = RunResult.PASSED 98 self.section = RunSection.SETUP 99 self.error_msg = '' 100 self.start_time = get_cst_time() 101 self.log = self.configs["log"] 102 self.set_project(self.configs["project"]) 103 self.con_fail_times = 0 104 self.fail_times = 0 105 self.step_flash_fail_msg = False 106 self.pass_through = None 107 self._test_args_para_parse(self.configs.get("testargs", None)) 108 # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果 109 self._is_loop_scenario = False 110 # proxy function 111 self.execption_callback = None 112 # case end function 113 self.case_end_callback = None 114 115 def __enter__(self): 116 return self 117 118 def __exit__(self, *args): 119 self._exec_func(self.clean_up) 120 121 def _test_args_para_parse(self, paras): 122 if not paras: 123 return 124 paras = dict(paras) 125 for para_name in paras.keys(): 126 para_name = para_name.strip() 127 para_values = paras.get(para_name, []) 128 if para_name == ConfigConst.pass_through: 129 self.pass_through = para_values 130 else: 131 continue 132 133 def _print_error(self, exception, error=None, result=RunResult.FAILED, refresh_method_result=False): 134 self.result = result 135 if refresh_method_result: 136 self.test_method_result = result 137 if error is not None: 138 self.error_msg = self.generate_fail_msg("{}: {}".format(error.Topic, exception)) 139 self.log.error(error.Message.en, error_no=error.Code) 140 else: 141 self.error_msg = str(exception) 142 trace_info = traceback.format_exc() 143 self.log.error(self.error_msg) 144 self.log.error(trace_info) 145 index = self.cur_case.step_index 146 if index == -1: 147 return 148 UpdateStep(index, result="fail\n" + _get_fail_line_from_exception(trace_info, self.TAG)) 149 150 def setup(self): 151 """Setup function that will be called before executing any test case 152 in the test class. Implementation is optional. 153 """ 154 return True 155 156 def process(self): 157 """process function that will be called before setup function and after 158 teardown function in the test class. Implementation is optional. 159 """ 160 pass 161 162 def teardown(self): 163 """Teardown function that will be called after all the selected test 164 cases in the test class have been executed. 165 Implementation is optional. 166 """ 167 pass 168 169 def exec_one_testcase(self, test_name, test_func): 170 """Executes one test case and update test results. 171 Args: 172 test_name: Name of the test. 173 test_func: The test function. 174 Returns: 175 True if the test passes, False otherwise. 176 """ 177 if not self._exec_func(self.setup_test): 178 self.log.error("Setup for {} failed, skipping.".format(test_name)) 179 _result = None 180 error = None 181 try: 182 verdict = test_func() 183 except TestSkip: 184 # Test skipped. 185 _result = True 186 self.log.debug("TestSkip") 187 188 except (DeviceNotFound, TestAssertionError, TestTerminated, TestAbortManual, 189 TestError, BaseTestError, DeviceTestError) as exception: 190 error = exception 191 self._print_error(exception, refresh_method_result=True) 192 except HdcCommandRejectedException as exception: 193 error = exception 194 self._print_error(exception, ErrorMessage.Error_01211, refresh_method_result=True) 195 except ShellCommandUnresponsiveException as exception: 196 error = exception 197 self._print_error(exception, ErrorMessage.Error_01212, refresh_method_result=True) 198 except AppInstallError as exception: 199 error = exception 200 self._print_error(exception, ErrorMessage.Error_01213, refresh_method_result=True) 201 except RpcNotRunningError as exception: 202 error = exception 203 self._print_error(exception, ErrorMessage.Error_01440, refresh_method_result=True) 204 except ConnectionRefusedError as exception: 205 error = exception 206 self._print_error(exception, ErrorMessage.Error_01217, refresh_method_result=True) 207 except ImportError as exception: 208 error = exception 209 self._print_error(exception, ErrorMessage.Error_01100, refresh_method_result=True) 210 except TestFailure as exception: 211 error = exception 212 self._print_error(exception, refresh_method_result=True) 213 except Exception as exception: 214 error = exception 215 self._print_error(exception, ErrorMessage.Error_01203, refresh_method_result=True) 216 else: 217 # loop执行场景,由其方法内设置测试结果 218 if self._is_loop_scenario: 219 _result = True 220 return 221 if (verdict is None or verdict is True) \ 222 and self.test_method_result == RunResult.PASSED: 223 # Test passed. 224 self.print_case_result(test_name, RunResult.PASSED) 225 _result = True 226 # Test failed because it didn't return True. 227 # This should be removed eventually. 228 else: 229 error_msg = "test func '{}' The actual input value of " \ 230 "the checkpoint is inconsistent with the " \ 231 "expected result.".format(test_func.__name__) 232 self.log.error( 233 ErrorMessage.Error_01200.Message.en.format(error_msg), 234 error_no=ErrorMessage.Error_01200.Code) 235 self.test_method_result = self.result = RunResult.FAILED 236 self.error_msg = "{}:{}".format(ErrorMessage.Error_01200.Topic, error_msg) 237 _result = False 238 239 finally: 240 if self.execption_callback is not None and error is not None: 241 self.execption_callback(error) 242 self._exec_func(self.teardown_test) 243 244 def _exec_func(self, func, *args): 245 """Executes a function with exception safeguard. 246 Args: 247 func: Function to be executed. 248 args: Arguments to be passed to the function. 249 Returns: 250 Whatever the function returns, or False if unhandled exception 251 occured. 252 """ 253 ret = False 254 error = None 255 try: 256 func(*args) 257 ret = True 258 except (TestError, TestAbortManual, TestTerminated, 259 TestAssertionError, DeviceTestError, DeviceNotFound) as exception: 260 error = exception 261 self._print_error(exception) 262 except HdcCommandRejectedException as exception: 263 error = exception 264 self._print_error(exception, ErrorMessage.Error_01211) 265 except ShellCommandUnresponsiveException as exception: 266 error = exception 267 self._print_error(exception, ErrorMessage.Error_01212) 268 except AppInstallError as exception: 269 error = exception 270 self._print_error(exception, ErrorMessage.Error_01213) 271 except RpcNotRunningError as exception: 272 error = exception 273 self._print_error(exception, ErrorMessage.Error_01440) 274 except ConnectionRefusedError as exception: 275 error = exception 276 self._print_error(exception, ErrorMessage.Error_01217) 277 except Exception as exception: 278 error = exception 279 self._print_error(exception) 280 finally: 281 if self.execption_callback is not None and error is not None: 282 self.execption_callback(error) 283 return ret 284 285 def get_error_code(self): 286 if self.section == RunSection.SETUP: 287 self.log.error(ErrorMessage.Error_01202.Message.en, 288 error_no=ErrorMessage.Error_01202.Code) 289 return ErrorMessage.Error_01202.Topic 290 291 elif self.section == RunSection.TEARDOWN: 292 self.log.error(ErrorMessage.Error_01204.Message.en, 293 error_no=ErrorMessage.Error_01204.Code) 294 return ErrorMessage.Error_01204.Topic 295 296 else: 297 self.log.error(ErrorMessage.Error_01203.Message.en, 298 error_no=ErrorMessage.Error_01203.Code) 299 return ErrorMessage.Error_01203.Topic 300 301 def _get_test_funcs(self): 302 # All tests are selected if test_cases list is None. 303 # Load functions based on test names. Also find the longest test name. 304 test_funcs = [] 305 for test_name in self.tests: 306 try: 307 validate_test_name(test_name) 308 test_funcs.append((test_name, getattr(self, test_name))) 309 except AttributeError: 310 self.result = RunResult.FAILED 311 self.error_msg = "{} does not have test step {}.".format( 312 self.TAG, test_name) 313 self.log.error(self.error_msg) 314 315 except BaseTestError as exception: 316 self.result = RunResult.FAILED 317 self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc())) 318 self.log.error(str(exception)) 319 320 return test_funcs 321 322 def run(self, test_names=None): 323 """Runs test cases within a test class by the order they 324 appear in the test list. 325 Being in the test_names list makes the test case "requested". If its 326 name passes validation, then it'll be executed, otherwise the name will 327 be skipped. 328 Args: 329 test_names: A list of names of the requested test cases. If None, 330 all test cases in the class are considered requested. 331 Returns: 332 A tuple of: The number of requested test cases, the number of test 333 cases executed, and the number of test cases passed. 334 """ 335 if RunStatus.FINISHED == self.run_setup(): 336 return 337 self.run_tests(test_names) 338 self.run_teardown() 339 if self.case_end_callback is not None: 340 self.case_end_callback() 341 342 def run_setup(self): 343 self.section = RunSection.SETUP 344 self.cur_case.set_run_section(self.section) 345 self.run_setup_start() 346 self.log.info("**********SetUp Starts!") 347 ret = self._exec_func(self.setup) 348 if not ret: 349 self.log.info("**********SetUp Ends!") 350 self.log.error("setup step fails") 351 self.section = RunSection.TEARDOWN 352 self.cur_case.set_run_section(self.section) 353 self.log.info("**********TearDown Starts!") 354 self._exec_func(self.teardown) 355 if self.result != RunResult.BLOCKED: 356 self.result = RunResult.FAILED 357 self.log.info('**********TearDown Ends!') 358 359 self.print_case_result(self.TAG, self.result) 360 361 return RunStatus.FINISHED 362 else: 363 self.log.info("**********SetUp Ends!") 364 self.run_setup_end() 365 366 return RunStatus.RUNNING 367 368 def run_tests(self, test_names): 369 self.section = RunSection.TEST 370 self.cur_case.set_run_section(self.section) 371 if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list): 372 tests = self._get_test_funcs() 373 for test_name, test_func in tests: 374 self.run_test(test_name, test_func) 375 else: 376 self.run_process() 377 self.log.info("**********All test methods Ends!**********") 378 379 def run_process(self): 380 self._exec_func(self.process) 381 382 def run_test(self, test_name, test_func): 383 self.log.info("[Test Step] {}".format(test_name)) 384 self.test_method_result = RunResult.PASSED 385 self.exec_one_testcase(test_name, test_func) 386 self.test_method_end(test_name) 387 388 def run_teardown(self): 389 self.section = RunSection.TEARDOWN 390 self.cur_case.set_run_section(self.section) 391 self.run_teardown_start() 392 self.log.info("**********TearDown Starts!") 393 self._exec_func(self.teardown) 394 self.log.info("**********TearDown Ends!") 395 self.run_teardown_end() 396 self._exec_func(self.clean_up) 397 398 def run_perf_models(self, models, fail_break=False): 399 """ 400 models: list, list of model object 401 fail_break: bool, if this is set to True, break down the loop of model execution when it fails 402 """ 403 self._is_loop_scenario = True 404 fail_models, pass_models, total = [], [], len(models) 405 for model in models: 406 model_name = model.__class__.__name__ 407 # 预置model的测试结果pass,避免执行循环时测试结果受到干扰 408 self.test_method_result = RunResult.PASSED 409 self.log.info("Executing test model {}".format(model_name)) 410 # 执行jump_to_start_anchor成功后,再执行execute 411 self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor) 412 if self.test_method_result == RunResult.PASSED: 413 self.exec_one_testcase("{}.execute".format(model_name), model.execute) 414 if self.test_method_result == RunResult.PASSED: 415 pass_models.append(model_name) 416 continue 417 fail_models.append(model_name) 418 if fail_break: 419 break 420 fail_cnt, pass_cnt = len(fail_models), len(pass_models) 421 self.log.info("Test models executed result with " 422 "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models)) 423 # 所有model执行通过,用例pass 424 if pass_cnt == total: 425 self.error_msg = "" 426 self.result = RunResult.PASSED 427 return 428 # 设置因fail退出,用例fail 429 if fail_break: 430 self.result = RunResult.FAILED 431 return 432 desc = ["(", ", ".join(fail_models[:4])] 433 if fail_cnt > 4: 434 desc.append(", etc.") 435 desc.append(")") 436 fail_desc = "".join(desc) 437 # 所有model执行失败 438 if fail_cnt == total: 439 self.error_msg = "All test models fail to be executed {}".format(fail_desc) 440 self.result = RunResult.FAILED 441 return 442 # 部分model通过 443 self.error_msg = "Some test models fail to be executed {}".format(fail_desc) 444 self.result = RunResult.FAILED 445 446 def test_method_end(self, test_name): 447 """ 448 case test method end event 449 """ 450 self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result)) 451 self.log.info("TestMethod: {} End".format(test_name)) 452 453 def clean_up(self): 454 """A function that is executed upon completion of all tests cases 455 selected in the test class. 456 This function should clean up objects initialized in the constructor 457 by user. 458 """ 459 pass 460 461 def run_setup_start(self): 462 """A function that is before all tests cases and before setup cases 463 selected in the test class. 464 This function can be customized to create different base classes or use cases. 465 """ 466 self.setup_start() 467 468 def setup_start(self): 469 """A function that is before all tests cases 470 selected in the test class. 471 This function can be used to execute before running. 472 """ 473 pass 474 475 def run_setup_end(self): 476 """A function that is before all tests cases and after setup cases 477 selected in the test class. 478 This function can be customized to create different base classes or use cases. 479 """ 480 self.setup_end() 481 482 def setup_end(self): 483 """A function that is after setup cases 484 selected in the test class. 485 This function can be used to execute after setup. 486 """ 487 pass 488 489 @classmethod 490 def setup_test(cls): 491 """Setup function that will be called every time before executing each 492 test case in the test class. 493 494 Implementation is optional. 495 """ 496 return True 497 498 def teardown_test(self): 499 """Teardown function that will be called every time a test case has 500 been executed. 501 Implementation is optional. 502 """ 503 pass 504 505 def run_teardown_start(self): 506 """A function that is after all tests cases and before teardown cases 507 selected in the test class. 508 This function can be customized to create different base classes or use cases. 509 """ 510 self.teardown_start() 511 512 def teardown_start(self): 513 """A function that is before teardown cases 514 selected in the test class. 515 This function can be used to execute before running. 516 """ 517 pass 518 519 def run_teardown_end(self): 520 """A function that is after all tests cases and teardown cases 521 selected in the test class. 522 This function can be customized to create different base classes or use cases. 523 """ 524 self.teardown_end() 525 526 def teardown_end(self): 527 """A function that is after teardown cases 528 selected in the test class. 529 This function can be used to execute before running. 530 """ 531 pass 532 533 def print_case_result(self, case, result): 534 self.log.info("****************************Test {} result is: {}" 535 .format(case, result)) 536 537 def generate_fail_msg(self, msg): 538 if isinstance(self.error_msg, str) and self.error_msg != "": 539 pass 540 else: 541 self.error_msg = msg 542 return self.error_msg 543 544 def set_log(self, _log): 545 self.log = _log 546 547 def set_project(self, project): 548 self.project = project 549 DeccVariable.set_project_obj(self.project) 550 self.cur_case = DeccVariable.cur_case() 551 self._init_case_var(self.TAG) 552 self.log.info("init case variables success.") 553 554 def _init_case_var(self, tag): 555 if tag: 556 self.cur_case.set_name(tag) 557 else: 558 self.cur_case.set_name(self.project.execute_case_name) 559 560 if not hasattr(self, "tests"): 561 setattr(self, "tests", ["process"]) 562 self.cur_case.set_step_total(1) 563 self.cur_case.set_case_screenshot_dir(self.project.test_suite_path, 564 self.project.task_report_dir, 565 self.project.cur_case_full_path) 566 self.cur_case.report_path = self.cur_case.case_screenshot_dir + ".html" 567 568 @classmethod 569 def step(cls, _ad, stepepr): 570 result = stepepr 571 if result is not None and isinstance(result, bool) and not result: 572 _ad.log.error(_ad.device_id + " exec step is fail") 573 elif _ad.screenshot: 574 pass 575 return result 576 577 def set_screenrecorder_and_screenshot(self, screenrecorder: bool, screenshot: bool = True): 578 """ 579 Set whether to enable screen recording or screenshot for the device in the test case. 580 """ 581 for device in self.devices: 582 setattr(device, "screenshot", screenshot) 583 if hasattr(device, "is_oh"): 584 setattr(device, "screenrecorder", screenrecorder) 585 586 587class TestCase(BaseCase): 588 """Base class for all test classes to inherit from. 589 This class gets all the controller objects from test_runner and executes 590 the test cases requested within itself. 591 """ 592 593 def __init__(self, tag, configs): 594 super().__init__(tag, configs) 595 self.devices = [] 596 self.device1 = None 597 self.device2 = None 598 self.set_devices(self.configs["devices"]) 599 self.testLoop = 0 600 601 def _exec_func(self, func, *args): 602 """Executes a function with exception safeguard. 603 Args: 604 func: Function to be executed. 605 args: Arguments to be passed to the function. 606 Returns: 607 Whatever the function returns, or False if unhandled exception 608 occured. 609 """ 610 return BaseCase._exec_func(self, func, *args) 611 612 def loop_start(self): 613 pass 614 615 def loop_end(self, testResult): 616 pass 617 618 def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None, 619 continues_fail=False): 620 621 self.con_fail_times = 0 622 self.last_result = RunResult.PASSED 623 624 self.fail_times = 0 625 self.testLoop = 0 626 for i in range(0, int(looptimes)): 627 self.log.info("--- Loop in %s time ---" % str(i + 1)) 628 self.testLoop += 1 629 if self.result != RunResult.PASSED: 630 if fail_break and not fail_times: 631 break 632 633 if self.project.record.is_shutdown(): 634 self.result = RunResult.FAILED 635 self.error_msg = "Testcase is stopped by manual!" 636 self.log.error(self.error_msg) 637 break 638 639 self.test_method_result = RunResult.PASSED 640 self.error_msg = "" 641 self.loop_start() 642 self.exec_one_testcase(test_name, getattr(self, test_name)) 643 self.loop_end(self.result) 644 645 self.log.info("--- Loop in %s-loop%s time result is: %s ---" 646 % (test_name, str(i + 1), self.test_method_result)) 647 self.test_method_end(self.test_method_result) 648 649 if DeccVariable.cur_case().test_method.func_ret: 650 self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1))) 651 DeccVariable.cur_case().test_method.func_ret.clear() 652 if self.test_method_result != "Passed": 653 self.fail_times += 1 654 655 if continues_fail: 656 if self.last_result != "Passed": 657 self.con_fail_times += 1 658 else: 659 self.con_fail_times = 1 660 661 if cat_log_step is not None: 662 self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step)) 663 if reset_test is not None: 664 self.exec_one_testcase(reset_test, getattr(self, reset_test)) 665 666 self.last_result = self.test_method_result 667 if continues_fail and fail_break and self.con_fail_times > fail_times: 668 break 669 if not continues_fail and fail_break and self.fail_times > fail_times: 670 break 671 672 if self.test_method_result != "Passed": 673 self.test_method_result = RunResult.PASSED 674 675 if not continues_fail and self.fail_times >= fail_times: 676 self.error_msg += " -- Loop fail %d times" % self.fail_times 677 self.log.error( 678 "DeviceTest-[{}] {} fail {} times".format(ErrorMessage.Error_01438.Code, test_name, self.fail_times)) 679 elif continues_fail and self.con_fail_times >= fail_times: 680 self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times 681 self.log.error( 682 "DeviceTest-[{}] {} continues fail {} times".format(ErrorMessage.Error_01438.Code, test_name, 683 self.con_fail_times)) 684 else: 685 self.result = RunResult.PASSED 686 self.error_msg = "" 687 688 def set_devices(self, devices): 689 self.devices = devices 690 if not devices: 691 return 692 693 try: 694 for num, _ad in enumerate(self.devices, 1): 695 if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"): 696 setattr(_ad, "device_id", "device{}".format(num)) 697 # 兼容release2 增加id、serial 698 setattr(_ad, "id", _ad.device_id) 699 setattr(_ad, "serial", _ad.device_sn) 700 setattr(self, _ad.device_id, _ad) 701 setattr(self, "device{}".format(num), _ad) 702 except Exception as error: 703 log.error(ErrorMessage.Error_01218.Message.en, 704 error_no=ErrorMessage.Error_01218.Code, 705 is_traceback=True) 706 raise DeviceTestError(ErrorMessage.Error_01218.Topic) from error 707 708 def set_property(self, index): 709 if isinstance(self.project.property_config, dict): 710 get_devices = self.project.property_config.get("devices") 711 if isinstance(get_devices, list) and len(get_devices) > index: 712 propertys = get_devices[index] 713 if isinstance(propertys, dict): 714 self.log.debug("set propertys: {}".format(propertys)) 715 return propertys 716 self.log.debug("get propertys: {}".format(propertys)) 717 self.log.warning("propertys not a dict!") 718 return {} 719 720 def get_property(self, property): 721 if hasattr(self, "propertys"): 722 get_value = self.propertys.get(property) 723 log.debug("get property {}:{}".format(property, get_value)) 724 return get_value 725 else: 726 log.warning("'Device' bject has no attribute 'propertys'") 727 return None 728 729 def get_case_report_path(self): 730 report_path = self.configs.get("report_path") 731 temp_task = "{}temp{}task".format(os.sep, os.sep) 732 if report_path and isinstance(report_path, str) and temp_task in report_path: 733 report_dir_path = report_path.split(temp_task)[0] 734 return report_dir_path 735 return report_path 736 737 def get_case_result(self): 738 return self.result 739 740 def set_auto_record_steps(self, flag): 741 """ 742 flag: bool, A switch be used to disable record steps automatically 743 """ 744 warnings.warn("function is deprecated", DeprecationWarning) 745 cur_case = DeccVariable.cur_case() 746 if cur_case is None: 747 self.log.warning("current case object is none, can not disable record step automatically") 748 return 749 DeccVariable.cur_case().auto_record_steps_info = flag 750 751 752class WindowsTestCase(BaseCase): 753 """Base class for all windows test classes to inherit from. 754 This class gets all the controller objects from test_runner and executes 755 the test cases requested within itself. 756 """ 757 758 def __init__(self, tag, configs): 759 super().__init__(tag, configs) 760 761 def _exec_func(self, func, *args): 762 """Executes a function with exception safeguard. 763 Args: 764 func: Function to be executed. 765 args: Arguments to be passed to the function. 766 Returns: 767 Whatever the function returns, or False if unhandled exception 768 occurred. 769 """ 770 return BaseCase._exec_func(self, func, *args) 771 772 def clear_device_callback_method(self): 773 pass 774 775 776def _log_info_aw_information(func, args, kwargs, is_checkepr=False): 777 cur_case = DeccVariable.cur_case() 778 if len(cur_case.test_method.func_ret) == 0: 779 if is_checkepr: 780 cur_case.set_checkepr(True) 781 cur_case.cur_check_cmd.__init__() 782 else: 783 cur_case.set_checkepr(False) 784 aw_level = "aw" 785 elif len(cur_case.test_method.func_ret) == 1: 786 aw_level = "aw1" 787 else: 788 aw_level = "aw2" 789 aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) 790 log.info("<div class='{}'>{}</div>".format(aw_level, aw_info)) 791 792 793def _get_is_raise_exception(kwargs): 794 if "EXCEPTION" not in kwargs: 795 is_raise_exception = True 796 else: 797 is_raise_exception = kwargs.pop("EXCEPTION") 798 return is_raise_exception, kwargs 799 800 801def _get_msg_args(kwargs): 802 msg_args = None 803 if "failMsg" in kwargs: 804 msg_args = kwargs.pop("failMsg") 805 msg_args = '' if msg_args is None \ 806 else ErrorMessage.Error_01500.Topic.format(msg_args) 807 808 return msg_args, kwargs 809 810 811def _get_ignore_fail(): 812 return DeccVariable.cur_case().run_section == RunSection.TEARDOWN 813 814 815def _screenshot_and_flash_error_msg(ignore_fail, is_raise_exception, msg_args, 816 func_name, args, error_msg): 817 ScreenAgent.screen_take_picture(args, False, func_name, is_raise_exception=is_raise_exception) 818 if not ignore_fail: 819 # 非teardown阶段 820 if is_raise_exception: 821 DeccVariable.cur_case().test_method.func_ret.clear() 822 _flash_error_msg(msg_args, error_msg) 823 raise DeviceTestError(error_msg) 824 else: 825 # 忽略异常 826 log.info("Ignore current exception because parameter EXCEPTION is False.") 827 else: 828 # teardown阶段 829 _flash_error_msg(msg_args, error_msg) 830 831 832def _is_in_top_aw(): 833 starts_num = DeccVariable.cur_case().test_method.func_ret.count("Starts") 834 ends_num = DeccVariable.cur_case().test_method.func_ret.count("Ends") 835 log.debug("Starts: {}, Ends: {}".format(starts_num, ends_num)) 836 return True if starts_num == ends_num else False 837 838 839def _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time, 840 ignore_fail, is_raise_exception, msg_args): 841 aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs) 842 result = False if isinstance(_ret, bool) and not _ret else True 843 cur_case = DeccVariable.cur_case() 844 if _is_in_top_aw(): 845 cost = 0 if cost_time is None else round(cost_time / 1000, 3) 846 log.info("<div class='aw'>{} return: {}, cost: {}s</div>".format(aw_info, _ret, cost)) 847 cur_case.test_method.func_ret.clear() 848 ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception) 849 850 if not cur_case.checkepr and not result: 851 if is_raise_exception and not ignore_fail: 852 _flash_error_msg(msg_args, ErrorMessage.Error_01200.Topic) 853 if msg_args: 854 raise DeviceTestError(msg_args) 855 raise TestFailure("{}: Step {} result TestError!".format( 856 ErrorMessage.Error_01200.Topic, aw_info)) 857 858 859def _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail, 860 is_raise_exception, msg_args): 861 cur_case = DeccVariable.cur_case() 862 if _is_in_top_aw(): 863 cur_case.test_method.func_ret.clear() 864 result = False if isinstance(_ret, bool) and not _ret else True 865 ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception) 866 if not _ret: 867 if cur_case.cur_check_cmd.get_cur_check_status(): 868 msg = cur_case.cur_check_cmd.get_cur_check_msg() 869 else: 870 msg = "Check Result: {} = {}!".format( 871 _ret, _gen_aw_invoke_info_no_div(func, args, kwargs)) 872 log.info("Return: {}".format(_ret)) 873 874 if is_raise_exception and not ignore_fail: 875 _flash_error_msg(msg_args, ErrorMessage.Error_01200.Topic) 876 if msg_args: 877 raise DeviceTestError(msg_args) 878 raise TestFailure("{}: Step {} result TestError!".format( 879 ErrorMessage.Error_01200.Topic, 880 _gen_aw_invoke_info_no_div(func, args, kwargs))) 881 else: 882 log.info(msg) 883 time.sleep(0.01) # 避免日志顺序混乱 884 885 else: 886 log.info("Return: {}".format(_ret)) 887 888 889def _check_exception(exception, in_method=False): 890 # 测试设备断连找不见, 直接抛出异常 891 find_result = re.search(r'device \w* not found|offline', str(exception)) 892 if find_result is not None: 893 log.error(ErrorMessage.Error_01217.Message.en, 894 error_no=ErrorMessage.Error_01217.Code) 895 if in_method: 896 return True 897 raise DeviceNotFound(ErrorMessage.Error_01217.Topic) 898 return False 899 900 901def checkepr(func: T) -> T: 902 @wraps(func) 903 def wrapper(*args, **kwargs): 904 # set default case obj 905 if DeccVariable.cur_case() is None: 906 cur_case = CurCase(log) 907 DeccVariable.set_cur_case_obj(cur_case) 908 DeccVariable.project.record.is_shutdown() 909 _res = run_checkepr(func, *args, **kwargs) 910 return _res 911 912 return wrapper 913 914 915def keyword(func: T) -> T: 916 @wraps(func) 917 def wrapper(*args, **kwargs): 918 # set default case obj 919 if DeccVariable.cur_case() is None: 920 cur_case = CurCase(log) 921 DeccVariable.set_cur_case_obj(cur_case) 922 DeccVariable.project.record.is_shutdown() 923 run_k = run_keyword(func, *args, **kwargs) 924 return run_k 925 926 return wrapper 927 928 929def run_keyword(func, *args, **kwargs): 930 _log_info_aw_information(func, args, kwargs) 931 DeccVariable.cur_case().test_method.func_ret.append("Starts") 932 is_raise_exception, kwargs = _get_is_raise_exception(kwargs) 933 msg_args, kwargs = _get_msg_args(kwargs) 934 ignore_fail = _get_ignore_fail() 935 is_exception = True 936 _ret = None 937 cost_time = 0 938 func_name = func.__name__ 939 try: 940 TS.start() 941 _ret = func(*args, **kwargs) 942 log.debug("func {} ret: {}".format(func_name, _ret)) 943 cost_time = TS.stop() 944 is_exception = False 945 946 if is_raise_exception and (not ignore_fail) and func_name == 'get_info_from_decc_svr': 947 if isinstance(_ret, dict): 948 if _ret['code'] != 200 and (_ret['success'] == 'false' 949 or _ret['success'] is False): 950 raise TestAssertionError('result error.') 951 952 except (DeviceNotFound, DeviceTestError) as e: 953 raise e 954 except TestAssertionError as exception: 955 # 断言的自定义异常优先于aw自定义的failMsg 956 _screenshot_and_flash_error_msg( 957 ignore_fail, is_raise_exception, str(exception), func_name, args, '') 958 959 except TypeError: 960 log.error(ErrorMessage.Error_01209.Message.en, 961 error_no=ErrorMessage.Error_01209.Code, 962 is_traceback=True) 963 _screenshot_and_flash_error_msg( 964 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01209.Topic) 965 966 except HdcCommandRejectedException as exception: 967 _check_exception(exception) 968 log.error(ErrorMessage.Error_01211.Message.en, 969 error_no=ErrorMessage.Error_01211.Code, 970 is_traceback=True) 971 _screenshot_and_flash_error_msg( 972 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01211.Topic) 973 974 except ShellCommandUnresponsiveException as exception: 975 _check_exception(exception) 976 log.error(ErrorMessage.Error_01212.Message.en, 977 error_no=ErrorMessage.Error_01212.Code, 978 is_traceback=True) 979 _screenshot_and_flash_error_msg( 980 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01212.Topic) 981 982 except AppInstallError as exception: 983 _check_exception(exception) 984 log.error(ErrorMessage.Error_01213.Message.en, 985 error_no=ErrorMessage.Error_01213.Code, 986 is_traceback=True) 987 _screenshot_and_flash_error_msg( 988 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01213.Topic) 989 990 except RpcNotRunningError as exception: 991 _check_exception(exception) 992 log.error(ErrorMessage.Error_01440.Message.en, 993 error_no=ErrorMessage.Error_01440.Code, 994 is_traceback=True) 995 _screenshot_and_flash_error_msg( 996 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01440.Topic) 997 998 except ConnectionRefusedError as error: 999 # 设备掉线connector_clinet连接拒绝 1000 log.error(ErrorMessage.Error_01217.Message.en, 1001 error_no=ErrorMessage.Error_01217.Code) 1002 raise DeviceNotFound(ErrorMessage.Error_01217.Topic) from error 1003 1004 except Exception as exception: 1005 _check_exception(exception) 1006 log.error(ErrorMessage.Error_01210.Message.en, 1007 error_no=ErrorMessage.Error_01210.Code, 1008 is_traceback=True) 1009 _screenshot_and_flash_error_msg( 1010 ignore_fail, is_raise_exception, msg_args, func_name, args, 1011 "{}: {}".format(ErrorMessage.Error_01210.Topic, exception)) 1012 finally: 1013 DeccVariable.cur_case().test_method.func_ret.append("Ends") 1014 if is_exception: 1015 if _is_in_top_aw(): 1016 DeccVariable.cur_case().test_method.func_ret.clear() 1017 return False 1018 _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time, 1019 ignore_fail, is_raise_exception, msg_args) 1020 return _ret 1021 1022 1023def run_checkepr(func, *args, **kwargs): 1024 _log_info_aw_information(func, args, kwargs, is_checkepr=True) 1025 DeccVariable.cur_case().test_method.func_ret.append("Starts") 1026 is_raise_exception, kwargs = _get_is_raise_exception(kwargs) 1027 msg_args, kwargs = _get_msg_args(kwargs) 1028 ignore_fail = _get_ignore_fail() 1029 is_exception = True 1030 _ret = None 1031 func_name = func.__name__ 1032 try: 1033 TS.start() 1034 # 执行当前函数 1035 _ret = func(*args, **kwargs) 1036 log.debug("step {} execute result: {}".format(func_name, _ret)) 1037 TS.stop() 1038 is_exception = False 1039 1040 except (DeviceNotFound, DeviceTestError) as e: 1041 raise e 1042 except TestAssertionError as exception: 1043 _screenshot_and_flash_error_msg( 1044 ignore_fail, is_raise_exception, str(exception), func_name, args, '') 1045 1046 except TypeError: 1047 log.error(ErrorMessage.Error_01209.Message.en, 1048 error_no=ErrorMessage.Error_01209.Code, 1049 is_traceback=True) 1050 _screenshot_and_flash_error_msg( 1051 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01209.Topic) 1052 1053 except HdcCommandRejectedException as exception: 1054 _check_exception(exception) 1055 log.error(ErrorMessage.Error_01211.Message.en, 1056 error_no=ErrorMessage.Error_01211.Code, 1057 is_traceback=True) 1058 _screenshot_and_flash_error_msg( 1059 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01211.Topic) 1060 1061 except ShellCommandUnresponsiveException as exception: 1062 _check_exception(exception) 1063 log.error(ErrorMessage.Error_01212.Message.en, 1064 error_no=ErrorMessage.Error_01212.Code, 1065 is_traceback=True) 1066 _screenshot_and_flash_error_msg( 1067 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01212.Topic) 1068 1069 except AppInstallError as exception: 1070 _check_exception(exception) 1071 log.error(ErrorMessage.Error_01213.Message.en, 1072 error_no=ErrorMessage.Error_01213.Code, 1073 is_traceback=True) 1074 _screenshot_and_flash_error_msg( 1075 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01213.Topic) 1076 1077 except RpcNotRunningError as exception: 1078 _check_exception(exception) 1079 log.error(ErrorMessage.Error_01440.Message.en, 1080 error_no=ErrorMessage.Error_01440.Code, 1081 is_traceback=True) 1082 _screenshot_and_flash_error_msg( 1083 ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01440.Topic) 1084 1085 except ConnectionRefusedError as error: 1086 # 设备掉线connector_clinet连接拒绝 1087 log.error(ErrorMessage.Error_01217.Message.en, 1088 error_no=ErrorMessage.Error_01217.Code) 1089 raise DeviceNotFound(ErrorMessage.Error_01217.Topic) from error 1090 1091 except Exception as exception: 1092 _check_exception(exception) 1093 log.error(ErrorMessage.Error_01210.Message.en, 1094 error_no=ErrorMessage.Error_01210.Code, 1095 is_traceback=True) 1096 _screenshot_and_flash_error_msg( 1097 ignore_fail, is_raise_exception, msg_args, func_name, args, 1098 "{}: {}".format(ErrorMessage.Error_01210.Topic, exception)) 1099 finally: 1100 DeccVariable.cur_case().test_method.func_ret.append("Ends") 1101 if is_exception: 1102 if _is_in_top_aw(): 1103 DeccVariable.cur_case().test_method.func_ret.clear() 1104 return False 1105 1106 _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail, 1107 is_raise_exception, msg_args) 1108 return _ret 1109 1110 1111def _flash_error_msg(msg_args, error_msg): 1112 log.info("flash error msg.") 1113 # 优先使用断言的自定义异常,然后再是failMsg,最后是捕获的异常 1114 if msg_args: 1115 if not DeccVariable.cur_case().test_method.error_msg or \ 1116 not DeccVariable.cur_case().test_method.step_flash_fail_msg: 1117 DeccVariable.cur_case().test_method.set_error_msg(msg_args) 1118 DeccVariable.cur_case().test_method.step_flash_fail_msg = True 1119 if not DeccVariable.cur_case().is_upload_method_result: 1120 DeccVariable.cur_case().set_error_msg(msg_args) 1121 else: 1122 if not DeccVariable.cur_case().test_method.error_msg: 1123 # 更新当前步骤error_msg 1124 DeccVariable.cur_case().test_method.set_error_msg(error_msg) 1125 if not DeccVariable.cur_case().is_upload_method_result \ 1126 and DeccVariable.cur_case().error_msg: 1127 DeccVariable.cur_case().set_error_msg(msg_args) 1128 1129 DeccVariable.cur_case().test_method.set_result(RunResult.FAILED) 1130 if DeccVariable.cur_case().case_result == RunResult.PASSED: 1131 DeccVariable.cur_case().set_case_result(RunResult.FAILED) 1132 1133 1134def _gen_aw_invoke_info_no_div(func, args, kwargs): 1135 all_args = [] 1136 name_id = None 1137 if args and getattr(args[0], "__module__", None): 1138 try: 1139 _ad = args[0] 1140 id_strings = [] 1141 dev_id = getattr(_ad, "device_id", "") 1142 if dev_id: 1143 id_strings.append(dev_id) 1144 dev_sn = getattr(_ad, "device_sn", "") 1145 if dev_sn: 1146 id_strings.append(convert_serial(dev_sn)) 1147 name_id = ".".join(id_strings).replace(" ", ".") 1148 except Exception as exception: 1149 log.error(exception) 1150 args = args[1:] 1151 if name_id is not None: 1152 all_args.append(name_id) 1153 if args: 1154 for arg in args: 1155 all_args.append(str(arg)) 1156 if kwargs: 1157 for key, value in kwargs.items(): 1158 all_args.append("{}={}".format(key, value)) 1159 info_items = [ 1160 func.__module__.split(".")[-1:][0], ".", func.__name__, 1161 "(", ", ".join(all_args), ")" 1162 ] 1163 return "".join(info_items) 1164 1165 1166def _get_fail_line_from_exception(trace_info, line_keyword): 1167 match, lines = -1, trace_info.split("\n") 1168 for index, line in enumerate(lines): 1169 if line_keyword not in line: 1170 continue 1171 match = index 1172 if match == -1: 1173 return trace_info 1174 return lines[match].strip() + "\n" + lines[match + 1].strip() 1175 1176 1177def GET_TRACEBACK(_trac=""): 1178 if _trac == "AW": 1179 return "".join(traceback.format_exception(*sys.exc_info())), \ 1180 traceback.format_exception(*sys.exc_info())[-1].strip() 1181 return "".join(traceback.format_exception(*sys.exc_info())) 1182 1183 1184def ASSERT(expect, actual): 1185 if expect != actual: 1186 raise TestFailure("{}: ASSERT TestError, Expect: {}, Actual: {}".format(ErrorMessage.Error_01200.Topic, 1187 expect, actual)) 1188 1189 1190def CHECK(message, expect, actual): 1191 if DeccVariable.cur_case() is None: 1192 cur_case = CurCase(log) 1193 DeccVariable.set_cur_case_obj(cur_case) 1194 return 1195 MESSAGE(message) 1196 EXPECT(expect) 1197 ACTUAL(actual) 1198 1199 1200def MESSAGE(arg): 1201 if DeccVariable.cur_case() is None: 1202 cur_case = CurCase(log) 1203 DeccVariable.set_cur_case_obj(cur_case) 1204 return 1205 DeccVariable.cur_case().cur_check_cmd.through = get_decode(arg) 1206 log.debug("Description: {}".format( 1207 DeccVariable.cur_case().cur_check_cmd.through)) 1208 1209 1210def EXPECT(arg): 1211 if DeccVariable.cur_case() is None: 1212 cur_case = CurCase(log) 1213 DeccVariable.set_cur_case_obj(cur_case) 1214 return 1215 DeccVariable.cur_case().cur_check_cmd.expect = get_decode(arg) 1216 log.debug("Expected: {}".format( 1217 DeccVariable.cur_case().cur_check_cmd.expect)) 1218 1219 1220def ACTUAL(arg): 1221 if DeccVariable.cur_case() is None: 1222 cur_case = CurCase(log) 1223 DeccVariable.set_cur_case_obj(cur_case) 1224 return 1225 DeccVariable.cur_case().cur_check_cmd.actual = get_decode(arg) 1226 log.debug("Actual: {}".format( 1227 DeccVariable.cur_case().cur_check_cmd.actual)) 1228 1229 1230def Step(name, **kwargs): 1231 """记录用例操作步骤,并展示在用例报告里 1232 Args: 1233 name: str, step name 1234 Example: 1235 Step("11") 1236 Step("11", video="a video address") 1237 """ 1238 cur_case = DeccVariable.cur_case() 1239 if cur_case is None: 1240 log.warning("current case object is none, recording step failed") 1241 return -1 1242 return cur_case.set_step_info(name, **kwargs) 1243 1244 1245def UpdateStep(index, **kwargs): 1246 """更新步骤记录信息 1247 Args: 1248 index: int, step index 1249 Example: 1250 index = Step("11") 1251 UpdateStep(index, video="a video address") 1252 """ 1253 cur_case = DeccVariable.cur_case() 1254 if cur_case is None: 1255 log.warning("current case object is none, updating step failed") 1256 return 1257 cur_case.update_step_info(index, **kwargs) 1258 1259 1260def CheckPoint(checkpoint): 1261 Step(checkpoint) 1262 1263 1264def CONFIG(): 1265 return DeccVariable.project.config_json 1266 1267 1268def get_report_dir(self=None): 1269 """ 1270 get Path to the framework execution case log folder 1271 Returns: log_dir_path 1272 """ 1273 if isinstance(self, TestCase): 1274 return self.project.task_report_dir 1275 return DeccVariable.project.task_report_dir 1276 1277 1278class Property: 1279 1280 def __init__(self): 1281 pass 1282 1283 def add_attributes(self, key, value): 1284 setattr(self, key, value) 1285 log.debug("Property setattr {}={}".format(key, value)) 1286