1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import uuid 21from dataclasses import dataclass 22 23from _core.plugin import Plugin 24from _core.plugin import get_plugin 25from _core.constants import ListenerType 26from _core.constants import TestType 27from _core.interface import LifeCycle 28from _core.interface import IListener 29from _core.logger import platform_logger 30from _core.logger import LogQueue 31from _core.report.suite_reporter import SuiteReporter 32from _core.report.suite_reporter import ResultCode 33from _core.report.encrypt import check_pub_key_exist 34 35__all__ = ["LogListener", "ReportListener", "UploadListener", 36 "CollectingTestListener", "CollectingLiteGTestListener", 37 "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder", 38 "TestDescription"] 39 40LOG = platform_logger("Listener") 41 42 43@dataclass 44class CaseResult: 45 index = "" 46 code = ResultCode.FAILED.value 47 test_name = None 48 test_class = None 49 stacktrace = "" 50 run_time = 0 51 is_completed = False 52 num_tests = 0 53 current = 0 54 55 def is_running(self): 56 return self.test_name is not None and not self.is_completed 57 58 59@dataclass 60class SuiteResult: 61 index = "" 62 code = ResultCode.UNKNOWN.value 63 suite_name = None 64 test_num = 0 65 stacktrace = "" 66 run_time = 0 67 is_completed = False 68 is_started = False 69 suite_num = 0 70 71 72@dataclass 73class SuitesResult: 74 index = "" 75 code = ResultCode.UNKNOWN.value 76 suites_name = None 77 test_num = 0 78 stacktrace = "" 79 run_time = 0 80 is_completed = False 81 product_info = {} 82 83 84@dataclass 85class StateRecorder: 86 current_suite = None 87 current_suites = None 88 current_test = None 89 trace_logs = [] 90 running_test_index = 0 91 92 def is_started(self): 93 return self.current_suite is not None 94 95 def suites_is_started(self): 96 return self.current_suites is not None 97 98 def suite_is_running(self): 99 suite = self.current_suite 100 return suite is not None and suite.suite_name is not None and \ 101 not suite.is_completed 102 103 def suites_is_running(self): 104 suites = self.current_suites 105 return suites is not None and suites.suites_name is not None and \ 106 not suites.is_completed 107 108 def test_is_running(self): 109 test = self.current_test 110 return test is not None and test.is_running() 111 112 def suite(self, reset=False): 113 if reset or not self.current_suite: 114 self.current_suite = SuiteResult() 115 self.current_suite.index = uuid.uuid4().hex 116 return self.current_suite 117 118 def get_suites(self, reset=False): 119 if reset or not self.current_suites: 120 self.current_suites = SuitesResult() 121 self.current_suites.index = uuid.uuid4().hex 122 return self.current_suites 123 124 def test(self, reset=False, test_index=None): 125 if reset or not self.current_test: 126 self.current_test = CaseResult() 127 if test_index: 128 self.current_test.index = test_index 129 else: 130 self.current_test.index = uuid.uuid4().hex 131 return self.current_test 132 133 134class TestDescription(object): 135 def __init__(self, class_name, test_name): 136 self.class_name = class_name 137 self.test_name = test_name 138 139 def __eq__(self, other): 140 return self.class_name == other.class_name and \ 141 self.test_name == other.test_name 142 143 @classmethod 144 def remove_test(cls, tests, execute_tests): 145 for execute_test in execute_tests: 146 if execute_test in tests: 147 tests.remove(execute_test) 148 return tests 149 150 151@Plugin(type=Plugin.LISTENER, id=ListenerType.log) 152class LogListener(IListener): 153 """ 154 Listener test status information to the console and log 155 """ 156 test_num = 0 157 device_sn = "" 158 log_queue = LogQueue(log=LOG) 159 160 def __started__(self, lifecycle, test_result): 161 if check_pub_key_exist(): 162 return 163 if lifecycle == LifeCycle.TestSuite: 164 self.log_queue.debug(log_data="Start test suite [{}] with {} tests" 165 .format(test_result.suite_name, test_result.test_num)) 166 self.test_num = test_result.test_num 167 elif lifecycle == LifeCycle.TestCase: 168 self.log_queue.debug(log_data="TestStarted({}#{})" 169 .format(test_result.test_class, test_result.test_name)) 170 171 def __ended__(self, lifecycle, test_result, **kwargs): 172 if check_pub_key_exist(): 173 return 174 175 from _core.utils import convert_serial 176 del kwargs 177 if lifecycle == LifeCycle.TestSuite: 178 self.log_queue.debug(log_data="End test suite [{}] and cost {}ms." 179 .format(test_result.suite_name, test_result.run_time)) 180 self.log_queue.clear() 181 elif lifecycle == LifeCycle.TestCase: 182 self.log_queue.debug(log_data="TestEnded({}#{})" 183 .format(test_result.test_class, test_result.test_name)) 184 ret = ResultCode(test_result.code).name 185 if self.test_num: 186 self.log_queue.debug(log_data="[{}/{} {}] {}#{} {}". 187 format(test_result.current, self.test_num, convert_serial(self.device_sn), 188 test_result.test_class, test_result.test_name, ret)) 189 else: 190 self.log_queue.debug(log_data="[{}/- {}] {}#{} {}". 191 format(test_result.current, convert_serial(self.device_sn), 192 test_result.test_class, test_result.test_name, ret)) 193 194 @staticmethod 195 def __skipped__(lifecycle, test_result, **kwargs): 196 if check_pub_key_exist(): 197 return 198 199 del kwargs 200 if lifecycle == LifeCycle.TestSuite: 201 LOG.debug("Test suite [{}] skipped".format(test_result.suite_name)) 202 elif lifecycle == LifeCycle.TestCase: 203 ret = ResultCode(test_result.code).name 204 LOG.debug("[{}] {}#{}".format(ret, test_result.test_class, 205 test_result.test_name)) 206 207 @staticmethod 208 def __failed__(lifecycle, test_result, **kwargs): 209 pass 210 211 212@Plugin(type=Plugin.LISTENER, id=ListenerType.report) 213class ReportListener(IListener): 214 """ 215 Listener test status information to the console 216 """ 217 218 def __init__(self): 219 self.result = list() 220 self.suites = dict() 221 self.tests = dict() 222 self.current_suite_id = 0 223 self.current_test_id = 0 224 self.report_path = "" 225 226 def _get_suite_result(self, test_result, create=False): 227 if test_result.index in self.suites: 228 return self.suites.get(test_result.index) 229 elif create: 230 suite = SuiteResult() 231 rid = uuid.uuid4().hex if test_result.index == "" else \ 232 test_result.index 233 suite.index = rid 234 return self.suites.setdefault(rid, suite) 235 else: 236 return self.suites.get(self.current_suite_id) 237 238 def _get_test_result(self, test_result, create=False): 239 if test_result.index in self.tests: 240 return self.tests.get(test_result.index) 241 elif create: 242 test = CaseResult() 243 rid = uuid.uuid4().hex if test_result.index == "" else \ 244 test_result.index 245 test.index = rid 246 return self.tests.setdefault(rid, test) 247 else: 248 return self.tests.get(self.current_test_id) 249 250 def _remove_current_test_result(self): 251 if self.current_test_id in self.tests: 252 del self.tests[self.current_test_id] 253 254 def __started__(self, lifecycle, test_result): 255 if lifecycle == LifeCycle.TestSuites: 256 suites = self._get_suite_result(test_result=test_result, 257 create=True) 258 suites.suites_name = test_result.suites_name 259 suites.test_num = test_result.test_num 260 self.current_suite_id = suites.index 261 elif lifecycle == LifeCycle.TestSuite: 262 suite = self._get_suite_result(test_result=test_result, 263 create=True) 264 suite.suite_name = test_result.suite_name 265 suite.test_num = test_result.test_num 266 self.current_suite_id = suite.index 267 elif lifecycle == LifeCycle.TestCase: 268 test = self._get_test_result(test_result=test_result, create=True) 269 test.test_name = test_result.test_name 270 test.test_class = test_result.test_class 271 self.current_test_id = test.index 272 273 def __ended__(self, lifecycle, test_result=None, **kwargs): 274 if lifecycle == LifeCycle.TestSuite: 275 suite = self._get_suite_result(test_result=test_result, 276 create=False) 277 if not suite: 278 return 279 suite.run_time = test_result.run_time 280 suite.code = test_result.code 281 is_clear = kwargs.get("is_clear", False) 282 suite.test_num = max(test_result.test_num, len(self.tests)) 283 # generate suite report 284 if not kwargs.get("suite_report", False): 285 if len(self.result) > 0 and self.result[-1][0].suite_name == \ 286 self.suites[suite.index].suite_name: 287 self.result[-1][1].extend(list(self.tests.values())) 288 self.result[-1][0].test_num = max(suite.test_num, 289 len(self.result[-1][1])) 290 else: 291 self.result.append((self.suites[suite.index], 292 list(self.tests.values()))) 293 else: 294 result_dir = os.path.join(self.report_path, "result") 295 os.makedirs(result_dir, exist_ok=True) 296 self.result.append((self.suites[suite.index], 297 list(self.tests.values()))) 298 results = [(suite, list(self.tests.values()))] 299 suite_report = SuiteReporter(results, suite.suite_name, 300 result_dir) 301 suite_report.generate_data_report() 302 if is_clear: 303 self.tests.clear() 304 elif lifecycle == LifeCycle.TestSuites: 305 if not kwargs.get("suite_report", False): 306 result_dir = os.path.join(self.report_path, "result") 307 os.makedirs(result_dir, exist_ok=True) 308 suites_name = kwargs.get("suites_name", "") 309 product_info = kwargs.get("product_info", "") 310 suite_report = SuiteReporter(self.result, suites_name, 311 result_dir, 312 product_info=product_info) 313 suite_report.generate_data_report() 314 elif lifecycle == LifeCycle.TestCase: 315 test = self._get_test_result(test_result=test_result, create=False) 316 test.run_time = test_result.run_time 317 test.stacktrace = test_result.stacktrace 318 test.code = test_result.code 319 elif lifecycle == LifeCycle.TestTask: 320 test_type = str(kwargs.get("test_type", TestType.all)) 321 reporter = get_plugin(plugin_type=Plugin.REPORTER, 322 plugin_id=test_type) 323 if not reporter: 324 reporter = get_plugin(plugin_type=Plugin.REPORTER, 325 plugin_id=TestType.all)[0] 326 else: 327 reporter = reporter[0] 328 reporter.__generate_reports__(self.report_path, 329 task_info=test_result) 330 331 def __skipped__(self, lifecycle, test_result): 332 del test_result 333 if lifecycle == LifeCycle.TestCase: 334 self._remove_current_test_result() 335 336 def __failed__(self, lifecycle, test_result): 337 if lifecycle == LifeCycle.TestSuite: 338 suite = self._get_suite_result(test_result=test_result, 339 create=False) 340 suite.stacktrace = test_result.stacktrace 341 suite.code = ResultCode.FAILED.value 342 elif lifecycle == LifeCycle.TestCase: 343 test = self._get_test_result(test_result=test_result, create=False) 344 test.stacktrace = test_result.stacktrace 345 test.code = ResultCode.FAILED.value 346 347 348@Plugin(type=Plugin.LISTENER, id=ListenerType.upload) 349class UploadListener(IListener): 350 def __started__(self, lifecycle, test_result): 351 pass 352 353 @staticmethod 354 def __ended__(lifecycle, test_result, **kwargs): 355 del test_result, kwargs 356 if lifecycle == LifeCycle.TestCase: 357 pass 358 359 @staticmethod 360 def __skipped__(lifecycle, test_result, **kwargs): 361 pass 362 363 @staticmethod 364 def __failed__(lifecycle, test_result, **kwargs): 365 pass 366 367 368@Plugin(type=Plugin.LISTENER, id=ListenerType.collect) 369class CollectingTestListener(IListener): 370 """ 371 Listener test status information to the console 372 """ 373 374 def __init__(self): 375 self.tests = [] 376 377 def __started__(self, lifecycle, test_result): 378 if lifecycle == LifeCycle.TestCase: 379 if not test_result.test_class or not test_result.test_name: 380 return 381 test = TestDescription(test_result.test_class, 382 test_result.test_name) 383 if test not in self.tests: 384 self.tests.append(test) 385 386 def __ended__(self, lifecycle, test_result=None, **kwargs): 387 pass 388 389 def __skipped__(self, lifecycle, test_result): 390 pass 391 392 def __failed__(self, lifecycle, test_result): 393 pass 394 395 def get_current_run_results(self): 396 return self.tests 397 398 399@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 400class CollectingLiteGTestListener(IListener): 401 """ 402 Listener test status information to the console 403 """ 404 405 def __init__(self): 406 self.tests = [] 407 408 def __started__(self, lifecycle, test_result): 409 if lifecycle == LifeCycle.TestCase: 410 if not test_result.test_class or not test_result.test_name: 411 return 412 test = TestDescription(test_result.test_class, 413 test_result.test_name) 414 if test not in self.tests: 415 self.tests.append(test) 416 417 def __ended__(self, lifecycle, test_result=None, **kwargs): 418 pass 419 420 def __skipped__(self, lifecycle, test_result): 421 pass 422 423 def __failed__(self, lifecycle, test_result): 424 if lifecycle == LifeCycle.TestCase: 425 if not test_result.test_class or not test_result.test_name: 426 return 427 test = TestDescription(test_result.test_class, 428 test_result.test_name) 429 if test not in self.tests: 430 self.tests.append(test) 431 432 def get_current_run_results(self): 433 return self.tests 434