• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import os
19import re
20import sys
21import time
22import traceback
23import warnings
24from functools import wraps
25
26from xdevice import convert_serial
27from xdevice import get_decode
28from xdevice import get_cst_time
29from xdevice import ConfigConst
30
31from devicetest.core.error_message import ErrorMessage
32from devicetest.core.exception import BaseTestError
33from devicetest.core.exception import HdcCommandRejectedException
34from devicetest.core.exception import ShellCommandUnresponsiveException
35from devicetest.core.exception import DeviceNotFound
36from devicetest.core.exception import AppInstallError
37from devicetest.core.exception import RpcNotRunningError
38from devicetest.core.exception import TestFailure
39from devicetest.core.exception import TestError
40from devicetest.core.exception import TestSkip
41from devicetest.core.exception import TestTerminated
42from devicetest.core.exception import TestAbortManual
43from devicetest.core.exception import DeviceTestError
44from devicetest.core.exception import TestAssertionError
45from devicetest.core.constants import RunResult
46from devicetest.core.constants import RunSection
47from devicetest.core.constants import RunStatus
48from devicetest.core.variables import DeccVariable
49from devicetest.core.variables import CurCase
50from devicetest.utils.time_util import TS
51from devicetest.utils.type_utils import T
52from devicetest.log.logger import DeviceTestLog as log
53from devicetest.controllers.tools.screen_agent import ScreenAgent
54
55RESULT_LINE_TEMPLATE = "[Test Step] %s %s"
56
57
58def validate_test_name(name):
59    """Checks if a test name is not null.
60    Args:
61        name: name of a test case.
62    Raises:
63        BaseTestError is raised if the name is null.
64    """
65    if name == "" or name is None or len(name) < 1:
66        raise BaseTestError("Invalid test case name found: {}, "
67                            "test method couldn't be none.".format(name))
68
69
70class DeviceRoot:
71    is_root_device = False
72
73    def __init__(self):
74        pass
75
76    @staticmethod
77    def set_device_root(is_root):
78        DeviceRoot.is_root_device = is_root
79
80    @staticmethod
81    def get_device_root():
82        return DeviceRoot.is_root_device
83
84
85class BaseCase:
86    """Base class for all test classes to inherit from.
87    This class gets all the controller objects from test_runner and executes
88    the test cases requested within itself.
89    """
90
91    def __init__(self, tag, configs):
92        self.cur_case = None
93        self.devices = []
94        self.configs = configs
95        self.result = RunResult.PASSED
96        self.last_result = RunResult.PASSED
97        self.test_method_result = RunResult.PASSED
98        self.section = RunSection.SETUP
99        self.error_msg = ''
100        self.start_time = get_cst_time()
101        self.log = self.configs["log"]
102        self.set_project(self.configs["project"])
103        self.con_fail_times = 0
104        self.fail_times = 0
105        self.step_flash_fail_msg = False
106        self.pass_through = None
107        self._test_args_para_parse(self.configs.get("testargs", None))
108        # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果
109        self._is_loop_scenario = False
110        # proxy function
111        self.execption_callback = None
112        # case end function
113        self.case_end_callback = None
114
115    def __enter__(self):
116        return self
117
118    def __exit__(self, *args):
119        self._exec_func(self.clean_up)
120
121    def _test_args_para_parse(self, paras):
122        if not paras:
123            return
124        paras = dict(paras)
125        for para_name in paras.keys():
126            para_name = para_name.strip()
127            para_values = paras.get(para_name, [])
128            if para_name == ConfigConst.pass_through:
129                self.pass_through = para_values
130            else:
131                continue
132
133    def _print_error(self, exception, error=None, result=RunResult.FAILED, refresh_method_result=False):
134        self.result = result
135        if refresh_method_result:
136            self.test_method_result = result
137        if error is not None:
138            self.error_msg = self.generate_fail_msg("{}: {}".format(error.Topic, exception))
139            self.log.error(error.Message.en, error_no=error.Code)
140        else:
141            self.error_msg = str(exception)
142        trace_info = traceback.format_exc()
143        self.log.error(self.error_msg)
144        self.log.error(trace_info)
145        index = self.cur_case.step_index
146        if index == -1:
147            return
148        UpdateStep(index, result="fail\n" + _get_fail_line_from_exception(trace_info, self.TAG))
149
150    def setup(self):
151        """Setup function that will be called before executing any test case
152        in the test class. Implementation is optional.
153        """
154        return True
155
156    def process(self):
157        """process function that will be called before setup function and after
158        teardown function in the test class. Implementation is optional.
159        """
160        pass
161
162    def teardown(self):
163        """Teardown function that will be called after all the selected test
164        cases in the test class have been executed.
165        Implementation is optional.
166        """
167        pass
168
169    def exec_one_testcase(self, test_name, test_func):
170        """Executes one test case and update test results.
171        Args:
172            test_name: Name of the test.
173            test_func: The test function.
174        Returns:
175            True if the test passes, False otherwise.
176        """
177        if not self._exec_func(self.setup_test):
178            self.log.error("Setup for {} failed, skipping.".format(test_name))
179        _result = None
180        error = None
181        try:
182            verdict = test_func()
183        except TestSkip:
184            # Test skipped.
185            _result = True
186            self.log.debug("TestSkip")
187
188        except (DeviceNotFound, TestAssertionError, TestTerminated, TestAbortManual,
189                TestError, BaseTestError, DeviceTestError) as exception:
190            error = exception
191            self._print_error(exception, refresh_method_result=True)
192        except HdcCommandRejectedException as exception:
193            error = exception
194            self._print_error(exception, ErrorMessage.Error_01211, refresh_method_result=True)
195        except ShellCommandUnresponsiveException as exception:
196            error = exception
197            self._print_error(exception, ErrorMessage.Error_01212, refresh_method_result=True)
198        except AppInstallError as exception:
199            error = exception
200            self._print_error(exception, ErrorMessage.Error_01213, refresh_method_result=True)
201        except RpcNotRunningError as exception:
202            error = exception
203            self._print_error(exception, ErrorMessage.Error_01440, refresh_method_result=True)
204        except ConnectionRefusedError as exception:
205            error = exception
206            self._print_error(exception, ErrorMessage.Error_01217, refresh_method_result=True)
207        except ImportError as exception:
208            error = exception
209            self._print_error(exception, ErrorMessage.Error_01100, refresh_method_result=True)
210        except TestFailure as exception:
211            error = exception
212            self._print_error(exception, refresh_method_result=True)
213        except Exception as exception:
214            error = exception
215            self._print_error(exception, ErrorMessage.Error_01203, refresh_method_result=True)
216        else:
217            # loop执行场景,由其方法内设置测试结果
218            if self._is_loop_scenario:
219                _result = True
220                return
221            if (verdict is None or verdict is True) \
222                    and self.test_method_result == RunResult.PASSED:
223                # Test passed.
224                self.print_case_result(test_name, RunResult.PASSED)
225                _result = True
226            # Test failed because it didn't return True.
227            # This should be removed eventually.
228            else:
229                error_msg = "test func '{}' The actual input value of " \
230                            "the checkpoint is inconsistent with the " \
231                            "expected result.".format(test_func.__name__)
232                self.log.error(
233                    ErrorMessage.Error_01200.Message.en.format(error_msg),
234                    error_no=ErrorMessage.Error_01200.Code)
235                self.test_method_result = self.result = RunResult.FAILED
236                self.error_msg = "{}:{}".format(ErrorMessage.Error_01200.Topic, error_msg)
237                _result = False
238
239        finally:
240            if self.execption_callback is not None and error is not None:
241                self.execption_callback(error)
242            self._exec_func(self.teardown_test)
243
244    def _exec_func(self, func, *args):
245        """Executes a function with exception safeguard.
246        Args:
247            func: Function to be executed.
248            args: Arguments to be passed to the function.
249        Returns:
250            Whatever the function returns, or False if unhandled exception
251            occured.
252        """
253        ret = False
254        error = None
255        try:
256            func(*args)
257            ret = True
258        except (TestError, TestAbortManual, TestTerminated,
259                TestAssertionError, DeviceTestError, DeviceNotFound) as exception:
260            error = exception
261            self._print_error(exception)
262        except HdcCommandRejectedException as exception:
263            error = exception
264            self._print_error(exception, ErrorMessage.Error_01211)
265        except ShellCommandUnresponsiveException as exception:
266            error = exception
267            self._print_error(exception, ErrorMessage.Error_01212)
268        except AppInstallError as exception:
269            error = exception
270            self._print_error(exception, ErrorMessage.Error_01213)
271        except RpcNotRunningError as exception:
272            error = exception
273            self._print_error(exception, ErrorMessage.Error_01440)
274        except ConnectionRefusedError as exception:
275            error = exception
276            self._print_error(exception, ErrorMessage.Error_01217)
277        except Exception as exception:
278            error = exception
279            self._print_error(exception)
280        finally:
281            if self.execption_callback is not None and error is not None:
282                self.execption_callback(error)
283        return ret
284
285    def get_error_code(self):
286        if self.section == RunSection.SETUP:
287            self.log.error(ErrorMessage.Error_01202.Message.en,
288                           error_no=ErrorMessage.Error_01202.Code)
289            return ErrorMessage.Error_01202.Topic
290
291        elif self.section == RunSection.TEARDOWN:
292            self.log.error(ErrorMessage.Error_01204.Message.en,
293                           error_no=ErrorMessage.Error_01204.Code)
294            return ErrorMessage.Error_01204.Topic
295
296        else:
297            self.log.error(ErrorMessage.Error_01203.Message.en,
298                           error_no=ErrorMessage.Error_01203.Code)
299            return ErrorMessage.Error_01203.Topic
300
301    def _get_test_funcs(self):
302        # All tests are selected if test_cases list is None.
303        # Load functions based on test names. Also find the longest test name.
304        test_funcs = []
305        for test_name in self.tests:
306            try:
307                validate_test_name(test_name)
308                test_funcs.append((test_name, getattr(self, test_name)))
309            except AttributeError:
310                self.result = RunResult.FAILED
311                self.error_msg = "{} does not have test step {}.".format(
312                    self.TAG, test_name)
313                self.log.error(self.error_msg)
314
315            except BaseTestError as exception:
316                self.result = RunResult.FAILED
317                self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc()))
318                self.log.error(str(exception))
319
320        return test_funcs
321
322    def run(self, test_names=None):
323        """Runs test cases within a test class by the order they
324        appear in the test list.
325        Being in the test_names list makes the test case "requested". If its
326        name passes validation, then it'll be executed, otherwise the name will
327        be skipped.
328        Args:
329            test_names: A list of names of the requested test cases. If None,
330                all test cases in the class are considered requested.
331        Returns:
332            A tuple of: The number of requested test cases, the number of test
333            cases executed, and the number of test cases passed.
334        """
335        if RunStatus.FINISHED == self.run_setup():
336            return
337        self.run_tests(test_names)
338        self.run_teardown()
339        if self.case_end_callback is not None:
340            self.case_end_callback()
341
342    def run_setup(self):
343        self.section = RunSection.SETUP
344        self.cur_case.set_run_section(self.section)
345        self.run_setup_start()
346        self.log.info("**********SetUp Starts!")
347        ret = self._exec_func(self.setup)
348        if not ret:
349            self.log.info("**********SetUp Ends!")
350            self.log.error("setup step fails")
351            self.section = RunSection.TEARDOWN
352            self.cur_case.set_run_section(self.section)
353            self.log.info("**********TearDown Starts!")
354            self._exec_func(self.teardown)
355            if self.result != RunResult.BLOCKED:
356                self.result = RunResult.FAILED
357            self.log.info('**********TearDown Ends!')
358
359            self.print_case_result(self.TAG, self.result)
360
361            return RunStatus.FINISHED
362        else:
363            self.log.info("**********SetUp Ends!")
364            self.run_setup_end()
365
366        return RunStatus.RUNNING
367
368    def run_tests(self, test_names):
369        self.section = RunSection.TEST
370        self.cur_case.set_run_section(self.section)
371        if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list):
372            tests = self._get_test_funcs()
373            for test_name, test_func in tests:
374                self.run_test(test_name, test_func)
375        else:
376            self.run_process()
377        self.log.info("**********All test methods Ends!**********")
378
379    def run_process(self):
380        self._exec_func(self.process)
381
382    def run_test(self, test_name, test_func):
383        self.log.info("[Test Step] {}".format(test_name))
384        self.test_method_result = RunResult.PASSED
385        self.exec_one_testcase(test_name, test_func)
386        self.test_method_end(test_name)
387
388    def run_teardown(self):
389        self.section = RunSection.TEARDOWN
390        self.cur_case.set_run_section(self.section)
391        self.run_teardown_start()
392        self.log.info("**********TearDown Starts!")
393        self._exec_func(self.teardown)
394        self.log.info("**********TearDown Ends!")
395        self.run_teardown_end()
396        self._exec_func(self.clean_up)
397
398    def run_perf_models(self, models, fail_break=False):
399        """
400        models: list, list of model object
401        fail_break: bool, if this is set to True, break down the loop of model execution when it fails
402        """
403        self._is_loop_scenario = True
404        fail_models, pass_models, total = [], [], len(models)
405        for model in models:
406            model_name = model.__class__.__name__
407            # 预置model的测试结果pass,避免执行循环时测试结果受到干扰
408            self.test_method_result = RunResult.PASSED
409            self.log.info("Executing test model {}".format(model_name))
410            # 执行jump_to_start_anchor成功后,再执行execute
411            self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor)
412            if self.test_method_result == RunResult.PASSED:
413                self.exec_one_testcase("{}.execute".format(model_name), model.execute)
414            if self.test_method_result == RunResult.PASSED:
415                pass_models.append(model_name)
416                continue
417            fail_models.append(model_name)
418            if fail_break:
419                break
420        fail_cnt, pass_cnt = len(fail_models), len(pass_models)
421        self.log.info("Test models executed result with "
422                      "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models))
423        # 所有model执行通过,用例pass
424        if pass_cnt == total:
425            self.error_msg = ""
426            self.result = RunResult.PASSED
427            return
428        # 设置因fail退出,用例fail
429        if fail_break:
430            self.result = RunResult.FAILED
431            return
432        desc = ["(", ", ".join(fail_models[:4])]
433        if fail_cnt > 4:
434            desc.append(", etc.")
435        desc.append(")")
436        fail_desc = "".join(desc)
437        # 所有model执行失败
438        if fail_cnt == total:
439            self.error_msg = "All test models fail to be executed {}".format(fail_desc)
440            self.result = RunResult.FAILED
441            return
442        # 部分model通过
443        self.error_msg = "Some test models fail to be executed {}".format(fail_desc)
444        self.result = RunResult.FAILED
445
446    def test_method_end(self, test_name):
447        """
448        case test method end event
449        """
450        self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result))
451        self.log.info("TestMethod: {} End".format(test_name))
452
453    def clean_up(self):
454        """A function that is executed upon completion of all tests cases
455        selected in the test class.
456        This function should clean up objects initialized in the constructor
457        by user.
458        """
459        pass
460
461    def run_setup_start(self):
462        """A function that is before all tests cases and before setup cases
463        selected in the test class.
464        This function can be customized to create different base classes or use cases.
465        """
466        self.setup_start()
467
468    def setup_start(self):
469        """A function that is before all tests cases
470        selected in the test class.
471        This function can be used to execute before running.
472        """
473        pass
474
475    def run_setup_end(self):
476        """A function that is before all tests cases and after setup cases
477        selected in the test class.
478        This function can be customized to create different base classes or use cases.
479        """
480        self.setup_end()
481
482    def setup_end(self):
483        """A function that is after setup cases
484        selected in the test class.
485        This function can be used to execute after setup.
486        """
487        pass
488
489    @classmethod
490    def setup_test(cls):
491        """Setup function that will be called every time before executing each
492        test case in the test class.
493
494        Implementation is optional.
495        """
496        return True
497
498    def teardown_test(self):
499        """Teardown function that will be called every time a test case has
500        been executed.
501        Implementation is optional.
502        """
503        pass
504
505    def run_teardown_start(self):
506        """A function that is after all tests cases and before teardown cases
507        selected in the test class.
508        This function can be customized to create different base classes or use cases.
509        """
510        self.teardown_start()
511
512    def teardown_start(self):
513        """A function that is before teardown cases
514        selected in the test class.
515        This function can be used to execute before running.
516        """
517        pass
518
519    def run_teardown_end(self):
520        """A function that is after all tests cases and teardown cases
521        selected in the test class.
522        This function can be customized to create different base classes or use cases.
523        """
524        self.teardown_end()
525
526    def teardown_end(self):
527        """A function that is after teardown cases
528        selected in the test class.
529        This function can be used to execute before running.
530        """
531        pass
532
533    def print_case_result(self, case, result):
534        self.log.info("****************************Test {} result is: {}"
535                      .format(case, result))
536
537    def generate_fail_msg(self, msg):
538        if isinstance(self.error_msg, str) and self.error_msg != "":
539            pass
540        else:
541            self.error_msg = msg
542        return self.error_msg
543
544    def set_log(self, _log):
545        self.log = _log
546
547    def set_project(self, project):
548        self.project = project
549        DeccVariable.set_project_obj(self.project)
550        self.cur_case = DeccVariable.cur_case()
551        self._init_case_var(self.TAG)
552        self.log.info("init case variables success.")
553
554    def _init_case_var(self, tag):
555        if tag:
556            self.cur_case.set_name(tag)
557        else:
558            self.cur_case.set_name(self.project.execute_case_name)
559
560        if not hasattr(self, "tests"):
561            setattr(self, "tests", ["process"])
562        self.cur_case.set_step_total(1)
563        self.cur_case.set_case_screenshot_dir(self.project.test_suite_path,
564                                              self.project.task_report_dir,
565                                              self.project.cur_case_full_path)
566        self.cur_case.report_path = self.cur_case.case_screenshot_dir + ".html"
567
568    @classmethod
569    def step(cls, _ad, stepepr):
570        result = stepepr
571        if result is not None and isinstance(result, bool) and not result:
572            _ad.log.error(_ad.device_id + " exec step is fail")
573        elif _ad.screenshot:
574            pass
575        return result
576
577    def set_screenrecorder_and_screenshot(self, screenrecorder: bool, screenshot: bool = True):
578        """
579        Set whether to enable screen recording or screenshot for the device in the test case.
580        """
581        for device in self.devices:
582            setattr(device, "screenshot", screenshot)
583            if hasattr(device, "is_oh"):
584                setattr(device, "screenrecorder", screenrecorder)
585
586
587class TestCase(BaseCase):
588    """Base class for all test classes to inherit from.
589    This class gets all the controller objects from test_runner and executes
590    the test cases requested within itself.
591    """
592
593    def __init__(self, tag, configs):
594        super().__init__(tag, configs)
595        self.devices = []
596        self.device1 = None
597        self.device2 = None
598        self.set_devices(self.configs["devices"])
599        self.testLoop = 0
600
601    def _exec_func(self, func, *args):
602        """Executes a function with exception safeguard.
603        Args:
604            func: Function to be executed.
605            args: Arguments to be passed to the function.
606        Returns:
607            Whatever the function returns, or False if unhandled exception
608            occured.
609        """
610        return BaseCase._exec_func(self, func, *args)
611
612    def loop_start(self):
613        pass
614
615    def loop_end(self, testResult):
616        pass
617
618    def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None,
619             continues_fail=False):
620
621        self.con_fail_times = 0
622        self.last_result = RunResult.PASSED
623
624        self.fail_times = 0
625        self.testLoop = 0
626        for i in range(0, int(looptimes)):
627            self.log.info("--- Loop in %s time ---" % str(i + 1))
628            self.testLoop += 1
629            if self.result != RunResult.PASSED:
630                if fail_break and not fail_times:
631                    break
632
633            if self.project.record.is_shutdown():
634                self.result = RunResult.FAILED
635                self.error_msg = "Testcase is stopped by manual!"
636                self.log.error(self.error_msg)
637                break
638
639            self.test_method_result = RunResult.PASSED
640            self.error_msg = ""
641            self.loop_start()
642            self.exec_one_testcase(test_name, getattr(self, test_name))
643            self.loop_end(self.result)
644
645            self.log.info("--- Loop in %s-loop%s time result is: %s ---"
646                          % (test_name, str(i + 1), self.test_method_result))
647            self.test_method_end(self.test_method_result)
648
649            if DeccVariable.cur_case().test_method.func_ret:
650                self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1)))
651                DeccVariable.cur_case().test_method.func_ret.clear()
652            if self.test_method_result != "Passed":
653                self.fail_times += 1
654
655                if continues_fail:
656                    if self.last_result != "Passed":
657                        self.con_fail_times += 1
658                    else:
659                        self.con_fail_times = 1
660
661                if cat_log_step is not None:
662                    self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step))
663                if reset_test is not None:
664                    self.exec_one_testcase(reset_test, getattr(self, reset_test))
665
666            self.last_result = self.test_method_result
667            if continues_fail and fail_break and self.con_fail_times > fail_times:
668                break
669            if not continues_fail and fail_break and self.fail_times > fail_times:
670                break
671
672        if self.test_method_result != "Passed":
673            self.test_method_result = RunResult.PASSED
674
675        if not continues_fail and self.fail_times >= fail_times:
676            self.error_msg += " -- Loop fail %d times" % self.fail_times
677            self.log.error(
678                "DeviceTest-[{}] {} fail {} times".format(ErrorMessage.Error_01438.Code, test_name, self.fail_times))
679        elif continues_fail and self.con_fail_times >= fail_times:
680            self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times
681            self.log.error(
682                "DeviceTest-[{}] {} continues fail {} times".format(ErrorMessage.Error_01438.Code, test_name,
683                                                                    self.con_fail_times))
684        else:
685            self.result = RunResult.PASSED
686            self.error_msg = ""
687
688    def set_devices(self, devices):
689        self.devices = devices
690        if not devices:
691            return
692
693        try:
694            for num, _ad in enumerate(self.devices, 1):
695                if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"):
696                    setattr(_ad, "device_id", "device{}".format(num))
697                # 兼容release2 增加id、serial
698                setattr(_ad, "id", _ad.device_id)
699                setattr(_ad, "serial", _ad.device_sn)
700                setattr(self, _ad.device_id, _ad)
701                setattr(self, "device{}".format(num), _ad)
702        except Exception as error:
703            log.error(ErrorMessage.Error_01218.Message.en,
704                      error_no=ErrorMessage.Error_01218.Code,
705                      is_traceback=True)
706            raise DeviceTestError(ErrorMessage.Error_01218.Topic) from error
707
708    def set_property(self, index):
709        if isinstance(self.project.property_config, dict):
710            get_devices = self.project.property_config.get("devices")
711            if isinstance(get_devices, list) and len(get_devices) > index:
712                propertys = get_devices[index]
713                if isinstance(propertys, dict):
714                    self.log.debug("set propertys: {}".format(propertys))
715                    return propertys
716                self.log.debug("get propertys: {}".format(propertys))
717                self.log.warning("propertys not a dict!")
718        return {}
719
720    def get_property(self, property):
721        if hasattr(self, "propertys"):
722            get_value = self.propertys.get(property)
723            log.debug("get property {}:{}".format(property, get_value))
724            return get_value
725        else:
726            log.warning("'Device' bject has no attribute 'propertys'")
727            return None
728
729    def get_case_report_path(self):
730        report_path = self.configs.get("report_path")
731        temp_task = "{}temp{}task".format(os.sep, os.sep)
732        if report_path and isinstance(report_path, str) and temp_task in report_path:
733            report_dir_path = report_path.split(temp_task)[0]
734            return report_dir_path
735        return report_path
736
737    def get_case_result(self):
738        return self.result
739
740    def set_auto_record_steps(self, flag):
741        """
742        flag: bool, A switch be used to disable record steps automatically
743        """
744        warnings.warn("function is deprecated", DeprecationWarning)
745        cur_case = DeccVariable.cur_case()
746        if cur_case is None:
747            self.log.warning("current case object is none, can not disable record step automatically")
748            return
749        DeccVariable.cur_case().auto_record_steps_info = flag
750
751
752class WindowsTestCase(BaseCase):
753    """Base class for all windows test classes to inherit from.
754    This class gets all the controller objects from test_runner and executes
755    the test cases requested within itself.
756    """
757
758    def __init__(self, tag, configs):
759        super().__init__(tag, configs)
760
761    def _exec_func(self, func, *args):
762        """Executes a function with exception safeguard.
763        Args:
764            func: Function to be executed.
765            args: Arguments to be passed to the function.
766        Returns:
767            Whatever the function returns, or False if unhandled exception
768            occurred.
769        """
770        return BaseCase._exec_func(self, func, *args)
771
772    def clear_device_callback_method(self):
773        pass
774
775
776def _log_info_aw_information(func, args, kwargs, is_checkepr=False):
777    cur_case = DeccVariable.cur_case()
778    if len(cur_case.test_method.func_ret) == 0:
779        if is_checkepr:
780            cur_case.set_checkepr(True)
781            cur_case.cur_check_cmd.__init__()
782        else:
783            cur_case.set_checkepr(False)
784        aw_level = "aw"
785    elif len(cur_case.test_method.func_ret) == 1:
786        aw_level = "aw1"
787    else:
788        aw_level = "aw2"
789    aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs)
790    log.info("<div class='{}'>{}</div>".format(aw_level, aw_info))
791
792
793def _get_is_raise_exception(kwargs):
794    if "EXCEPTION" not in kwargs:
795        is_raise_exception = True
796    else:
797        is_raise_exception = kwargs.pop("EXCEPTION")
798    return is_raise_exception, kwargs
799
800
801def _get_msg_args(kwargs):
802    msg_args = None
803    if "failMsg" in kwargs:
804        msg_args = kwargs.pop("failMsg")
805        msg_args = '' if msg_args is None \
806            else ErrorMessage.Error_01500.Topic.format(msg_args)
807
808    return msg_args, kwargs
809
810
811def _get_ignore_fail():
812    return DeccVariable.cur_case().run_section == RunSection.TEARDOWN
813
814
815def _screenshot_and_flash_error_msg(ignore_fail, is_raise_exception, msg_args,
816                                    func_name, args, error_msg):
817    ScreenAgent.screen_take_picture(args, False, func_name, is_raise_exception=is_raise_exception)
818    if not ignore_fail:
819        # 非teardown阶段
820        if is_raise_exception:
821            DeccVariable.cur_case().test_method.func_ret.clear()
822            _flash_error_msg(msg_args, error_msg)
823            raise DeviceTestError(error_msg)
824        else:
825            # 忽略异常
826            log.info("Ignore current exception because parameter EXCEPTION is False.")
827    else:
828        # teardown阶段
829        _flash_error_msg(msg_args, error_msg)
830
831
832def _is_in_top_aw():
833    starts_num = DeccVariable.cur_case().test_method.func_ret.count("Starts")
834    ends_num = DeccVariable.cur_case().test_method.func_ret.count("Ends")
835    log.debug("Starts: {}, Ends: {}".format(starts_num, ends_num))
836    return True if starts_num == ends_num else False
837
838
839def _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time,
840                              ignore_fail, is_raise_exception, msg_args):
841    aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs)
842    result = False if isinstance(_ret, bool) and not _ret else True
843    cur_case = DeccVariable.cur_case()
844    if _is_in_top_aw():
845        cost = 0 if cost_time is None else round(cost_time / 1000, 3)
846        log.info("<div class='aw'>{} return: {}, cost: {}s</div>".format(aw_info, _ret, cost))
847        cur_case.test_method.func_ret.clear()
848        ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception)
849
850    if not cur_case.checkepr and not result:
851        if is_raise_exception and not ignore_fail:
852            _flash_error_msg(msg_args, ErrorMessage.Error_01200.Topic)
853            if msg_args:
854                raise DeviceTestError(msg_args)
855            raise TestFailure("{}: Step {} result TestError!".format(
856                ErrorMessage.Error_01200.Topic, aw_info))
857
858
859def _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail,
860                               is_raise_exception, msg_args):
861    cur_case = DeccVariable.cur_case()
862    if _is_in_top_aw():
863        cur_case.test_method.func_ret.clear()
864        result = False if isinstance(_ret, bool) and not _ret else True
865        ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception)
866        if not _ret:
867            if cur_case.cur_check_cmd.get_cur_check_status():
868                msg = cur_case.cur_check_cmd.get_cur_check_msg()
869            else:
870                msg = "Check Result: {} = {}!".format(
871                    _ret, _gen_aw_invoke_info_no_div(func, args, kwargs))
872                log.info("Return: {}".format(_ret))
873
874            if is_raise_exception and not ignore_fail:
875                _flash_error_msg(msg_args, ErrorMessage.Error_01200.Topic)
876                if msg_args:
877                    raise DeviceTestError(msg_args)
878                raise TestFailure("{}: Step {} result TestError!".format(
879                    ErrorMessage.Error_01200.Topic,
880                    _gen_aw_invoke_info_no_div(func, args, kwargs)))
881            else:
882                log.info(msg)
883            time.sleep(0.01)  # 避免日志顺序混乱
884
885        else:
886            log.info("Return: {}".format(_ret))
887
888
889def _check_exception(exception, in_method=False):
890    # 测试设备断连找不见, 直接抛出异常
891    find_result = re.search(r'device \w* not found|offline', str(exception))
892    if find_result is not None:
893        log.error(ErrorMessage.Error_01217.Message.en,
894                  error_no=ErrorMessage.Error_01217.Code)
895        if in_method:
896            return True
897        raise DeviceNotFound(ErrorMessage.Error_01217.Topic)
898    return False
899
900
901def checkepr(func: T) -> T:
902    @wraps(func)
903    def wrapper(*args, **kwargs):
904        # set default case obj
905        if DeccVariable.cur_case() is None:
906            cur_case = CurCase(log)
907            DeccVariable.set_cur_case_obj(cur_case)
908        DeccVariable.project.record.is_shutdown()
909        _res = run_checkepr(func, *args, **kwargs)
910        return _res
911
912    return wrapper
913
914
915def keyword(func: T) -> T:
916    @wraps(func)
917    def wrapper(*args, **kwargs):
918        # set default case obj
919        if DeccVariable.cur_case() is None:
920            cur_case = CurCase(log)
921            DeccVariable.set_cur_case_obj(cur_case)
922        DeccVariable.project.record.is_shutdown()
923        run_k = run_keyword(func, *args, **kwargs)
924        return run_k
925
926    return wrapper
927
928
929def run_keyword(func, *args, **kwargs):
930    _log_info_aw_information(func, args, kwargs)
931    DeccVariable.cur_case().test_method.func_ret.append("Starts")
932    is_raise_exception, kwargs = _get_is_raise_exception(kwargs)
933    msg_args, kwargs = _get_msg_args(kwargs)
934    ignore_fail = _get_ignore_fail()
935    is_exception = True
936    _ret = None
937    cost_time = 0
938    func_name = func.__name__
939    try:
940        TS.start()
941        _ret = func(*args, **kwargs)
942        log.debug("func {} ret: {}".format(func_name, _ret))
943        cost_time = TS.stop()
944        is_exception = False
945
946        if is_raise_exception and (not ignore_fail) and func_name == 'get_info_from_decc_svr':
947            if isinstance(_ret, dict):
948                if _ret['code'] != 200 and (_ret['success'] == 'false'
949                                            or _ret['success'] is False):
950                    raise TestAssertionError('result error.')
951
952    except (DeviceNotFound, DeviceTestError) as e:
953        raise e
954    except TestAssertionError as exception:
955        # 断言的自定义异常优先于aw自定义的failMsg
956        _screenshot_and_flash_error_msg(
957            ignore_fail, is_raise_exception, str(exception), func_name, args, '')
958
959    except TypeError:
960        log.error(ErrorMessage.Error_01209.Message.en,
961                  error_no=ErrorMessage.Error_01209.Code,
962                  is_traceback=True)
963        _screenshot_and_flash_error_msg(
964            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01209.Topic)
965
966    except HdcCommandRejectedException as exception:
967        _check_exception(exception)
968        log.error(ErrorMessage.Error_01211.Message.en,
969                  error_no=ErrorMessage.Error_01211.Code,
970                  is_traceback=True)
971        _screenshot_and_flash_error_msg(
972            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01211.Topic)
973
974    except ShellCommandUnresponsiveException as exception:
975        _check_exception(exception)
976        log.error(ErrorMessage.Error_01212.Message.en,
977                  error_no=ErrorMessage.Error_01212.Code,
978                  is_traceback=True)
979        _screenshot_and_flash_error_msg(
980            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01212.Topic)
981
982    except AppInstallError as exception:
983        _check_exception(exception)
984        log.error(ErrorMessage.Error_01213.Message.en,
985                  error_no=ErrorMessage.Error_01213.Code,
986                  is_traceback=True)
987        _screenshot_and_flash_error_msg(
988            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01213.Topic)
989
990    except RpcNotRunningError as exception:
991        _check_exception(exception)
992        log.error(ErrorMessage.Error_01440.Message.en,
993                  error_no=ErrorMessage.Error_01440.Code,
994                  is_traceback=True)
995        _screenshot_and_flash_error_msg(
996            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01440.Topic)
997
998    except ConnectionRefusedError as error:
999        # 设备掉线connector_clinet连接拒绝
1000        log.error(ErrorMessage.Error_01217.Message.en,
1001                  error_no=ErrorMessage.Error_01217.Code)
1002        raise DeviceNotFound(ErrorMessage.Error_01217.Topic) from error
1003
1004    except Exception as exception:
1005        _check_exception(exception)
1006        log.error(ErrorMessage.Error_01210.Message.en,
1007                  error_no=ErrorMessage.Error_01210.Code,
1008                  is_traceback=True)
1009        _screenshot_and_flash_error_msg(
1010            ignore_fail, is_raise_exception, msg_args, func_name, args,
1011            "{}: {}".format(ErrorMessage.Error_01210.Topic, exception))
1012    finally:
1013        DeccVariable.cur_case().test_method.func_ret.append("Ends")
1014    if is_exception:
1015        if _is_in_top_aw():
1016            DeccVariable.cur_case().test_method.func_ret.clear()
1017        return False
1018    _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time,
1019                              ignore_fail, is_raise_exception, msg_args)
1020    return _ret
1021
1022
1023def run_checkepr(func, *args, **kwargs):
1024    _log_info_aw_information(func, args, kwargs, is_checkepr=True)
1025    DeccVariable.cur_case().test_method.func_ret.append("Starts")
1026    is_raise_exception, kwargs = _get_is_raise_exception(kwargs)
1027    msg_args, kwargs = _get_msg_args(kwargs)
1028    ignore_fail = _get_ignore_fail()
1029    is_exception = True
1030    _ret = None
1031    func_name = func.__name__
1032    try:
1033        TS.start()
1034        # 执行当前函数
1035        _ret = func(*args, **kwargs)
1036        log.debug("step {} execute result: {}".format(func_name, _ret))
1037        TS.stop()
1038        is_exception = False
1039
1040    except (DeviceNotFound, DeviceTestError) as e:
1041        raise e
1042    except TestAssertionError as exception:
1043        _screenshot_and_flash_error_msg(
1044            ignore_fail, is_raise_exception, str(exception), func_name, args, '')
1045
1046    except TypeError:
1047        log.error(ErrorMessage.Error_01209.Message.en,
1048                  error_no=ErrorMessage.Error_01209.Code,
1049                  is_traceback=True)
1050        _screenshot_and_flash_error_msg(
1051            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01209.Topic)
1052
1053    except HdcCommandRejectedException as exception:
1054        _check_exception(exception)
1055        log.error(ErrorMessage.Error_01211.Message.en,
1056                  error_no=ErrorMessage.Error_01211.Code,
1057                  is_traceback=True)
1058        _screenshot_and_flash_error_msg(
1059            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01211.Topic)
1060
1061    except ShellCommandUnresponsiveException as exception:
1062        _check_exception(exception)
1063        log.error(ErrorMessage.Error_01212.Message.en,
1064                  error_no=ErrorMessage.Error_01212.Code,
1065                  is_traceback=True)
1066        _screenshot_and_flash_error_msg(
1067            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01212.Topic)
1068
1069    except AppInstallError as exception:
1070        _check_exception(exception)
1071        log.error(ErrorMessage.Error_01213.Message.en,
1072                  error_no=ErrorMessage.Error_01213.Code,
1073                  is_traceback=True)
1074        _screenshot_and_flash_error_msg(
1075            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01213.Topic)
1076
1077    except RpcNotRunningError as exception:
1078        _check_exception(exception)
1079        log.error(ErrorMessage.Error_01440.Message.en,
1080                  error_no=ErrorMessage.Error_01440.Code,
1081                  is_traceback=True)
1082        _screenshot_and_flash_error_msg(
1083            ignore_fail, is_raise_exception, msg_args, func_name, args, ErrorMessage.Error_01440.Topic)
1084
1085    except ConnectionRefusedError as error:
1086        # 设备掉线connector_clinet连接拒绝
1087        log.error(ErrorMessage.Error_01217.Message.en,
1088                  error_no=ErrorMessage.Error_01217.Code)
1089        raise DeviceNotFound(ErrorMessage.Error_01217.Topic) from error
1090
1091    except Exception as exception:
1092        _check_exception(exception)
1093        log.error(ErrorMessage.Error_01210.Message.en,
1094                  error_no=ErrorMessage.Error_01210.Code,
1095                  is_traceback=True)
1096        _screenshot_and_flash_error_msg(
1097            ignore_fail, is_raise_exception, msg_args, func_name, args,
1098            "{}: {}".format(ErrorMessage.Error_01210.Topic, exception))
1099    finally:
1100        DeccVariable.cur_case().test_method.func_ret.append("Ends")
1101    if is_exception:
1102        if _is_in_top_aw():
1103            DeccVariable.cur_case().test_method.func_ret.clear()
1104        return False
1105
1106    _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail,
1107                               is_raise_exception, msg_args)
1108    return _ret
1109
1110
1111def _flash_error_msg(msg_args, error_msg):
1112    log.info("flash error msg.")
1113    # 优先使用断言的自定义异常,然后再是failMsg,最后是捕获的异常
1114    if msg_args:
1115        if not DeccVariable.cur_case().test_method.error_msg or \
1116                not DeccVariable.cur_case().test_method.step_flash_fail_msg:
1117            DeccVariable.cur_case().test_method.set_error_msg(msg_args)
1118            DeccVariable.cur_case().test_method.step_flash_fail_msg = True
1119            if not DeccVariable.cur_case().is_upload_method_result:
1120                DeccVariable.cur_case().set_error_msg(msg_args)
1121    else:
1122        if not DeccVariable.cur_case().test_method.error_msg:
1123            # 更新当前步骤error_msg
1124            DeccVariable.cur_case().test_method.set_error_msg(error_msg)
1125            if not DeccVariable.cur_case().is_upload_method_result \
1126                    and DeccVariable.cur_case().error_msg:
1127                DeccVariable.cur_case().set_error_msg(msg_args)
1128
1129    DeccVariable.cur_case().test_method.set_result(RunResult.FAILED)
1130    if DeccVariable.cur_case().case_result == RunResult.PASSED:
1131        DeccVariable.cur_case().set_case_result(RunResult.FAILED)
1132
1133
1134def _gen_aw_invoke_info_no_div(func, args, kwargs):
1135    all_args = []
1136    name_id = None
1137    if args and getattr(args[0], "__module__", None):
1138        try:
1139            _ad = args[0]
1140            id_strings = []
1141            dev_id = getattr(_ad, "device_id", "")
1142            if dev_id:
1143                id_strings.append(dev_id)
1144            dev_sn = getattr(_ad, "device_sn", "")
1145            if dev_sn:
1146                id_strings.append(convert_serial(dev_sn))
1147            name_id = ".".join(id_strings).replace(" ", ".")
1148        except Exception as exception:
1149            log.error(exception)
1150        args = args[1:]
1151    if name_id is not None:
1152        all_args.append(name_id)
1153    if args:
1154        for arg in args:
1155            all_args.append(str(arg))
1156    if kwargs:
1157        for key, value in kwargs.items():
1158            all_args.append("{}={}".format(key, value))
1159    info_items = [
1160        func.__module__.split(".")[-1:][0], ".", func.__name__,
1161        "(", ", ".join(all_args), ")"
1162    ]
1163    return "".join(info_items)
1164
1165
1166def _get_fail_line_from_exception(trace_info, line_keyword):
1167    match, lines = -1, trace_info.split("\n")
1168    for index, line in enumerate(lines):
1169        if line_keyword not in line:
1170            continue
1171        match = index
1172    if match == -1:
1173        return trace_info
1174    return lines[match].strip() + "\n" + lines[match + 1].strip()
1175
1176
1177def GET_TRACEBACK(_trac=""):
1178    if _trac == "AW":
1179        return "".join(traceback.format_exception(*sys.exc_info())), \
1180            traceback.format_exception(*sys.exc_info())[-1].strip()
1181    return "".join(traceback.format_exception(*sys.exc_info()))
1182
1183
1184def ASSERT(expect, actual):
1185    if expect != actual:
1186        raise TestFailure("{}: ASSERT TestError, Expect: {}, Actual: {}".format(ErrorMessage.Error_01200.Topic,
1187                                                                                expect, actual))
1188
1189
1190def CHECK(message, expect, actual):
1191    if DeccVariable.cur_case() is None:
1192        cur_case = CurCase(log)
1193        DeccVariable.set_cur_case_obj(cur_case)
1194        return
1195    MESSAGE(message)
1196    EXPECT(expect)
1197    ACTUAL(actual)
1198
1199
1200def MESSAGE(arg):
1201    if DeccVariable.cur_case() is None:
1202        cur_case = CurCase(log)
1203        DeccVariable.set_cur_case_obj(cur_case)
1204        return
1205    DeccVariable.cur_case().cur_check_cmd.through = get_decode(arg)
1206    log.debug("Description: {}".format(
1207        DeccVariable.cur_case().cur_check_cmd.through))
1208
1209
1210def EXPECT(arg):
1211    if DeccVariable.cur_case() is None:
1212        cur_case = CurCase(log)
1213        DeccVariable.set_cur_case_obj(cur_case)
1214        return
1215    DeccVariable.cur_case().cur_check_cmd.expect = get_decode(arg)
1216    log.debug("Expected: {}".format(
1217        DeccVariable.cur_case().cur_check_cmd.expect))
1218
1219
1220def ACTUAL(arg):
1221    if DeccVariable.cur_case() is None:
1222        cur_case = CurCase(log)
1223        DeccVariable.set_cur_case_obj(cur_case)
1224        return
1225    DeccVariable.cur_case().cur_check_cmd.actual = get_decode(arg)
1226    log.debug("Actual: {}".format(
1227        DeccVariable.cur_case().cur_check_cmd.actual))
1228
1229
1230def Step(name, **kwargs):
1231    """记录用例操作步骤,并展示在用例报告里
1232    Args:
1233        name: str, step name
1234    Example:
1235        Step("11")
1236        Step("11", video="a video address")
1237    """
1238    cur_case = DeccVariable.cur_case()
1239    if cur_case is None:
1240        log.warning("current case object is none, recording step failed")
1241        return -1
1242    return cur_case.set_step_info(name, **kwargs)
1243
1244
1245def UpdateStep(index, **kwargs):
1246    """更新步骤记录信息
1247    Args:
1248        index: int, step index
1249    Example:
1250        index = Step("11")
1251        UpdateStep(index, video="a video address")
1252    """
1253    cur_case = DeccVariable.cur_case()
1254    if cur_case is None:
1255        log.warning("current case object is none, updating step failed")
1256        return
1257    cur_case.update_step_info(index, **kwargs)
1258
1259
1260def CheckPoint(checkpoint):
1261    Step(checkpoint)
1262
1263
1264def CONFIG():
1265    return DeccVariable.project.config_json
1266
1267
1268def get_report_dir(self=None):
1269    """
1270    get Path to the framework execution case log folder
1271    Returns: log_dir_path
1272    """
1273    if isinstance(self, TestCase):
1274        return self.project.task_report_dir
1275    return DeccVariable.project.task_report_dir
1276
1277
1278class Property:
1279
1280    def __init__(self):
1281        pass
1282
1283    def add_attributes(self, key, value):
1284        setattr(self, key, value)
1285        log.debug("Property setattr {}={}".format(key, value))
1286