• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20
21from _core.plugin import Plugin
22from _core.constants import ListenerType
23from _core.executor.abs import PlusReportListener
24from _core.interface import LifeCycle
25from _core.interface import IListener
26from _core.logger import platform_logger
27from _core.report.suite_reporter import ResultCode
28from _core.report.encrypt import check_pub_key_exist
29
30__all__ = ["LogListener", "ReportListener", "UploadListener",
31           "CollectingTestListener", "CollectingLiteGTestListener",
32           "TestDescription"]
33
34LOG = platform_logger("Listener")
35
36
37class TestDescription(object):
38    def __init__(self, class_name, test_name):
39        self.class_name = class_name
40        self.test_name = test_name
41
42    def __eq__(self, other):
43        return self.class_name == other.class_name and \
44               self.test_name == other.test_name
45
46    @classmethod
47    def remove_test(cls, tests, execute_tests):
48        for execute_test in execute_tests:
49            if execute_test in tests:
50                tests.remove(execute_test)
51        return tests
52
53
54@Plugin(type=Plugin.LISTENER, id=ListenerType.log)
55class LogListener(IListener):
56    """
57    Listener test status information to the console and log
58    """
59    test_num = 0
60    device_sn = ""
61
62    def __started__(self, lifecycle, test_result):
63        if check_pub_key_exist():
64            return
65        if lifecycle == LifeCycle.TestSuite:
66            LOG.debug("Start test suite [{}] with {} tests"
67                      .format(test_result.suite_name, test_result.test_num))
68            self.test_num = test_result.test_num
69        elif lifecycle == LifeCycle.TestCase:
70            LOG.debug("TestStarted({}#{})"
71                      .format(test_result.test_class, test_result.test_name))
72
73    def __ended__(self, lifecycle, test_result, **kwargs):
74        if check_pub_key_exist():
75            return
76
77        from _core.utils import convert_serial
78        del kwargs
79        if lifecycle == LifeCycle.TestSuite:
80            LOG.debug("End test suite cost {}ms."
81                      .format(test_result.run_time))
82            LOG.info("End test suite [{}]."
83                     .format(test_result.suite_name))
84        elif lifecycle == LifeCycle.TestCase:
85            LOG.debug("TestEnded({}#{})"
86                      .format(test_result.test_class, test_result.test_name))
87            ret = ResultCode(test_result.code).name
88            if self.test_num:
89                LOG.info("[{}/{} {}] {}#{} {}"
90                         .format(test_result.current, self.test_num, convert_serial(self.device_sn),
91                                 test_result.test_class, test_result.test_name, ret))
92            else:
93                LOG.info("[{}/- {}] {}#{} {}"
94                         .format(test_result.current, convert_serial(self.device_sn),
95                                 test_result.test_class, test_result.test_name, ret))
96
97    @staticmethod
98    def __skipped__(lifecycle, test_result, **kwargs):
99        if check_pub_key_exist():
100            return
101
102        del kwargs
103        if lifecycle == LifeCycle.TestSuite:
104            LOG.debug("Test suite [{}] skipped".format(test_result.suite_name))
105        elif lifecycle == LifeCycle.TestCase:
106            ret = ResultCode(test_result.code).name
107            LOG.debug("[{}] {}#{}".format(ret, test_result.test_class,
108                                          test_result.test_name))
109
110    @staticmethod
111    def __failed__(lifecycle, test_result, **kwargs):
112        pass
113
114
115@Plugin(type=Plugin.LISTENER, id=ListenerType.report)
116class ReportListener(PlusReportListener):
117    """
118    Listener test status information to the console
119    """
120
121    def handle_half_break(self, suites_name, error_message=''):
122        """测试套运行异常,将已运行的部分用例结果记录到结果文件"""
123        test_result = self.suites.get(self.current_suite_id)
124        if test_result is None:
125            return
126        self.__ended__(lifecycle=LifeCycle.TestSuite, test_result=test_result)
127        LOG.warning(f"Testsuite({test_result.suite_name}) is running abnormally")
128        self.__ended__(lifecycle=LifeCycle.TestSuites, test_result=test_result,
129                       suites_name=suites_name, message=error_message)
130
131    def _handle_suite_end_data(self, suite, kwargs):
132        is_clear = kwargs.get("is_clear", False)
133        # generate suite report
134        if not kwargs.get("suite_report", False):
135            if len(self.result) > 0 and self.result[-1][0].suite_name == \
136                    self.suites.get(suite.index).suite_name:
137                self.result[-1][1].extend(list(self.tests.values()))
138                self.result[-1][0].test_num = max(suite.test_num,
139                                                  len(self.result[-1][1]))
140            else:
141                self.result.append((self.suites.get(suite.index),
142                                    list(self.tests.values())))
143        else:
144            result_dir = os.path.join(self.report_path, "result")
145            os.makedirs(result_dir, exist_ok=True)
146            self.result.append((self.suites.get(suite.index),
147                                list(self.tests.values())))
148            results = [(suite, list(self.tests.values()))]
149            self._generate_data_report(result_dir, results, suite.suite_name)
150        if is_clear:
151            self.tests.clear()
152
153
154@Plugin(type=Plugin.LISTENER, id=ListenerType.upload)
155class UploadListener(IListener):
156    def __started__(self, lifecycle, test_result):
157        pass
158
159    @staticmethod
160    def __ended__(lifecycle, test_result, **kwargs):
161        del test_result, kwargs
162        if lifecycle == LifeCycle.TestCase:
163            pass
164
165    @staticmethod
166    def __skipped__(lifecycle, test_result, **kwargs):
167        pass
168
169    @staticmethod
170    def __failed__(lifecycle, test_result, **kwargs):
171        pass
172
173
174@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
175class CollectingTestListener(IListener):
176    """
177    Listener test status information to the console
178    """
179
180    def __init__(self):
181        self.tests = []
182
183    def __started__(self, lifecycle, test_result):
184        if lifecycle == LifeCycle.TestCase:
185            if not test_result.test_class or not test_result.test_name:
186                return
187            test = TestDescription(test_result.test_class,
188                                   test_result.test_name)
189            if test not in self.tests:
190                self.tests.append(test)
191
192    def __ended__(self, lifecycle, test_result=None, **kwargs):
193        pass
194
195    def __skipped__(self, lifecycle, test_result):
196        pass
197
198    def __failed__(self, lifecycle, test_result):
199        pass
200
201    def get_current_run_results(self):
202        return self.tests
203
204
205@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
206class CollectingLiteGTestListener(IListener):
207    """
208    Listener test status information to the console
209    """
210
211    def __init__(self):
212        self.tests = []
213
214    def __started__(self, lifecycle, test_result):
215        if lifecycle == LifeCycle.TestCase:
216            if not test_result.test_class or not test_result.test_name:
217                return
218            test = TestDescription(test_result.test_class,
219                                   test_result.test_name)
220            if test not in self.tests:
221                self.tests.append(test)
222
223    def __ended__(self, lifecycle, test_result=None, **kwargs):
224        pass
225
226    def __skipped__(self, lifecycle, test_result):
227        pass
228
229    def __failed__(self, lifecycle, test_result):
230        if lifecycle == LifeCycle.TestCase:
231            if not test_result.test_class or not test_result.test_name:
232                return
233            test = TestDescription(test_result.test_class,
234                                   test_result.test_name)
235            if test not in self.tests:
236                self.tests.append(test)
237
238    def get_current_run_results(self):
239        return self.tests
240