• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21from enum import Enum
22from threading import RLock
23
24from _core.constants import ModeType
25from _core.logger import platform_logger
26from _core.report.encrypt import check_pub_key_exist
27from _core.report.reporter_helper import DataHelper
28from _core.report.reporter_helper import ReportConstant
29
30LOG = platform_logger("SuiteReporter")
31SUITE_REPORTER_LOCK = RLock()
32
33
34class ResultCode(Enum):
35    UNKNOWN = -1010
36    BLOCKED = -1
37    PASSED = 0
38    FAILED = 1
39    SKIPPED = 2
40
41
42class SuiteReporter:
43    suite_list = []
44    suite_report_result = []
45    failed_case_list = []
46    history_report_result = []
47
48    def __init__(self, results, report_name, report_path=None, **kwargs):
49        """
50        create suite report
51        :param results: [(suite_result, [case_results]),
52                        (suite_result, [case_results]), ...]
53        :param report_name: suite report name
54        :param report_path: suite report path
55        """
56        self.results = results
57        self.data_helper = DataHelper()
58        self.report_name = report_name
59        self.report_path = report_path
60        self.suite_data_path = os.path.join(
61            self.report_path, "%s%s" % (
62                report_name, self.data_helper.DATA_REPORT_SUFFIX))
63        self.args = kwargs
64        from xdevice import Scheduler
65        if not check_pub_key_exist() and Scheduler.mode != ModeType.decc:
66            SuiteReporter.suite_report_result.clear()
67
68    def create_empty_report(self):
69        # create empty data report only for single suite
70        if len(self.results) != 1:
71            LOG.error("can only create one empty data report once")
72            return
73        suite_result, _ = self.results[0]
74
75        # initial test suites element
76        test_suites_element, test_suites_attributes, _ = \
77            self._initial_test_suites()
78        test_suites_attributes[ReportConstant.unavailable] = 1
79        self.data_helper.set_element_attributes(test_suites_element,
80                                                test_suites_attributes)
81
82        # initial test suite element
83        test_suite_element, test_suite_attributes = self._initial_test_suite(
84            suite_result)
85        test_suite_element.text, test_suite_element.tail = \
86            "", self.data_helper.LINE_BREAK
87        test_suite_attributes[ReportConstant.unavailable] = 1
88        test_suite_attributes[ReportConstant.message] = suite_result.stacktrace
89
90        from xdevice import Scheduler
91        if Scheduler.mode == ModeType.decc:
92            test_suite_attributes[ReportConstant.result] = ReportConstant.false
93        self.data_helper.set_element_attributes(test_suite_element,
94                                                test_suite_attributes)
95
96        # append test suite element
97        test_suites_element.append(test_suite_element)
98
99        # generate report
100        if test_suites_element:
101            from xdevice import Scheduler
102            if Scheduler.mode != ModeType.decc:
103                self.data_helper.generate_report(test_suites_element,
104                                                 self.suite_data_path)
105            SuiteReporter.append_report_result((
106                self.suite_data_path, self.data_helper.to_string(
107                    test_suites_element)))
108
109    def generate_data_report(self):
110        # construct test suites element
111        test_suites_element = self._construct_test_suites()
112
113        # generate report
114        if test_suites_element:
115            self.data_helper.generate_report(test_suites_element,
116                                             self.suite_data_path)
117            SuiteReporter.append_report_result((
118                self.suite_data_path, self.data_helper.to_string(
119                    test_suites_element)))
120
121    def _construct_test_suites(self):
122        # initial test suites element
123        test_suites_element, test_suites_attributes, need_update_attributes = \
124            self._initial_test_suites()
125
126        # construct test suite element
127        for suite_result, case_results in self.results:
128            test_suite_element, test_suite_attributes = \
129                self._construct_test_suite(suite_result, case_results)
130
131            # add test suite element
132            test_suites_element.append(test_suite_element)
133
134            # update and set test suites element attributes
135            for need_update_attribute in need_update_attributes:
136                test_suites_attributes[need_update_attribute] += \
137                    test_suite_attributes.get(need_update_attribute, 0)
138        test_suites_attributes[ReportConstant.time] = \
139            round(test_suites_attributes[ReportConstant.time], 3)
140
141        if test_suites_element:
142            test_suite_element = test_suites_element[-1]
143            test_suite_element.tail = self.data_helper.LINE_BREAK
144        else:
145            LOG.error("%s no suite result exists" % self.report_name)
146
147        # set test suites element attributes
148        self.data_helper.set_element_attributes(test_suites_element,
149                                                test_suites_attributes)
150        return test_suites_element
151
152    def _initial_test_suites(self):
153        test_suites_element = self.data_helper.initial_suites_element()
154        test_suites_attributes = {ReportConstant.name: self.report_name,
155                                  ReportConstant.time_stamp: time.strftime(
156                                      ReportConstant.time_format,
157                                      time.localtime()),
158                                  ReportConstant.time: 0,
159                                  ReportConstant.errors: 0,
160                                  ReportConstant.disabled: 0,
161                                  ReportConstant.failures: 0,
162                                  ReportConstant.tests: 0,
163                                  ReportConstant.ignored: 0,
164                                  ReportConstant.unavailable: 0,
165                                  ReportConstant.product_info: self.args.get(
166                                      ReportConstant.product_info_, "")}
167        if self.args.get(ReportConstant.module_name, ""):
168            test_suites_attributes[ReportConstant.name] = self.args.get(
169                ReportConstant.module_name, "")
170        need_update_attributes = [ReportConstant.time, ReportConstant.errors,
171                                  ReportConstant.tests, ReportConstant.ignored,
172                                  ReportConstant.disabled,
173                                  ReportConstant.failures,
174                                  ReportConstant.unavailable]
175        return test_suites_element, test_suites_attributes, \
176            need_update_attributes
177
178    def _construct_test_suite(self, suite_result, case_results):
179        # initial test suite element
180        test_suite_element, test_suite_attributes = self._initial_test_suite(
181            suite_result)
182
183        # get test case elements that are children of test suite element
184        test_case_elements = []
185        for case_result in case_results:
186            # initial test case element
187            test_case_element, test_case_attributes = self._initial_test_case(
188                case_result)
189
190            # update attributes according to case result
191            self.update_attributes(case_result, test_case_attributes,
192                                   test_suite_attributes)
193
194            # set test case attributes and add to test_suite_element
195            self.data_helper.set_element_attributes(test_case_element,
196                                                    test_case_attributes)
197            test_case_elements.append(test_case_element)
198        test_suite_attributes[ReportConstant.disabled] += max(int(
199            test_suite_attributes[ReportConstant.tests] -
200            len(test_case_elements)), 0)
201        if test_case_elements:
202            child = test_case_elements[-1]
203            child.tail = self.data_helper.LINE_BREAK_INDENT
204        else:
205            LOG.debug("no case executed")
206        test_suite_element.extend(test_case_elements)
207
208        # set test suite attributes
209        self.data_helper.set_element_attributes(test_suite_element,
210                                                test_suite_attributes)
211        return test_suite_element, test_suite_attributes
212
213    @classmethod
214    def update_attributes(cls, case_result, test_case_attributes,
215                          test_suite_attributes):
216        if case_result.code == ResultCode.PASSED.value:
217            test_case_attributes[ReportConstant.status] = ReportConstant.run
218            test_case_attributes[ReportConstant.result] = ReportConstant.true
219            test_case_attributes[ReportConstant.message] = ""
220        elif case_result.code == ResultCode.FAILED.value:
221            test_case_attributes[ReportConstant.status] = ReportConstant.run
222            test_case_attributes[ReportConstant.result] = ReportConstant.false
223            test_suite_attributes[ReportConstant.failures] = \
224                test_suite_attributes[ReportConstant.failures] + 1
225        elif case_result.code == ResultCode.SKIPPED.value:
226            test_case_attributes[ReportConstant.status] = ReportConstant.skip
227            test_case_attributes[ReportConstant.result] = ReportConstant.false
228            test_suite_attributes[ReportConstant.ignored] = \
229                test_suite_attributes[ReportConstant.ignored] + 1
230        else:  # ResultCode.UNKNOWN.value or other value
231            test_case_attributes[ReportConstant.status] = \
232                ReportConstant.disable
233            test_case_attributes[ReportConstant.result] = ReportConstant.false
234            test_suite_attributes[ReportConstant.disabled] = \
235                test_suite_attributes[ReportConstant.disabled] + 1
236
237    def _initial_test_suite(self, suite_result):
238        test_suite_element = self.data_helper.initial_suite_element()
239        test_suite_attributes = {ReportConstant.name: suite_result.suite_name,
240                                 ReportConstant.time: round(float(
241                                     suite_result.run_time) / 1000, 3),
242                                 ReportConstant.errors: 0,
243                                 ReportConstant.disabled: 0,
244                                 ReportConstant.failures: 0,
245                                 ReportConstant.ignored: 0,
246                                 ReportConstant.tests: suite_result.test_num,
247                                 ReportConstant.message:
248                                     suite_result.stacktrace
249                                 }
250        if self.args.get(ReportConstant.module_name, ""):
251            test_suite_attributes[ReportConstant.module_name] = self.args.get(
252                ReportConstant.module_name, "")
253        return test_suite_element, test_suite_attributes
254
255    def _initial_test_case(self, case_result):
256        test_case_element = self.data_helper.initial_case_element()
257        case_stacktrace = str(case_result.stacktrace)
258        for char_index in range(32):
259            if char_index in [10, 13]:  # chr(10): LF, chr(13): CR
260                continue
261            case_stacktrace = case_stacktrace.replace(chr(char_index), "")
262        test_case_attributes = {ReportConstant.name: case_result.test_name,
263                                ReportConstant.status: "",
264                                ReportConstant.time: float(
265                                    case_result.run_time) / 1000,
266                                ReportConstant.class_name:
267                                    case_result.test_class,
268                                ReportConstant.result: "",
269                                ReportConstant.level: 1,
270                                ReportConstant.message: case_stacktrace}
271        return test_case_element, test_case_attributes
272
273    @classmethod
274    def clear_report_result(cls):
275        with SUITE_REPORTER_LOCK:
276            LOG.debug("clear_report_result")
277            cls.suite_report_result.clear()
278
279    @classmethod
280    def clear_failed_case_list(cls):
281        with SUITE_REPORTER_LOCK:
282            LOG.debug("clear_failed_case_list")
283            cls.failed_case_list.clear()
284
285    @classmethod
286    def append_report_result(cls, report_result):
287        with SUITE_REPORTER_LOCK:
288            if not isinstance(report_result, tuple) or len(report_result) != 2:
289                LOG.error("report result should be a tuple with length 2")
290                return
291            data_path = report_result[0]
292            for index, exist_result in enumerate(cls.suite_report_result):
293                if exist_result[0] == data_path:
294                    LOG.debug("data report %s generate again", data_path)
295                    cls.suite_report_result[index] = report_result
296                    return
297            cls.suite_report_result.append(report_result)
298            cls._upload_case_result(report_result[1])
299
300    @classmethod
301    def _upload_case_result(cls, result_str):
302        from xdevice import Scheduler
303        if Scheduler.mode != ModeType.decc:
304            return
305        element = DataHelper.parse_data_report(result_str)
306        if len(element) == 0:
307            LOG.debug("%s is error", result_str)
308            return
309        element = element[0]
310        result, error_msg = Scheduler.get_script_result(element)
311        case_name = element.get(ReportConstant.name, "")
312        try:
313            from agent.decc import Handler
314            LOG.info("upload case result to decc")
315            Handler.upload_case_result(case_name, result, error_msg)
316        except ModuleNotFoundError as error:
317            from xdevice import Scheduler
318            if Scheduler.mode == ModeType.decc:
319                LOG.error("module not found %s", error.args)
320
321    @classmethod
322    def get_report_result(cls):
323        with SUITE_REPORTER_LOCK:
324            LOG.debug("get_report_result, length is {}".
325                      format(len(cls.suite_report_result)))
326            return SuiteReporter.suite_report_result
327
328    @classmethod
329    def set_suite_list(cls, suite_list):
330        LOG.debug("set_suite_list, length is {}".format(len(suite_list)))
331        cls.suite_list = suite_list
332
333    @classmethod
334    def get_suite_list(cls):
335        with SUITE_REPORTER_LOCK:
336            LOG.debug("get_suite_list, length is {}".
337                      format(len(cls.suite_list)))
338            return SuiteReporter.suite_list
339
340    @classmethod
341    def get_failed_case_list(cls):
342        with SUITE_REPORTER_LOCK:
343            LOG.debug("get_failed_case_list, length is {}".
344                      format(len(cls.failed_case_list)))
345            return SuiteReporter.failed_case_list
346
347    @classmethod
348    def append_history_result(cls, suite_reports):
349        from _core.utils import get_filename_extension
350        with SUITE_REPORTER_LOCK:
351            LOG.debug("append_history_result,suite_reports length is {}".
352                      format(len(suite_reports)))
353            for report_path, report_result in suite_reports:
354                module_name = get_filename_extension(report_path)[0]
355                cls.history_report_result. \
356                    append((module_name, report_path, report_result))
357
358    @classmethod
359    def clear_history_result(cls):
360        with SUITE_REPORTER_LOCK:
361            LOG.debug("clear_history_result")
362            cls.history_report_result.clear()
363
364    @classmethod
365    def get_history_result_by_module(cls, name):
366        with SUITE_REPORTER_LOCK:
367            LOG.debug("get_history_result_by_module,module_name:{}".
368                      format(name))
369            for module_name, report_path, report_result in \
370                    cls.history_report_result:
371                if name == module_name:
372                    return report_path, report_result
373            return "", ""
374
375    @classmethod
376    def get_history_result_list(cls):
377        with SUITE_REPORTER_LOCK:
378            LOG.debug("get_history_result_list,length is {}".
379                      format(len(cls.history_report_result)))
380            return cls.history_report_result
381