• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3.4
2# coding=utf-8
3#
4
5# Copyright (C) 2016 Huawei Technologies Co., HUTAF xDevice
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may not
8# use this file except in compliance with the License. You may obtain a copy of
9# the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations under
17# the License.
18
19import copy
20import sys
21import os
22import traceback
23
24from xdevice import ConfigConst
25from xdevice import get_cst_time
26from xdevice import get_file_absolute_path
27from xdevice import LifeStage
28from xdevice import Scheduler
29
30from devicetest.core.constants import RunStatus
31from devicetest.core.constants import RunResult
32from devicetest.core.constants import FileAttribute
33from devicetest.core.variables import CurCase
34from devicetest.core.variables import DeccVariable
35from devicetest.core.variables import ProjectVariables
36from devicetest.log.logger import DeviceTestLog
37from devicetest.report.generation import add_log_caching_handler
38from devicetest.report.generation import del_log_caching_handler
39from devicetest.report.generation import get_caching_logs
40from devicetest.report.generation import generate_report
41from devicetest.utils.util import calculate_execution_time
42from devicetest.utils.util import get_base_name
43from devicetest.utils.util import get_dir_path
44from devicetest.utils.util import import_from_file
45
46suite_flag = None
47
48
49class TestSuite(object):
50    """Base class for all test classes to inherit from.
51
52    This class gets all the controller objects from test_runner and executes
53    the test cases requested within itself.
54
55    """
56
57    def __init__(self, configs, path):
58        self.configs = configs
59        self.devices = []
60        self.device1 = None
61        self.device2 = None
62        # 透传的参数
63        self.pass_through = None
64        self.set_devices(self.configs["devices"])
65        self.path = path
66        self.log = self.configs["log"]
67        self.error_msg = ''
68        self.case_list = []
69        self.case_result = dict()
70        self.suite_name = self.configs.get("suite_name")
71        # 白名单用例
72        self.white_case_list = []
73        # 黑名单用例
74        self.black_case_list = []
75        # 初始化透传参数的列表
76        self.arg_list = dict()
77        self.app_result_info = dict()
78        self._test_args_para_parse(self.configs["testargs"])
79        # 往DeviceTest的用例中注入logger并防止重复初始化测试套级别的变量
80        self.inject_logger = None
81        self.cur_case = None
82        # device log
83        self.device_log = dict()
84        self.hilog = dict()
85        self.log_proc = dict()
86        self.hilog_proc = dict()
87
88        self.suite_case_results = []
89        self.suite_report_path = ""
90        self._case_log_buffer_hdl = None
91
92        # device录屏截图属性
93        self.devices_media = dict()
94
95    def __enter__(self):
96        return self
97
98    def __exit__(self, *args):
99        pass
100
101    def _device_close(self):
102        self.log.debug("Start device close")
103        for device in self.devices:
104            device.close()
105        self.log.debug("Finish device close.")
106
107    def run(self):
108        self._init_devicetest()
109        start_time = get_cst_time()
110        # 开始收集测试套(setup和teardown)的运行日志
111        suite_log_buffer_hdl = add_log_caching_handler()
112        try:
113            self.cur_case.set_suite_instance(self)
114            # 1.先判断是否在json中指定,否则先收集当前文件夹下所有testcase得到run_list
115            self.case_list = self._get_case_list(self.path)
116            self.log.debug("Execute test case list: {}".format(self.case_list))
117            # 2.先执行self.setup
118            if RunStatus.FINISHED == self.run_setup():
119                return
120
121            # 在运行测试套子用例前,停止收集测试套(setup和teardown)的运行日志
122            del_log_caching_handler(suite_log_buffer_hdl)
123            # 3.依次执行所有的run_list
124            # 开始收集测试套子用例的运行日志
125            self._case_log_buffer_hdl = add_log_caching_handler()
126            total_case_num = len(self.case_list)
127            case_num = 1
128            for case in self.case_list:
129                self.log.info("[{} / {}] Executing suite case: {}".format(case_num, total_case_num, case))
130                self.run_one_test_case(case)
131                case_num = case_num + 1
132
133            # 停止收集测试套子用例的运行日志
134            del_log_caching_handler(self._case_log_buffer_hdl)
135            self._case_log_buffer_hdl = None
136            # 在运行测试套子用例前,重新开始收集测试套(setup和teardown)的运行日志
137            add_log_caching_handler(buffer_hdl=suite_log_buffer_hdl)
138        finally:
139            # 4.执行self.teardown
140            self.run_teardown()
141            self.cur_case.set_suite_instance(None)
142
143        steps = self.cur_case.get_steps_info()
144        self.log.info(f"test suite steps: {steps}")
145        # 停止收集测试套(setup和teardown)的运行日志
146        del_log_caching_handler(suite_log_buffer_hdl)
147        if suite_log_buffer_hdl is None:
148            return
149        # 生成测试套的报告
150        self.log.info("generate suite report")
151        end_time = get_cst_time()
152        suite_info = {
153            "name": self.suite_name,
154            "result": "",
155            "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
156            "end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
157            'elapsed': calculate_execution_time(start_time, end_time),
158            "error": "",
159            "logs": get_caching_logs(suite_log_buffer_hdl),
160            "cases": self.suite_case_results
161        }
162        report_path = os.path.join("details", self.suite_name, self.suite_name + ".html")
163        to_file = os.path.join(self.get_case_report_path(), report_path)
164        generate_report(to_file, template="suite.html", suite=suite_info)
165        del suite_log_buffer_hdl
166
167        # 往结果xml添加测试套的报告路径
168        self.suite_report_path = report_path
169        steps.clear()
170        DeccVariable.reset()
171
172    def setup(self):
173        """Setup function that will be called before executing any test suite.
174        Implementation is optional.
175        """
176        pass
177
178    def setup_start(self):
179        """
180        setup_start function that will be called after setup function.
181        Implementation is optional.
182        """
183        pass
184
185    def setup_end(self):
186        """
187        setup_end function that will be called after setup function.
188        Implementation is optional.
189        """
190        pass
191
192    def teardown(self):
193        """Teardown function that will be called after all the selected test
194        suite.
195        Implementation is optional.
196        """
197        pass
198
199    def teardown_start(self):
200        """
201        teardown_start function that will be called before Teardown function.
202        Implementation is optional.
203        """
204        pass
205
206    def teardown_end(self):
207        """
208        teardown_end function that will be called after Teardown function.
209        Implementation is optional.
210        """
211        pass
212
213    def get_params(self):
214        return self.arg_list
215
216    def set_devices(self, devices):
217        self.devices = devices
218        if not devices:
219            return
220
221        try:
222            num = 1
223
224            for _, _ad in enumerate(self.devices):
225                if not hasattr(_ad, "device_id"):
226                    setattr(_ad, "device_id", "device{}".format(num))
227                # 兼容release2 增加id、serial
228                setattr(_ad, "id", _ad.device_id)
229                setattr(_ad, "serial", _ad.device_sn)
230                setattr(self, _ad.device_id, _ad)
231                setattr(self, "device{}".format(num), _ad)
232                num += 1
233        except Exception as error:
234            self.log.error("Failed to initialize the device object in the "
235                           "TestCase.", error_no="01218")
236            raise error
237
238    def _get_case_list(self, path):
239        result = []
240        if len(self.configs["suitecases"]) > 0:
241            for _, case in enumerate(self.configs["suitecases"]):
242                if os.path.exists(case):
243                    case_path = case
244                else:
245                    case_path = get_file_absolute_path(case, [path,
246                                                              self.configs["resource_path"],
247                                                              self.configs["testcases_path"]])
248                result.append(case_path)
249        else:
250            all_file_list = os.listdir(path)
251            # 遍历该文件夹下的所有目录或者文件
252            for file in all_file_list:
253                filepath = os.path.join(path, file)
254                # 如果是文件夹,递归调用函数
255                if os.path.isdir(filepath):
256                    result.extend(self._get_case_list(filepath))
257                # 如果不是文件夹,保存文件路径及文件名
258                elif os.path.isfile(filepath) and \
259                        "__pycache__" not in filepath:
260                    if file.startswith(FileAttribute.TESTCASE_PREFIX) and \
261                            (file.endswith(FileAttribute.TESTCASE_POSFIX_PY) or
262                             file.endswith(FileAttribute.TESTCASE_POSFIX_PYC) or
263                             file.endswith(FileAttribute.TESTCASE_POSFIX_PYD)):
264                        result.append(filepath)
265        return result
266
267    def _exec_func(self, func, *args):
268        result = False
269        try:
270            func(*args)
271        except Exception as exception:
272            self.error_msg = "{}:{}".format(str(exception), traceback.format_exc())
273            self.log.error("run case error! Exception: {}".format(self.error_msg))
274        else:
275            result = True
276        return result
277
278    def run_setup(self):
279        self.setup_start()
280        self.log.info("**********SetUp Starts!")
281        ret = self._exec_func(self.setup)
282        self.log.info("**********SetUp Ends!")
283        if ret:
284            self.setup_end()
285            return RunStatus.RUNNING
286        self.log.info("SetUp Failed!")
287        return RunStatus.FINISHED
288
289    def run_one_test_case(self, case_path):
290        case_name = get_base_name(case_path)
291        if (self.black_case_list and case_name in self.black_case_list) \
292                or (self.white_case_list and case_name not in self.white_case_list):
293            self.log.warning("case name {} is in black list or not in white list, ignored".format(case_name))
294            return
295        start_time = get_cst_time()
296        case_result = RunResult.FAILED
297        error_msg = ""
298        test_cls_instance = None
299        try:
300            test_cls = import_from_file(get_dir_path(case_path), case_name)
301            self.log.info("Success to import {}.".format(case_name))
302            self._compatible_testcase(case_path, case_name)
303            with test_cls(self.configs) as test_cls_instance:
304                self.cur_case.set_case_instance(test_cls_instance)
305                test_cls_instance.run()
306            case_result, error_msg = test_cls_instance.result, test_cls_instance.error_msg
307        except Exception as e:
308            self.log.debug(traceback.format_exc())
309            self.log.error("run case error! Exception: {}".format(e))
310        finally:
311            if test_cls_instance is None:
312                case_result = RunResult.BLOCKED
313
314            Scheduler.call_life_stage_action(stage=LifeStage.case_end,
315                                             case_name=case_name,
316                                             case_result=case_result)
317
318            end_time = get_cst_time()
319            cost = int(round((end_time - start_time).total_seconds() * 1000))
320            self.case_result[case_name] = {
321                "result": case_result, "error": error_msg,
322                "run_time": cost, "report": ""}
323            self.log.info("Executed case: {}, result: {}, cost time: {}".format(
324                case_name, test_cls_instance.result, cost))
325            if test_cls_instance:
326                try:
327                    del test_cls_instance
328                    self.log.debug("del test case instance success.")
329                except Exception as e:
330                    self.log.debug(traceback.format_exc())
331                    self.log.warning("del test case instance exception."
332                                     " Exception: {}".format(e))
333            self._device_close()
334
335        if self._case_log_buffer_hdl is None:
336            return
337        # 生成子用例的报告
338        end_time = get_cst_time()
339        extra_head = self.cur_case.get_steps_extra_head()
340        steps = self.cur_case.get_steps_info()
341        base_info = {
342            "name": case_name,
343            "result": case_result,
344            "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
345            "end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
346            'elapsed': calculate_execution_time(start_time, end_time),
347            "error": error_msg,
348            "extra_head": extra_head,
349            "steps": steps
350        }
351        case_info = copy.copy(base_info)
352        case_info["logs"] = copy.copy(get_caching_logs(self._case_log_buffer_hdl))
353        case_html = case_name + ".html"
354        report_path = os.path.join("details", self.suite_name, case_html)
355        to_path = os.path.join(self.configs.get("report_path"), report_path)
356        generate_report(to_path, case=case_info)
357        base_info["report"] = case_html
358        self.suite_case_results.append(base_info)
359        # 清空日志缓存
360        self._case_log_buffer_hdl.buffer.clear()
361        steps.clear()
362        # 往结果xml添加子用例的报告路径
363        self.case_result[case_name]["report"] = report_path
364        # 将用例实例对象和用例名置为空
365        self.cur_case.set_case_instance(None)
366        self.cur_case.set_name("")
367
368    def run_teardown(self):
369        self.log.info("**********TearDown Starts!")
370        self.teardown_start()
371        self._exec_func(self.teardown)
372        self.teardown_end()
373        self.log.info("**********TearDown Ends!")
374
375    def _test_args_para_parse(self, paras):
376        paras = dict(paras)
377        for para_name in paras.keys():
378            para_name = para_name.strip()
379            para_values = paras.get(para_name, [])
380            if para_name == "class":
381                self.white_case_list.extend(para_values)
382            elif para_name == "notClass":
383                self.black_case_list.extend(para_values)
384            elif para_name == "para":
385                for arg in para_values:
386                    key, value = arg.split("#")
387                    self.arg_list[key] = value
388            elif para_name == "deveco_planet_info":
389                for app_info in para_values:
390                    key, value = app_info.split("#")
391                    if key == "task_type":
392                        setattr(sys, "category", value)
393                    else:
394                        self.app_result_info[key] = value
395                        setattr(sys, "app_result_info", self.app_result_info)
396            elif para_name == ConfigConst.pass_through:
397                self.pass_through = para_values
398            else:
399                continue
400
401        self.configs["pass_through"] = self.pass_through
402        self.configs["arg_list"] = self.arg_list
403
404    def get_case_report_path(self):
405        return self.configs["report_path"]
406
407    def _compatible_testcase(self, case_path, case_name):
408        DeccVariable.cur_case().set_name(case_name)
409        project_var = ProjectVariables(self.inject_logger)
410        project_var.execute_case_name = case_name
411        project_var.cur_case_full_path = case_path
412        project_var.task_report_dir = self.get_case_report_path()
413        self.configs["project"] = project_var
414
415    def _init_devicetest(self):
416        DeviceTestLog.set_log(self.log)
417        self.cur_case = CurCase(self.log)
418        self.cur_case.suite_name = self.suite_name
419        self.cur_case.set_case_screenshot_dir(None, self.get_case_report_path(), None)
420        DeccVariable.set_cur_case_obj(self.cur_case)
421