• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import uuid
21from dataclasses import dataclass
22
23from _core.plugin import Plugin
24from _core.plugin import get_plugin
25from _core.constants import ListenerType
26from _core.constants import TestType
27from _core.interface import LifeCycle
28from _core.interface import IListener
29from _core.logger import platform_logger
30from _core.report.suite_reporter import SuiteReporter
31from _core.report.suite_reporter import ResultCode
32from _core.report.encrypt import check_pub_key_exist
33
34__all__ = ["LogListener", "ReportListener", "UploadListener",
35           "CollectingTestListener", "CollectingLiteGTestListener",
36           "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder",
37           "TestDescription"]
38
39LOG = platform_logger("Listener")
40
41
42@dataclass
43class CaseResult:
44    index = ""
45    code = ResultCode.FAILED.value
46    test_name = None
47    test_class = None
48    stacktrace = ""
49    run_time = 0
50    is_completed = False
51    num_tests = 0
52    current = 0
53
54    def is_running(self):
55        return self.test_name is not None and not self.is_completed
56
57
58@dataclass
59class SuiteResult:
60    index = ""
61    code = ResultCode.UNKNOWN.value
62    suite_name = None
63    test_num = 0
64    stacktrace = ""
65    run_time = 0
66    is_completed = False
67    is_started = False
68    suite_num = 0
69
70
71@dataclass
72class SuitesResult:
73    index = ""
74    code = ResultCode.UNKNOWN.value
75    suites_name = None
76    test_num = 0
77    stacktrace = ""
78    run_time = 0
79    is_completed = False
80    product_info = {}
81
82
83@dataclass
84class StateRecorder:
85    current_suite = None
86    current_suites = None
87    current_test = None
88    trace_logs = []
89    running_test_index = 0
90
91    def is_started(self):
92        return self.current_suite is not None
93
94    def suites_is_started(self):
95        return self.current_suites is not None
96
97    def suite_is_running(self):
98        suite = self.current_suite
99        return suite is not None and suite.suite_name is not None and \
100            not suite.is_completed
101
102    def suites_is_running(self):
103        suites = self.current_suites
104        return suites is not None and suites.suites_name is not None and \
105            not suites.is_completed
106
107    def test_is_running(self):
108        test = self.current_test
109        return test is not None and test.is_running()
110
111    def suite(self, reset=False):
112        if reset or not self.current_suite:
113            self.current_suite = SuiteResult()
114            self.current_suite.index = uuid.uuid4().hex
115        return self.current_suite
116
117    def get_suites(self, reset=False):
118        if reset or not self.current_suites:
119            self.current_suites = SuitesResult()
120            self.current_suites.index = uuid.uuid4().hex
121        return self.current_suites
122
123    def test(self, reset=False, test_index=None):
124        if reset or not self.current_test:
125            self.current_test = CaseResult()
126            if test_index:
127                self.current_test.index = test_index
128            else:
129                self.current_test.index = uuid.uuid4().hex
130        return self.current_test
131
132
133class TestDescription(object):
134    def __init__(self, class_name, test_name):
135        self.class_name = class_name
136        self.test_name = test_name
137
138    def __eq__(self, other):
139        return self.class_name == other.class_name and \
140               self.test_name == other.test_name
141
142    @classmethod
143    def remove_test(cls, tests, execute_tests):
144        for execute_test in execute_tests:
145            if execute_test in tests:
146                tests.remove(execute_test)
147        return tests
148
149
150@Plugin(type=Plugin.LISTENER, id=ListenerType.log)
151class LogListener(IListener):
152    """
153    listener test status information to the console and log
154    """
155    test_num = 0
156    device_sn = ""
157
158    def __started__(self, lifecycle, test_result):
159        if check_pub_key_exist():
160            return
161        if lifecycle == LifeCycle.TestSuite:
162            LOG.debug("Start test suite [%s] with %s tests"
163                      % (test_result.suite_name, test_result.test_num))
164            self.test_num = test_result.test_num
165        elif lifecycle == LifeCycle.TestCase:
166            LOG.debug("testStarted(%s#%s)" % (test_result.test_class,
167                                              test_result.test_name))
168
169    def __ended__(self, lifecycle, test_result, **kwargs):
170        if check_pub_key_exist():
171            return
172
173        from _core.utils import convert_serial
174        del kwargs
175        if lifecycle == LifeCycle.TestSuite:
176            LOG.debug("End test suite [%s] and cost %sms."
177                      % (test_result.suite_name, test_result.run_time))
178        elif lifecycle == LifeCycle.TestCase:
179            LOG.debug("testEnded(%s#%s)" % (test_result.test_class,
180                                            test_result.test_name))
181            ret = ResultCode(test_result.code).name
182            if self.test_num:
183                LOG.info("[%s/%s %s] %s#%s %s" %
184                         (test_result.current, self.test_num,
185                          convert_serial(self.device_sn),
186                          test_result.test_class, test_result.test_name, ret))
187            else:
188                LOG.info("[%s/- %s] %s#%s %s" %
189                         (test_result.current, convert_serial(self.device_sn),
190                          test_result.test_class, test_result.test_name,
191                          ret))
192
193    @staticmethod
194    def __skipped__(lifecycle, test_result, **kwargs):
195        if check_pub_key_exist():
196            return
197
198        del kwargs
199        if lifecycle == LifeCycle.TestSuite:
200            LOG.debug("Test suite [{}] skipped".format(test_result.suite_name))
201        elif lifecycle == LifeCycle.TestCase:
202            ret = ResultCode(test_result.code).name
203            LOG.debug("[{}] {}#{}".format(ret, test_result.test_class,
204                                          test_result.test_name))
205
206    @staticmethod
207    def __failed__(lifecycle, test_result, **kwargs):
208        pass
209
210
211@Plugin(type=Plugin.LISTENER, id=ListenerType.report)
212class ReportListener(IListener):
213    """
214    listener test status information to the console
215    """
216
217    def __init__(self):
218        self.result = list()
219        self.suites = dict()
220        self.tests = dict()
221        self.current_suite_id = 0
222        self.current_test_id = 0
223        self.report_path = ""
224
225    def _get_suite_result(self, test_result, create=False):
226        if test_result.index in self.suites:
227            return self.suites.get(test_result.index)
228        elif create:
229            suite = SuiteResult()
230            rid = uuid.uuid4().hex if test_result.index == "" else \
231                test_result.index
232            suite.index = rid
233            return self.suites.setdefault(rid, suite)
234        else:
235            return self.suites.get(self.current_suite_id)
236
237    def _get_test_result(self, test_result, create=False):
238        if test_result.index in self.tests:
239            return self.tests.get(test_result.index)
240        elif create:
241            test = CaseResult()
242            rid = uuid.uuid4().hex if test_result.index == "" else \
243                test_result.index
244            test.index = rid
245            return self.tests.setdefault(rid, test)
246        else:
247            return self.tests.get(self.current_test_id)
248
249    def _remove_current_test_result(self):
250        if self.current_test_id in self.tests:
251            del self.tests[self.current_test_id]
252
253    def __started__(self, lifecycle, test_result):
254        if lifecycle == LifeCycle.TestSuites:
255            suites = self._get_suite_result(test_result=test_result,
256                                            create=True)
257            suites.suites_name = test_result.suites_name
258            suites.test_num = test_result.test_num
259            self.current_suite_id = suites.index
260        elif lifecycle == LifeCycle.TestSuite:
261            suite = self._get_suite_result(test_result=test_result,
262                                           create=True)
263            suite.suite_name = test_result.suite_name
264            suite.test_num = test_result.test_num
265            self.current_suite_id = suite.index
266        elif lifecycle == LifeCycle.TestCase:
267            test = self._get_test_result(test_result=test_result, create=True)
268            test.test_name = test_result.test_name
269            test.test_class = test_result.test_class
270            self.current_test_id = test.index
271
272    def __ended__(self, lifecycle, test_result=None, **kwargs):
273        if lifecycle == LifeCycle.TestSuite:
274            suite = self._get_suite_result(test_result=test_result,
275                                           create=False)
276            if not suite:
277                return
278            suite.run_time = test_result.run_time
279            suite.code = test_result.code
280            is_clear = kwargs.get("is_clear", False)
281            suite.test_num = max(test_result.test_num, len(self.tests))
282            # generate suite report
283            if not kwargs.get("suite_report", False):
284                if len(self.result) > 0 and self.result[-1][0].suite_name == \
285                        self.suites[suite.index].suite_name:
286                    self.result[-1][1].extend(list(self.tests.values()))
287                    self.result[-1][0].test_num = max(suite.test_num,
288                                                      len(self.result[-1][1]))
289                else:
290                    self.result.append((self.suites[suite.index],
291                                        list(self.tests.values())))
292            else:
293                result_dir = os.path.join(self.report_path, "result")
294                os.makedirs(result_dir, exist_ok=True)
295                self.result.append((self.suites[suite.index],
296                                    list(self.tests.values())))
297                results = [(suite, list(self.tests.values()))]
298                suite_report = SuiteReporter(results, suite.suite_name,
299                                             result_dir)
300                suite_report.generate_data_report()
301            if is_clear:
302                self.tests.clear()
303        elif lifecycle == LifeCycle.TestSuites:
304            if not kwargs.get("suite_report", False):
305                result_dir = os.path.join(self.report_path, "result")
306                os.makedirs(result_dir, exist_ok=True)
307                suites_name = kwargs.get("suites_name", "")
308                product_info = kwargs.get("product_info", "")
309                suite_report = SuiteReporter(self.result, suites_name,
310                                             result_dir,
311                                             product_info=product_info)
312                suite_report.generate_data_report()
313        elif lifecycle == LifeCycle.TestCase:
314            test = self._get_test_result(test_result=test_result, create=False)
315            test.run_time = test_result.run_time
316            test.stacktrace = test_result.stacktrace
317            test.code = test_result.code
318        elif lifecycle == LifeCycle.TestTask:
319            test_type = str(kwargs.get("test_type", TestType.all))
320            reporter = get_plugin(plugin_type=Plugin.REPORTER,
321                                  plugin_id=test_type)
322            if not reporter:
323                reporter = get_plugin(plugin_type=Plugin.REPORTER,
324                                      plugin_id=TestType.all)[0]
325            else:
326                reporter = reporter[0]
327            reporter.__generate_reports__(self.report_path,
328                                          task_info=test_result)
329
330    def __skipped__(self, lifecycle, test_result):
331        del test_result
332        if lifecycle == LifeCycle.TestCase:
333            self._remove_current_test_result()
334
335    def __failed__(self, lifecycle, test_result):
336        if lifecycle == LifeCycle.TestSuite:
337            suite = self._get_suite_result(test_result=test_result,
338                                           create=False)
339            suite.stacktrace = test_result.stacktrace
340            suite.code = ResultCode.FAILED.value
341        elif lifecycle == LifeCycle.TestCase:
342            test = self._get_test_result(test_result=test_result, create=False)
343            test.stacktrace = test_result.stacktrace
344            test.code = ResultCode.FAILED.value
345
346
347@Plugin(type=Plugin.LISTENER, id=ListenerType.upload)
348class UploadListener(IListener):
349    def __started__(self, lifecycle, test_result):
350        pass
351
352    @staticmethod
353    def __ended__(lifecycle, test_result, **kwargs):
354        del test_result, kwargs
355        if lifecycle == LifeCycle.TestCase:
356            pass
357
358    @staticmethod
359    def __skipped__(lifecycle, test_result, **kwargs):
360        pass
361
362    @staticmethod
363    def __failed__(lifecycle, test_result, **kwargs):
364        pass
365
366
367@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
368class CollectingTestListener(IListener):
369    """
370    listener test status information to the console
371    """
372
373    def __init__(self):
374        self.tests = []
375
376    def __started__(self, lifecycle, test_result):
377        if lifecycle == LifeCycle.TestCase:
378            if not test_result.test_class or not test_result.test_name:
379                return
380            test = TestDescription(test_result.test_class,
381                                   test_result.test_name)
382            if test not in self.tests:
383                self.tests.append(test)
384
385    def __ended__(self, lifecycle, test_result=None, **kwargs):
386        pass
387
388    def __skipped__(self, lifecycle, test_result):
389        pass
390
391    def __failed__(self, lifecycle, test_result):
392        pass
393
394    def get_current_run_results(self):
395        return self.tests
396
397
398@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
399class CollectingLiteGTestListener(IListener):
400    """
401    listener test status information to the console
402    """
403
404    def __init__(self):
405        self.tests = []
406
407    def __started__(self, lifecycle, test_result):
408        if lifecycle == LifeCycle.TestCase:
409            if not test_result.test_class or not test_result.test_name:
410                return
411            test = TestDescription(test_result.test_class,
412                                   test_result.test_name)
413            if test not in self.tests:
414                self.tests.append(test)
415
416    def __ended__(self, lifecycle, test_result=None, **kwargs):
417        pass
418
419    def __skipped__(self, lifecycle, test_result):
420        pass
421
422    def __failed__(self, lifecycle, test_result):
423        if lifecycle == LifeCycle.TestCase:
424            if not test_result.test_class or not test_result.test_name:
425                return
426            test = TestDescription(test_result.test_class,
427                                   test_result.test_name)
428            if test not in self.tests:
429                self.tests.append(test)
430
431    def get_current_run_results(self):
432        return self.tests
433