1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import uuid 21from dataclasses import dataclass 22 23from _core.plugin import Plugin 24from _core.plugin import get_plugin 25from _core.constants import ListenerType 26from _core.constants import TestType 27from _core.interface import LifeCycle 28from _core.interface import IListener 29from _core.logger import platform_logger 30from _core.report.suite_reporter import SuiteReporter 31from _core.report.suite_reporter import ResultCode 32from _core.report.encrypt import check_pub_key_exist 33 34__all__ = ["LogListener", "ReportListener", "UploadListener", 35 "CollectingTestListener", "CollectingLiteGTestListener", 36 "CaseResult", "SuiteResult", "SuitesResult", "StateRecorder", 37 "TestDescription"] 38 39LOG = platform_logger("Listener") 40 41 42@dataclass 43class CaseResult: 44 index = "" 45 code = ResultCode.FAILED.value 46 test_name = None 47 test_class = None 48 stacktrace = "" 49 run_time = 0 50 is_completed = False 51 num_tests = 0 52 current = 0 53 54 def is_running(self): 55 return self.test_name is not None and not self.is_completed 56 57 58@dataclass 59class SuiteResult: 60 index = "" 61 code = ResultCode.UNKNOWN.value 62 suite_name = None 63 test_num = 0 64 stacktrace = "" 65 run_time = 0 66 is_completed = False 67 is_started = False 68 suite_num = 0 69 70 71@dataclass 72class SuitesResult: 73 index = "" 74 code = ResultCode.UNKNOWN.value 75 suites_name = None 76 test_num = 0 77 stacktrace = "" 78 run_time = 0 79 is_completed = False 80 product_info = {} 81 82 83@dataclass 84class StateRecorder: 85 current_suite = None 86 current_suites = None 87 current_test = None 88 trace_logs = [] 89 running_test_index = 0 90 91 def is_started(self): 92 return self.current_suite is not None 93 94 def suites_is_started(self): 95 return self.current_suites is not None 96 97 def suite_is_running(self): 98 suite = self.current_suite 99 return suite is not None and suite.suite_name is not None and \ 100 not suite.is_completed 101 102 def suites_is_running(self): 103 suites = self.current_suites 104 return suites is not None and suites.suites_name is not None and \ 105 not suites.is_completed 106 107 def test_is_running(self): 108 test = self.current_test 109 return test is not None and test.is_running() 110 111 def suite(self, reset=False): 112 if reset or not self.current_suite: 113 self.current_suite = SuiteResult() 114 self.current_suite.index = uuid.uuid4().hex 115 return self.current_suite 116 117 def get_suites(self, reset=False): 118 if reset or not self.current_suites: 119 self.current_suites = SuitesResult() 120 self.current_suites.index = uuid.uuid4().hex 121 return self.current_suites 122 123 def test(self, reset=False, test_index=None): 124 if reset or not self.current_test: 125 self.current_test = CaseResult() 126 if test_index: 127 self.current_test.index = test_index 128 else: 129 self.current_test.index = uuid.uuid4().hex 130 return self.current_test 131 132 133class TestDescription(object): 134 def __init__(self, class_name, test_name): 135 self.class_name = class_name 136 self.test_name = test_name 137 138 def __eq__(self, other): 139 return self.class_name == other.class_name and \ 140 self.test_name == other.test_name 141 142 @classmethod 143 def remove_test(cls, tests, execute_tests): 144 for execute_test in execute_tests: 145 if execute_test in tests: 146 tests.remove(execute_test) 147 return tests 148 149 150@Plugin(type=Plugin.LISTENER, id=ListenerType.log) 151class LogListener(IListener): 152 """ 153 listener test status information to the console and log 154 """ 155 test_num = 0 156 device_sn = "" 157 158 def __started__(self, lifecycle, test_result): 159 if check_pub_key_exist(): 160 return 161 if lifecycle == LifeCycle.TestSuite: 162 LOG.debug("Start test suite [%s] with %s tests" 163 % (test_result.suite_name, test_result.test_num)) 164 self.test_num = test_result.test_num 165 elif lifecycle == LifeCycle.TestCase: 166 LOG.debug("testStarted(%s#%s)" % (test_result.test_class, 167 test_result.test_name)) 168 169 def __ended__(self, lifecycle, test_result, **kwargs): 170 if check_pub_key_exist(): 171 return 172 173 from _core.utils import convert_serial 174 del kwargs 175 if lifecycle == LifeCycle.TestSuite: 176 LOG.debug("End test suite [%s] and cost %sms." 177 % (test_result.suite_name, test_result.run_time)) 178 elif lifecycle == LifeCycle.TestCase: 179 LOG.debug("testEnded(%s#%s)" % (test_result.test_class, 180 test_result.test_name)) 181 ret = ResultCode(test_result.code).name 182 if self.test_num: 183 LOG.info("[%s/%s %s] %s#%s %s" % 184 (test_result.current, self.test_num, 185 convert_serial(self.device_sn), 186 test_result.test_class, test_result.test_name, ret)) 187 else: 188 LOG.info("[%s/- %s] %s#%s %s" % 189 (test_result.current, convert_serial(self.device_sn), 190 test_result.test_class, test_result.test_name, 191 ret)) 192 193 @staticmethod 194 def __skipped__(lifecycle, test_result, **kwargs): 195 if check_pub_key_exist(): 196 return 197 198 del kwargs 199 if lifecycle == LifeCycle.TestSuite: 200 LOG.debug("Test suite [{}] skipped".format(test_result.suite_name)) 201 elif lifecycle == LifeCycle.TestCase: 202 ret = ResultCode(test_result.code).name 203 LOG.debug("[{}] {}#{}".format(ret, test_result.test_class, 204 test_result.test_name)) 205 206 @staticmethod 207 def __failed__(lifecycle, test_result, **kwargs): 208 pass 209 210 211@Plugin(type=Plugin.LISTENER, id=ListenerType.report) 212class ReportListener(IListener): 213 """ 214 listener test status information to the console 215 """ 216 217 def __init__(self): 218 self.result = list() 219 self.suites = dict() 220 self.tests = dict() 221 self.current_suite_id = 0 222 self.current_test_id = 0 223 self.report_path = "" 224 225 def _get_suite_result(self, test_result, create=False): 226 if test_result.index in self.suites: 227 return self.suites.get(test_result.index) 228 elif create: 229 suite = SuiteResult() 230 rid = uuid.uuid4().hex if test_result.index == "" else \ 231 test_result.index 232 suite.index = rid 233 return self.suites.setdefault(rid, suite) 234 else: 235 return self.suites.get(self.current_suite_id) 236 237 def _get_test_result(self, test_result, create=False): 238 if test_result.index in self.tests: 239 return self.tests.get(test_result.index) 240 elif create: 241 test = CaseResult() 242 rid = uuid.uuid4().hex if test_result.index == "" else \ 243 test_result.index 244 test.index = rid 245 return self.tests.setdefault(rid, test) 246 else: 247 return self.tests.get(self.current_test_id) 248 249 def _remove_current_test_result(self): 250 if self.current_test_id in self.tests: 251 del self.tests[self.current_test_id] 252 253 def __started__(self, lifecycle, test_result): 254 if lifecycle == LifeCycle.TestSuites: 255 suites = self._get_suite_result(test_result=test_result, 256 create=True) 257 suites.suites_name = test_result.suites_name 258 suites.test_num = test_result.test_num 259 self.current_suite_id = suites.index 260 elif lifecycle == LifeCycle.TestSuite: 261 suite = self._get_suite_result(test_result=test_result, 262 create=True) 263 suite.suite_name = test_result.suite_name 264 suite.test_num = test_result.test_num 265 self.current_suite_id = suite.index 266 elif lifecycle == LifeCycle.TestCase: 267 test = self._get_test_result(test_result=test_result, create=True) 268 test.test_name = test_result.test_name 269 test.test_class = test_result.test_class 270 self.current_test_id = test.index 271 272 def __ended__(self, lifecycle, test_result=None, **kwargs): 273 if lifecycle == LifeCycle.TestSuite: 274 suite = self._get_suite_result(test_result=test_result, 275 create=False) 276 if not suite: 277 return 278 suite.run_time = test_result.run_time 279 suite.code = test_result.code 280 is_clear = kwargs.get("is_clear", False) 281 suite.test_num = max(test_result.test_num, len(self.tests)) 282 # generate suite report 283 if not kwargs.get("suite_report", False): 284 if len(self.result) > 0 and self.result[-1][0].suite_name == \ 285 self.suites[suite.index].suite_name: 286 self.result[-1][1].extend(list(self.tests.values())) 287 self.result[-1][0].test_num = max(suite.test_num, 288 len(self.result[-1][1])) 289 else: 290 self.result.append((self.suites[suite.index], 291 list(self.tests.values()))) 292 else: 293 result_dir = os.path.join(self.report_path, "result") 294 os.makedirs(result_dir, exist_ok=True) 295 self.result.append((self.suites[suite.index], 296 list(self.tests.values()))) 297 results = [(suite, list(self.tests.values()))] 298 suite_report = SuiteReporter(results, suite.suite_name, 299 result_dir) 300 suite_report.generate_data_report() 301 if is_clear: 302 self.tests.clear() 303 elif lifecycle == LifeCycle.TestSuites: 304 if not kwargs.get("suite_report", False): 305 result_dir = os.path.join(self.report_path, "result") 306 os.makedirs(result_dir, exist_ok=True) 307 suites_name = kwargs.get("suites_name", "") 308 product_info = kwargs.get("product_info", "") 309 suite_report = SuiteReporter(self.result, suites_name, 310 result_dir, 311 product_info=product_info) 312 suite_report.generate_data_report() 313 elif lifecycle == LifeCycle.TestCase: 314 test = self._get_test_result(test_result=test_result, create=False) 315 test.run_time = test_result.run_time 316 test.stacktrace = test_result.stacktrace 317 test.code = test_result.code 318 elif lifecycle == LifeCycle.TestTask: 319 test_type = str(kwargs.get("test_type", TestType.all)) 320 reporter = get_plugin(plugin_type=Plugin.REPORTER, 321 plugin_id=test_type) 322 if not reporter: 323 reporter = get_plugin(plugin_type=Plugin.REPORTER, 324 plugin_id=TestType.all)[0] 325 else: 326 reporter = reporter[0] 327 reporter.__generate_reports__(self.report_path, 328 task_info=test_result) 329 330 def __skipped__(self, lifecycle, test_result): 331 del test_result 332 if lifecycle == LifeCycle.TestCase: 333 self._remove_current_test_result() 334 335 def __failed__(self, lifecycle, test_result): 336 if lifecycle == LifeCycle.TestSuite: 337 suite = self._get_suite_result(test_result=test_result, 338 create=False) 339 suite.stacktrace = test_result.stacktrace 340 suite.code = ResultCode.FAILED.value 341 elif lifecycle == LifeCycle.TestCase: 342 test = self._get_test_result(test_result=test_result, create=False) 343 test.stacktrace = test_result.stacktrace 344 test.code = ResultCode.FAILED.value 345 346 347@Plugin(type=Plugin.LISTENER, id=ListenerType.upload) 348class UploadListener(IListener): 349 def __started__(self, lifecycle, test_result): 350 pass 351 352 @staticmethod 353 def __ended__(lifecycle, test_result, **kwargs): 354 del test_result, kwargs 355 if lifecycle == LifeCycle.TestCase: 356 pass 357 358 @staticmethod 359 def __skipped__(lifecycle, test_result, **kwargs): 360 pass 361 362 @staticmethod 363 def __failed__(lifecycle, test_result, **kwargs): 364 pass 365 366 367@Plugin(type=Plugin.LISTENER, id=ListenerType.collect) 368class CollectingTestListener(IListener): 369 """ 370 listener test status information to the console 371 """ 372 373 def __init__(self): 374 self.tests = [] 375 376 def __started__(self, lifecycle, test_result): 377 if lifecycle == LifeCycle.TestCase: 378 if not test_result.test_class or not test_result.test_name: 379 return 380 test = TestDescription(test_result.test_class, 381 test_result.test_name) 382 if test not in self.tests: 383 self.tests.append(test) 384 385 def __ended__(self, lifecycle, test_result=None, **kwargs): 386 pass 387 388 def __skipped__(self, lifecycle, test_result): 389 pass 390 391 def __failed__(self, lifecycle, test_result): 392 pass 393 394 def get_current_run_results(self): 395 return self.tests 396 397 398@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite) 399class CollectingLiteGTestListener(IListener): 400 """ 401 listener test status information to the console 402 """ 403 404 def __init__(self): 405 self.tests = [] 406 407 def __started__(self, lifecycle, test_result): 408 if lifecycle == LifeCycle.TestCase: 409 if not test_result.test_class or not test_result.test_name: 410 return 411 test = TestDescription(test_result.test_class, 412 test_result.test_name) 413 if test not in self.tests: 414 self.tests.append(test) 415 416 def __ended__(self, lifecycle, test_result=None, **kwargs): 417 pass 418 419 def __skipped__(self, lifecycle, test_result): 420 pass 421 422 def __failed__(self, lifecycle, test_result): 423 if lifecycle == LifeCycle.TestCase: 424 if not test_result.test_class or not test_result.test_name: 425 return 426 test = TestDescription(test_result.test_class, 427 test_result.test_name) 428 if test not in self.tests: 429 self.tests.append(test) 430 431 def get_current_run_results(self): 432 return self.tests 433