• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import stat
22import uuid
23from abc import ABC
24from abc import abstractmethod
25
26from _core.plugin import Plugin
27from _core.plugin import get_plugin
28from _core.constants import TestType
29from _core.executor.bean import CaseResult
30from _core.executor.bean import SuiteResult
31from _core.executor.bean import SuitesResult
32from _core.interface import LifeCycle
33from _core.interface import IListener
34from _core.report.suite_reporter import SuiteReporter
35from _core.report.suite_reporter import ResultCode
36
37
38__all__ = ["UniversalReportListener", "PlusReportListener"]
39
40
41class AbsReportListener(IListener, ABC):
42
43    @abstractmethod
44    def _get_suite_result(self, test_result, create=False):
45        pass
46
47    @abstractmethod
48    def _get_test_result(self, test_result, create=False):
49        pass
50
51    def __started__(self, lifecycle, result):
52        if lifecycle == LifeCycle.TestSuites:
53            self._handle_test_suites_start(result)
54        elif lifecycle == LifeCycle.TestSuite:
55            self._handle_testsuite_start(result)
56        elif lifecycle == LifeCycle.TestCase:
57            self._handle_case_start(result)
58
59    def __ended__(self, lifecycle, test_result, **kwargs):
60        if lifecycle == LifeCycle.TestSuite:
61            self._handle_testsuite_end(test_result, kwargs)
62        elif lifecycle == LifeCycle.TestSuites:
63            self._handle_test_suites_end(test_result, kwargs)
64        elif lifecycle == LifeCycle.TestCase:
65            self._handle_case_end(test_result)
66        elif lifecycle == LifeCycle.TestTask:
67            self._handle_task_end(test_result, kwargs)
68
69    def __skipped__(self, lifecycle, result):
70        if lifecycle == LifeCycle.TestCase:
71            self._handle_case_skip(result)
72
73    def __failed__(self, lifecycle, test_result):
74        if lifecycle == LifeCycle.TestSuite:
75            self._handle_suite_fail(test_result)
76        elif lifecycle == LifeCycle.TestCase:
77            self._handle_case_fail(test_result)
78
79    @abstractmethod
80    def _handle_test_suites_start(self, test_result):
81        pass
82
83    @abstractmethod
84    def _handle_testsuite_start(self, test_result):
85        pass
86
87    @abstractmethod
88    def _handle_case_start(self, test_result):
89        pass
90
91    @abstractmethod
92    def _handle_testsuite_end(self, test_result, kwargs):
93        pass
94
95    @abstractmethod
96    def _handle_test_suites_end(self, test_result, kwargs):
97        pass
98
99    @abstractmethod
100    def _handle_case_end(self, test_result):
101        pass
102
103    @abstractmethod
104    def _handle_task_end(self, test_result, kwargs):
105        pass
106
107    @abstractmethod
108    def _handle_case_skip(self, test_result):
109        pass
110
111    @abstractmethod
112    def _handle_case_fail(self, test_result):
113        pass
114
115    @abstractmethod
116    def _handle_suite_fail(self, test_result):
117        pass
118
119
120class ReportEventListener(AbsReportListener, ABC):
121
122    def __init__(self):
123        self.result = list()
124        self.suites = dict()
125        self.tests = dict()
126        self.current_suite_id = 0
127        self.current_test_id = 0
128        self.report_path = ""
129
130    def _get_suite_result(self, test_result, create=False):
131        if test_result.index in self.suites:
132            return self.suites.get(test_result.index)
133        elif create:
134            suite = SuiteResult()
135            rid = uuid.uuid4().hex if test_result.index == "" else \
136                test_result.index
137            suite.index = rid
138            return self.suites.setdefault(rid, suite)
139        else:
140            return self.suites.get(self.current_suite_id)
141
142    def _get_test_result(self, test_result, create=False):
143        if test_result.index in self.tests:
144            return self.tests.get(test_result.index)
145        elif create:
146            test = CaseResult()
147            rid = uuid.uuid4().hex if test_result.index == "" else \
148                test_result.index
149            test.index = rid
150            return self.tests.setdefault(rid, test)
151        else:
152            return self.tests.get(self.current_test_id)
153
154    def _handle_case_start(self, test_result):
155        test = self._get_test_result(test_result=test_result, create=True)
156        test.test_name = test_result.test_name
157        test.test_class = test_result.test_class
158        self.current_test_id = test.index
159
160    def _handle_testsuite_start(self, test_result):
161        suite = self._get_suite_result(test_result=test_result,
162                                       create=True)
163        suite.suite_name = test_result.suite_name
164        suite.test_num = test_result.test_num
165        self.current_suite_id = suite.index
166
167    def _handle_test_suites_start(self, test_result):
168        suites = self._get_suite_result(test_result=test_result,
169                                        create=True)
170        suites.suites_name = test_result.suites_name
171        suites.test_num = test_result.test_num
172        self.current_suite_id = suites.index
173
174    def _handle_testsuite_end(self, test_result, kwargs):
175        suite = self._get_suite_result(test_result=test_result,
176                                       create=False)
177        if not suite:
178            return
179        suite.run_time = test_result.run_time
180        suite.code = test_result.code
181        suite.report = test_result.report
182        suite.test_num = max(test_result.test_num, len(self.tests))
183        self._handle_suite_end_data(suite, kwargs)
184
185    def _handle_task_end(self, test_result, kwargs):
186        test_type = str(kwargs.get("test_type", TestType.all))
187        reporter = get_plugin(plugin_type=Plugin.REPORTER,
188                              plugin_id=test_type)
189        if not reporter:
190            reporter = get_plugin(plugin_type=Plugin.REPORTER,
191                                  plugin_id=TestType.all)[0]
192        else:
193            reporter = reporter[0]
194        reporter.__generate_reports__(self.report_path,
195                                      task_info=test_result)
196
197    def _handle_case_end(self, test_result):
198        test = self._get_test_result(test_result=test_result, create=False)
199        test.run_time = test_result.run_time
200        test.stacktrace = test_result.stacktrace
201        test.code = test_result.code
202        test.report = test_result.report
203        test.is_completed = test_result.is_completed
204
205    def _handle_test_suites_end(self, test_result, kwargs):
206        if not kwargs.get("suite_report", False):
207            result_dir = os.path.join(self.report_path, "result")
208            os.makedirs(result_dir, exist_ok=True)
209            message = kwargs.get("message", "")
210            # 有的场景传SuiteResult对象进来,导致报错,需要增加实例判断
211            if isinstance(test_result, SuitesResult):
212                message = test_result.stacktrace
213                suites_name = test_result.suites_name
214            else:
215                suites_name = kwargs.get("suites_name", "")
216            self._generate_data_report(result_dir, self.result, suites_name, message=message)
217
218    def _handle_case_skip(self, test_result):
219        test = self._get_test_result(test_result=test_result, create=False)
220        test.stacktrace = test_result.stacktrace
221        test.code = ResultCode.SKIPPED.value
222
223    def _handle_case_fail(self, test_result):
224        test = self._get_test_result(test_result=test_result, create=False)
225        test.stacktrace = test_result.stacktrace
226        test.code = ResultCode.FAILED.value
227
228    def _handle_suite_fail(self, test_result):
229        suite = self._get_suite_result(test_result=test_result,
230                                       create=False)
231        suite.stacktrace = test_result.stacktrace
232        suite.code = ResultCode.FAILED.value
233
234    @abstractmethod
235    def _generate_data_report(self, result_dir, results, name, **kwargs):
236        pass
237
238    @abstractmethod
239    def _handle_suite_end_data(self, suite, kwargs):
240        pass
241
242
243class UniversalReportListener(ReportEventListener, ABC):
244
245    def __init__(self):
246        super().__init__()
247        self.suite_distributions = dict()
248
249    def _handle_task_end(self, test_result, kwargs):
250        pass
251
252    @classmethod
253    def _generate_data_report(cls, result_dir, results, name, **kwargs):
254        suite_report = SuiteReporter(results, name, result_dir, **kwargs)
255        suite_report.generate_data_report()
256
257
258class PlusReportListener(ReportEventListener, ABC):
259
260    def __init__(self):
261        super().__init__()
262        self.device_sn = ""
263        self.suite_name = ""
264
265    @classmethod
266    def _generate_data_report(cls, result_dir, results, name, **kwargs):
267        suite_report = SuiteReporter(results, name, result_dir, **kwargs)
268        suite_report.generate_data_report()
269
270    def _handle_case_end(self, test_result):
271        test = self._get_test_result(test_result=test_result, create=False)
272        test.run_time = test_result.run_time
273        test.stacktrace = test_result.stacktrace
274        test.code = test_result.code
275        test.report = test_result.report
276        if getattr(test_result, "result_content", ""):
277            test.result_content = test_result.result_content
278        if hasattr(self, "report_plus") and self.report_plus:
279            self._update_result(test_result)
280            test.normal_screen_urls = test_result.normal_screen_urls
281            test.failure_screen_urls = test_result.failure_screen_urls
282        if hasattr(self, "device_log_back_fill_path"):
283            test_record, result_info = self._update_test_record(self.device_log_back_fill_path, test_result)
284            test.test_record = test_record
285            if hasattr(test_result, "result_info") and test_result.result_info:
286                test.result_info = test_result.result_info
287            else:
288                test.result_info = result_info
289
290    def _update_result(self, result):
291        serial_re = re.sub(r'[^A-Za-z0-9]', '_', self.device_sn)
292        screenshot_base_path = os.path.join(
293            self.report_path, "screenshot", serial_re, self.suite_name)
294        temp = "{}_{}=".format(result.test_class, result.test_name)
295        result.normal_screen_urls = os.path.join(screenshot_base_path, "normal", temp)
296        result.failure_screen_urls = os.path.join(screenshot_base_path, "failure", temp)
297
298    def _update_test_record(self, device_log_back_fill_path, result):
299        try:
300            split_str = "{}_{}".format(result.test_class, result.test_name)
301            test_record = ""
302            result_info = ""
303            test_class = '"{}"'.format(result.test_class)
304            test_name = '"{}"'.format(result.test_name)
305
306            fd = os.open(device_log_back_fill_path, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR)
307            with os.fdopen(fd, "r", encoding="utf-8") as file_content:
308                for line in file_content.readlines():
309                    suite_name = ""
310                    if hasattr(self, "suite_name"):
311                        suite_name = self.suite_name
312                    if suite_name and suite_name in line and split_str in line:
313                        str_arr = line.split(split_str)
314                        if str_arr[1]:
315                            test_record = str_arr[1].strip()
316                    elif test_class in line and test_name in line:
317                        json_arr = line.split("HtsIgnoredTest")
318                        if len(json_arr) == 2:
319                            result_info = json_arr[1].strip()
320            return test_record, result_info
321        except (FileNotFoundError, IOError) as error:
322            raise error
323