1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from ohos.parser import * 20from ohos.constants import ParserType 21from ohos.parser.constants import parse_product_info 22 23__all__ = ["CppTestListParserLite", "CppTestParserLite"] 24 25_INFORMATIONAL_START = "[----------]" 26_TEST_START_RUN_TAG = "[==========] Running" 27_TEST_RUN_TAG = "[==========]" 28_CPP_TEST_DRYRUN_TAG = "Running main() " 29_TEST_START_TAG = "[ RUN ]" 30_TEST_OK_TAG = "[ OK ]" 31_TEST_SKIPPED_TAG = "[ SKIPPED ]" 32_TEST_FAILED_TAG = "[ FAILED ]" 33_ALT_OK_TAG = "[ OK ]" 34_TIMEOUT_TAG = "[ TIMEOUT ]" 35 36LOG = platform_logger("CppTestParserLite") 37 38 39@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_lite) 40class CppTestParserLite(IParser): 41 def __init__(self): 42 self.state_machine = StateRecorder() 43 self.suite_name = "" 44 self.listeners = [] 45 self.product_info = {} 46 self.is_params = False 47 48 def get_suite_name(self): 49 return self.suite_name 50 51 def get_listeners(self): 52 return self.listeners 53 54 def __process__(self, lines): 55 if not self.state_machine.suites_is_started(): 56 self.state_machine.trace_logs.extend(lines) 57 for line in lines: 58 if not check_pub_key_exist(): 59 LOG.debug(line) 60 self.parse(line) 61 62 def __done__(self): 63 suite_result = self.state_machine.suite() 64 suite_result.is_completed = True 65 for listener in self.get_listeners(): 66 suite = copy.copy(suite_result) 67 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 68 self.state_machine.running_test_index = 0 69 70 suites_result = self.state_machine.get_suites() 71 if not suites_result.suites_name: 72 return 73 for listener in self.get_listeners(): 74 suites = copy.copy(suites_result) 75 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 76 suites_name=suites.suites_name) 77 self.state_machine.current_suites = None 78 79 @staticmethod 80 def _is_test_run(line): 81 return True if _TEST_RUN_TAG in line else False 82 83 @staticmethod 84 def _is_test_start_run(line): 85 return True if _TEST_START_RUN_TAG in line else False 86 87 @staticmethod 88 def _is_informational_start(line): 89 return True if _INFORMATIONAL_START in line else False 90 91 @staticmethod 92 def _is_test_start(line): 93 return True if _TEST_START_TAG in line else False 94 95 def _process_informational_line(self, line): 96 pattern = r"(.*) (\(\d+ ms total\))" 97 message = line[len(_INFORMATIONAL_START):].strip() 98 if re.match(pattern, line.strip()): 99 self.handle_suite_ended_tag(message) 100 elif re.match(r'(\d+) test[s]? from (.*)', message): 101 self.handle_suite_started_tag(message) 102 103 def _process_test_run_line(self, line): 104 if not self.state_machine.suites_is_running(): 105 return 106 message = line[len(_TEST_RUN_TAG):].strip() 107 self.handle_suites_ended_tag(message) 108 109 def parse(self, line): 110 parse_product_info(line, self.is_params, self.product_info) 111 112 if self.state_machine.suites_is_started() or self._is_test_run(line): 113 if self._is_test_start_run(line): 114 self.handle_suites_started_tag(line) 115 elif self._is_informational_start(line): 116 self._process_informational_line(line) 117 elif self._is_test_run(line): 118 self._process_test_run_line(line) 119 elif self._is_test_start(line): 120 message = line[line.index(_TEST_START_TAG) + 121 len(_TEST_START_TAG):].strip() 122 self.handle_test_started_tag(message) 123 else: 124 self.process_test(line) 125 126 def process_test(self, line): 127 if _TEST_SKIPPED_TAG in line: 128 message = line[line.index(_TEST_SKIPPED_TAG) + len( 129 _TEST_SKIPPED_TAG):].strip() 130 if not self.state_machine.test_is_running(): 131 LOG.error( 132 "Found {} without {} before, wrong GTest log format". 133 format(line, _TEST_START_TAG), error_no="00405") 134 return 135 self.handle_test_ended_tag(message, ResultCode.SKIPPED) 136 elif _TEST_OK_TAG in line: 137 message = line[line.index(_TEST_OK_TAG) + len( 138 _TEST_OK_TAG):].strip() 139 if not self.state_machine.test_is_running(): 140 LOG.error( 141 "Found {} without {} before, wrong GTest log format". 142 format(line, _TEST_START_TAG), error_no="00405") 143 return 144 self.handle_test_ended_tag(message, ResultCode.PASSED) 145 elif _ALT_OK_TAG in line: 146 message = line[line.index(_ALT_OK_TAG) + len( 147 _ALT_OK_TAG):].strip() 148 self.fake_run_marker(message) 149 self.handle_test_ended_tag(message, ResultCode.PASSED) 150 elif _TEST_FAILED_TAG in line: 151 message = line[line.index(_TEST_FAILED_TAG) + len( 152 _TEST_FAILED_TAG):].strip() 153 if not self.state_machine.suite_is_running(): 154 return 155 if not self.state_machine.test_is_running(): 156 self.fake_run_marker(message) 157 self.handle_test_ended_tag(message, ResultCode.FAILED) 158 elif _TIMEOUT_TAG in line: 159 message = line[line.index(_TIMEOUT_TAG) + len( 160 _TIMEOUT_TAG):].strip() 161 self.fake_run_marker(message) 162 self.handle_test_ended_tag(message, ResultCode.FAILED) 163 elif self.state_machine.test_is_running(): 164 self.append_test_output(line) 165 166 def handle_test_started_tag(self, message): 167 test_class, test_name, _ = self.parse_test_description(message) 168 test_result = self.state_machine.test(reset=True) 169 test_result.test_class = test_class 170 test_result.test_name = test_name 171 for listener in self.get_listeners(): 172 test_result = copy.copy(test_result) 173 listener.__started__(LifeCycle.TestCase, test_result) 174 175 @classmethod 176 def parse_test_description(cls, message): 177 run_time = 0 178 matcher = re.match(r'(.*) \((\d+) ms\)(.*)', message) 179 if matcher: 180 test_class, test_name = matcher.group(1).rsplit(".", 1) 181 run_time = int(matcher.group(2)) 182 else: 183 test_class, test_name = message.rsplit(".", 1) 184 return test_class.split(" ")[-1], test_name.split(" ")[0], run_time 185 186 def handle_test_ended_tag(self, message, test_status): 187 test_class, test_name, run_time = self.parse_test_description( 188 message) 189 test_result = self.state_machine.test() 190 test_result.run_time = int(run_time) 191 test_result.code = test_status.value 192 if not test_result.is_running(): 193 LOG.error( 194 "Test has no start tag when trying to end test: %s", message, 195 error_no="00405") 196 return 197 found_unexpected_test = False 198 if test_result.test_class != test_class: 199 LOG.error( 200 "Expected class: {} but got:{} ".format(test_result.test_class, 201 test_class), 202 error_no="00405") 203 found_unexpected_test = True 204 if test_result.test_name != test_name: 205 LOG.error( 206 "Expected test: {} but got: {}".format(test_result.test_name, 207 test_name), 208 error_no="00405") 209 found_unexpected_test = True 210 test_result.current = self.state_machine.running_test_index + 1 211 self.state_machine.test().is_completed = True 212 if found_unexpected_test: 213 test_result.code = ResultCode.FAILED.value 214 215 for listener in self.get_listeners(): 216 result = copy.copy(test_result) 217 listener.__ended__(LifeCycle.TestCase, result) 218 self.state_machine.running_test_index += 1 219 220 def fake_run_marker(self, message): 221 fake_marker = re.compile(" +").split(message) 222 self.handle_test_started_tag(fake_marker) 223 224 def handle_suites_started_tag(self, message): 225 self.state_machine.get_suites(reset=True) 226 matcher = re.match(r'.* Running (\d+) test[s]? from .*', message) 227 expected_test_num = int(matcher.group(1)) if matcher else -1 228 if expected_test_num >= 0: 229 test_suites = self.state_machine.get_suites() 230 test_suites.suites_name = self.get_suite_name() 231 test_suites.test_num = expected_test_num 232 for listener in self.get_listeners(): 233 suite_report = copy.copy(test_suites) 234 listener.__started__(LifeCycle.TestSuites, suite_report) 235 236 def handle_suite_started_tag(self, message): 237 self.state_machine.suite(reset=True) 238 matcher = re.match(r'(\d+) test[s]? from (.*)', message) 239 expected_test_num = int(matcher.group(1)) if matcher else -1 240 if expected_test_num >= 0: 241 test_suite = self.state_machine.suite() 242 test_suite.suite_name = matcher.group(2) 243 test_suite.test_num = expected_test_num 244 for listener in self.get_listeners(): 245 suite_report = copy.copy(test_suite) 246 listener.__started__(LifeCycle.TestSuite, suite_report) 247 248 def handle_suite_ended_tag(self, message): 249 suite_result = self.state_machine.suite() 250 matcher = re.match(r'.*\((\d+) ms total\)', message) 251 if matcher: 252 suite_result.run_time = int(matcher.group(1)) 253 suite_result.is_completed = True 254 for listener in self.get_listeners(): 255 suite = copy.copy(suite_result) 256 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 257 self.state_machine.running_test_index = 0 258 259 def handle_suites_ended_tag(self, message): 260 suites = self.state_machine.get_suites() 261 matcher = re.match(r'.*\((\d+) ms total\)', message) 262 if matcher: 263 suites.run_time = int(matcher.group(1)) 264 suites.is_completed = True 265 for listener in self.get_listeners(): 266 copy_suites = copy.copy(suites) 267 listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites, 268 suites_name=suites.suites_name) 269 270 def append_test_output(self, message): 271 if self.state_machine.test().stacktrace: 272 self.state_machine.test().stacktrace = "{}\r\n".format( 273 self.state_machine.test().stacktrace) 274 self.state_machine.test().stacktrace = "{}{}".format( 275 self.state_machine.test().stacktrace, message) 276 277 @staticmethod 278 def handle_test_run_failed(error_msg): 279 if not error_msg: 280 error_msg = "Unknown error" 281 if not check_pub_key_exist(): 282 LOG.debug("Error msg:%s" % error_msg) 283 284 def mark_test_as_failed(self, test): 285 if not self.state_machine.current_suite and not test.class_name: 286 return 287 suites_result = self.state_machine.get_suites(reset=True) 288 suites_result.suites_name = self.get_suite_name() 289 290 suite_name = self.state_machine.current_suite.suite_name if \ 291 self.state_machine.current_suite else None 292 suite_result = self.state_machine.suite(reset=True) 293 test_result = self.state_machine.test(reset=True) 294 suite_result.suite_name = suite_name or test.class_name 295 suite_result.suite_num = 1 296 test_result.test_class = test.class_name 297 test_result.test_name = test.test_name 298 test_result.stacktrace = "error_msg: Unknown error" 299 test_result.num_tests = 1 300 test_result.run_time = 0 301 test_result.code = ResultCode.FAILED.value 302 for listener in self.get_listeners(): 303 suite_report = copy.copy(suites_result) 304 listener.__started__(LifeCycle.TestSuites, suite_report) 305 for listener in self.get_listeners(): 306 suite_report = copy.copy(suite_result) 307 listener.__started__(LifeCycle.TestSuite, suite_report) 308 for listener in self.get_listeners(): 309 test_result = copy.copy(test_result) 310 listener.__started__(LifeCycle.TestCase, test_result) 311 for listener in self.get_listeners(): 312 test_result = copy.copy(test_result) 313 listener.__ended__(LifeCycle.TestCase, test_result) 314 for listener in self.get_listeners(): 315 suite_report = copy.copy(suite_result) 316 listener.__ended__(LifeCycle.TestSuite, suite_report, 317 is_clear=True) 318 self.__done__() 319 320 321@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_list_lite) 322class CppTestListParserLite(IParser): 323 def __init__(self): 324 self.last_test_class_name = None 325 self.state_machine = StateRecorder() 326 self.listeners = [] 327 self.tests = [] 328 self.suites_name = "" 329 self.class_result = None 330 self.method_result = None 331 332 def __process__(self, lines): 333 for line in lines: 334 if not check_pub_key_exist(): 335 LOG.debug(line) 336 self.parse(line) 337 338 def get_suite_name(self): 339 return self.suites_name 340 341 def get_listeners(self): 342 return self.listeners 343 344 def __done__(self): 345 if self.state_machine.is_started(): 346 self.handle_suite_ended_tag() 347 suites_result = self.state_machine.get_suites() 348 if not suites_result.suites_name: 349 return 350 for listener in self.get_listeners(): 351 suites = copy.copy(suites_result) 352 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 353 suites_name=suites.suites_name) 354 self.state_machine.current_suites = None 355 356 def _is_class(self, line): 357 self.class_result = re.compile('^([a-zA-Z]+.*)\\.$').match(line) 358 return self.class_result 359 360 def _is_method(self, line): 361 self.method_result = re.compile( 362 '\\s+([a-zA-Z_]+[\\S]*)(.*)?(\\s+.*)?$').match(line) 363 return self.method_result 364 365 def _process_class_line(self, line): 366 del line 367 if not self.state_machine.suites_is_started(): 368 self.handle_suites_started_tag() 369 self.last_test_class_name = self.class_result.group(1) 370 if self.state_machine.is_started(): 371 self.handle_suite_ended_tag() 372 self.handle_suite_started_tag(self.class_result.group(1)) 373 374 def _process_method_line(self, line): 375 if not self.last_test_class_name: 376 LOG.error( 377 "Parsed new test case name %s but no test class" 378 " name has been set" % line, error_no="00405") 379 else: 380 test = TestDescription(self.last_test_class_name, 381 self.method_result.group(1)) 382 self.tests.append(test) 383 self.handle_test_tag(self.last_test_class_name, 384 self.method_result.group(1)) 385 386 @staticmethod 387 def _is_cpp_test_dryrun(line): 388 return True if line.find(_CPP_TEST_DRYRUN_TAG) != -1 else False 389 390 def parse(self, line): 391 if self.state_machine.suites_is_started() or self._is_cpp_test_dryrun( 392 line): 393 if self._is_cpp_test_dryrun(line): 394 self.handle_suites_started_tag() 395 elif self._is_class(line): 396 self._process_class_line(line) 397 elif self._is_method(line): 398 self._process_method_line(line) 399 else: 400 if not check_pub_key_exist(): 401 LOG.debug("Line ignored: %s" % line) 402 403 def handle_test_tag(self, test_class, test_name): 404 test_result = self.state_machine.test(reset=True) 405 test_result.test_class = test_class 406 test_result.test_name = test_name 407 for listener in self.get_listeners(): 408 test_result = copy.copy(test_result) 409 listener.__started__(LifeCycle.TestCase, test_result) 410 self.state_machine.test().is_completed = True 411 test_result.code = ResultCode.SKIPPED.value 412 for listener in self.get_listeners(): 413 result = copy.copy(test_result) 414 listener.__ended__(LifeCycle.TestCase, result) 415 self.state_machine.running_test_index += 1 416 test_suites = self.state_machine.get_suites() 417 test_suite = self.state_machine.suite() 418 test_suites.test_num += 1 419 test_suite.test_num += 1 420 421 def handle_suites_started_tag(self): 422 self.state_machine.get_suites(reset=True) 423 test_suites = self.state_machine.get_suites() 424 test_suites.suites_name = self.get_suite_name() 425 test_suites.test_num = 0 426 for listener in self.get_listeners(): 427 suite_report = copy.copy(test_suites) 428 listener.__started__(LifeCycle.TestSuites, suite_report) 429 430 def handle_suite_started_tag(self, class_name): 431 self.state_machine.suite(reset=True) 432 test_suite = self.state_machine.suite() 433 test_suite.suite_name = class_name 434 test_suite.test_num = 0 435 for listener in self.get_listeners(): 436 test_suite_copy = copy.copy(test_suite) 437 listener.__started__(LifeCycle.TestSuite, test_suite_copy) 438 439 def handle_suite_ended_tag(self): 440 suite_result = self.state_machine.suite() 441 suite_result.is_completed = True 442 for listener in self.get_listeners(): 443 suite = copy.copy(suite_result) 444 listener.__ended__(LifeCycle.TestSuite, suite) 445 446 def handle_suites_ended_tag(self): 447 suites = self.state_machine.get_suites() 448 suites.is_completed = True 449 for listener in self.get_listeners(): 450 copy_suites = copy.copy(suites) 451 listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites, 452 suites_name=suites.suites_name) 453 454 def mark_test_as_failed(self, test): 455 if not self.state_machine.current_suite and not test.class_name: 456 return 457 suite_name = self.state_machine.current_suite.suite_name if \ 458 self.state_machine.current_suite else None 459 suite_result = self.state_machine.suite(reset=True) 460 test_result = self.state_machine.test(reset=True) 461 suite_result.suite_name = suite_name or test.class_name 462 suite_result.suite_num = 1 463 test_result.test_class = test.class_name 464 test_result.test_name = test.test_name 465 test_result.stacktrace = "error_msg: Unknown error" 466 test_result.num_tests = 1 467 test_result.run_time = 0 468 test_result.code = ResultCode.FAILED.value 469 for listener in self.get_listeners(): 470 suite_report = copy.copy(suite_result) 471 listener.__started__(LifeCycle.TestSuite, suite_report) 472 for listener in self.get_listeners(): 473 test_result = copy.copy(test_result) 474 listener.__started__(LifeCycle.TestCase, test_result) 475 for listener in self.get_listeners(): 476 test_result = copy.copy(test_result) 477 listener.__ended__(LifeCycle.TestCase, test_result) 478 self.__done__() 479