1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import uuid 19 20from xdevice import Plugin 21from xdevice import IListener 22from xdevice import LifeCycle 23from xdevice import platform_logger 24from xdevice import ListenerType 25from xdevice import TestDescription 26from xdevice import ResultCode 27from xdevice import UniversalReportListener 28 29from ohos.executor.bean import StackCaseResult 30 31 32__all__ = ["CollectingLiteGTestListener", "CollectingPassListener"] 33 34LOG = platform_logger("Listener") 35 36 37@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 38class CollectingLiteGTestListener(IListener): 39 """ 40 Listener test status information to the console 41 """ 42 43 def __init__(self): 44 self.tests = [] 45 46 def __started__(self, lifecycle, test_result): 47 if lifecycle == LifeCycle.TestCase: 48 if not test_result.test_class or not test_result.test_name: 49 return 50 test = TestDescription(test_result.test_class, 51 test_result.test_name) 52 if test not in self.tests: 53 self.tests.append(test) 54 55 def __ended__(self, lifecycle, test_result=None, **kwargs): 56 pass 57 58 def __skipped__(self, lifecycle, test_result): 59 pass 60 61 def __failed__(self, lifecycle, test_result): 62 if lifecycle == LifeCycle.TestCase: 63 if not test_result.test_class or not test_result.test_name: 64 return 65 test = TestDescription(test_result.test_class, 66 test_result.test_name) 67 if test not in self.tests: 68 self.tests.append(test) 69 70 def get_current_run_results(self): 71 return self.tests 72 73 74@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_pass) 75class CollectingPassListener(IListener): 76 """ 77 listener test status information to the console 78 """ 79 80 def __init__(self): 81 self.tests = [] 82 83 def __started__(self, lifecycle, test_result): 84 pass 85 86 def __ended__(self, lifecycle, test_result=None, **kwargs): 87 if lifecycle == LifeCycle.TestCase: 88 if not test_result.test_class or not test_result.test_name: 89 return 90 if test_result.code != ResultCode.PASSED.value: 91 return 92 test = TestDescription(test_result.test_class, 93 test_result.test_name) 94 if test not in self.tests: 95 self.tests.append(test) 96 else: 97 LOG.warning("Duplicate testcase: %s#%s" % ( 98 test_result.test_class, test_result.test_name)) 99 100 def __skipped__(self, lifecycle, test_result): 101 pass 102 103 def __failed__(self, lifecycle, test_result): 104 pass 105 106 def get_current_run_results(self): 107 return self.tests 108 109 110@Plugin(type=Plugin.LISTENER, id=ListenerType.stack_report) 111class StackReportListener(UniversalReportListener): 112 """ 113 Listener suite event 114 """ 115 116 def __init__(self): 117 super().__init__() 118 self._suites_index_stack = list() 119 self._suite_name_mapping = dict() 120 self.cycle = 0 121 122 def _get_test_result(self, test_result, create=False): 123 if test_result.index in self.tests: 124 return self.tests.get(test_result.index) 125 elif create: 126 test = StackCaseResult() 127 rid = uuid.uuid4().hex if test_result.index == "" else \ 128 test_result.index 129 test.index = rid 130 return self.tests.setdefault(rid, test) 131 else: 132 return self.tests.get(self.current_test_id) 133 134 def _handle_case_start(self, test_result): 135 test = self._get_test_result(test_result=test_result, create=True) 136 test.test_name = test_result.test_name 137 test.test_class = test_result.test_class 138 if len(self._suites_index_stack) > 0: 139 test.parent_index = self._suites_index_stack[-1] 140 self.current_test_id = test.index 141 142 def _handle_testsuite_start(self, test_result): 143 suite = self._get_suite_result(test_result=test_result, 144 create=True) 145 suite.suite_name = test_result.suite_name 146 suite.test_num = test_result.test_num 147 self.current_suite_id = suite.index 148 self._suites_index_stack.append(suite.index) 149 150 def _handle_testsuite_end(self, test_result, kwargs): 151 suite = self._get_suite_result(test_result=test_result, 152 create=False) 153 if not suite: 154 return 155 suite.run_time = test_result.run_time 156 suite.code = test_result.code 157 suite.report = test_result.report 158 suite.test_num = max(test_result.test_num, len(self.tests)) 159 self._handle_suite_end_data(suite, kwargs) 160 if len(self._suites_index_stack) > 0: 161 self._suites_index_stack.pop(-1) 162 163 def _handle_suite_end_data(self, suite, kwargs): 164 if not kwargs.get("suite_report", False): 165 results_of_same_suite = list() 166 test_values = list(self.tests.values()) 167 have_marked_list = list() 168 for index in range(len(test_values)-1, -1, -1): 169 cur_test = test_values[index] 170 if cur_test.parent_index == suite.index: 171 results_of_same_suite.insert(0, cur_test) 172 have_marked_list.append(cur_test.index) 173 for have_marked in have_marked_list: 174 self.tests.pop(have_marked) 175 if suite.suite_name not in self._suite_name_mapping.keys(): 176 self._suite_name_mapping.update({suite.suite_name: suite.index}) 177 if self.cycle > 0: 178 suite_index = self._suite_name_mapping.get(suite.suite_name, "") 179 if suite_index not in self.suite_distributions.keys(): 180 for suite_item, result_list in self.result: 181 if suite_item.index != suite_index: 182 continue 183 self.suites.pop(suite.index) 184 if result_list and result_list[-1].is_completed is not True: 185 result_list.pop(-1) 186 result_list.extend(results_of_same_suite) 187 break 188 else: 189 self.suite_distributions.update({suite.index: len(self.result)}) 190 self.result.append((self.suites.get(suite.index), results_of_same_suite)) 191 else: 192 self.suite_distributions.update({suite.index: len(self.result)}) 193 self.result.append((self.suites.get(suite.index), results_of_same_suite)) 194