1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import uuid 21from dataclasses import dataclass 22 23from _core.plugin import Plugin 24from _core.plugin import get_plugin 25from _core.constants import ListenerType 26from _core.constants import TestType 27from _core.interface import LifeCycle 28from _core.interface import IListener 29from _core.logger import platform_logger 30from _core.logger import LogQueue 31from _core.report.suite_reporter import SuiteReporter 32from _core.report.suite_reporter import ResultCode 33from _core.report.encrypt import check_pub_key_exist 34 35__all__ = ["LogListener", "ReportListener", "UploadListener", 36 "CollectingTestListener", "CollectingLiteGTestListener", 37 "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder", 38 "TestDescription"] 39 40LOG = platform_logger("Listener") 41 42 43@dataclass 44class CaseResult: 45 index = "" 46 code = ResultCode.FAILED.value 47 test_name = None 48 test_class = None 49 stacktrace = "" 50 run_time = 0 51 is_completed = False 52 num_tests = 0 53 current = 0 54 55 def is_running(self): 56 return self.test_name is not None and not self.is_completed 57 58 59@dataclass 60class SuiteResult: 61 index = "" 62 code = ResultCode.UNKNOWN.value 63 suite_name = None 64 test_num = 0 65 stacktrace = "" 66 run_time = 0 67 is_completed = False 68 is_started = False 69 suite_num = 0 70 71 72@dataclass 73class SuitesResult: 74 index = "" 75 code = ResultCode.UNKNOWN.value 76 suites_name = None 77 test_num = 0 78 stacktrace = "" 79 run_time = 0 80 is_completed = False 81 product_info = {} 82 83 84@dataclass 85class StateRecorder: 86 current_suite = None 87 current_suites = None 88 current_test = None 89 trace_logs = [] 90 running_test_index = 0 91 92 def is_started(self): 93 return self.current_suite is not None 94 95 def suites_is_started(self): 96 return self.current_suites is not None 97 98 def suite_is_running(self): 99 suite = self.current_suite 100 return suite is not None and suite.suite_name is not None and \ 101 not suite.is_completed 102 103 def suites_is_running(self): 104 suites = self.current_suites 105 return suites is not None and suites.suites_name is not None and \ 106 not suites.is_completed 107 108 def test_is_running(self): 109 test = self.current_test 110 return test is not None and test.is_running() 111 112 def suite(self, reset=False): 113 if reset or not self.current_suite: 114 self.current_suite = SuiteResult() 115 self.current_suite.index = uuid.uuid4().hex 116 return self.current_suite 117 118 def get_suites(self, reset=False): 119 if reset or not self.current_suites: 120 self.current_suites = SuitesResult() 121 self.current_suites.index = uuid.uuid4().hex 122 return self.current_suites 123 124 def test(self, reset=False, test_index=None): 125 if reset or not self.current_test: 126 self.current_test = CaseResult() 127 if test_index: 128 self.current_test.index = test_index 129 else: 130 self.current_test.index = uuid.uuid4().hex 131 return self.current_test 132 133 134class TestDescription(object): 135 def __init__(self, class_name, test_name): 136 self.class_name = class_name 137 self.test_name = test_name 138 139 def __eq__(self, other): 140 return self.class_name == other.class_name and \ 141 self.test_name == other.test_name 142 143 @classmethod 144 def remove_test(cls, tests, execute_tests): 145 for execute_test in execute_tests: 146 if execute_test in tests: 147 tests.remove(execute_test) 148 return tests 149 150 151@Plugin(type=Plugin.LISTENER, id=ListenerType.log) 152class LogListener(IListener): 153 """ 154 Listener test status information to the console and log 155 """ 156 test_num = 0 157 device_sn = "" 158 log_queue = LogQueue(log=LOG) 159 160 def __started__(self, lifecycle, test_result): 161 if check_pub_key_exist(): 162 return 163 if lifecycle == LifeCycle.TestSuite: 164 self.log_queue.debug(log_data="Start test suite [{}] with {} tests" 165 .format(test_result.suite_name, test_result.test_num)) 166 self.test_num = test_result.test_num 167 elif lifecycle == LifeCycle.TestCase: 168 self.log_queue.debug(log_data="TestStarted({}#{})" 169 .format(test_result.test_class, test_result.test_name)) 170 171 def __ended__(self, lifecycle, test_result, **kwargs): 172 if check_pub_key_exist(): 173 return 174 175 from _core.utils import convert_serial 176 del kwargs 177 if lifecycle == LifeCycle.TestSuite: 178 self.log_queue.debug(log_data="End test suite cost {}ms." 179 .format(test_result.run_time), clear=True) 180 self.log_queue.info(log_data="End test suite [{}]." 181 .format(test_result.suite_name), clear=True) 182 elif lifecycle == LifeCycle.TestCase: 183 self.log_queue.debug(log_data="TestEnded({}#{})" 184 .format(test_result.test_class, test_result.test_name)) 185 ret = ResultCode(test_result.code).name 186 if self.test_num: 187 self.log_queue.info(log_data="[{}/{} {}] {}#{} {}". 188 format(test_result.current, self.test_num, convert_serial(self.device_sn), 189 test_result.test_class, test_result.test_name, ret)) 190 else: 191 self.log_queue.info(log_data="[{}/- {}] {}#{} {}". 192 format(test_result.current, convert_serial(self.device_sn), 193 test_result.test_class, test_result.test_name, ret)) 194 195 @staticmethod 196 def __skipped__(lifecycle, test_result, **kwargs): 197 if check_pub_key_exist(): 198 return 199 200 del kwargs 201 if lifecycle == LifeCycle.TestSuite: 202 LOG.debug("Test suite [{}] skipped".format(test_result.suite_name)) 203 elif lifecycle == LifeCycle.TestCase: 204 ret = ResultCode(test_result.code).name 205 LOG.debug("[{}] {}#{}".format(ret, test_result.test_class, 206 test_result.test_name)) 207 208 @staticmethod 209 def __failed__(lifecycle, test_result, **kwargs): 210 pass 211 212 213@Plugin(type=Plugin.LISTENER, id=ListenerType.report) 214class ReportListener(IListener): 215 """ 216 Listener test status information to the console 217 """ 218 219 def __init__(self): 220 self.result = list() 221 self.suites = dict() 222 self.tests = dict() 223 self.current_suite_id = 0 224 self.current_test_id = 0 225 self.report_path = "" 226 227 def _get_suite_result(self, test_result, create=False): 228 if test_result.index in self.suites: 229 return self.suites.get(test_result.index) 230 elif create: 231 suite = SuiteResult() 232 rid = uuid.uuid4().hex if test_result.index == "" else \ 233 test_result.index 234 suite.index = rid 235 return self.suites.setdefault(rid, suite) 236 else: 237 return self.suites.get(self.current_suite_id) 238 239 def _get_test_result(self, test_result, create=False): 240 if test_result.index in self.tests: 241 return self.tests.get(test_result.index) 242 elif create: 243 test = CaseResult() 244 rid = uuid.uuid4().hex if test_result.index == "" else \ 245 test_result.index 246 test.index = rid 247 return self.tests.setdefault(rid, test) 248 else: 249 return self.tests.get(self.current_test_id) 250 251 def _remove_current_test_result(self): 252 if self.current_test_id in self.tests: 253 del self.tests[self.current_test_id] 254 255 def __started__(self, lifecycle, test_result): 256 if lifecycle == LifeCycle.TestSuites: 257 suites = self._get_suite_result(test_result=test_result, 258 create=True) 259 suites.suites_name = test_result.suites_name 260 suites.test_num = test_result.test_num 261 self.current_suite_id = suites.index 262 elif lifecycle == LifeCycle.TestSuite: 263 suite = self._get_suite_result(test_result=test_result, 264 create=True) 265 suite.suite_name = test_result.suite_name 266 suite.test_num = test_result.test_num 267 self.current_suite_id = suite.index 268 elif lifecycle == LifeCycle.TestCase: 269 test = self._get_test_result(test_result=test_result, create=True) 270 test.test_name = test_result.test_name 271 test.test_class = test_result.test_class 272 self.current_test_id = test.index 273 274 def __ended__(self, lifecycle, test_result=None, **kwargs): 275 if lifecycle == LifeCycle.TestSuite: 276 suite = self._get_suite_result(test_result=test_result, 277 create=False) 278 if not suite: 279 return 280 suite.run_time = test_result.run_time 281 suite.code = test_result.code 282 is_clear = kwargs.get("is_clear", False) 283 suite.test_num = max(test_result.test_num, len(self.tests)) 284 # generate suite report 285 if not kwargs.get("suite_report", False): 286 if len(self.result) > 0 and self.result[-1][0].suite_name == \ 287 self.suites[suite.index].suite_name: 288 self.result[-1][1].extend(list(self.tests.values())) 289 self.result[-1][0].test_num = max(suite.test_num, 290 len(self.result[-1][1])) 291 else: 292 self.result.append((self.suites[suite.index], 293 list(self.tests.values()))) 294 else: 295 result_dir = os.path.join(self.report_path, "result") 296 os.makedirs(result_dir, exist_ok=True) 297 self.result.append((self.suites[suite.index], 298 list(self.tests.values()))) 299 results = [(suite, list(self.tests.values()))] 300 suite_report = SuiteReporter(results, suite.suite_name, 301 result_dir) 302 suite_report.generate_data_report() 303 if is_clear: 304 self.tests.clear() 305 elif lifecycle == LifeCycle.TestSuites: 306 if not kwargs.get("suite_report", False): 307 result_dir = os.path.join(self.report_path, "result") 308 os.makedirs(result_dir, exist_ok=True) 309 suites_name = kwargs.get("suites_name", "") 310 product_info = kwargs.get("product_info", "") 311 suite_report = SuiteReporter(self.result, suites_name, 312 result_dir, 313 product_info=product_info) 314 suite_report.generate_data_report() 315 elif lifecycle == LifeCycle.TestCase: 316 test = self._get_test_result(test_result=test_result, create=False) 317 test.run_time = test_result.run_time 318 test.stacktrace = test_result.stacktrace 319 test.code = test_result.code 320 elif lifecycle == LifeCycle.TestTask: 321 test_type = str(kwargs.get("test_type", TestType.all)) 322 reporter = get_plugin(plugin_type=Plugin.REPORTER, 323 plugin_id=test_type) 324 if not reporter: 325 reporter = get_plugin(plugin_type=Plugin.REPORTER, 326 plugin_id=TestType.all)[0] 327 else: 328 reporter = reporter[0] 329 reporter.__generate_reports__(self.report_path, 330 task_info=test_result) 331 332 def __skipped__(self, lifecycle, test_result): 333 del test_result 334 if lifecycle == LifeCycle.TestCase: 335 self._remove_current_test_result() 336 337 def __failed__(self, lifecycle, test_result): 338 if lifecycle == LifeCycle.TestSuite: 339 suite = self._get_suite_result(test_result=test_result, 340 create=False) 341 suite.stacktrace = test_result.stacktrace 342 suite.code = ResultCode.FAILED.value 343 elif lifecycle == LifeCycle.TestCase: 344 test = self._get_test_result(test_result=test_result, create=False) 345 test.stacktrace = test_result.stacktrace 346 test.code = ResultCode.FAILED.value 347 348 349@Plugin(type=Plugin.LISTENER, id=ListenerType.upload) 350class UploadListener(IListener): 351 def __started__(self, lifecycle, test_result): 352 pass 353 354 @staticmethod 355 def __ended__(lifecycle, test_result, **kwargs): 356 del test_result, kwargs 357 if lifecycle == LifeCycle.TestCase: 358 pass 359 360 @staticmethod 361 def __skipped__(lifecycle, test_result, **kwargs): 362 pass 363 364 @staticmethod 365 def __failed__(lifecycle, test_result, **kwargs): 366 pass 367 368 369@Plugin(type=Plugin.LISTENER, id=ListenerType.collect) 370class CollectingTestListener(IListener): 371 """ 372 Listener test status information to the console 373 """ 374 375 def __init__(self): 376 self.tests = [] 377 378 def __started__(self, lifecycle, test_result): 379 if lifecycle == LifeCycle.TestCase: 380 if not test_result.test_class or not test_result.test_name: 381 return 382 test = TestDescription(test_result.test_class, 383 test_result.test_name) 384 if test not in self.tests: 385 self.tests.append(test) 386 387 def __ended__(self, lifecycle, test_result=None, **kwargs): 388 pass 389 390 def __skipped__(self, lifecycle, test_result): 391 pass 392 393 def __failed__(self, lifecycle, test_result): 394 pass 395 396 def get_current_run_results(self): 397 return self.tests 398 399 400@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 401class CollectingLiteGTestListener(IListener): 402 """ 403 Listener test status information to the console 404 """ 405 406 def __init__(self): 407 self.tests = [] 408 409 def __started__(self, lifecycle, test_result): 410 if lifecycle == LifeCycle.TestCase: 411 if not test_result.test_class or not test_result.test_name: 412 return 413 test = TestDescription(test_result.test_class, 414 test_result.test_name) 415 if test not in self.tests: 416 self.tests.append(test) 417 418 def __ended__(self, lifecycle, test_result=None, **kwargs): 419 pass 420 421 def __skipped__(self, lifecycle, test_result): 422 pass 423 424 def __failed__(self, lifecycle, test_result): 425 if lifecycle == LifeCycle.TestCase: 426 if not test_result.test_class or not test_result.test_name: 427 return 428 test = TestDescription(test_result.test_class, 429 test_result.test_name) 430 if test not in self.tests: 431 self.tests.append(test) 432 433 def get_current_run_results(self): 434 return self.tests 435