• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21from enum import Enum
22from threading import RLock
23
24from _core.constants import CaseResult
25from _core.constants import ModeType
26from _core.logger import platform_logger
27from _core.report.encrypt import check_pub_key_exist
28from _core.report.reporter_helper import DataHelper
29from _core.report.reporter_helper import ReportConstant
30from _core.context.center import Context
31from _core.context.handler import get_case_result
32
33LOG = platform_logger("SuiteReporter")
34SUITE_REPORTER_LOCK = RLock()
35
36
37class ResultCode(Enum):
38    UNKNOWN = -1010
39    BLOCKED = -1
40    PASSED = 0
41    FAILED = 1
42    SKIPPED = 2
43
44
45class SuiteReporter:
46    suite_list = []
47    suite_report_result = []
48    failed_case_list = []
49    history_report_result = []
50
51    def __init__(self, results, report_name, report_path=None, **kwargs):
52        """
53        Create suite report
54        :param results: [(suite_result, [case_results]),
55                        (suite_result, [case_results]), ...]
56        :param report_name: suite report name
57        :param report_path: suite report path
58        """
59        self.results = results
60        self.data_helper = DataHelper()
61        self.report_name = report_name
62        self.report_path = report_path
63        self.suite_data_path = os.path.join(
64            self.report_path, "%s%s" % (
65                report_name, self.data_helper.DATA_REPORT_SUFFIX))
66        self.kwargs = kwargs
67        if not check_pub_key_exist() and Context.session().mode != ModeType.decc:
68            SuiteReporter.suite_report_result.clear()
69
70    def create_empty_report(self):
71        # create empty data report only for single suite
72        if len(self.results) != 1:
73            LOG.error("Can only create one empty data report once")
74            return
75        suite_result, _ = self.results[0]
76
77        # initial test suites element
78        test_suites_element, test_suites_attributes, _ = \
79            self._initial_test_suites()
80        if self.kwargs.get(ReportConstant.result_kind) == CaseResult.unavailable:
81            test_suites_attributes[ReportConstant.unavailable] = 1
82        else:
83            test_suites_attributes[ReportConstant.disabled] = 1
84        self.data_helper.set_element_attributes(test_suites_element,
85                                                test_suites_attributes)
86
87        # initial test suite element
88        test_suite_element, test_suite_attributes = self._initial_test_suite(
89            suite_result)
90        test_suite_element.text, test_suite_element.tail = \
91            "", self.data_helper.LINE_BREAK
92        if self.kwargs.get(ReportConstant.result_kind) == CaseResult.unavailable:
93            test_suite_attributes[ReportConstant.unavailable] = 1
94        else:
95            test_suite_attributes[ReportConstant.disabled] = 1
96        test_suite_attributes[ReportConstant.message] = suite_result.stacktrace
97
98        if Context.session().mode == ModeType.decc:
99            test_suite_attributes[ReportConstant.result] = ReportConstant.false
100        self.data_helper.set_element_attributes(test_suite_element,
101                                                test_suite_attributes)
102
103        # append test suite element
104        test_suites_element.append(test_suite_element)
105
106        # generate report
107        if test_suites_element:
108            if Context.session().mode != ModeType.decc:
109                self.data_helper.generate_report(test_suites_element,
110                                                 self.suite_data_path)
111            SuiteReporter.append_report_result((
112                self.suite_data_path, self.data_helper.to_string(
113                    test_suites_element)))
114
115    def generate_data_report(self):
116        # construct test suites element
117        test_suites_element = self._construct_test_suites()
118
119        # generate report
120        if test_suites_element:
121            self.data_helper.generate_report(test_suites_element,
122                                             self.suite_data_path)
123            SuiteReporter.append_report_result((
124                self.suite_data_path, self.data_helper.to_string(
125                    test_suites_element)))
126
127    def _construct_test_suites(self):
128        # initial test suites element
129        test_suites_element, test_suites_attributes, need_update_attributes = \
130            self._initial_test_suites()
131
132        # construct test suite element
133        for suite_result, case_results in self.results:
134            test_suite_element, test_suite_attributes = \
135                self._construct_test_suite(suite_result, case_results)
136
137            # add test suite element
138            test_suites_element.append(test_suite_element)
139
140            # update and set test suites element attributes
141            for need_update_attribute in need_update_attributes:
142                test_suites_attributes[need_update_attribute] += \
143                    test_suite_attributes.get(need_update_attribute, 0)
144        test_suites_attributes[ReportConstant.time] = \
145            round(test_suites_attributes.get(ReportConstant.time), 3)
146
147        if test_suites_element:
148            test_suite_element = test_suites_element[-1]
149            test_suite_element.tail = self.data_helper.LINE_BREAK
150        else:
151            LOG.error("%s no suite result exists" % self.report_name)
152
153        # set test suites element attributes
154        self.data_helper.set_element_attributes(test_suites_element,
155                                                test_suites_attributes)
156        return test_suites_element
157
158    def _initial_test_suites(self):
159        test_suites_element = self.data_helper.initial_suites_element()
160        test_suites_attributes = {
161            ReportConstant.name: self.report_name,
162            ReportConstant.time_stamp: time.strftime(ReportConstant.time_format, time.localtime()),
163            ReportConstant.time: 0,
164            ReportConstant.errors: 0,
165            ReportConstant.disabled: 0,
166            ReportConstant.failures: 0,
167            ReportConstant.tests: 0,
168            ReportConstant.ignored: 0,
169            ReportConstant.unavailable: 0,
170            # module's failure message
171            ReportConstant.message: self.kwargs.get(ReportConstant.message, "")
172        }
173        module_name = self.kwargs.get(ReportConstant.module_name, "")
174        if module_name:
175            test_suites_attributes[ReportConstant.name] = module_name
176        need_update_attributes = [
177            ReportConstant.time,
178            ReportConstant.errors,
179            ReportConstant.tests,
180            ReportConstant.ignored,
181            ReportConstant.disabled,
182            ReportConstant.failures,
183            ReportConstant.unavailable
184        ]
185        return test_suites_element, test_suites_attributes, need_update_attributes
186
187    def _construct_test_suite(self, suite_result, case_results):
188        # initial test suite element
189        test_suite_element, test_suite_attributes = self._initial_test_suite(
190            suite_result)
191
192        # get test case elements that are children of test suite element
193        test_case_elements = []
194        for case_result in case_results:
195            # initial test case element
196            test_case_element, test_case_attributes = self._initial_test_case(
197                case_result)
198
199            # update attributes according to case result
200            self.update_attributes(case_result, test_case_attributes,
201                                   test_suite_attributes)
202
203            # set test case attributes and add to test_suite_element
204            self.data_helper.set_element_attributes(test_case_element,
205                                                    test_case_attributes)
206            test_case_elements.append(test_case_element)
207        test_suite_attributes[ReportConstant.disabled] += max(int(
208            test_suite_attributes.get(ReportConstant.tests) -
209            len(test_case_elements)), 0)
210        if test_case_elements:
211            child = test_case_elements[-1]
212            child.tail = self.data_helper.LINE_BREAK_INDENT
213        else:
214            LOG.debug("No case executed")
215        test_suite_element.extend(test_case_elements)
216
217        # set test suite attributes
218        self.data_helper.set_element_attributes(test_suite_element,
219                                                test_suite_attributes)
220        return test_suite_element, test_suite_attributes
221
222    @classmethod
223    def update_attributes(cls, case_result, test_case_attributes,
224                          test_suite_attributes):
225        if case_result.code == ResultCode.PASSED.value:
226            test_case_attributes[ReportConstant.status] = ReportConstant.run
227            test_case_attributes[ReportConstant.result] = ReportConstant.true
228            test_case_attributes[ReportConstant.message] = ""
229        elif case_result.code == ResultCode.FAILED.value:
230            test_case_attributes[ReportConstant.status] = ReportConstant.run
231            test_case_attributes[ReportConstant.result] = ReportConstant.false
232            test_suite_attributes[ReportConstant.failures] = \
233                test_suite_attributes[ReportConstant.failures] + 1
234        elif case_result.code == ResultCode.SKIPPED.value:
235            test_case_attributes[ReportConstant.status] = ReportConstant.skip
236            test_case_attributes[ReportConstant.result] = ReportConstant.false
237            test_suite_attributes[ReportConstant.ignored] = \
238                test_suite_attributes[ReportConstant.ignored] + 1
239        else:  # ResultCode.UNKNOWN.value or other value
240            test_case_attributes[ReportConstant.status] = \
241                ReportConstant.disable
242            test_case_attributes[ReportConstant.result] = ReportConstant.false
243            test_suite_attributes[ReportConstant.disabled] = \
244                test_suite_attributes[ReportConstant.disabled] + 1
245
246    def _initial_test_suite(self, suite_result):
247        test_suite_element = self.data_helper.initial_suite_element()
248        test_suite_attributes = {
249            ReportConstant.name: suite_result.suite_name,
250            ReportConstant.time: round(float(suite_result.run_time) / 1000, 3),
251            ReportConstant.errors: 0,
252            ReportConstant.disabled: 0,
253            ReportConstant.failures: 0,
254            ReportConstant.ignored: 0,
255            ReportConstant.tests: suite_result.test_num,
256            ReportConstant.message: suite_result.stacktrace,
257            ReportConstant.report: suite_result.report
258        }
259        if self.kwargs.get(ReportConstant.module_name, ""):
260            test_suite_attributes[ReportConstant.module_name] = self.kwargs.get(
261                ReportConstant.module_name, "")
262        return test_suite_element, test_suite_attributes
263
264    def _initial_test_case(self, case_result):
265        test_case_element = self.data_helper.initial_case_element()
266        case_stacktrace = str(case_result.stacktrace)
267        for char_index in range(32):
268            if char_index in [10, 13]:  # chr(10): LF, chr(13): CR
269                continue
270            case_stacktrace = case_stacktrace.replace(chr(char_index), "")
271        test_case_attributes = {
272            ReportConstant.name: case_result.test_name,
273            ReportConstant.status: "",
274            ReportConstant.time: round(float(case_result.run_time) / 1000, 3),
275            ReportConstant.class_name: case_result.test_class,
276            ReportConstant.result: "",
277            ReportConstant.level: 1,
278            ReportConstant.message: case_stacktrace,
279            ReportConstant.report: case_result.report
280        }
281        result_content = getattr(case_result, ReportConstant.result_content, "")
282        if result_content:
283            test_case_attributes.update({ReportConstant.result_content: result_content})
284        return test_case_element, test_case_attributes
285
286    @classmethod
287    def clear_report_result(cls):
288        with SUITE_REPORTER_LOCK:
289            LOG.debug("Clear report result")
290            cls.suite_report_result.clear()
291
292    @classmethod
293    def clear_failed_case_list(cls):
294        with SUITE_REPORTER_LOCK:
295            LOG.debug("Clear failed case list")
296            cls.failed_case_list.clear()
297
298    @classmethod
299    def append_report_result(cls, report_result):
300        with SUITE_REPORTER_LOCK:
301            if not isinstance(report_result, tuple) or len(report_result) != 2:
302                LOG.error("Report result should be a tuple with length 2")
303                return
304            data_path = report_result[0]
305            for index, exist_result in enumerate(cls.suite_report_result):
306                if exist_result[0] == data_path:
307                    LOG.debug("Data report %s generate again", data_path)
308                    cls.suite_report_result[index] = report_result
309                    return
310            cls.suite_report_result.append(report_result)
311            cls._upload_case_result(report_result[1])
312
313    @classmethod
314    def _upload_case_result(cls, result_str):
315        if Context.session().mode != ModeType.decc:
316            return
317        element = DataHelper.parse_data_report(result_str)
318        if len(element) == 0:
319            LOG.debug("%s is error", result_str)
320            return
321        element = element[0]
322        result, error_msg = get_case_result(element)
323        case_name = element.get(ReportConstant.name, "")
324        try:
325            from agent.decc import Handler
326            LOG.info("Upload case result to decc")
327            Handler.upload_case_result(case_name, result, error_msg)
328        except ModuleNotFoundError as error:
329            if Context.session().mode == ModeType.decc:
330                LOG.error("Module not found %s", error.args)
331
332    @classmethod
333    def get_report_result(cls):
334        with SUITE_REPORTER_LOCK:
335            LOG.debug("Get report result, length is {}".
336                      format(len(cls.suite_report_result)))
337            return SuiteReporter.suite_report_result
338
339    @classmethod
340    def set_suite_list(cls, suite_list):
341        LOG.debug("Set suite list, length is {}".format(len(suite_list)))
342        cls.suite_list = suite_list
343
344    @classmethod
345    def get_suite_list(cls):
346        with SUITE_REPORTER_LOCK:
347            LOG.debug("Get suite list, length is {}".
348                      format(len(cls.suite_list)))
349            return SuiteReporter.suite_list
350
351    @classmethod
352    def get_failed_case_list(cls):
353        with SUITE_REPORTER_LOCK:
354            LOG.debug("Get failed case list, length is {}".
355                      format(len(cls.failed_case_list)))
356            return SuiteReporter.failed_case_list
357
358    @classmethod
359    def append_history_result(cls, suite_reports):
360        from _core.utils import get_filename_extension
361        with SUITE_REPORTER_LOCK:
362            LOG.debug("Append history result,suite reports length is {}".
363                      format(len(suite_reports)))
364            for report_path, report_result in suite_reports:
365                module_name = get_filename_extension(report_path)[0]
366                cls.history_report_result. \
367                    append((module_name, report_path, report_result))
368
369    @classmethod
370    def clear_history_result(cls):
371        with SUITE_REPORTER_LOCK:
372            LOG.debug("Clear history result")
373            cls.history_report_result.clear()
374
375    @classmethod
376    def get_history_result_by_module(cls, name):
377        with SUITE_REPORTER_LOCK:
378            LOG.debug("Get history result by module,module_name:{}".
379                      format(name))
380            for module_name, report_path, report_result in \
381                    cls.history_report_result:
382                if name == module_name:
383                    return report_path, report_result
384            return "", ""
385
386    @classmethod
387    def get_history_result_list(cls):
388        with SUITE_REPORTER_LOCK:
389            LOG.debug("Get history result list,length is {}".
390                      format(len(cls.history_report_result)))
391            return cls.history_report_result
392