• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21from enum import Enum
22from threading import RLock
23
24from _core.constants import ModeType
25from _core.logger import platform_logger
26from _core.report.encrypt import check_pub_key_exist
27from _core.report.reporter_helper import DataHelper
28from _core.report.reporter_helper import ReportConstant
29
30LOG = platform_logger("SuiteReporter")
31SUITE_REPORTER_LOCK = RLock()
32
33
34class ResultCode(Enum):
35    UNKNOWN = -1010
36    BLOCKED = -1
37    PASSED = 0
38    FAILED = 1
39    SKIPPED = 2
40
41
42class SuiteReporter:
43    suite_list = []
44    suite_report_result = []
45    failed_case_list = []
46    history_report_result = []
47
48    def __init__(self, results, report_name, report_path=None, **kwargs):
49        """
50        Create suite report
51        :param results: [(suite_result, [case_results]),
52                        (suite_result, [case_results]), ...]
53        :param report_name: suite report name
54        :param report_path: suite report path
55        """
56        self.results = results
57        self.data_helper = DataHelper()
58        self.report_name = report_name
59        self.report_path = report_path
60        self.suite_data_path = os.path.join(
61            self.report_path, "%s%s" % (
62                report_name, self.data_helper.DATA_REPORT_SUFFIX))
63        self.kwargs = kwargs
64        from xdevice import Scheduler
65        if not check_pub_key_exist() and Scheduler.mode != ModeType.decc:
66            SuiteReporter.suite_report_result.clear()
67
68    def create_empty_report(self):
69        # create empty data report only for single suite
70        if len(self.results) != 1:
71            LOG.error("Can only create one empty data report once")
72            return
73        suite_result, _ = self.results[0]
74
75        # initial test suites element
76        test_suites_element, test_suites_attributes, _ = \
77            self._initial_test_suites()
78        test_suites_attributes[ReportConstant.unavailable] = 1
79        self.data_helper.set_element_attributes(test_suites_element,
80                                                test_suites_attributes)
81
82        # initial test suite element
83        test_suite_element, test_suite_attributes = self._initial_test_suite(
84            suite_result)
85        test_suite_element.text, test_suite_element.tail = \
86            "", self.data_helper.LINE_BREAK
87        test_suite_attributes[ReportConstant.unavailable] = 1
88        test_suite_attributes[ReportConstant.message] = suite_result.stacktrace
89
90        from xdevice import Scheduler
91        if Scheduler.mode == ModeType.decc:
92            test_suite_attributes[ReportConstant.result] = ReportConstant.false
93        self.data_helper.set_element_attributes(test_suite_element,
94                                                test_suite_attributes)
95
96        # append test suite element
97        test_suites_element.append(test_suite_element)
98
99        # generate report
100        if test_suites_element:
101            from xdevice import Scheduler
102            if Scheduler.mode != ModeType.decc:
103                self.data_helper.generate_report(test_suites_element,
104                                                 self.suite_data_path)
105            SuiteReporter.append_report_result((
106                self.suite_data_path, self.data_helper.to_string(
107                    test_suites_element)))
108
109    def generate_data_report(self):
110        # construct test suites element
111        test_suites_element = self._construct_test_suites()
112
113        # generate report
114        if test_suites_element:
115            self.data_helper.generate_report(test_suites_element,
116                                             self.suite_data_path)
117            SuiteReporter.append_report_result((
118                self.suite_data_path, self.data_helper.to_string(
119                    test_suites_element)))
120
121    def _construct_test_suites(self):
122        # initial test suites element
123        test_suites_element, test_suites_attributes, need_update_attributes = \
124            self._initial_test_suites()
125
126        # construct test suite element
127        for suite_result, case_results in self.results:
128            test_suite_element, test_suite_attributes = \
129                self._construct_test_suite(suite_result, case_results)
130
131            # add test suite element
132            test_suites_element.append(test_suite_element)
133
134            # update and set test suites element attributes
135            for need_update_attribute in need_update_attributes:
136                test_suites_attributes[need_update_attribute] += \
137                    test_suite_attributes.get(need_update_attribute, 0)
138        test_suites_attributes[ReportConstant.time] = \
139            round(test_suites_attributes.get(ReportConstant.time), 3)
140
141        if test_suites_element:
142            test_suite_element = test_suites_element[-1]
143            test_suite_element.tail = self.data_helper.LINE_BREAK
144        else:
145            LOG.error("%s no suite result exists" % self.report_name)
146
147        # set test suites element attributes
148        self.data_helper.set_element_attributes(test_suites_element,
149                                                test_suites_attributes)
150        return test_suites_element
151
152    def _initial_test_suites(self):
153        test_suites_element = self.data_helper.initial_suites_element()
154        test_suites_attributes = {
155            ReportConstant.name: self.report_name,
156            ReportConstant.time_stamp: time.strftime(ReportConstant.time_format, time.localtime()),
157            ReportConstant.time: 0,
158            ReportConstant.errors: 0,
159            ReportConstant.disabled: 0,
160            ReportConstant.failures: 0,
161            ReportConstant.tests: 0,
162            ReportConstant.ignored: 0,
163            ReportConstant.unavailable: 0,
164            ReportConstant.product_info: self.kwargs.get(ReportConstant.product_info_, ""),
165            # module's failure message
166            ReportConstant.message: self.kwargs.get(ReportConstant.message, "")
167        }
168        module_name = self.kwargs.get(ReportConstant.module_name, "")
169        if module_name:
170            test_suites_attributes[ReportConstant.name] = module_name
171        need_update_attributes = [
172            ReportConstant.time,
173            ReportConstant.errors,
174            ReportConstant.tests,
175            ReportConstant.ignored,
176            ReportConstant.disabled,
177            ReportConstant.failures,
178            ReportConstant.unavailable
179        ]
180        return test_suites_element, test_suites_attributes, need_update_attributes
181
182    def _construct_test_suite(self, suite_result, case_results):
183        # initial test suite element
184        test_suite_element, test_suite_attributes = self._initial_test_suite(
185            suite_result)
186
187        # get test case elements that are children of test suite element
188        test_case_elements = []
189        for case_result in case_results:
190            # initial test case element
191            test_case_element, test_case_attributes = self._initial_test_case(
192                case_result)
193
194            # update attributes according to case result
195            self.update_attributes(case_result, test_case_attributes,
196                                   test_suite_attributes)
197
198            # set test case attributes and add to test_suite_element
199            self.data_helper.set_element_attributes(test_case_element,
200                                                    test_case_attributes)
201            test_case_elements.append(test_case_element)
202        test_suite_attributes[ReportConstant.disabled] += max(int(
203            test_suite_attributes.get(ReportConstant.tests) -
204            len(test_case_elements)), 0)
205        if test_case_elements:
206            child = test_case_elements[-1]
207            child.tail = self.data_helper.LINE_BREAK_INDENT
208        else:
209            LOG.debug("No case executed")
210        test_suite_element.extend(test_case_elements)
211
212        # set test suite attributes
213        self.data_helper.set_element_attributes(test_suite_element,
214                                                test_suite_attributes)
215        return test_suite_element, test_suite_attributes
216
217    @classmethod
218    def update_attributes(cls, case_result, test_case_attributes,
219                          test_suite_attributes):
220        if case_result.code == ResultCode.PASSED.value:
221            test_case_attributes[ReportConstant.status] = ReportConstant.run
222            test_case_attributes[ReportConstant.result] = ReportConstant.true
223            test_case_attributes[ReportConstant.message] = ""
224        elif case_result.code == ResultCode.FAILED.value:
225            test_case_attributes[ReportConstant.status] = ReportConstant.run
226            test_case_attributes[ReportConstant.result] = ReportConstant.false
227            test_suite_attributes[ReportConstant.failures] = \
228                test_suite_attributes[ReportConstant.failures] + 1
229        elif case_result.code == ResultCode.SKIPPED.value:
230            test_case_attributes[ReportConstant.status] = ReportConstant.skip
231            test_case_attributes[ReportConstant.result] = ReportConstant.false
232            test_suite_attributes[ReportConstant.ignored] = \
233                test_suite_attributes[ReportConstant.ignored] + 1
234        else:  # ResultCode.UNKNOWN.value or other value
235            test_case_attributes[ReportConstant.status] = \
236                ReportConstant.disable
237            test_case_attributes[ReportConstant.result] = ReportConstant.false
238            test_suite_attributes[ReportConstant.disabled] = \
239                test_suite_attributes[ReportConstant.disabled] + 1
240
241    def _initial_test_suite(self, suite_result):
242        test_suite_element = self.data_helper.initial_suite_element()
243        test_suite_attributes = {
244            ReportConstant.name: suite_result.suite_name,
245            ReportConstant.time: round(float(suite_result.run_time) / 1000, 3),
246            ReportConstant.errors: 0,
247            ReportConstant.disabled: 0,
248            ReportConstant.failures: 0,
249            ReportConstant.ignored: 0,
250            ReportConstant.tests: suite_result.test_num,
251            ReportConstant.message: suite_result.stacktrace
252        }
253        if self.kwargs.get(ReportConstant.module_name, ""):
254            test_suite_attributes[ReportConstant.module_name] = self.kwargs.get(
255                ReportConstant.module_name, "")
256        return test_suite_element, test_suite_attributes
257
258    def _initial_test_case(self, case_result):
259        test_case_element = self.data_helper.initial_case_element()
260        case_stacktrace = str(case_result.stacktrace)
261        for char_index in range(32):
262            if char_index in [10, 13]:  # chr(10): LF, chr(13): CR
263                continue
264            case_stacktrace = case_stacktrace.replace(chr(char_index), "")
265        test_case_attributes = {ReportConstant.name: case_result.test_name,
266                                ReportConstant.status: "",
267                                ReportConstant.time: round(float(
268                                     case_result.run_time) / 1000, 3),
269                                ReportConstant.class_name:
270                                    case_result.test_class,
271                                ReportConstant.result: "",
272                                ReportConstant.level: 1,
273                                ReportConstant.message: case_stacktrace}
274        return test_case_element, test_case_attributes
275
276    @classmethod
277    def clear_report_result(cls):
278        with SUITE_REPORTER_LOCK:
279            LOG.debug("Clear report result")
280            cls.suite_report_result.clear()
281
282    @classmethod
283    def clear_failed_case_list(cls):
284        with SUITE_REPORTER_LOCK:
285            LOG.debug("Clear failed case list")
286            cls.failed_case_list.clear()
287
288    @classmethod
289    def append_report_result(cls, report_result):
290        with SUITE_REPORTER_LOCK:
291            if not isinstance(report_result, tuple) or len(report_result) != 2:
292                LOG.error("Report result should be a tuple with length 2")
293                return
294            data_path = report_result[0]
295            for index, exist_result in enumerate(cls.suite_report_result):
296                if exist_result[0] == data_path:
297                    LOG.debug("Data report %s generate again", data_path)
298                    cls.suite_report_result[index] = report_result
299                    return
300            cls.suite_report_result.append(report_result)
301            cls._upload_case_result(report_result[1])
302
303    @classmethod
304    def _upload_case_result(cls, result_str):
305        from xdevice import Scheduler
306        if Scheduler.mode != ModeType.decc:
307            return
308        element = DataHelper.parse_data_report(result_str)
309        if len(element) == 0:
310            LOG.debug("%s is error", result_str)
311            return
312        element = element[0]
313        result, error_msg = Scheduler.get_script_result(element)
314        case_name = element.get(ReportConstant.name, "")
315        try:
316            from agent.decc import Handler
317            LOG.info("Upload case result to decc")
318            Handler.upload_case_result(case_name, result, error_msg)
319        except ModuleNotFoundError as error:
320            from xdevice import Scheduler
321            if Scheduler.mode == ModeType.decc:
322                LOG.error("Module not found %s", error.args)
323
324    @classmethod
325    def get_report_result(cls):
326        with SUITE_REPORTER_LOCK:
327            LOG.debug("Get report result, length is {}".
328                      format(len(cls.suite_report_result)))
329            return SuiteReporter.suite_report_result
330
331    @classmethod
332    def set_suite_list(cls, suite_list):
333        LOG.debug("Set suite list, length is {}".format(len(suite_list)))
334        cls.suite_list = suite_list
335
336    @classmethod
337    def get_suite_list(cls):
338        with SUITE_REPORTER_LOCK:
339            LOG.debug("Get suite list, length is {}".
340                      format(len(cls.suite_list)))
341            return SuiteReporter.suite_list
342
343    @classmethod
344    def get_failed_case_list(cls):
345        with SUITE_REPORTER_LOCK:
346            LOG.debug("Get failed case list, length is {}".
347                      format(len(cls.failed_case_list)))
348            return SuiteReporter.failed_case_list
349
350    @classmethod
351    def append_history_result(cls, suite_reports):
352        from _core.utils import get_filename_extension
353        with SUITE_REPORTER_LOCK:
354            LOG.debug("Append history result,suite reports length is {}".
355                      format(len(suite_reports)))
356            for report_path, report_result in suite_reports:
357                module_name = get_filename_extension(report_path)[0]
358                cls.history_report_result. \
359                    append((module_name, report_path, report_result))
360
361    @classmethod
362    def clear_history_result(cls):
363        with SUITE_REPORTER_LOCK:
364            LOG.debug("Clear history result")
365            cls.history_report_result.clear()
366
367    @classmethod
368    def get_history_result_by_module(cls, name):
369        with SUITE_REPORTER_LOCK:
370            LOG.debug("Get history result by module,module_name:{}".
371                      format(name))
372            for module_name, report_path, report_result in \
373                    cls.history_report_result:
374                if name == module_name:
375                    return report_path, report_result
376            return "", ""
377
378    @classmethod
379    def get_history_result_list(cls):
380        with SUITE_REPORTER_LOCK:
381            LOG.debug("Get history result list,length is {}".
382                      format(len(cls.history_report_result)))
383            return cls.history_report_result
384