1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import uuid 21from dataclasses import dataclass 22 23from _core.plugin import Plugin 24from _core.plugin import get_plugin 25from _core.constants import ListenerType 26from _core.constants import TestType 27from _core.interface import LifeCycle 28from _core.interface import IListener 29from _core.logger import platform_logger 30from _core.report.suite_reporter import SuiteReporter 31from _core.report.suite_reporter import ResultCode 32from _core.report.encrypt import check_pub_key_exist 33 34__all__ = ["LogListener", "ReportListener", "UploadListener", 35 "CollectingTestListener", "CollectingLiteGTestListener", 36 "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder", 37 "TestDescription"] 38 39LOG = platform_logger("Listener") 40 41 42@dataclass 43class CaseResult: 44 index = "" 45 code = ResultCode.FAILED.value 46 test_name = None 47 test_class = None 48 stacktrace = "" 49 run_time = 0 50 is_completed = False 51 num_tests = 0 52 current = 0 53 54 def is_running(self): 55 return self.test_name is not None and not self.is_completed 56 57 58@dataclass 59class SuiteResult: 60 index = "" 61 code = ResultCode.UNKNOWN.value 62 suite_name = None 63 test_num = 0 64 stacktrace = "" 65 run_time = 0 66 is_completed = False 67 is_started = False 68 suite_num = 0 69 70 71@dataclass 72class SuitesResult: 73 index = "" 74 code = ResultCode.UNKNOWN.value 75 suites_name = None 76 test_num = 0 77 stacktrace = "" 78 run_time = 0 79 is_completed = False 80 product_info = {} 81 82 83@dataclass 84class StateRecorder: 85 current_suite = None 86 current_suites = None 87 current_test = None 88 trace_logs = [] 89 running_test_index = 0 90 91 def is_started(self): 92 return self.current_suite is not None 93 94 def suites_is_started(self): 95 return self.current_suites is not None 96 97 def suite_is_running(self): 98 suite = self.current_suite 99 return suite is not None and suite.suite_name is not None and \ 100 not suite.is_completed 101 102 def suites_is_running(self): 103 suites = self.current_suites 104 return suites is not None and suites.suites_name is not None and \ 105 not suites.is_completed 106 107 def test_is_running(self): 108 test = self.current_test 109 return test is not None and test.is_running() 110 111 def suite(self, reset=False): 112 if reset or not self.current_suite: 113 self.current_suite = SuiteResult() 114 self.current_suite.index = uuid.uuid4().hex 115 return self.current_suite 116 117 def get_suites(self, reset=False): 118 if reset or not self.current_suites: 119 self.current_suites = SuitesResult() 120 self.current_suites.index = uuid.uuid4().hex 121 return self.current_suites 122 123 def test(self, reset=False, test_index=None): 124 if reset or not self.current_test: 125 self.current_test = CaseResult() 126 if test_index: 127 self.current_test.index = test_index 128 else: 129 self.current_test.index = uuid.uuid4().hex 130 return self.current_test 131 132 133class TestDescription(object): 134 def __init__(self, class_name, test_name): 135 self.class_name = class_name 136 self.test_name = test_name 137 138 def __eq__(self, other): 139 return self.class_name == other.class_name and \ 140 self.test_name == other.test_name 141 142 @classmethod 143 def remove_test(cls, tests, execute_tests): 144 for execute_test in execute_tests: 145 if execute_test in tests: 146 tests.remove(execute_test) 147 return tests 148 149 150@Plugin(type=Plugin.LISTENER, id=ListenerType.log) 151class LogListener(IListener): 152 """ 153 Listener test status information to the console and log 154 """ 155 test_num = 0 156 device_sn = "" 157 158 def __started__(self, lifecycle, test_result): 159 if check_pub_key_exist(): 160 return 161 if lifecycle == LifeCycle.TestSuite: 162 LOG.debug("Start test suite [{}] with {} tests" 163 .format(test_result.suite_name, test_result.test_num)) 164 self.test_num = test_result.test_num 165 elif lifecycle == LifeCycle.TestCase: 166 LOG.debug("TestStarted({}#{})" 167 .format(test_result.test_class, test_result.test_name)) 168 169 def __ended__(self, lifecycle, test_result, **kwargs): 170 if check_pub_key_exist(): 171 return 172 173 from _core.utils import convert_serial 174 del kwargs 175 if lifecycle == LifeCycle.TestSuite: 176 LOG.debug("End test suite cost {}ms." 177 .format(test_result.run_time)) 178 LOG.info("End test suite [{}]." 179 .format(test_result.suite_name)) 180 elif lifecycle == LifeCycle.TestCase: 181 LOG.debug("TestEnded({}#{})" 182 .format(test_result.test_class, test_result.test_name)) 183 ret = ResultCode(test_result.code).name 184 if self.test_num: 185 LOG.info("[{}/{} {}] {}#{} {}" 186 .format(test_result.current, self.test_num, 187 convert_serial(self.device_sn), test_result.test_class, 188 test_result.test_name, ret)) 189 else: 190 LOG.info("[{}/- {}] {}#{} {}" 191 .format(test_result.current, convert_serial(self.device_sn), 192 test_result.test_class, test_result.test_name, ret)) 193 194 @staticmethod 195 def __skipped__(lifecycle, test_result, **kwargs): 196 if check_pub_key_exist(): 197 return 198 199 del kwargs 200 if lifecycle == LifeCycle.TestSuite: 201 LOG.debug("Test suite [{}] skipped".format(test_result.suite_name)) 202 elif lifecycle == LifeCycle.TestCase: 203 ret = ResultCode(test_result.code).name 204 LOG.debug("[{}] {}#{}".format(ret, test_result.test_class, 205 test_result.test_name)) 206 207 @staticmethod 208 def __failed__(lifecycle, test_result, **kwargs): 209 pass 210 211 212@Plugin(type=Plugin.LISTENER, id=ListenerType.report) 213class ReportListener(IListener): 214 """ 215 Listener test status information to the console 216 """ 217 218 def __init__(self): 219 self.result = list() 220 self.suites = dict() 221 self.tests = dict() 222 self.current_suite_id = 0 223 self.current_test_id = 0 224 self.report_path = "" 225 226 def _get_suite_result(self, test_result, create=False): 227 if test_result.index in self.suites: 228 return self.suites.get(test_result.index) 229 elif create: 230 suite = SuiteResult() 231 rid = uuid.uuid4().hex if test_result.index == "" else \ 232 test_result.index 233 suite.index = rid 234 return self.suites.setdefault(rid, suite) 235 else: 236 return self.suites.get(self.current_suite_id) 237 238 def _get_test_result(self, test_result, create=False): 239 if test_result.index in self.tests: 240 return self.tests.get(test_result.index) 241 elif create: 242 test = CaseResult() 243 rid = uuid.uuid4().hex if test_result.index == "" else \ 244 test_result.index 245 test.index = rid 246 return self.tests.setdefault(rid, test) 247 else: 248 return self.tests.get(self.current_test_id) 249 250 def _remove_current_test_result(self): 251 if self.current_test_id in self.tests: 252 del self.tests[self.current_test_id] 253 254 def __started__(self, lifecycle, test_result): 255 if lifecycle == LifeCycle.TestSuites: 256 suites = self._get_suite_result(test_result=test_result, 257 create=True) 258 suites.suites_name = test_result.suites_name 259 suites.test_num = test_result.test_num 260 self.current_suite_id = suites.index 261 elif lifecycle == LifeCycle.TestSuite: 262 suite = self._get_suite_result(test_result=test_result, 263 create=True) 264 suite.suite_name = test_result.suite_name 265 suite.test_num = test_result.test_num 266 self.current_suite_id = suite.index 267 elif lifecycle == LifeCycle.TestCase: 268 test = self._get_test_result(test_result=test_result, create=True) 269 test.test_name = test_result.test_name 270 test.test_class = test_result.test_class 271 self.current_test_id = test.index 272 273 def __ended__(self, lifecycle, test_result=None, **kwargs): 274 if lifecycle == LifeCycle.TestSuite: 275 suite = self._get_suite_result(test_result=test_result, 276 create=False) 277 if not suite: 278 return 279 suite.run_time = test_result.run_time 280 suite.code = test_result.code 281 is_clear = kwargs.get("is_clear", False) 282 suite.test_num = max(test_result.test_num, len(self.tests)) 283 # generate suite report 284 if not kwargs.get("suite_report", False): 285 if len(self.result) > 0 and self.result[-1][0].suite_name == \ 286 self.suites[suite.index].suite_name: 287 self.result[-1][1].extend(list(self.tests.values())) 288 self.result[-1][0].test_num = max(suite.test_num, 289 len(self.result[-1][1])) 290 else: 291 self.result.append((self.suites[suite.index], 292 list(self.tests.values()))) 293 else: 294 result_dir = os.path.join(self.report_path, "result") 295 os.makedirs(result_dir, exist_ok=True) 296 self.result.append((self.suites[suite.index], 297 list(self.tests.values()))) 298 results = [(suite, list(self.tests.values()))] 299 suite_report = SuiteReporter(results, suite.suite_name, 300 result_dir) 301 suite_report.generate_data_report() 302 if is_clear: 303 self.tests.clear() 304 elif lifecycle == LifeCycle.TestSuites: 305 if not kwargs.get("suite_report", False): 306 result_dir = os.path.join(self.report_path, "result") 307 os.makedirs(result_dir, exist_ok=True) 308 suites_name = kwargs.get("suites_name", "") 309 product_info = kwargs.get("product_info", "") 310 suite_report = SuiteReporter(self.result, suites_name, 311 result_dir, 312 product_info=product_info) 313 suite_report.generate_data_report() 314 elif lifecycle == LifeCycle.TestCase: 315 test = self._get_test_result(test_result=test_result, create=False) 316 test.run_time = test_result.run_time 317 test.stacktrace = test_result.stacktrace 318 test.code = test_result.code 319 elif lifecycle == LifeCycle.TestTask: 320 test_type = str(kwargs.get("test_type", TestType.all)) 321 reporter = get_plugin(plugin_type=Plugin.REPORTER, 322 plugin_id=test_type) 323 if not reporter: 324 reporter = get_plugin(plugin_type=Plugin.REPORTER, 325 plugin_id=TestType.all)[0] 326 else: 327 reporter = reporter[0] 328 reporter.__generate_reports__(self.report_path, 329 task_info=test_result) 330 331 def __skipped__(self, lifecycle, test_result): 332 if lifecycle == LifeCycle.TestCase: 333 test = self._get_test_result(test_result=test_result, create=False) 334 test.stacktrace = test_result.stacktrace 335 test.code = ResultCode.SKIPPED.value 336 337 def __failed__(self, lifecycle, test_result): 338 if lifecycle == LifeCycle.TestSuite: 339 suite = self._get_suite_result(test_result=test_result, 340 create=False) 341 suite.stacktrace = test_result.stacktrace 342 suite.code = ResultCode.FAILED.value 343 elif lifecycle == LifeCycle.TestCase: 344 test = self._get_test_result(test_result=test_result, create=False) 345 test.stacktrace = test_result.stacktrace 346 test.code = ResultCode.FAILED.value 347 348 349@Plugin(type=Plugin.LISTENER, id=ListenerType.upload) 350class UploadListener(IListener): 351 def __started__(self, lifecycle, test_result): 352 pass 353 354 @staticmethod 355 def __ended__(lifecycle, test_result, **kwargs): 356 del test_result, kwargs 357 if lifecycle == LifeCycle.TestCase: 358 pass 359 360 @staticmethod 361 def __skipped__(lifecycle, test_result, **kwargs): 362 pass 363 364 @staticmethod 365 def __failed__(lifecycle, test_result, **kwargs): 366 pass 367 368 369@Plugin(type=Plugin.LISTENER, id=ListenerType.collect) 370class CollectingTestListener(IListener): 371 """ 372 Listener test status information to the console 373 """ 374 375 def __init__(self): 376 self.tests = [] 377 378 def __started__(self, lifecycle, test_result): 379 if lifecycle == LifeCycle.TestCase: 380 if not test_result.test_class or not test_result.test_name: 381 return 382 test = TestDescription(test_result.test_class, 383 test_result.test_name) 384 if test not in self.tests: 385 self.tests.append(test) 386 387 def __ended__(self, lifecycle, test_result=None, **kwargs): 388 pass 389 390 def __skipped__(self, lifecycle, test_result): 391 pass 392 393 def __failed__(self, lifecycle, test_result): 394 pass 395 396 def get_current_run_results(self): 397 return self.tests 398 399 400@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 401class CollectingLiteGTestListener(IListener): 402 """ 403 Listener test status information to the console 404 """ 405 406 def __init__(self): 407 self.tests = [] 408 409 def __started__(self, lifecycle, test_result): 410 if lifecycle == LifeCycle.TestCase: 411 if not test_result.test_class or not test_result.test_name: 412 return 413 test = TestDescription(test_result.test_class, 414 test_result.test_name) 415 if test not in self.tests: 416 self.tests.append(test) 417 418 def __ended__(self, lifecycle, test_result=None, **kwargs): 419 pass 420 421 def __skipped__(self, lifecycle, test_result): 422 pass 423 424 def __failed__(self, lifecycle, test_result): 425 if lifecycle == LifeCycle.TestCase: 426 if not test_result.test_class or not test_result.test_name: 427 return 428 test = TestDescription(test_result.test_class, 429 test_result.test_name) 430 if test not in self.tests: 431 self.tests.append(test) 432 433 def get_current_run_results(self): 434 return self.tests 435