1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20 21from _core.plugin import Plugin 22from _core.constants import ListenerType 23from _core.executor.abs import PlusReportListener 24from _core.interface import LifeCycle 25from _core.interface import IListener 26from _core.logger import platform_logger 27from _core.report.suite_reporter import ResultCode 28from _core.report.encrypt import check_pub_key_exist 29 30__all__ = ["LogListener", "ReportListener", "UploadListener", 31 "CollectingTestListener", "CollectingLiteGTestListener", 32 "TestDescription"] 33 34LOG = platform_logger("Listener") 35 36 37class TestDescription(object): 38 def __init__(self, class_name, test_name): 39 self.class_name = class_name 40 self.test_name = test_name 41 42 def __eq__(self, other): 43 return self.class_name == other.class_name and \ 44 self.test_name == other.test_name 45 46 @classmethod 47 def remove_test(cls, tests, execute_tests): 48 for execute_test in execute_tests: 49 if execute_test in tests: 50 tests.remove(execute_test) 51 return tests 52 53 54@Plugin(type=Plugin.LISTENER, id=ListenerType.log) 55class LogListener(IListener): 56 """ 57 Listener test status information to the console and log 58 """ 59 test_num = 0 60 device_sn = "" 61 62 def __started__(self, lifecycle, test_result): 63 if check_pub_key_exist(): 64 return 65 if lifecycle == LifeCycle.TestSuite: 66 LOG.debug("Start test suite [{}] with {} tests" 67 .format(test_result.suite_name, test_result.test_num)) 68 self.test_num = test_result.test_num 69 elif lifecycle == LifeCycle.TestCase: 70 LOG.debug("TestStarted({}#{})" 71 .format(test_result.test_class, test_result.test_name)) 72 73 def __ended__(self, lifecycle, test_result, **kwargs): 74 if check_pub_key_exist(): 75 return 76 77 from _core.utils import convert_serial 78 del kwargs 79 if lifecycle == LifeCycle.TestSuite: 80 LOG.debug("End test suite cost {}ms." 81 .format(test_result.run_time)) 82 LOG.info("End test suite [{}]." 83 .format(test_result.suite_name)) 84 elif lifecycle == LifeCycle.TestCase: 85 LOG.debug("TestEnded({}#{})" 86 .format(test_result.test_class, test_result.test_name)) 87 ret = ResultCode(test_result.code).name 88 if self.test_num: 89 LOG.info("[{}/{} {}] {}#{} {}" 90 .format(test_result.current, self.test_num, convert_serial(self.device_sn), 91 test_result.test_class, test_result.test_name, ret)) 92 else: 93 LOG.info("[{}/- {}] {}#{} {}" 94 .format(test_result.current, convert_serial(self.device_sn), 95 test_result.test_class, test_result.test_name, ret)) 96 97 @staticmethod 98 def __skipped__(lifecycle, test_result, **kwargs): 99 if check_pub_key_exist(): 100 return 101 102 del kwargs 103 if lifecycle == LifeCycle.TestSuite: 104 LOG.debug("Test suite [{}] skipped".format(test_result.suite_name)) 105 elif lifecycle == LifeCycle.TestCase: 106 ret = ResultCode(test_result.code).name 107 LOG.debug("[{}] {}#{}".format(ret, test_result.test_class, 108 test_result.test_name)) 109 110 @staticmethod 111 def __failed__(lifecycle, test_result, **kwargs): 112 pass 113 114 115@Plugin(type=Plugin.LISTENER, id=ListenerType.report) 116class ReportListener(PlusReportListener): 117 """ 118 Listener test status information to the console 119 """ 120 121 def handle_half_break(self, suites_name, error_message=''): 122 """测试套运行异常,将已运行的部分用例结果记录到结果文件""" 123 test_result = self.suites.get(self.current_suite_id) 124 if test_result is None: 125 return 126 self.__ended__(lifecycle=LifeCycle.TestSuite, test_result=test_result) 127 LOG.warning(f"Testsuite({test_result.suite_name}) is running abnormally") 128 self.__ended__(lifecycle=LifeCycle.TestSuites, test_result=test_result, 129 suites_name=suites_name, message=error_message) 130 131 def _handle_suite_end_data(self, suite, kwargs): 132 is_clear = kwargs.get("is_clear", False) 133 # generate suite report 134 if not kwargs.get("suite_report", False): 135 if len(self.result) > 0 and self.result[-1][0].suite_name == \ 136 self.suites.get(suite.index).suite_name: 137 self.result[-1][1].extend(list(self.tests.values())) 138 self.result[-1][0].test_num = max(suite.test_num, 139 len(self.result[-1][1])) 140 else: 141 self.result.append((self.suites.get(suite.index), 142 list(self.tests.values()))) 143 else: 144 result_dir = os.path.join(self.report_path, "result") 145 os.makedirs(result_dir, exist_ok=True) 146 self.result.append((self.suites.get(suite.index), 147 list(self.tests.values()))) 148 results = [(suite, list(self.tests.values()))] 149 self._generate_data_report(result_dir, results, suite.suite_name) 150 if is_clear: 151 self.tests.clear() 152 153 154@Plugin(type=Plugin.LISTENER, id=ListenerType.upload) 155class UploadListener(IListener): 156 def __started__(self, lifecycle, test_result): 157 pass 158 159 @staticmethod 160 def __ended__(lifecycle, test_result, **kwargs): 161 del test_result, kwargs 162 if lifecycle == LifeCycle.TestCase: 163 pass 164 165 @staticmethod 166 def __skipped__(lifecycle, test_result, **kwargs): 167 pass 168 169 @staticmethod 170 def __failed__(lifecycle, test_result, **kwargs): 171 pass 172 173 174@Plugin(type=Plugin.LISTENER, id=ListenerType.collect) 175class CollectingTestListener(IListener): 176 """ 177 Listener test status information to the console 178 """ 179 180 def __init__(self): 181 self.tests = [] 182 183 def __started__(self, lifecycle, test_result): 184 if lifecycle == LifeCycle.TestCase: 185 if not test_result.test_class or not test_result.test_name: 186 return 187 test = TestDescription(test_result.test_class, 188 test_result.test_name) 189 if test not in self.tests: 190 self.tests.append(test) 191 192 def __ended__(self, lifecycle, test_result=None, **kwargs): 193 pass 194 195 def __skipped__(self, lifecycle, test_result): 196 pass 197 198 def __failed__(self, lifecycle, test_result): 199 pass 200 201 def get_current_run_results(self): 202 return self.tests 203 204 205@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 206class CollectingLiteGTestListener(IListener): 207 """ 208 Listener test status information to the console 209 """ 210 211 def __init__(self): 212 self.tests = [] 213 214 def __started__(self, lifecycle, test_result): 215 if lifecycle == LifeCycle.TestCase: 216 if not test_result.test_class or not test_result.test_name: 217 return 218 test = TestDescription(test_result.test_class, 219 test_result.test_name) 220 if test not in self.tests: 221 self.tests.append(test) 222 223 def __ended__(self, lifecycle, test_result=None, **kwargs): 224 pass 225 226 def __skipped__(self, lifecycle, test_result): 227 pass 228 229 def __failed__(self, lifecycle, test_result): 230 if lifecycle == LifeCycle.TestCase: 231 if not test_result.test_class or not test_result.test_name: 232 return 233 test = TestDescription(test_result.test_class, 234 test_result.test_name) 235 if test not in self.tests: 236 self.tests.append(test) 237 238 def get_current_run_results(self): 239 return self.tests 240