• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import sys
21import time
22import traceback
23import warnings
24from functools import wraps
25from typing import Union
26
27from xdevice import convert_serial
28from xdevice import get_decode
29from xdevice import get_cst_time
30from xdevice import ConfigConst
31from xdevice import IDevice
32from xdevice import EnvPool
33from xdevice import is_env_pool_run_mode
34from xdevice import AgentMode
35from xdevice import Variables
36
37from devicetest.core.exception import BaseTestError
38from devicetest.core.exception import HdcCommandRejectedException
39from devicetest.core.exception import ShellCommandUnresponsiveException
40from devicetest.core.exception import DeviceNotFound
41from devicetest.core.exception import AppInstallError
42from devicetest.core.exception import RpcNotRunningError
43from devicetest.core.exception import TestFailure
44from devicetest.core.exception import TestError
45from devicetest.core.exception import TestSkip
46from devicetest.core.exception import TestTerminated
47from devicetest.core.exception import TestAbortManual
48from devicetest.core.exception import DeviceTestError
49from devicetest.core.exception import TestAssertionError
50from devicetest.core.constants import RunResult
51from devicetest.core.constants import RunSection
52from devicetest.core.constants import RunStatus
53from devicetest.core.variables import DeccVariable
54from devicetest.core.variables import CurCase
55from devicetest.error import ErrorMessage
56from devicetest.utils.time_util import TS
57from devicetest.utils.type_utils import T
58from devicetest.log.logger import DeviceTestLog as log
59from devicetest.controllers.tools.screen_agent import ScreenAgent
60
61RESULT_LINE_TEMPLATE = "[Test Step] %s %s"
62
63
64def validate_test_name(name):
65    """Checks if a test name is not null.
66    Args:
67        name: name of a test case.
68    Raises:
69        BaseTestError is raised if the name is null.
70    """
71    if name == "" or name is None or len(name) < 1:
72        raise BaseTestError(ErrorMessage.TestCase.Code_0203004.format(name))
73
74
75class DeviceRoot:
76    is_root_device = False
77
78    def __init__(self):
79        pass
80
81    @staticmethod
82    def set_device_root(is_root):
83        DeviceRoot.is_root_device = is_root
84
85    @staticmethod
86    def get_device_root():
87        return DeviceRoot.is_root_device
88
89
90class CoreCase:
91    """Base class for all test classes to inherit from.
92    This class gets all the controller objects from test_runner and executes
93    the test cases requested within itself.
94    """
95
96    def __init__(self, tag, configs):
97        self.cur_case = None
98        self.devices = []
99        self.configs = configs
100        self.result = RunResult.PASSED
101        self.last_result = RunResult.PASSED
102        self.test_method_result = RunResult.PASSED
103        self.section = RunSection.SETUP
104        self.error_msg = ''
105        self.trace_info = ''
106        self.start_time = get_cst_time()
107        self.log = self.configs["log"]
108        self.set_project(self.configs["project"])
109        self.con_fail_times = 0
110        self.fail_times = 0
111        self.step_flash_fail_msg = False
112        self.pass_through = Variables.config.pass_through
113        # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果
114        self._is_loop_scenario = False
115        # proxy function
116        self.execption_callback = None
117        # case end function
118        self.case_end_callback = None
119
120    def __enter__(self):
121        return self
122
123    def __exit__(self, *args):
124        self._exec_func(self.clean_up)
125
126    def _print_error(self, error: str, result: RunResult = RunResult.FAILED,
127                     refresh_method_result: bool = False, screenshot: bool = True):
128        """打印异常信息
129        error : str, error message
130        result: RunResult, RunResult class variables
131        refresh_method_result: bool, refresh result flag
132        screenshot: bool, 运行keyword/checkepr装饰的接口异常,默认是有失败截图,可将其设为False,可避免重复截图
133        """
134        self.log.error(error)
135        self.error_msg = str(error)
136        self.result = result
137        if refresh_method_result:
138            self.test_method_result = result
139        if screenshot:
140            # 在非keyword装饰的接口执行失败的时候,对每个设备进行截图
141            for device in self.devices:
142                ScreenAgent.screen_take_picture([device], False, "shot_on_fail")
143
144        trace_info = traceback.format_exc()
145        index = self.cur_case.step_index
146        if index == -1:
147            self.log.error(self.error_msg)
148            self.log.error(trace_info)
149            return
150        step_error_id = f'step_error_{index}'
151        self.log.error(f'<span id="{step_error_id}">{self.error_msg}</span>')
152        self.log.error(trace_info)
153        _error = f'<a href="javascript:" onclick="gotoStep(\'{step_error_id}\')">{self.error_msg}</a>'
154        UpdateStep(index, error=_error)
155
156    def setup(self):
157        """Setup function that will be called before executing any test case
158        in the test class. Implementation is optional.
159        """
160        return True
161
162    def process(self):
163        """process function that will be called before setup function and after
164        teardown function in the test class. Implementation is optional.
165        """
166        pass
167
168    def teardown(self):
169        """Teardown function that will be called after all the selected test
170        cases in the test class have been executed.
171        Implementation is optional.
172        """
173        pass
174
175    def exec_one_testcase(self, test_name, test_func):
176        """Executes one test case and update test results.
177        Args:
178            test_name: Name of the test.
179            test_func: The test function.
180        Returns:
181            True if the test passes, False otherwise.
182        """
183        if not self._exec_func(self.setup_test):
184            self.log.error("Setup for {} failed, skipping.".format(test_name))
185        _result = None
186        error = None
187        try:
188            verdict = test_func()
189        except TestSkip:
190            # Test skipped.
191            _result = True
192            self.log.debug("TestSkip")
193        except (AppInstallError, BaseTestError, DeviceNotFound, DeviceTestError,
194                TestAssertionError, TestError, TestFailure, TestTerminated, TestAbortManual) as exception:
195            # 上述异常,已设置错误码,无需再次设置
196            error = str(exception)
197            self._print_error(error, refresh_method_result=True, screenshot=False)
198        except HdcCommandRejectedException as exception:
199            error = ErrorMessage.Device.Code_0202302.format(exception)
200            self._print_error(error, refresh_method_result=True)
201        except ShellCommandUnresponsiveException as exception:
202            error = ErrorMessage.Device.Code_0202304.format(exception)
203            self._print_error(error, refresh_method_result=True)
204        except RpcNotRunningError as exception:
205            error = ErrorMessage.Device.Code_0202305.format(exception)
206            self._print_error(error, refresh_method_result=True)
207        except ConnectionRefusedError as exception:
208            error = ErrorMessage.Device.Code_0202306.format(exception)
209            self._print_error(error, refresh_method_result=True)
210        except ImportError as exception:
211            error = ErrorMessage.TestCase.Code_0203006.format(exception)
212            self._print_error(error, refresh_method_result=True, screenshot=False)
213        except Exception as exception:
214            error = ErrorMessage.TestCase.Code_0203002.format(exception)
215            self._print_error(error, refresh_method_result=True)
216        else:
217            # loop执行场景,由其方法内设置测试结果
218            if self._is_loop_scenario:
219                _result = True
220                return
221            if (verdict is None or verdict is True) \
222                    and self.test_method_result == RunResult.PASSED:
223                # Test passed.
224                self.print_case_result(test_name, RunResult.PASSED)
225                _result = True
226            # Test failed because it didn't return True.
227            # This should be removed eventually.
228            else:
229                error_msg = "test func '{}' The actual input value of " \
230                            "the checkpoint is inconsistent with the " \
231                            "expected result.".format(test_func.__name__)
232                self.log.error(error_msg)
233                self.test_method_result = self.result = RunResult.FAILED
234                self.error_msg = error_msg
235                _result = False
236
237        finally:
238            if self.execption_callback is not None and error is not None:
239                self.execption_callback(error)
240            self._exec_func(self.teardown_test)
241
242    def _exec_func(self, func, *args):
243        """Executes a function with exception safeguard.
244        Args:
245            func: Function to be executed.
246            args: Arguments to be passed to the function.
247        Returns:
248            Whatever the function returns, or False if unhandled exception
249            occured.
250        """
251        ret = False
252        error = None
253        try:
254            func(*args)
255            ret = True
256        except (AppInstallError, DeviceNotFound, DeviceTestError,
257                TestAssertionError, TestError, TestAbortManual, TestTerminated) as exception:
258            # 上述异常,已设置错误码,无需再次设置
259            error = str(exception)
260            self._print_error(error, screenshot=False)
261        except HdcCommandRejectedException as exception:
262            error = ErrorMessage.Device.Code_0202302.format(exception)
263            self._print_error(error)
264        except ShellCommandUnresponsiveException as exception:
265            error = ErrorMessage.Device.Code_0202304.format(exception)
266            self._print_error(error)
267        except RpcNotRunningError as exception:
268            error = ErrorMessage.Device.Code_0202305.format(exception)
269            self._print_error(error)
270        except ConnectionRefusedError as exception:
271            error = ErrorMessage.Device.Code_0202306.format(exception)
272            self._print_error(error)
273        except Exception as exception:
274            error = ErrorMessage.TestCase.Code_0203002.format(exception)
275            self._print_error(error)
276        finally:
277            if self.execption_callback is not None and error is not None:
278                self.execption_callback(error)
279        return ret
280
281    def _get_test_funcs(self):
282        # All tests are selected if test_cases list is None.
283        # Load functions based on test names. Also find the longest test name.
284        test_funcs = []
285        for test_name in self.tests:
286            try:
287                validate_test_name(test_name)
288                test_funcs.append((test_name, getattr(self, test_name)))
289            except AttributeError:
290                self.result = RunResult.FAILED
291                self.error_msg = "{} does not have test step {}.".format(
292                    self.TAG, test_name)
293                self.log.error(self.error_msg)
294
295            except BaseTestError as exception:
296                self.result = RunResult.FAILED
297                self.error_msg = self.generate_fail_msg("{}:{}".format(str(exception), traceback.format_exc()))
298                self.log.error(str(exception))
299
300        return test_funcs
301
302    def run(self, test_names=None):
303        """Runs test cases within a test class by the order they
304        appear in the test list.
305        Being in the test_names list makes the test case "requested". If its
306        name passes validation, then it'll be executed, otherwise the name will
307        be skipped.
308        Args:
309            test_names: A list of names of the requested test cases. If None,
310                all test cases in the class are considered requested.
311        Returns:
312            A tuple of: The number of requested test cases, the number of test
313            cases executed, and the number of test cases passed.
314        """
315        if RunStatus.FINISHED == self.run_setup():
316            return
317        self.run_tests(test_names)
318        run_result = self.result
319        self.run_teardown()
320        # 避免teardown报错导致结果失败1/./
321        self.result = run_result
322        if self.case_end_callback is not None:
323            self.case_end_callback()
324
325    def run_setup(self):
326        self.section = RunSection.SETUP
327        self.cur_case.set_run_section(self.section)
328        self.run_setup_start()
329        self.log.info("**********SetUp Starts!")
330        ret = self._exec_func(self.setup)
331        if not ret:
332            self.log.info("**********SetUp Ends!")
333            self.log.error("setup step fails")
334            self.section = RunSection.TEARDOWN
335            self.cur_case.set_run_section(self.section)
336            self.log.info("**********TearDown Starts!")
337            self._exec_func(self.teardown)
338            if self.result != RunResult.BLOCKED:
339                self.result = RunResult.FAILED
340            self.log.info('**********TearDown Ends!')
341
342            self.print_case_result(self.TAG, self.result)
343
344            return RunStatus.FINISHED
345        else:
346            self.log.info("**********SetUp Ends!")
347            self.run_setup_end()
348
349        return RunStatus.RUNNING
350
351    def run_tests(self, test_names):
352        self.section = RunSection.TEST
353        self.cur_case.set_run_section(self.section)
354        if hasattr(self, "tests") and isinstance(getattr(self, "tests", None), list):
355            tests = self._get_test_funcs()
356            for test_name, test_func in tests:
357                self.run_test(test_name, test_func)
358        else:
359            self.run_process()
360        self.log.info("**********All test methods Ends!**********")
361
362    def run_process(self):
363        self._exec_func(self.process)
364
365    def run_test(self, test_name, test_func):
366        self.log.info("[Test Step] {}".format(test_name))
367        self.test_method_result = RunResult.PASSED
368        self.exec_one_testcase(test_name, test_func)
369        self.test_method_end(test_name)
370
371    def run_teardown(self):
372        self.section = RunSection.TEARDOWN
373        self.cur_case.set_run_section(self.section)
374        self.run_teardown_start()
375        self.log.info("**********TearDown Starts!")
376        self._exec_func(self.teardown)
377        self.log.info("**********TearDown Ends!")
378        self.run_teardown_end()
379        self._exec_func(self.clean_up)
380
381    def run_perf_models(self, models, fail_break=False):
382        """
383        models: list, list of model object
384        fail_break: bool, if this is set to True, break down the loop of model execution when it fails
385        """
386        self._is_loop_scenario = True
387        fail_models, pass_models, total = [], [], len(models)
388        for model in models:
389            model_name = model.__class__.__name__
390            # 预置model的测试结果pass,避免执行循环时测试结果受到干扰
391            self.test_method_result = RunResult.PASSED
392            self.log.info("Executing test model {}".format(model_name))
393            # 执行jump_to_start_anchor成功后,再执行execute
394            self.exec_one_testcase("{}.jump_to_start_anchor".format(model_name), model.jump_to_start_anchor)
395            if self.test_method_result == RunResult.PASSED:
396                self.exec_one_testcase("{}.execute".format(model_name), model.execute)
397            if self.test_method_result == RunResult.PASSED:
398                pass_models.append(model_name)
399                continue
400            fail_models.append(model_name)
401            if fail_break:
402                break
403        fail_cnt, pass_cnt = len(fail_models), len(pass_models)
404        self.log.info("Test models executed result with "
405                      "{} fail({}), {} pass({})".format(fail_cnt, fail_models, pass_cnt, pass_models))
406        # 所有model执行通过,用例pass
407        if pass_cnt == total:
408            self.error_msg = ""
409            self.result = RunResult.PASSED
410            return
411        self.result = RunResult.FAILED
412
413    def test_method_end(self, test_name):
414        """
415        case test method end event
416        """
417        self.log.info("TestMethod: {} result is {}".format(test_name, self.test_method_result))
418        self.log.info("TestMethod: {} End".format(test_name))
419
420    def clean_up(self):
421        """A function that is executed upon completion of all tests cases
422        selected in the test class.
423        This function should clean up objects initialized in the constructor
424        by user.
425        """
426        pass
427
428    def run_setup_start(self):
429        """A function that is before all tests cases and before setup cases
430        selected in the test class.
431        This function can be customized to create different base classes or use cases.
432        """
433        self.setup_start()
434
435    def setup_start(self):
436        """A function that is before all tests cases
437        selected in the test class.
438        This function can be used to execute before running.
439        """
440        pass
441
442    def run_setup_end(self):
443        """A function that is before all tests cases and after setup cases
444        selected in the test class.
445        This function can be customized to create different base classes or use cases.
446        """
447        self.setup_end()
448
449    def setup_end(self):
450        """A function that is after setup cases
451        selected in the test class.
452        This function can be used to execute after setup.
453        """
454        pass
455
456    @classmethod
457    def setup_test(cls):
458        """Setup function that will be called every time before executing each
459        test case in the test class.
460
461        Implementation is optional.
462        """
463        return True
464
465    def teardown_test(self):
466        """Teardown function that will be called every time a test case has
467        been executed.
468        Implementation is optional.
469        """
470        pass
471
472    def run_teardown_start(self):
473        """A function that is after all tests cases and before teardown cases
474        selected in the test class.
475        This function can be customized to create different base classes or use cases.
476        """
477        self.teardown_start()
478
479    def teardown_start(self):
480        """A function that is before teardown cases
481        selected in the test class.
482        This function can be used to execute before running.
483        """
484        pass
485
486    def run_teardown_end(self):
487        """A function that is after all tests cases and teardown cases
488        selected in the test class.
489        This function can be customized to create different base classes or use cases.
490        """
491        self.teardown_end()
492
493    def teardown_end(self):
494        """A function that is after teardown cases
495        selected in the test class.
496        This function can be used to execute before running.
497        """
498        pass
499
500    def print_case_result(self, case, result):
501        self.log.info("****************************Test {} result is: {}"
502                      .format(case, result))
503
504    def generate_fail_msg(self, msg):
505        if isinstance(self.error_msg, str) and self.error_msg != "":
506            pass
507        else:
508            self.error_msg = msg
509        return self.error_msg
510
511    def set_log(self, _log):
512        self.log = _log
513
514    def set_project(self, project):
515        self.project = project
516        DeccVariable.set_project_obj(self.project)
517        self.cur_case = DeccVariable.cur_case()
518        self._init_case_var(self.TAG)
519        self.log.info("init case variables success.")
520
521    def _init_case_var(self, tag):
522        if tag:
523            self.cur_case.set_name(tag)
524        else:
525            self.cur_case.set_name(self.project.execute_case_name)
526
527        if not hasattr(self, "tests"):
528            setattr(self, "tests", ["process"])
529        request = self.configs.get("request", None)
530        repeat, repeat_round = 1, 1
531        if request is not None:
532            repeat, repeat_round = request.config.repeat, request.get_repeat_round()
533        self.cur_case.set_step_total(1)
534        self.cur_case.set_case_screenshot_dir(
535            self.project.test_suite_path,
536            self.project.task_report_dir,
537            self.project.cur_case_full_path,
538            repeat=repeat, repeat_round=repeat_round)
539
540    @classmethod
541    def step(cls, _ad, stepepr):
542        result = stepepr
543        if result is not None and isinstance(result, bool) and not result:
544            _ad.log.error(_ad.device_id + " exec step is fail")
545        elif _ad.screenshot:
546            pass
547        return result
548
549
550class BaseCase:
551    """Base class for simple test classes to inherit from.
552    This class gets all the controller objects from test_runner and executes
553    the test cases requested within itself.
554    """
555
556    def __init__(self, tag, controllers: Union[IDevice, dict]):
557        self.log = None
558        if isinstance(controllers, dict):
559            if controllers.get("devices", None):
560                if not isinstance(controllers["devices"], list):
561                    devices = list()
562                    devices.append(controllers["devices"])
563                    self.devices = devices
564                else:
565                    self.devices = controllers["devices"]
566        else:
567            devices = list()
568            devices.append(controllers)
569            self.devices = devices
570        self.result = RunResult.PASSED
571        self.error_msg = ''
572        self.trace_info = ''
573        self.start_time = get_cst_time()
574        self.test_method_result = None
575        self._set_device_attr(controllers)
576        self._auto_record_steps = False
577        self.configs = controllers
578        self.configs.setdefault("report_path", EnvPool.report_path)
579        # loop执行场景标记,避免exec_one_testcase覆写loop里设置的结果
580        self._is_loop_scenario = False
581        # proxy function
582        self.execption_callback = None
583        # case end function
584        self.case_end_callback = None
585        self.pass_through = self.configs.get(ConfigConst.testargs, {}).get(ConfigConst.pass_through, {})
586        self.cur_case = DeccVariable.cur_case()
587
588    def __enter__(self):
589        return self
590
591    def __exit__(self, *args):
592        pass
593
594    def _set_device_attr(self, controllers):
595        index = 1
596        for device in self.devices:
597            setattr(self, "Phone{}".format(index), device)
598            setattr(self, "device{}".format(index), device)
599            if controllers.get("screenshot", None):
600                setattr(device, "screenshot", controllers["screenshot"])
601            if controllers.get("screenshot_fail", None):
602                setattr(device, "screenshot_fail", controllers["screenshot_fail"])
603            else:
604                setattr(device, "screenshot_fail", True)
605            # 控制agent的模式, 默认是bin模式
606            mode = controllers.get("agent_mode", AgentMode.bin)
607            device.set_agent_mode(mode)
608
609    def set_auto_record_steps(self, flag):
610        """
611        flag: bool, A switch be used to disable record steps automatically
612        """
613        self._auto_record_steps = flag
614
615    def get_auto_record_steps_status(self):
616        """
617        return if record steps automatically
618        """
619        return self._auto_record_steps
620
621    def get_case_result(self):
622        return self.result
623
624
625class TestCase(CoreCase, BaseCase):
626    """Base class for all test classes to inherit from.
627    This class gets all the controller objects from test_runner and executes
628    the test cases requested within itself.
629    """
630
631    def __init__(self, tag, configs):
632        if is_env_pool_run_mode():
633            BaseCase.__init__(self, tag, configs)
634        else:
635            CoreCase.__init__(self, tag, configs)
636            self.devices = []
637            self.device1 = None
638            self.device2 = None
639            self.set_devices(self.configs["devices"])
640            self.testLoop = 0
641
642    def _exec_func(self, func, *args):
643        """Executes a function with exception safeguard.
644        Args:
645            func: Function to be executed.
646            args: Arguments to be passed to the function.
647        Returns:
648            Whatever the function returns, or False if unhandled exception
649            occured.
650        """
651        return CoreCase._exec_func(self, func, *args)
652
653    def loop_start(self):
654        pass
655
656    def loop_end(self, testResult):
657        pass
658
659    def loop(self, test_name, looptimes=0, fail_break=False, fail_times=0, reset_test=None, cat_log_step=None,
660             continues_fail=False):
661
662        self.con_fail_times = 0
663        self.last_result = RunResult.PASSED
664
665        self.fail_times = 0
666        self.testLoop = 0
667        for i in range(0, int(looptimes)):
668            self.log.info("--- Loop in %s time ---" % str(i + 1))
669            self.testLoop += 1
670            if self.result != RunResult.PASSED:
671                if fail_break and not fail_times:
672                    break
673
674            if self.project.record.is_shutdown():
675                self.result = RunResult.FAILED
676                self.error_msg = "Testcase is stopped by manual!"
677                self.log.error(self.error_msg)
678                break
679
680            self.test_method_result = RunResult.PASSED
681            self.error_msg = ""
682            self.loop_start()
683            self.exec_one_testcase(test_name, getattr(self, test_name))
684            self.loop_end(self.result)
685
686            self.log.info("--- Loop in %s-loop%s time result is: %s ---"
687                          % (test_name, str(i + 1), self.test_method_result))
688            self.test_method_end(self.test_method_result)
689
690            if DeccVariable.cur_case().test_method.func_ret:
691                self.log.warning("{} time loop end, the FUNCRET has error, clear FUNCRET again.".format(str(i + 1)))
692                DeccVariable.cur_case().test_method.func_ret.clear()
693            if self.test_method_result != "Passed":
694                self.fail_times += 1
695
696                if continues_fail:
697                    if self.last_result != "Passed":
698                        self.con_fail_times += 1
699                    else:
700                        self.con_fail_times = 1
701
702                if cat_log_step is not None:
703                    self.exec_one_testcase(cat_log_step, getattr(self, cat_log_step))
704                if reset_test is not None:
705                    self.exec_one_testcase(reset_test, getattr(self, reset_test))
706
707            self.last_result = self.test_method_result
708            if continues_fail and fail_break and self.con_fail_times > fail_times:
709                break
710            if not continues_fail and fail_break and self.fail_times > fail_times:
711                break
712
713        if self.test_method_result != "Passed":
714            self.test_method_result = RunResult.PASSED
715
716        if not continues_fail and self.fail_times >= fail_times:
717            self.error_msg += " -- Loop fail %d times" % self.fail_times
718            self.log.error("{} fail {} times".format(test_name, self.fail_times))
719        elif continues_fail and self.con_fail_times >= fail_times:
720            self.error_msg += " -- Loop continues fail %d times" % self.con_fail_times
721            self.log.error("{} continues fail {} times".format(test_name, self.con_fail_times))
722        else:
723            self.result = RunResult.PASSED
724            self.error_msg = ""
725
726    def set_devices(self, devices):
727        self.devices = devices
728        if not devices:
729            return
730
731        try:
732            for num, _ad in enumerate(self.devices, 1):
733                if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"):
734                    setattr(_ad, "device_id", "device{}".format(num))
735                # 兼容release2 增加id、serial
736                setattr(_ad, "id", _ad.device_id)
737                setattr(_ad, "serial", _ad.device_sn)
738                setattr(self, _ad.device_id, _ad)
739                setattr(self, "device{}".format(num), _ad)
740        except Exception as error:
741            err_msg = ErrorMessage.TestCase.Code_0203007
742            log.error(err_msg, is_traceback=True)
743            raise DeviceTestError(err_msg) from error
744
745    def set_property(self, index):
746        if isinstance(self.project.property_config, dict):
747            get_devices = self.project.property_config.get("devices")
748            if isinstance(get_devices, list) and len(get_devices) > index:
749                propertys = get_devices[index]
750                if isinstance(propertys, dict):
751                    self.log.debug("set propertys: {}".format(propertys))
752                    return propertys
753                self.log.debug("get propertys: {}".format(propertys))
754                self.log.warning("propertys not a dict!")
755        return {}
756
757    def get_property(self, property):
758        if hasattr(self, "propertys"):
759            get_value = self.propertys.get(property)
760            log.debug("get property {}:{}".format(property, get_value))
761            return get_value
762        else:
763            log.warning("'Device' object has no attribute 'propertys'")
764            return None
765
766    def get_case_report_path(self):
767        return self.configs.get("report_path")
768
769    def get_case_result(self):
770        return self.result
771
772    def set_case_result_content(self, content: dict):
773        """set result content for testcase(供用例拓展结果内容)
774        content: dict, result content
775        """
776        if not isinstance(content, dict) or not content:
777            return
778        self.log.info(f"setting result content: {content}")
779        setattr(self, "result_content", content)
780
781    def set_auto_record_steps(self, flag):
782        """
783        flag: bool, A switch be used to disable record steps automatically
784        """
785        warnings.warn("function is deprecated", DeprecationWarning)
786        cur_case = DeccVariable.cur_case()
787        if cur_case is None:
788            self.log.warning("current case object is none, can not disable record step automatically")
789            return
790        DeccVariable.cur_case().auto_record_steps_info = flag
791
792
793class WindowsTestCase(CoreCase):
794    """Base class for all windows test classes to inherit from.
795    This class gets all the controller objects from test_runner and executes
796    the test cases requested within itself.
797    """
798
799    def __init__(self, tag, configs):
800        super().__init__(tag, configs)
801
802    def _exec_func(self, func, *args):
803        """Executes a function with exception safeguard.
804        Args:
805            func: Function to be executed.
806            args: Arguments to be passed to the function.
807        Returns:
808            Whatever the function returns, or False if unhandled exception
809            occurred.
810        """
811        return CoreCase._exec_func(self, func, *args)
812
813    def clear_device_callback_method(self):
814        pass
815
816
817def _log_info_aw_information(func, args, kwargs, is_checkepr=False):
818    cur_case = DeccVariable.cur_case()
819    if len(cur_case.test_method.func_ret) == 0:
820        if is_checkepr:
821            cur_case.set_checkepr(True)
822            cur_case.cur_check_cmd.__init__()
823        else:
824            cur_case.set_checkepr(False)
825        aw_level = "aw"
826    elif len(cur_case.test_method.func_ret) == 1:
827        aw_level = "aw1"
828    else:
829        aw_level = "aw2"
830    aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs)
831    log.info("<div class='{}'>{}</div>".format(aw_level, aw_info))
832
833
834def _get_is_raise_exception(kwargs):
835    if "EXCEPTION" not in kwargs:
836        is_raise_exception = True
837    else:
838        is_raise_exception = kwargs.pop("EXCEPTION")
839    return is_raise_exception, kwargs
840
841
842def _get_msg_args(kwargs):
843    msg_args = None
844    if "failMsg" in kwargs:
845        msg_args = kwargs.pop("failMsg")
846        msg_args = '' if msg_args is None else ErrorMessage.TestCase.Code_0203002.format(msg_args)
847
848    return msg_args, kwargs
849
850
851def _get_ignore_fail():
852    # 忽略定义在teardown里的接口的运行报错,使得teardown里的每个接口都能被执行
853    return DeccVariable.cur_case().run_section == RunSection.TEARDOWN
854
855
856def _screenshot_and_flash_error_msg(ignore_fail, is_raise_exception, msg_args,
857                                    func_name, args, error_msg):
858    ScreenAgent.screen_take_picture(args, False, func_name, is_raise_exception=is_raise_exception)
859    if not ignore_fail:
860        # 非teardown阶段
861        if is_raise_exception:
862            _flash_error_msg(msg_args, error_msg)
863            raise DeviceTestError(error_msg)
864        else:
865            # 忽略异常
866            log.info("Ignore current exception because parameter EXCEPTION is False.")
867    else:
868        # teardown阶段
869        _flash_error_msg(msg_args, error_msg)
870
871
872def _is_in_top_aw():
873    starts_num = DeccVariable.cur_case().test_method.func_ret.count("Starts")
874    ends_num = DeccVariable.cur_case().test_method.func_ret.count("Ends")
875    log.debug("Starts: {}, Ends: {}".format(starts_num, ends_num))
876    return True if starts_num == ends_num else False
877
878
879def _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time,
880                              ignore_fail, is_raise_exception, msg_args):
881    aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs)
882    result = False if isinstance(_ret, bool) and not _ret else True
883    cur_case = DeccVariable.cur_case()
884    if _is_in_top_aw():
885        cost = 0 if cost_time is None else round(cost_time / 1000, 3)
886        log.info("<div class='aw'>{} cost: {}s</div>".format(aw_info, cost))
887        cur_case.test_method.func_ret.clear()
888        ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception)
889
890    if not cur_case.checkepr and not result:
891        if is_raise_exception and not ignore_fail:
892            _flash_error_msg(msg_args, "测试用例执行失败")
893            if msg_args:
894                raise DeviceTestError(msg_args)
895            raise TestFailure(ErrorMessage.TestCase.Code_0203003.format(aw_info))
896
897
898def _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail,
899                               is_raise_exception, msg_args):
900    cur_case = DeccVariable.cur_case()
901    if _is_in_top_aw():
902        cur_case.test_method.func_ret.clear()
903        result = False if isinstance(_ret, bool) and not _ret else True
904        ScreenAgent.screen_take_picture(args, result, func.__name__, is_raise_exception=is_raise_exception)
905        if not _ret:
906            aw_info = _gen_aw_invoke_info_no_div(func, args, kwargs)
907            if cur_case.cur_check_cmd.get_cur_check_status():
908                msg = cur_case.cur_check_cmd.get_cur_check_msg()
909            else:
910                msg = "Check Result: {} = {}!".format(_ret, aw_info)
911                log.info("Return: {}".format(_ret))
912
913            if is_raise_exception and not ignore_fail:
914                _flash_error_msg(msg_args, "测试用例执行失败")
915                if msg_args:
916                    raise DeviceTestError(msg_args)
917                raise TestFailure(ErrorMessage.TestCase.Code_0203003.format(aw_info))
918            else:
919                log.info(msg)
920            time.sleep(0.01)  # 避免日志顺序混乱
921
922        else:
923            log.info("Return: {}".format(_ret))
924
925
926def _check_exception(exception, in_method=False):
927    # 测试设备断连找不见, 直接抛出异常
928    find_result = re.search(r'device \w* not found|offline', str(exception))
929    if find_result is not None:
930        err_msg = ErrorMessage.Device.Code_0202306
931        log.error(err_msg)
932        if in_method:
933            return True
934        raise DeviceNotFound(err_msg)
935    return False
936
937
938def checkepr(func: T) -> T:
939    @wraps(func)
940    def wrapper(*args, **kwargs):
941        # set default case obj
942        if DeccVariable.cur_case() is None:
943            cur_case = CurCase(log)
944            DeccVariable.set_cur_case_obj(cur_case)
945        DeccVariable.project.record.is_shutdown()
946        _res = run_checkepr(func, *args, **kwargs)
947        return _res
948
949    return wrapper
950
951
952def keyword(func: T) -> T:
953    @wraps(func)
954    def wrapper(*args, **kwargs):
955        # set default case obj
956        if DeccVariable.cur_case() is None:
957            cur_case = CurCase(log)
958            DeccVariable.set_cur_case_obj(cur_case)
959        DeccVariable.project.record.is_shutdown()
960        run_k = run_keyword(func, *args, **kwargs)
961        return run_k
962
963    return wrapper
964
965
966def run_keyword(func, *args, **kwargs):
967    _log_info_aw_information(func, args, kwargs)
968    DeccVariable.cur_case().test_method.func_ret.append("Starts")
969    is_raise_exception, kwargs = _get_is_raise_exception(kwargs)
970    msg_args, kwargs = _get_msg_args(kwargs)
971    ignore_fail = _get_ignore_fail()
972    is_exception = True
973    _ret = None
974    cost_time = 0
975    func_name = func.__name__
976    try:
977        TS.start()
978        _ret = func(*args, **kwargs)
979        log.debug("func {} ret: {}".format(func_name, _ret))
980        cost_time = TS.stop()
981        is_exception = False
982
983        if is_raise_exception and (not ignore_fail) and func_name == 'get_info_from_decc_svr':
984            if isinstance(_ret, dict):
985                if _ret['code'] != 200 and (_ret['success'] == 'false'
986                                            or _ret['success'] is False):
987                    raise TestAssertionError('result error.')
988    except (DeviceNotFound, DeviceTestError) as e:
989        # 设备异常或DeviceTestError,直接将异常抛出,可解决错误码重复添加的问题
990        raise e
991    except TestAssertionError as exception:
992        # 断言的自定义异常优先于aw自定义的failMsg
993        _screenshot_and_flash_error_msg(
994            ignore_fail, is_raise_exception, str(exception), func_name, args, '')
995    except TypeError as exception:
996        error_msg = ErrorMessage.TestCase.Code_0203008.format(exception)
997        log.error(error_msg, is_traceback=True)
998        _screenshot_and_flash_error_msg(
999            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1000    except HdcCommandRejectedException as exception:
1001        _check_exception(exception)
1002        error_msg = ErrorMessage.Device.Code_0202302.format(exception)
1003        log.error(error_msg, is_traceback=True)
1004        _screenshot_and_flash_error_msg(
1005            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1006    except ShellCommandUnresponsiveException as exception:
1007        _check_exception(exception)
1008        error_msg = ErrorMessage.Device.Code_0202304.format(exception)
1009        log.error(error_msg, is_traceback=True)
1010        _screenshot_and_flash_error_msg(
1011            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1012    except AppInstallError as exception:
1013        _check_exception(exception)
1014        error_msg = ErrorMessage.Device.Code_0202307.format(exception)
1015        log.error(error_msg, is_traceback=True)
1016        _screenshot_and_flash_error_msg(
1017            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1018    except RpcNotRunningError as exception:
1019        _check_exception(exception)
1020        error_msg = ErrorMessage.Device.Code_0202305.format(exception)
1021        log.error(error_msg, is_traceback=True)
1022        _screenshot_and_flash_error_msg(
1023            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1024    except ConnectionRefusedError as exception:
1025        # 设备掉线connector client连接拒绝
1026        error_msg = ErrorMessage.Device.Code_0202306.format(exception)
1027        log.error(error_msg)
1028        raise DeviceNotFound(error_msg) from exception
1029    except Exception as exception:
1030        _check_exception(exception)
1031        error_msg = ErrorMessage.TestCase.Code_0203002.format(exception)
1032        log.error(error_msg, is_traceback=True)
1033        _screenshot_and_flash_error_msg(
1034            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1035    finally:
1036        DeccVariable.cur_case().test_method.func_ret.append("Ends")
1037        # 1.若执行接口出现异常,会进入_screenshot_and_flash_error_msg方法
1038        # 2.当接口不是定义在teardown里,且没有接口参数没有设置EXCEPTION=False,
1039        # 那么_screenshot_and_flash_error_msg方法会重新抛出DeviceTestError异常
1040        # 3.若接口不是定义在teardown里,执行出现异常,且回到了顶层aw,将Starts/Ends计数器清零
1041        if is_exception and not ignore_fail and _is_in_top_aw():
1042            log.debug('----------finally clear Starts/Ends--------')
1043            DeccVariable.cur_case().test_method.func_ret.clear()
1044
1045    if is_exception:
1046        if _is_in_top_aw():
1047            DeccVariable.cur_case().test_method.func_ret.clear()
1048        return False
1049    _check_ret_in_run_keyword(func, args, kwargs, _ret, cost_time,
1050                              ignore_fail, is_raise_exception, msg_args)
1051    return _ret
1052
1053
1054def run_checkepr(func, *args, **kwargs):
1055    _log_info_aw_information(func, args, kwargs, is_checkepr=True)
1056    DeccVariable.cur_case().test_method.func_ret.append("Starts")
1057    is_raise_exception, kwargs = _get_is_raise_exception(kwargs)
1058    msg_args, kwargs = _get_msg_args(kwargs)
1059    ignore_fail = _get_ignore_fail()
1060    is_exception = True
1061    _ret = None
1062    func_name = func.__name__
1063    try:
1064        TS.start()
1065        # 执行当前函数
1066        _ret = func(*args, **kwargs)
1067        log.debug("step {} execute result: {}".format(func_name, _ret))
1068        TS.stop()
1069        is_exception = False
1070    except (DeviceNotFound, DeviceTestError) as e:
1071        # 设备异常或DeviceTestError,直接将异常抛出,可解决错误码重复添加的问题
1072        raise e
1073    except TestAssertionError as exception:
1074        _screenshot_and_flash_error_msg(
1075            ignore_fail, is_raise_exception, str(exception), func_name, args, '')
1076    except TypeError as exception:
1077        error_msg = ErrorMessage.TestCase.Code_0203008.format(exception)
1078        log.error(error_msg, is_traceback=True)
1079        _screenshot_and_flash_error_msg(
1080            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1081    except HdcCommandRejectedException as exception:
1082        _check_exception(exception)
1083        error_msg = ErrorMessage.Device.Code_0202302.format(exception)
1084        log.error(error_msg, is_traceback=True)
1085        _screenshot_and_flash_error_msg(
1086            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1087    except ShellCommandUnresponsiveException as exception:
1088        _check_exception(exception)
1089        error_msg = ErrorMessage.Device.Code_0202304.format(exception)
1090        log.error(error_msg, is_traceback=True)
1091        _screenshot_and_flash_error_msg(
1092            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1093    except AppInstallError as exception:
1094        _check_exception(exception)
1095        error_msg = ErrorMessage.Device.Code_0202307.format(exception)
1096        log.error(error_msg, is_traceback=True)
1097        _screenshot_and_flash_error_msg(
1098            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1099    except RpcNotRunningError as exception:
1100        _check_exception(exception)
1101        error_msg = ErrorMessage.Device.Code_0202305.format(exception)
1102        log.error(error_msg, is_traceback=True)
1103        _screenshot_and_flash_error_msg(
1104            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1105    except ConnectionRefusedError as exception:
1106        # 设备掉线connector client连接拒绝
1107        error_msg = ErrorMessage.Device.Code_0202306.format(exception)
1108        log.error(error_msg, is_traceback=True)
1109        raise DeviceNotFound(error_msg) from exception
1110    except Exception as exception:
1111        _check_exception(exception)
1112        error_msg = ErrorMessage.TestCase.Code_0203002.format(exception)
1113        log.error(error_msg, is_traceback=True)
1114        _screenshot_and_flash_error_msg(
1115            ignore_fail, is_raise_exception, msg_args, func_name, args, error_msg)
1116    finally:
1117        DeccVariable.cur_case().test_method.func_ret.append("Ends")
1118        # 1.若执行接口出现异常,会进入_screenshot_and_flash_error_msg方法
1119        # 2.当被执行的接口不在teardown阶段里运行,且没有接口参数没有设置EXCEPTION=False,
1120        # _screenshot_and_flash_error_msg方法会重新抛出DeviceTestError异常
1121        # 3.若执行非teardown的接口出现异常,且回到了顶层aw,将Starts/Ends计数器清零
1122        if is_exception and not ignore_fail and _is_in_top_aw():
1123            log.debug('----------finally clear Starts/Ends--------')
1124            DeccVariable.cur_case().test_method.func_ret.clear()
1125
1126    if is_exception:
1127        if _is_in_top_aw():
1128            DeccVariable.cur_case().test_method.func_ret.clear()
1129        return False
1130
1131    _check_ret_in_run_checkepr(func, args, kwargs, _ret, ignore_fail,
1132                               is_raise_exception, msg_args)
1133    return _ret
1134
1135
1136def _flash_error_msg(msg_args, error_msg):
1137    log.info("flash error msg.")
1138    # 优先使用断言的自定义异常,然后再是failMsg,最后是捕获的异常
1139    if msg_args:
1140        if not DeccVariable.cur_case().test_method.error_msg or \
1141                not DeccVariable.cur_case().test_method.step_flash_fail_msg:
1142            DeccVariable.cur_case().test_method.set_error_msg(msg_args)
1143            DeccVariable.cur_case().test_method.step_flash_fail_msg = True
1144            if not DeccVariable.cur_case().is_upload_method_result:
1145                DeccVariable.cur_case().set_error_msg(msg_args)
1146    else:
1147        if not DeccVariable.cur_case().test_method.error_msg:
1148            # 更新当前步骤error_msg
1149            DeccVariable.cur_case().test_method.set_error_msg(error_msg)
1150            if not DeccVariable.cur_case().is_upload_method_result \
1151                    and DeccVariable.cur_case().error_msg:
1152                DeccVariable.cur_case().set_error_msg(msg_args)
1153
1154    DeccVariable.cur_case().test_method.set_result(RunResult.FAILED)
1155    if DeccVariable.cur_case().case_result == RunResult.PASSED:
1156        DeccVariable.cur_case().set_case_result(RunResult.FAILED)
1157
1158
1159def _gen_aw_invoke_info_no_div(func, args, kwargs):
1160    all_args = []
1161    name_id = None
1162    if args and getattr(args[0], "__module__", None):
1163        try:
1164            _ad = args[0]
1165            id_strings = []
1166            _device = getattr(_ad, "_device", None)
1167            if _device:
1168                dev_alias = getattr(_device, "device_id", "")
1169                if dev_alias:
1170                    id_strings.append(dev_alias)
1171            dev_sn = getattr(_ad, "device_sn", "")
1172            if dev_sn:
1173                id_strings.append(convert_serial(dev_sn))
1174            name_id = ".".join(id_strings).replace(" ", ".")
1175        except Exception as exception:
1176            log.error(exception)
1177        args = args[1:]
1178    if name_id is not None:
1179        all_args.append(name_id)
1180    if args:
1181        for arg in args:
1182            all_args.append(str(arg))
1183    if kwargs:
1184        for key, value in kwargs.items():
1185            all_args.append("{}={}".format(key, value))
1186    info_items = [
1187        func.__module__.split(".")[-1:][0], ".", func.__name__,
1188        "(", ", ".join(all_args), ")"
1189    ]
1190    return "".join(info_items)
1191
1192
1193def GET_TRACEBACK(_trac=""):
1194    if _trac == "AW":
1195        return "".join(traceback.format_exception(*sys.exc_info())), \
1196            traceback.format_exception(*sys.exc_info())[-1].strip()
1197    return "".join(traceback.format_exception(*sys.exc_info()))
1198
1199
1200def ASSERT(expect, actual):
1201    if expect != actual:
1202        raise TestFailure(ErrorMessage.Assertion.Code_0204026.format(expect, actual))
1203
1204
1205def CHECK(message, expect, actual):
1206    if DeccVariable.cur_case() is None:
1207        cur_case = CurCase(log)
1208        DeccVariable.set_cur_case_obj(cur_case)
1209        return
1210    MESSAGE(message)
1211    EXPECT(expect)
1212    ACTUAL(actual)
1213
1214
1215def MESSAGE(arg):
1216    if DeccVariable.cur_case() is None:
1217        cur_case = CurCase(log)
1218        DeccVariable.set_cur_case_obj(cur_case)
1219        return
1220    DeccVariable.cur_case().cur_check_cmd.through = get_decode(arg)
1221    log.debug("Description: {}".format(
1222        DeccVariable.cur_case().cur_check_cmd.through))
1223
1224
1225def EXPECT(arg):
1226    if DeccVariable.cur_case() is None:
1227        cur_case = CurCase(log)
1228        DeccVariable.set_cur_case_obj(cur_case)
1229        return
1230    DeccVariable.cur_case().cur_check_cmd.expect = get_decode(arg)
1231    log.debug("Expected: {}".format(
1232        DeccVariable.cur_case().cur_check_cmd.expect))
1233
1234
1235def ACTUAL(arg):
1236    if DeccVariable.cur_case() is None:
1237        cur_case = CurCase(log)
1238        DeccVariable.set_cur_case_obj(cur_case)
1239        return
1240    DeccVariable.cur_case().cur_check_cmd.actual = get_decode(arg)
1241    log.debug("Actual: {}".format(
1242        DeccVariable.cur_case().cur_check_cmd.actual))
1243
1244
1245def Step(name, **kwargs):
1246    """记录用例操作步骤,并展示在用例报告里
1247    Args:
1248        name: str, step name
1249    Example:
1250        Step("11")
1251        Step("11", video="a video address")
1252    """
1253    cur_case = DeccVariable.cur_case()
1254    if cur_case is None:
1255        log.warning("current case object is none, recording step failed")
1256        return -1
1257    return cur_case.set_step_info(name, **kwargs)
1258
1259
1260def UpdateStep(index, **kwargs):
1261    """更新步骤记录信息
1262    Args:
1263        index: int, step index
1264    Example:
1265        index = Step("11")
1266        UpdateStep(index, video="a video address")
1267    """
1268    cur_case = DeccVariable.cur_case()
1269    if cur_case is None:
1270        log.warning("current case object is none, updating step failed")
1271        return
1272    cur_case.update_step_info(index, **kwargs)
1273
1274
1275def CheckPoint(checkpoint):
1276    Step(checkpoint)
1277
1278
1279def CONFIG():
1280    return DeccVariable.project.config_json
1281
1282
1283def get_report_dir(self=None):
1284    """
1285    get Path to the framework execution case log folder
1286    Returns: log_dir_path
1287    """
1288    if isinstance(self, TestCase):
1289        return self.project.task_report_dir
1290    return DeccVariable.project.task_report_dir
1291
1292
1293def RecordStep(name, **kwargs):
1294    """记录用例操作步骤,并展示在用例报告里
1295    name: str, step name, 作为唯一标识,用于更新记录
1296    Example:
1297        RecordStep("11", result=True)
1298        RecordStep("11", result=True, video="a video address")
1299    """
1300    warnings.warn("RecordStep is deprecated, use Step instead", DeprecationWarning)
1301    cur_case = DeccVariable.cur_case()
1302    if cur_case is None:
1303        log.warning("current case object is none, recording step failed")
1304        return
1305    cur_case.set_step_info(name, **kwargs)
1306
1307
1308class Property:
1309
1310    def __init__(self):
1311        pass
1312
1313    def add_attributes(self, key, value):
1314        setattr(self, key, value)
1315        log.debug("Property setattr {}={}".format(key, value))
1316