• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import uuid
21from dataclasses import dataclass
22
23from _core.plugin import Plugin
24from _core.plugin import get_plugin
25from _core.constants import ListenerType
26from _core.constants import TestType
27from _core.interface import LifeCycle
28from _core.interface import IListener
29from _core.logger import platform_logger
30from _core.logger import LogQueue
31from _core.report.suite_reporter import SuiteReporter
32from _core.report.suite_reporter import ResultCode
33from _core.report.encrypt import check_pub_key_exist
34
35__all__ = ["LogListener", "ReportListener", "UploadListener",
36           "CollectingTestListener", "CollectingLiteGTestListener",
37           "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder",
38           "TestDescription"]
39
40LOG = platform_logger("Listener")
41
42
43@dataclass
44class CaseResult:
45    index = ""
46    code = ResultCode.FAILED.value
47    test_name = None
48    test_class = None
49    stacktrace = ""
50    run_time = 0
51    is_completed = False
52    num_tests = 0
53    current = 0
54
55    def is_running(self):
56        return self.test_name is not None and not self.is_completed
57
58
59@dataclass
60class SuiteResult:
61    index = ""
62    code = ResultCode.UNKNOWN.value
63    suite_name = None
64    test_num = 0
65    stacktrace = ""
66    run_time = 0
67    is_completed = False
68    is_started = False
69    suite_num = 0
70
71
72@dataclass
73class SuitesResult:
74    index = ""
75    code = ResultCode.UNKNOWN.value
76    suites_name = None
77    test_num = 0
78    stacktrace = ""
79    run_time = 0
80    is_completed = False
81    product_info = {}
82
83
84@dataclass
85class StateRecorder:
86    current_suite = None
87    current_suites = None
88    current_test = None
89    trace_logs = []
90    running_test_index = 0
91
92    def is_started(self):
93        return self.current_suite is not None
94
95    def suites_is_started(self):
96        return self.current_suites is not None
97
98    def suite_is_running(self):
99        suite = self.current_suite
100        return suite is not None and suite.suite_name is not None and \
101            not suite.is_completed
102
103    def suites_is_running(self):
104        suites = self.current_suites
105        return suites is not None and suites.suites_name is not None and \
106            not suites.is_completed
107
108    def test_is_running(self):
109        test = self.current_test
110        return test is not None and test.is_running()
111
112    def suite(self, reset=False):
113        if reset or not self.current_suite:
114            self.current_suite = SuiteResult()
115            self.current_suite.index = uuid.uuid4().hex
116        return self.current_suite
117
118    def get_suites(self, reset=False):
119        if reset or not self.current_suites:
120            self.current_suites = SuitesResult()
121            self.current_suites.index = uuid.uuid4().hex
122        return self.current_suites
123
124    def test(self, reset=False, test_index=None):
125        if reset or not self.current_test:
126            self.current_test = CaseResult()
127            if test_index:
128                self.current_test.index = test_index
129            else:
130                self.current_test.index = uuid.uuid4().hex
131        return self.current_test
132
133
134class TestDescription(object):
135    def __init__(self, class_name, test_name):
136        self.class_name = class_name
137        self.test_name = test_name
138
139    def __eq__(self, other):
140        return self.class_name == other.class_name and \
141               self.test_name == other.test_name
142
143    @classmethod
144    def remove_test(cls, tests, execute_tests):
145        for execute_test in execute_tests:
146            if execute_test in tests:
147                tests.remove(execute_test)
148        return tests
149
150
151@Plugin(type=Plugin.LISTENER, id=ListenerType.log)
152class LogListener(IListener):
153    """
154    Listener test status information to the console and log
155    """
156    test_num = 0
157    device_sn = ""
158    log_queue = LogQueue(log=LOG)
159
160    def __started__(self, lifecycle, test_result):
161        if check_pub_key_exist():
162            return
163        if lifecycle == LifeCycle.TestSuite:
164            self.log_queue.debug(log_data="Start test suite [{}] with {} tests"
165                                 .format(test_result.suite_name, test_result.test_num))
166            self.test_num = test_result.test_num
167        elif lifecycle == LifeCycle.TestCase:
168            self.log_queue.debug(log_data="TestStarted({}#{})"
169                                 .format(test_result.test_class, test_result.test_name))
170
171    def __ended__(self, lifecycle, test_result, **kwargs):
172        if check_pub_key_exist():
173            return
174
175        from _core.utils import convert_serial
176        del kwargs
177        if lifecycle == LifeCycle.TestSuite:
178            self.log_queue.debug(log_data="End test suite cost {}ms."
179                                 .format(test_result.run_time), clear=True)
180            self.log_queue.info(log_data="End test suite [{}]."
181                                .format(test_result.suite_name), clear=True)
182        elif lifecycle == LifeCycle.TestCase:
183            self.log_queue.debug(log_data="TestEnded({}#{})"
184                                 .format(test_result.test_class, test_result.test_name))
185            ret = ResultCode(test_result.code).name
186            if self.test_num:
187                self.log_queue.info(log_data="[{}/{} {}] {}#{} {}".
188                                    format(test_result.current, self.test_num, convert_serial(self.device_sn),
189                                           test_result.test_class, test_result.test_name, ret))
190            else:
191                self.log_queue.info(log_data="[{}/- {}] {}#{} {}".
192                                    format(test_result.current, convert_serial(self.device_sn),
193                                           test_result.test_class, test_result.test_name, ret))
194
195    @staticmethod
196    def __skipped__(lifecycle, test_result, **kwargs):
197        if check_pub_key_exist():
198            return
199
200        del kwargs
201        if lifecycle == LifeCycle.TestSuite:
202            LOG.debug("Test suite [{}] skipped".format(test_result.suite_name))
203        elif lifecycle == LifeCycle.TestCase:
204            ret = ResultCode(test_result.code).name
205            LOG.debug("[{}] {}#{}".format(ret, test_result.test_class,
206                                          test_result.test_name))
207
208    @staticmethod
209    def __failed__(lifecycle, test_result, **kwargs):
210        pass
211
212
213@Plugin(type=Plugin.LISTENER, id=ListenerType.report)
214class ReportListener(IListener):
215    """
216    Listener test status information to the console
217    """
218
219    def __init__(self):
220        self.result = list()
221        self.suites = dict()
222        self.tests = dict()
223        self.current_suite_id = 0
224        self.current_test_id = 0
225        self.report_path = ""
226
227    def _get_suite_result(self, test_result, create=False):
228        if test_result.index in self.suites:
229            return self.suites.get(test_result.index)
230        elif create:
231            suite = SuiteResult()
232            rid = uuid.uuid4().hex if test_result.index == "" else \
233                test_result.index
234            suite.index = rid
235            return self.suites.setdefault(rid, suite)
236        else:
237            return self.suites.get(self.current_suite_id)
238
239    def _get_test_result(self, test_result, create=False):
240        if test_result.index in self.tests:
241            return self.tests.get(test_result.index)
242        elif create:
243            test = CaseResult()
244            rid = uuid.uuid4().hex if test_result.index == "" else \
245                test_result.index
246            test.index = rid
247            return self.tests.setdefault(rid, test)
248        else:
249            return self.tests.get(self.current_test_id)
250
251    def _remove_current_test_result(self):
252        if self.current_test_id in self.tests:
253            del self.tests[self.current_test_id]
254
255    def __started__(self, lifecycle, test_result):
256        if lifecycle == LifeCycle.TestSuites:
257            suites = self._get_suite_result(test_result=test_result,
258                                            create=True)
259            suites.suites_name = test_result.suites_name
260            suites.test_num = test_result.test_num
261            self.current_suite_id = suites.index
262        elif lifecycle == LifeCycle.TestSuite:
263            suite = self._get_suite_result(test_result=test_result,
264                                           create=True)
265            suite.suite_name = test_result.suite_name
266            suite.test_num = test_result.test_num
267            self.current_suite_id = suite.index
268        elif lifecycle == LifeCycle.TestCase:
269            test = self._get_test_result(test_result=test_result, create=True)
270            test.test_name = test_result.test_name
271            test.test_class = test_result.test_class
272            self.current_test_id = test.index
273
274    def __ended__(self, lifecycle, test_result=None, **kwargs):
275        if lifecycle == LifeCycle.TestSuite:
276            suite = self._get_suite_result(test_result=test_result,
277                                           create=False)
278            if not suite:
279                return
280            suite.run_time = test_result.run_time
281            suite.code = test_result.code
282            is_clear = kwargs.get("is_clear", False)
283            suite.test_num = max(test_result.test_num, len(self.tests))
284            # generate suite report
285            if not kwargs.get("suite_report", False):
286                if len(self.result) > 0 and self.result[-1][0].suite_name == \
287                        self.suites[suite.index].suite_name:
288                    self.result[-1][1].extend(list(self.tests.values()))
289                    self.result[-1][0].test_num = max(suite.test_num,
290                                                      len(self.result[-1][1]))
291                else:
292                    self.result.append((self.suites[suite.index],
293                                        list(self.tests.values())))
294            else:
295                result_dir = os.path.join(self.report_path, "result")
296                os.makedirs(result_dir, exist_ok=True)
297                self.result.append((self.suites[suite.index],
298                                    list(self.tests.values())))
299                results = [(suite, list(self.tests.values()))]
300                suite_report = SuiteReporter(results, suite.suite_name,
301                                             result_dir)
302                suite_report.generate_data_report()
303            if is_clear:
304                self.tests.clear()
305        elif lifecycle == LifeCycle.TestSuites:
306            if not kwargs.get("suite_report", False):
307                result_dir = os.path.join(self.report_path, "result")
308                os.makedirs(result_dir, exist_ok=True)
309                suites_name = kwargs.get("suites_name", "")
310                product_info = kwargs.get("product_info", "")
311                suite_report = SuiteReporter(self.result, suites_name,
312                                             result_dir,
313                                             product_info=product_info)
314                suite_report.generate_data_report()
315        elif lifecycle == LifeCycle.TestCase:
316            test = self._get_test_result(test_result=test_result, create=False)
317            test.run_time = test_result.run_time
318            test.stacktrace = test_result.stacktrace
319            test.code = test_result.code
320        elif lifecycle == LifeCycle.TestTask:
321            test_type = str(kwargs.get("test_type", TestType.all))
322            reporter = get_plugin(plugin_type=Plugin.REPORTER,
323                                  plugin_id=test_type)
324            if not reporter:
325                reporter = get_plugin(plugin_type=Plugin.REPORTER,
326                                      plugin_id=TestType.all)[0]
327            else:
328                reporter = reporter[0]
329            reporter.__generate_reports__(self.report_path,
330                                          task_info=test_result)
331
332    def __skipped__(self, lifecycle, test_result):
333        del test_result
334        if lifecycle == LifeCycle.TestCase:
335            self._remove_current_test_result()
336
337    def __failed__(self, lifecycle, test_result):
338        if lifecycle == LifeCycle.TestSuite:
339            suite = self._get_suite_result(test_result=test_result,
340                                           create=False)
341            suite.stacktrace = test_result.stacktrace
342            suite.code = ResultCode.FAILED.value
343        elif lifecycle == LifeCycle.TestCase:
344            test = self._get_test_result(test_result=test_result, create=False)
345            test.stacktrace = test_result.stacktrace
346            test.code = ResultCode.FAILED.value
347
348
349@Plugin(type=Plugin.LISTENER, id=ListenerType.upload)
350class UploadListener(IListener):
351    def __started__(self, lifecycle, test_result):
352        pass
353
354    @staticmethod
355    def __ended__(lifecycle, test_result, **kwargs):
356        del test_result, kwargs
357        if lifecycle == LifeCycle.TestCase:
358            pass
359
360    @staticmethod
361    def __skipped__(lifecycle, test_result, **kwargs):
362        pass
363
364    @staticmethod
365    def __failed__(lifecycle, test_result, **kwargs):
366        pass
367
368
369@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
370class CollectingTestListener(IListener):
371    """
372    Listener test status information to the console
373    """
374
375    def __init__(self):
376        self.tests = []
377
378    def __started__(self, lifecycle, test_result):
379        if lifecycle == LifeCycle.TestCase:
380            if not test_result.test_class or not test_result.test_name:
381                return
382            test = TestDescription(test_result.test_class,
383                                   test_result.test_name)
384            if test not in self.tests:
385                self.tests.append(test)
386
387    def __ended__(self, lifecycle, test_result=None, **kwargs):
388        pass
389
390    def __skipped__(self, lifecycle, test_result):
391        pass
392
393    def __failed__(self, lifecycle, test_result):
394        pass
395
396    def get_current_run_results(self):
397        return self.tests
398
399
400@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
401class CollectingLiteGTestListener(IListener):
402    """
403    Listener test status information to the console
404    """
405
406    def __init__(self):
407        self.tests = []
408
409    def __started__(self, lifecycle, test_result):
410        if lifecycle == LifeCycle.TestCase:
411            if not test_result.test_class or not test_result.test_name:
412                return
413            test = TestDescription(test_result.test_class,
414                                   test_result.test_name)
415            if test not in self.tests:
416                self.tests.append(test)
417
418    def __ended__(self, lifecycle, test_result=None, **kwargs):
419        pass
420
421    def __skipped__(self, lifecycle, test_result):
422        pass
423
424    def __failed__(self, lifecycle, test_result):
425        if lifecycle == LifeCycle.TestCase:
426            if not test_result.test_class or not test_result.test_name:
427                return
428            test = TestDescription(test_result.test_class,
429                                   test_result.test_name)
430            if test not in self.tests:
431                self.tests.append(test)
432
433    def get_current_run_results(self):
434        return self.tests
435