• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import uuid
21from dataclasses import dataclass
22
23from _core.plugin import Plugin
24from _core.plugin import get_plugin
25from _core.constants import ListenerType
26from _core.constants import TestType
27from _core.interface import LifeCycle
28from _core.interface import IListener
29from _core.logger import platform_logger
30from _core.report.suite_reporter import SuiteReporter
31from _core.report.suite_reporter import ResultCode
32from _core.report.encrypt import check_pub_key_exist
33
34__all__ = ["LogListener", "ReportListener", "UploadListener",
35           "CollectingTestListener", "CollectingLiteGTestListener",
36           "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder",
37           "TestDescription"]
38
39LOG = platform_logger("Listener")
40
41
42@dataclass
43class CaseResult:
44    index = ""
45    code = ResultCode.FAILED.value
46    test_name = None
47    test_class = None
48    stacktrace = ""
49    run_time = 0
50    is_completed = False
51    num_tests = 0
52    current = 0
53
54    def is_running(self):
55        return self.test_name is not None and not self.is_completed
56
57
58@dataclass
59class SuiteResult:
60    index = ""
61    code = ResultCode.UNKNOWN.value
62    suite_name = None
63    test_num = 0
64    stacktrace = ""
65    run_time = 0
66    is_completed = False
67    is_started = False
68    suite_num = 0
69
70
71@dataclass
72class SuitesResult:
73    index = ""
74    code = ResultCode.UNKNOWN.value
75    suites_name = None
76    test_num = 0
77    stacktrace = ""
78    run_time = 0
79    is_completed = False
80    product_info = {}
81
82
83@dataclass
84class StateRecorder:
85    current_suite = None
86    current_suites = None
87    current_test = None
88    trace_logs = []
89    running_test_index = 0
90
91    def is_started(self):
92        return self.current_suite is not None
93
94    def suites_is_started(self):
95        return self.current_suites is not None
96
97    def suite_is_running(self):
98        suite = self.current_suite
99        return suite is not None and suite.suite_name is not None and \
100            not suite.is_completed
101
102    def suites_is_running(self):
103        suites = self.current_suites
104        return suites is not None and suites.suites_name is not None and \
105            not suites.is_completed
106
107    def test_is_running(self):
108        test = self.current_test
109        return test is not None and test.is_running()
110
111    def suite(self, reset=False):
112        if reset or not self.current_suite:
113            self.current_suite = SuiteResult()
114            self.current_suite.index = uuid.uuid4().hex
115        return self.current_suite
116
117    def get_suites(self, reset=False):
118        if reset or not self.current_suites:
119            self.current_suites = SuitesResult()
120            self.current_suites.index = uuid.uuid4().hex
121        return self.current_suites
122
123    def test(self, reset=False, test_index=None):
124        if reset or not self.current_test:
125            self.current_test = CaseResult()
126            if test_index:
127                self.current_test.index = test_index
128            else:
129                self.current_test.index = uuid.uuid4().hex
130        return self.current_test
131
132
133class TestDescription(object):
134    def __init__(self, class_name, test_name):
135        self.class_name = class_name
136        self.test_name = test_name
137
138    def __eq__(self, other):
139        return self.class_name == other.class_name and \
140               self.test_name == other.test_name
141
142    @classmethod
143    def remove_test(cls, tests, execute_tests):
144        for execute_test in execute_tests:
145            if execute_test in tests:
146                tests.remove(execute_test)
147        return tests
148
149
150@Plugin(type=Plugin.LISTENER, id=ListenerType.log)
151class LogListener(IListener):
152    """
153    Listener test status information to the console and log
154    """
155    test_num = 0
156    device_sn = ""
157
158    def __started__(self, lifecycle, test_result):
159        if check_pub_key_exist():
160            return
161        if lifecycle == LifeCycle.TestSuite:
162            LOG.debug("Start test suite [{}] with {} tests"
163                      .format(test_result.suite_name, test_result.test_num))
164            self.test_num = test_result.test_num
165        elif lifecycle == LifeCycle.TestCase:
166            LOG.debug("TestStarted({}#{})"
167                      .format(test_result.test_class, test_result.test_name))
168
169    def __ended__(self, lifecycle, test_result, **kwargs):
170        if check_pub_key_exist():
171            return
172
173        from _core.utils import convert_serial
174        del kwargs
175        if lifecycle == LifeCycle.TestSuite:
176            LOG.debug("End test suite cost {}ms."
177                      .format(test_result.run_time))
178            LOG.info("End test suite [{}]."
179                     .format(test_result.suite_name))
180        elif lifecycle == LifeCycle.TestCase:
181            LOG.debug("TestEnded({}#{})"
182                      .format(test_result.test_class, test_result.test_name))
183            ret = ResultCode(test_result.code).name
184            if self.test_num:
185                LOG.info("[{}/{} {}] {}#{} {}"
186                         .format(test_result.current, self.test_num,
187                                 convert_serial(self.device_sn), test_result.test_class,
188                                 test_result.test_name, ret))
189            else:
190                LOG.info("[{}/- {}] {}#{} {}"
191                         .format(test_result.current, convert_serial(self.device_sn),
192                                 test_result.test_class, test_result.test_name, ret))
193
194    @staticmethod
195    def __skipped__(lifecycle, test_result, **kwargs):
196        if check_pub_key_exist():
197            return
198
199        del kwargs
200        if lifecycle == LifeCycle.TestSuite:
201            LOG.debug("Test suite [{}] skipped".format(test_result.suite_name))
202        elif lifecycle == LifeCycle.TestCase:
203            ret = ResultCode(test_result.code).name
204            LOG.debug("[{}] {}#{}".format(ret, test_result.test_class,
205                                          test_result.test_name))
206
207    @staticmethod
208    def __failed__(lifecycle, test_result, **kwargs):
209        pass
210
211
212@Plugin(type=Plugin.LISTENER, id=ListenerType.report)
213class ReportListener(IListener):
214    """
215    Listener test status information to the console
216    """
217
218    def __init__(self):
219        self.result = list()
220        self.suites = dict()
221        self.tests = dict()
222        self.current_suite_id = 0
223        self.current_test_id = 0
224        self.report_path = ""
225
226    def _get_suite_result(self, test_result, create=False):
227        if test_result.index in self.suites:
228            return self.suites.get(test_result.index)
229        elif create:
230            suite = SuiteResult()
231            rid = uuid.uuid4().hex if test_result.index == "" else \
232                test_result.index
233            suite.index = rid
234            return self.suites.setdefault(rid, suite)
235        else:
236            return self.suites.get(self.current_suite_id)
237
238    def _get_test_result(self, test_result, create=False):
239        if test_result.index in self.tests:
240            return self.tests.get(test_result.index)
241        elif create:
242            test = CaseResult()
243            rid = uuid.uuid4().hex if test_result.index == "" else \
244                test_result.index
245            test.index = rid
246            return self.tests.setdefault(rid, test)
247        else:
248            return self.tests.get(self.current_test_id)
249
250    def _remove_current_test_result(self):
251        if self.current_test_id in self.tests:
252            del self.tests[self.current_test_id]
253
254    def __started__(self, lifecycle, test_result):
255        if lifecycle == LifeCycle.TestSuites:
256            suites = self._get_suite_result(test_result=test_result,
257                                            create=True)
258            suites.suites_name = test_result.suites_name
259            suites.test_num = test_result.test_num
260            self.current_suite_id = suites.index
261        elif lifecycle == LifeCycle.TestSuite:
262            suite = self._get_suite_result(test_result=test_result,
263                                           create=True)
264            suite.suite_name = test_result.suite_name
265            suite.test_num = test_result.test_num
266            self.current_suite_id = suite.index
267        elif lifecycle == LifeCycle.TestCase:
268            test = self._get_test_result(test_result=test_result, create=True)
269            test.test_name = test_result.test_name
270            test.test_class = test_result.test_class
271            self.current_test_id = test.index
272
273    def __ended__(self, lifecycle, test_result=None, **kwargs):
274        if lifecycle == LifeCycle.TestSuite:
275            suite = self._get_suite_result(test_result=test_result,
276                                           create=False)
277            if not suite:
278                return
279            suite.run_time = test_result.run_time
280            suite.code = test_result.code
281            is_clear = kwargs.get("is_clear", False)
282            suite.test_num = max(test_result.test_num, len(self.tests))
283            # generate suite report
284            if not kwargs.get("suite_report", False):
285                if len(self.result) > 0 and self.result[-1][0].suite_name == \
286                        self.suites[suite.index].suite_name:
287                    self.result[-1][1].extend(list(self.tests.values()))
288                    self.result[-1][0].test_num = max(suite.test_num,
289                                                      len(self.result[-1][1]))
290                else:
291                    self.result.append((self.suites[suite.index],
292                                        list(self.tests.values())))
293            else:
294                result_dir = os.path.join(self.report_path, "result")
295                os.makedirs(result_dir, exist_ok=True)
296                self.result.append((self.suites[suite.index],
297                                    list(self.tests.values())))
298                results = [(suite, list(self.tests.values()))]
299                suite_report = SuiteReporter(results, suite.suite_name,
300                                             result_dir)
301                suite_report.generate_data_report()
302            if is_clear:
303                self.tests.clear()
304        elif lifecycle == LifeCycle.TestSuites:
305            if not kwargs.get("suite_report", False):
306                result_dir = os.path.join(self.report_path, "result")
307                os.makedirs(result_dir, exist_ok=True)
308                suites_name = kwargs.get("suites_name", "")
309                product_info = kwargs.get("product_info", "")
310                suite_report = SuiteReporter(self.result, suites_name,
311                                             result_dir,
312                                             product_info=product_info)
313                suite_report.generate_data_report()
314        elif lifecycle == LifeCycle.TestCase:
315            test = self._get_test_result(test_result=test_result, create=False)
316            test.run_time = test_result.run_time
317            test.stacktrace = test_result.stacktrace
318            test.code = test_result.code
319        elif lifecycle == LifeCycle.TestTask:
320            test_type = str(kwargs.get("test_type", TestType.all))
321            reporter = get_plugin(plugin_type=Plugin.REPORTER,
322                                  plugin_id=test_type)
323            if not reporter:
324                reporter = get_plugin(plugin_type=Plugin.REPORTER,
325                                      plugin_id=TestType.all)[0]
326            else:
327                reporter = reporter[0]
328            reporter.__generate_reports__(self.report_path,
329                                          task_info=test_result)
330
331    def __skipped__(self, lifecycle, test_result):
332        if lifecycle == LifeCycle.TestCase:
333            test = self._get_test_result(test_result=test_result, create=False)
334            test.stacktrace = test_result.stacktrace
335            test.code = ResultCode.SKIPPED.value
336
337    def __failed__(self, lifecycle, test_result):
338        if lifecycle == LifeCycle.TestSuite:
339            suite = self._get_suite_result(test_result=test_result,
340                                           create=False)
341            suite.stacktrace = test_result.stacktrace
342            suite.code = ResultCode.FAILED.value
343        elif lifecycle == LifeCycle.TestCase:
344            test = self._get_test_result(test_result=test_result, create=False)
345            test.stacktrace = test_result.stacktrace
346            test.code = ResultCode.FAILED.value
347
348
349@Plugin(type=Plugin.LISTENER, id=ListenerType.upload)
350class UploadListener(IListener):
351    def __started__(self, lifecycle, test_result):
352        pass
353
354    @staticmethod
355    def __ended__(lifecycle, test_result, **kwargs):
356        del test_result, kwargs
357        if lifecycle == LifeCycle.TestCase:
358            pass
359
360    @staticmethod
361    def __skipped__(lifecycle, test_result, **kwargs):
362        pass
363
364    @staticmethod
365    def __failed__(lifecycle, test_result, **kwargs):
366        pass
367
368
369@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
370class CollectingTestListener(IListener):
371    """
372    Listener test status information to the console
373    """
374
375    def __init__(self):
376        self.tests = []
377
378    def __started__(self, lifecycle, test_result):
379        if lifecycle == LifeCycle.TestCase:
380            if not test_result.test_class or not test_result.test_name:
381                return
382            test = TestDescription(test_result.test_class,
383                                   test_result.test_name)
384            if test not in self.tests:
385                self.tests.append(test)
386
387    def __ended__(self, lifecycle, test_result=None, **kwargs):
388        pass
389
390    def __skipped__(self, lifecycle, test_result):
391        pass
392
393    def __failed__(self, lifecycle, test_result):
394        pass
395
396    def get_current_run_results(self):
397        return self.tests
398
399
400@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
401class CollectingLiteGTestListener(IListener):
402    """
403    Listener test status information to the console
404    """
405
406    def __init__(self):
407        self.tests = []
408
409    def __started__(self, lifecycle, test_result):
410        if lifecycle == LifeCycle.TestCase:
411            if not test_result.test_class or not test_result.test_name:
412                return
413            test = TestDescription(test_result.test_class,
414                                   test_result.test_name)
415            if test not in self.tests:
416                self.tests.append(test)
417
418    def __ended__(self, lifecycle, test_result=None, **kwargs):
419        pass
420
421    def __skipped__(self, lifecycle, test_result):
422        pass
423
424    def __failed__(self, lifecycle, test_result):
425        if lifecycle == LifeCycle.TestCase:
426            if not test_result.test_class or not test_result.test_name:
427                return
428            test = TestDescription(test_result.test_class,
429                                   test_result.test_name)
430            if test not in self.tests:
431                self.tests.append(test)
432
433    def get_current_run_results(self):
434        return self.tests
435