• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import uuid
19
20from xdevice import Plugin
21from xdevice import IListener
22from xdevice import LifeCycle
23from xdevice import platform_logger
24from xdevice import ListenerType
25from xdevice import TestDescription
26from xdevice import ResultCode
27from xdevice import UniversalReportListener
28
29from ohos.executor.bean import StackCaseResult
30
31
32__all__ = ["CollectingLiteGTestListener", "CollectingPassListener"]
33
34LOG = platform_logger("Listener")
35
36
37@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
38class CollectingLiteGTestListener(IListener):
39    """
40    Listener test status information to the console
41    """
42
43    def __init__(self):
44        self.tests = []
45
46    def __started__(self, lifecycle, test_result):
47        if lifecycle == LifeCycle.TestCase:
48            if not test_result.test_class or not test_result.test_name:
49                return
50            test = TestDescription(test_result.test_class,
51                                   test_result.test_name)
52            if test not in self.tests:
53                self.tests.append(test)
54
55    def __ended__(self, lifecycle, test_result=None, **kwargs):
56        pass
57
58    def __skipped__(self, lifecycle, test_result):
59        pass
60
61    def __failed__(self, lifecycle, test_result):
62        if lifecycle == LifeCycle.TestCase:
63            if not test_result.test_class or not test_result.test_name:
64                return
65            test = TestDescription(test_result.test_class,
66                                   test_result.test_name)
67            if test not in self.tests:
68                self.tests.append(test)
69
70    def get_current_run_results(self):
71        return self.tests
72
73
74@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_pass)
75class CollectingPassListener(IListener):
76    """
77    listener test status information to the console
78    """
79
80    def __init__(self):
81        self.tests = []
82
83    def __started__(self, lifecycle, test_result):
84        pass
85
86    def __ended__(self, lifecycle, test_result=None, **kwargs):
87        if lifecycle == LifeCycle.TestCase:
88            if not test_result.test_class or not test_result.test_name:
89                return
90            if test_result.code != ResultCode.PASSED.value:
91                return
92            test = TestDescription(test_result.test_class,
93                                   test_result.test_name)
94            if test not in self.tests:
95                self.tests.append(test)
96            else:
97                LOG.warning("Duplicate testcase: %s#%s" % (
98                    test_result.test_class, test_result.test_name))
99
100    def __skipped__(self, lifecycle, test_result):
101        pass
102
103    def __failed__(self, lifecycle, test_result):
104        pass
105
106    def get_current_run_results(self):
107        return self.tests
108
109
110@Plugin(type=Plugin.LISTENER, id=ListenerType.stack_report)
111class StackReportListener(UniversalReportListener):
112    """
113    Listener suite event
114    """
115
116    def __init__(self):
117        super().__init__()
118        self._suites_index_stack = list()
119        self._suite_name_mapping = dict()
120        self.cycle = 0
121
122    def _get_test_result(self, test_result, create=False):
123        if test_result.index in self.tests:
124            return self.tests.get(test_result.index)
125        elif create:
126            test = StackCaseResult()
127            rid = uuid.uuid4().hex if test_result.index == "" else \
128                test_result.index
129            test.index = rid
130            return self.tests.setdefault(rid, test)
131        else:
132            return self.tests.get(self.current_test_id)
133
134    def _handle_case_start(self, test_result):
135        test = self._get_test_result(test_result=test_result, create=True)
136        test.test_name = test_result.test_name
137        test.test_class = test_result.test_class
138        if len(self._suites_index_stack) > 0:
139            test.parent_index = self._suites_index_stack[-1]
140        self.current_test_id = test.index
141
142    def _handle_testsuite_start(self, test_result):
143        suite = self._get_suite_result(test_result=test_result,
144                                       create=True)
145        suite.suite_name = test_result.suite_name
146        suite.test_num = test_result.test_num
147        self.current_suite_id = suite.index
148        self._suites_index_stack.append(suite.index)
149
150    def _handle_testsuite_end(self, test_result, kwargs):
151        suite = self._get_suite_result(test_result=test_result,
152                                       create=False)
153        if not suite:
154            return
155        suite.run_time = test_result.run_time
156        suite.code = test_result.code
157        suite.report = test_result.report
158        suite.test_num = max(test_result.test_num, len(self.tests))
159        self._handle_suite_end_data(suite, kwargs)
160        if len(self._suites_index_stack) > 0:
161            self._suites_index_stack.pop(-1)
162
163    def _handle_suite_end_data(self, suite, kwargs):
164        if not kwargs.get("suite_report", False):
165            results_of_same_suite = list()
166            test_values = list(self.tests.values())
167            have_marked_list = list()
168            for index in range(len(test_values)-1, -1, -1):
169                cur_test = test_values[index]
170                if cur_test.parent_index == suite.index:
171                    results_of_same_suite.insert(0, cur_test)
172                    have_marked_list.append(cur_test.index)
173            for have_marked in have_marked_list:
174                self.tests.pop(have_marked)
175            if suite.suite_name not in self._suite_name_mapping.keys():
176                self._suite_name_mapping.update({suite.suite_name: suite.index})
177            if self.cycle > 0:
178                suite_index = self._suite_name_mapping.get(suite.suite_name, "")
179                if suite_index not in self.suite_distributions.keys():
180                    for suite_item, result_list in self.result:
181                        if suite_item.index != suite_index:
182                            continue
183                        self.suites.pop(suite.index)
184                        if result_list and result_list[-1].is_completed is not True:
185                            result_list.pop(-1)
186                        result_list.extend(results_of_same_suite)
187                        break
188                else:
189                    self.suite_distributions.update({suite.index: len(self.result)})
190                    self.result.append((self.suites.get(suite.index), results_of_same_suite))
191            else:
192                self.suite_distributions.update({suite.index:  len(self.result)})
193                self.result.append((self.suites.get(suite.index), results_of_same_suite))
194