• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import uuid
21from dataclasses import dataclass
22
23from _core.plugin import Plugin
24from _core.plugin import get_plugin
25from _core.constants import ListenerType
26from _core.constants import TestType
27from _core.interface import LifeCycle
28from _core.interface import IListener
29from _core.logger import platform_logger
30from _core.logger import LogQueue
31from _core.report.suite_reporter import SuiteReporter
32from _core.report.suite_reporter import ResultCode
33from _core.report.encrypt import check_pub_key_exist
34
35__all__ = ["LogListener", "ReportListener", "UploadListener",
36           "CollectingTestListener", "CollectingLiteGTestListener",
37           "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder",
38           "TestDescription"]
39
40LOG = platform_logger("Listener")
41
42
43@dataclass
44class CaseResult:
45    index = ""
46    code = ResultCode.FAILED.value
47    test_name = None
48    test_class = None
49    stacktrace = ""
50    run_time = 0
51    is_completed = False
52    num_tests = 0
53    current = 0
54
55    def is_running(self):
56        return self.test_name is not None and not self.is_completed
57
58
59@dataclass
60class SuiteResult:
61    index = ""
62    code = ResultCode.UNKNOWN.value
63    suite_name = None
64    test_num = 0
65    stacktrace = ""
66    run_time = 0
67    is_completed = False
68    is_started = False
69    suite_num = 0
70
71
72@dataclass
73class SuitesResult:
74    index = ""
75    code = ResultCode.UNKNOWN.value
76    suites_name = None
77    test_num = 0
78    stacktrace = ""
79    run_time = 0
80    is_completed = False
81    product_info = {}
82
83
84@dataclass
85class StateRecorder:
86    current_suite = None
87    current_suites = None
88    current_test = None
89    trace_logs = []
90    running_test_index = 0
91
92    def is_started(self):
93        return self.current_suite is not None
94
95    def suites_is_started(self):
96        return self.current_suites is not None
97
98    def suite_is_running(self):
99        suite = self.current_suite
100        return suite is not None and suite.suite_name is not None and \
101            not suite.is_completed
102
103    def suites_is_running(self):
104        suites = self.current_suites
105        return suites is not None and suites.suites_name is not None and \
106            not suites.is_completed
107
108    def test_is_running(self):
109        test = self.current_test
110        return test is not None and test.is_running()
111
112    def suite(self, reset=False):
113        if reset or not self.current_suite:
114            self.current_suite = SuiteResult()
115            self.current_suite.index = uuid.uuid4().hex
116        return self.current_suite
117
118    def get_suites(self, reset=False):
119        if reset or not self.current_suites:
120            self.current_suites = SuitesResult()
121            self.current_suites.index = uuid.uuid4().hex
122        return self.current_suites
123
124    def test(self, reset=False, test_index=None):
125        if reset or not self.current_test:
126            self.current_test = CaseResult()
127            if test_index:
128                self.current_test.index = test_index
129            else:
130                self.current_test.index = uuid.uuid4().hex
131        return self.current_test
132
133
134class TestDescription(object):
135    def __init__(self, class_name, test_name):
136        self.class_name = class_name
137        self.test_name = test_name
138
139    def __eq__(self, other):
140        return self.class_name == other.class_name and \
141               self.test_name == other.test_name
142
143    @classmethod
144    def remove_test(cls, tests, execute_tests):
145        for execute_test in execute_tests:
146            if execute_test in tests:
147                tests.remove(execute_test)
148        return tests
149
150
151@Plugin(type=Plugin.LISTENER, id=ListenerType.log)
152class LogListener(IListener):
153    """
154    Listener test status information to the console and log
155    """
156    test_num = 0
157    device_sn = ""
158    log_queue = LogQueue(log=LOG)
159
160    def __started__(self, lifecycle, test_result):
161        if check_pub_key_exist():
162            return
163        if lifecycle == LifeCycle.TestSuite:
164            self.log_queue.debug(log_data="Start test suite [{}] with {} tests"
165                                 .format(test_result.suite_name, test_result.test_num))
166            self.test_num = test_result.test_num
167        elif lifecycle == LifeCycle.TestCase:
168            self.log_queue.debug(log_data="TestStarted({}#{})"
169                                 .format(test_result.test_class, test_result.test_name))
170
171    def __ended__(self, lifecycle, test_result, **kwargs):
172        if check_pub_key_exist():
173            return
174
175        from _core.utils import convert_serial
176        del kwargs
177        if lifecycle == LifeCycle.TestSuite:
178            self.log_queue.debug(log_data="End test suite [{}] and cost {}ms."
179                                 .format(test_result.suite_name, test_result.run_time))
180            self.log_queue.clear()
181        elif lifecycle == LifeCycle.TestCase:
182            self.log_queue.debug(log_data="TestEnded({}#{})"
183                                 .format(test_result.test_class, test_result.test_name))
184            ret = ResultCode(test_result.code).name
185            if self.test_num:
186                self.log_queue.debug(log_data="[{}/{} {}] {}#{} {}".
187                                     format(test_result.current, self.test_num, convert_serial(self.device_sn),
188                                            test_result.test_class, test_result.test_name, ret))
189            else:
190                self.log_queue.debug(log_data="[{}/- {}] {}#{} {}".
191                                     format(test_result.current, convert_serial(self.device_sn),
192                                            test_result.test_class, test_result.test_name, ret))
193
194    @staticmethod
195    def __skipped__(lifecycle, test_result, **kwargs):
196        if check_pub_key_exist():
197            return
198
199        del kwargs
200        if lifecycle == LifeCycle.TestSuite:
201            LOG.debug("Test suite [{}] skipped".format(test_result.suite_name))
202        elif lifecycle == LifeCycle.TestCase:
203            ret = ResultCode(test_result.code).name
204            LOG.debug("[{}] {}#{}".format(ret, test_result.test_class,
205                                          test_result.test_name))
206
207    @staticmethod
208    def __failed__(lifecycle, test_result, **kwargs):
209        pass
210
211
212@Plugin(type=Plugin.LISTENER, id=ListenerType.report)
213class ReportListener(IListener):
214    """
215    Listener test status information to the console
216    """
217
218    def __init__(self):
219        self.result = list()
220        self.suites = dict()
221        self.tests = dict()
222        self.current_suite_id = 0
223        self.current_test_id = 0
224        self.report_path = ""
225
226    def _get_suite_result(self, test_result, create=False):
227        if test_result.index in self.suites:
228            return self.suites.get(test_result.index)
229        elif create:
230            suite = SuiteResult()
231            rid = uuid.uuid4().hex if test_result.index == "" else \
232                test_result.index
233            suite.index = rid
234            return self.suites.setdefault(rid, suite)
235        else:
236            return self.suites.get(self.current_suite_id)
237
238    def _get_test_result(self, test_result, create=False):
239        if test_result.index in self.tests:
240            return self.tests.get(test_result.index)
241        elif create:
242            test = CaseResult()
243            rid = uuid.uuid4().hex if test_result.index == "" else \
244                test_result.index
245            test.index = rid
246            return self.tests.setdefault(rid, test)
247        else:
248            return self.tests.get(self.current_test_id)
249
250    def _remove_current_test_result(self):
251        if self.current_test_id in self.tests:
252            del self.tests[self.current_test_id]
253
254    def __started__(self, lifecycle, test_result):
255        if lifecycle == LifeCycle.TestSuites:
256            suites = self._get_suite_result(test_result=test_result,
257                                            create=True)
258            suites.suites_name = test_result.suites_name
259            suites.test_num = test_result.test_num
260            self.current_suite_id = suites.index
261        elif lifecycle == LifeCycle.TestSuite:
262            suite = self._get_suite_result(test_result=test_result,
263                                           create=True)
264            suite.suite_name = test_result.suite_name
265            suite.test_num = test_result.test_num
266            self.current_suite_id = suite.index
267        elif lifecycle == LifeCycle.TestCase:
268            test = self._get_test_result(test_result=test_result, create=True)
269            test.test_name = test_result.test_name
270            test.test_class = test_result.test_class
271            self.current_test_id = test.index
272
273    def __ended__(self, lifecycle, test_result=None, **kwargs):
274        if lifecycle == LifeCycle.TestSuite:
275            suite = self._get_suite_result(test_result=test_result,
276                                           create=False)
277            if not suite:
278                return
279            suite.run_time = test_result.run_time
280            suite.code = test_result.code
281            is_clear = kwargs.get("is_clear", False)
282            suite.test_num = max(test_result.test_num, len(self.tests))
283            # generate suite report
284            if not kwargs.get("suite_report", False):
285                if len(self.result) > 0 and self.result[-1][0].suite_name == \
286                        self.suites[suite.index].suite_name:
287                    self.result[-1][1].extend(list(self.tests.values()))
288                    self.result[-1][0].test_num = max(suite.test_num,
289                                                      len(self.result[-1][1]))
290                else:
291                    self.result.append((self.suites[suite.index],
292                                        list(self.tests.values())))
293            else:
294                result_dir = os.path.join(self.report_path, "result")
295                os.makedirs(result_dir, exist_ok=True)
296                self.result.append((self.suites[suite.index],
297                                    list(self.tests.values())))
298                results = [(suite, list(self.tests.values()))]
299                suite_report = SuiteReporter(results, suite.suite_name,
300                                             result_dir)
301                suite_report.generate_data_report()
302            if is_clear:
303                self.tests.clear()
304        elif lifecycle == LifeCycle.TestSuites:
305            if not kwargs.get("suite_report", False):
306                result_dir = os.path.join(self.report_path, "result")
307                os.makedirs(result_dir, exist_ok=True)
308                suites_name = kwargs.get("suites_name", "")
309                product_info = kwargs.get("product_info", "")
310                suite_report = SuiteReporter(self.result, suites_name,
311                                             result_dir,
312                                             product_info=product_info)
313                suite_report.generate_data_report()
314        elif lifecycle == LifeCycle.TestCase:
315            test = self._get_test_result(test_result=test_result, create=False)
316            test.run_time = test_result.run_time
317            test.stacktrace = test_result.stacktrace
318            test.code = test_result.code
319        elif lifecycle == LifeCycle.TestTask:
320            test_type = str(kwargs.get("test_type", TestType.all))
321            reporter = get_plugin(plugin_type=Plugin.REPORTER,
322                                  plugin_id=test_type)
323            if not reporter:
324                reporter = get_plugin(plugin_type=Plugin.REPORTER,
325                                      plugin_id=TestType.all)[0]
326            else:
327                reporter = reporter[0]
328            reporter.__generate_reports__(self.report_path,
329                                          task_info=test_result)
330
331    def __skipped__(self, lifecycle, test_result):
332        del test_result
333        if lifecycle == LifeCycle.TestCase:
334            self._remove_current_test_result()
335
336    def __failed__(self, lifecycle, test_result):
337        if lifecycle == LifeCycle.TestSuite:
338            suite = self._get_suite_result(test_result=test_result,
339                                           create=False)
340            suite.stacktrace = test_result.stacktrace
341            suite.code = ResultCode.FAILED.value
342        elif lifecycle == LifeCycle.TestCase:
343            test = self._get_test_result(test_result=test_result, create=False)
344            test.stacktrace = test_result.stacktrace
345            test.code = ResultCode.FAILED.value
346
347
348@Plugin(type=Plugin.LISTENER, id=ListenerType.upload)
349class UploadListener(IListener):
350    def __started__(self, lifecycle, test_result):
351        pass
352
353    @staticmethod
354    def __ended__(lifecycle, test_result, **kwargs):
355        del test_result, kwargs
356        if lifecycle == LifeCycle.TestCase:
357            pass
358
359    @staticmethod
360    def __skipped__(lifecycle, test_result, **kwargs):
361        pass
362
363    @staticmethod
364    def __failed__(lifecycle, test_result, **kwargs):
365        pass
366
367
368@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
369class CollectingTestListener(IListener):
370    """
371    Listener test status information to the console
372    """
373
374    def __init__(self):
375        self.tests = []
376
377    def __started__(self, lifecycle, test_result):
378        if lifecycle == LifeCycle.TestCase:
379            if not test_result.test_class or not test_result.test_name:
380                return
381            test = TestDescription(test_result.test_class,
382                                   test_result.test_name)
383            if test not in self.tests:
384                self.tests.append(test)
385
386    def __ended__(self, lifecycle, test_result=None, **kwargs):
387        pass
388
389    def __skipped__(self, lifecycle, test_result):
390        pass
391
392    def __failed__(self, lifecycle, test_result):
393        pass
394
395    def get_current_run_results(self):
396        return self.tests
397
398
399@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
400class CollectingLiteGTestListener(IListener):
401    """
402    Listener test status information to the console
403    """
404
405    def __init__(self):
406        self.tests = []
407
408    def __started__(self, lifecycle, test_result):
409        if lifecycle == LifeCycle.TestCase:
410            if not test_result.test_class or not test_result.test_name:
411                return
412            test = TestDescription(test_result.test_class,
413                                   test_result.test_name)
414            if test not in self.tests:
415                self.tests.append(test)
416
417    def __ended__(self, lifecycle, test_result=None, **kwargs):
418        pass
419
420    def __skipped__(self, lifecycle, test_result):
421        pass
422
423    def __failed__(self, lifecycle, test_result):
424        if lifecycle == LifeCycle.TestCase:
425            if not test_result.test_class or not test_result.test_name:
426                return
427            test = TestDescription(test_result.test_class,
428                                   test_result.test_name)
429            if test not in self.tests:
430                self.tests.append(test)
431
432    def get_current_run_results(self):
433        return self.tests
434