1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import re 21import time 22from queue import Queue 23 24from xdevice import IParser 25from xdevice import Plugin 26from xdevice import StateRecorder 27from xdevice import TestDescription 28from xdevice import LifeCycle 29from xdevice import ResultCode 30from xdevice import platform_logger 31from xdevice import check_pub_key_exist 32from xdevice import get_cst_time 33 34from ohos.constants import ParserType 35 36__all__ = ["CppTestListParserLite", "CTestParser", "OpenSourceParser", 37 "CppTestParserLite"] 38 39_INFORMATIONAL_START = "[----------]" 40_TEST_START_RUN_TAG = "[==========] Running" 41_TEST_RUN_TAG = "[==========]" 42_CPP_TEST_DRYRUN_TAG = "Running main() " 43_TEST_START_TAG = "[ RUN ]" 44_TEST_OK_TAG = "[ OK ]" 45_TEST_SKIPPED_TAG = "[ SKIPPED ]" 46_TEST_FAILED_TAG = "[ FAILED ]" 47_ALT_OK_TAG = "[ OK ]" 48_TIMEOUT_TAG = "[ TIMEOUT ]" 49 50_CTEST_START_TEST_RUN_TAG = "Framework inited." 51_CTEST_END_TEST_RUN_TAG = "Framework finished." 52_CTEST_SUITE_TEST_RUN_TAG = "Start to run test suite:" 53_CTEST_SUITE_TIME_RUN_TAG = "Run test suite " 54_CTEST_SETUP_TAG = "setup" 55_CTEST_RUN_TAG = "-----------------------" 56 57_TEST_PASSED_LOWER = "pass" 58 59_COMPILE_PASSED = "compile PASSED" 60_COMPILE_PARA = r"(.* compile .*)" 61 62_PRODUCT_PARA = r"(.*The .* is .*)" 63_PRODUCT_PARA_START = r"To Obtain Product Params Start" 64_PRODUCT_PARA_END = r"To Obtain Product Params End" 65 66_START_JSUNIT_RUN_MARKER = "[start] start run suites" 67_START_JSUNIT_SUITE_RUN_MARKER = "[suite start]" 68_START_JSUNIT_SUITE_END_MARKER = "[suite end]" 69_END_JSUNIT_RUN_MARKER = "[end] run suites end" 70_PASS_JSUNIT_MARKER = "[%s]" % "pass" 71_FAIL_JSUNIT_MARKER = "[fail]" 72_ACE_LOG_MARKER = "[Console Info]" 73 74LOG = platform_logger("ParserLite") 75 76 77@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_lite) 78class CppTestParserLite(IParser): 79 def __init__(self): 80 self.state_machine = StateRecorder() 81 self.suite_name = "" 82 self.listeners = [] 83 self.product_info = {} 84 self.is_params = False 85 86 def get_suite_name(self): 87 return self.suite_name 88 89 def get_listeners(self): 90 return self.listeners 91 92 def __process__(self, lines): 93 if not self.state_machine.suites_is_started(): 94 self.state_machine.trace_logs.extend(lines) 95 for line in lines: 96 if not check_pub_key_exist(): 97 LOG.debug(line) 98 self.parse(line) 99 100 def __done__(self): 101 suite_result = self.state_machine.suite() 102 suite_result.is_completed = True 103 for listener in self.get_listeners(): 104 suite = copy.copy(suite_result) 105 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 106 self.state_machine.running_test_index = 0 107 108 suites_result = self.state_machine.get_suites() 109 if not suites_result.suites_name: 110 return 111 for listener in self.get_listeners(): 112 suites = copy.copy(suites_result) 113 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 114 suites_name=suites.suites_name, 115 product_info=suites.product_info) 116 self.state_machine.current_suites = None 117 118 @staticmethod 119 def _is_test_run(line): 120 return True if _TEST_RUN_TAG in line else False 121 122 @staticmethod 123 def _is_test_start_run(line): 124 return True if _TEST_START_RUN_TAG in line else False 125 126 @staticmethod 127 def _is_informational_start(line): 128 return True if _INFORMATIONAL_START in line else False 129 130 @staticmethod 131 def _is_test_start(line): 132 return True if _TEST_START_TAG in line else False 133 134 def _process_informational_line(self, line): 135 pattern = r"(.*) (\(\d+ ms total\))" 136 message = line[len(_INFORMATIONAL_START):].strip() 137 if re.match(pattern, line.strip()): 138 self.handle_suite_ended_tag(message) 139 elif re.match(r'(\d+) test[s]? from (.*)', message): 140 self.handle_suite_started_tag(message) 141 142 def _process_test_run_line(self, line): 143 if not self.state_machine.suites_is_running(): 144 return 145 message = line[len(_TEST_RUN_TAG):].strip() 146 self.handle_suites_ended_tag(message) 147 148 def parse(self, line): 149 if _PRODUCT_PARA_START in line: 150 self.is_params = True 151 elif _PRODUCT_PARA_END in line: 152 self.is_params = False 153 if re.match(_PRODUCT_PARA, line) and self.is_params: 154 handle_product_info(line, self.product_info) 155 156 if self.state_machine.suites_is_started() or self._is_test_run(line): 157 if self._is_test_start_run(line): 158 self.handle_suites_started_tag(line) 159 elif self._is_informational_start(line): 160 self._process_informational_line(line) 161 elif self._is_test_run(line): 162 self._process_test_run_line(line) 163 elif self._is_test_start(line): 164 message = line[line.index(_TEST_START_TAG) + 165 len(_TEST_START_TAG):].strip() 166 self.handle_test_started_tag(message) 167 else: 168 self.process_test(line) 169 170 def process_test(self, line): 171 if _TEST_SKIPPED_TAG in line: 172 message = line[line.index(_TEST_SKIPPED_TAG) + len( 173 _TEST_SKIPPED_TAG):].strip() 174 if not self.state_machine.test_is_running(): 175 LOG.error( 176 "Found {} without {} before, wrong GTest log format". 177 format(line, _TEST_START_TAG), error_no="00405") 178 return 179 self.handle_test_ended_tag(message, ResultCode.SKIPPED) 180 elif _TEST_OK_TAG in line: 181 message = line[line.index(_TEST_OK_TAG) + len( 182 _TEST_OK_TAG):].strip() 183 if not self.state_machine.test_is_running(): 184 LOG.error( 185 "Found {} without {} before, wrong GTest log format". 186 format(line, _TEST_START_TAG), error_no="00405") 187 return 188 self.handle_test_ended_tag(message, ResultCode.PASSED) 189 elif _ALT_OK_TAG in line: 190 message = line[line.index(_ALT_OK_TAG) + len( 191 _ALT_OK_TAG):].strip() 192 self.fake_run_marker(message) 193 self.handle_test_ended_tag(message, ResultCode.PASSED) 194 elif _TEST_FAILED_TAG in line: 195 message = line[line.index(_TEST_FAILED_TAG) + len( 196 _TEST_FAILED_TAG):].strip() 197 if not self.state_machine.suite_is_running(): 198 return 199 if not self.state_machine.test_is_running(): 200 self.fake_run_marker(message) 201 self.handle_test_ended_tag(message, ResultCode.FAILED) 202 elif _TIMEOUT_TAG in line: 203 message = line[line.index(_TIMEOUT_TAG) + len( 204 _TIMEOUT_TAG):].strip() 205 self.fake_run_marker(message) 206 self.handle_test_ended_tag(message, ResultCode.FAILED) 207 elif self.state_machine.test_is_running(): 208 self.append_test_output(line) 209 210 def handle_test_started_tag(self, message): 211 test_class, test_name, _ = self.parse_test_description(message) 212 test_result = self.state_machine.test(reset=True) 213 test_result.test_class = test_class 214 test_result.test_name = test_name 215 for listener in self.get_listeners(): 216 test_result = copy.copy(test_result) 217 listener.__started__(LifeCycle.TestCase, test_result) 218 219 @classmethod 220 def parse_test_description(cls, message): 221 run_time = 0 222 matcher = re.match(r'(.*) \((\d+) ms\)(.*)', message) 223 if matcher: 224 test_class, test_name = matcher.group(1).rsplit(".", 1) 225 run_time = int(matcher.group(2)) 226 else: 227 test_class, test_name = message.rsplit(".", 1) 228 return test_class.split(" ")[-1], test_name.split(" ")[0], run_time 229 230 def handle_test_ended_tag(self, message, test_status): 231 test_class, test_name, run_time = self.parse_test_description( 232 message) 233 test_result = self.state_machine.test() 234 test_result.run_time = int(run_time) 235 test_result.code = test_status.value 236 if not test_result.is_running(): 237 LOG.error( 238 "Test has no start tag when trying to end test: %s", message, 239 error_no="00405") 240 return 241 found_unexpected_test = False 242 if test_result.test_class != test_class: 243 LOG.error( 244 "Expected class: {} but got:{} ".format(test_result.test_class, 245 test_class), 246 error_no="00405") 247 found_unexpected_test = True 248 if test_result.test_name != test_name: 249 LOG.error( 250 "Expected test: {} but got: {}".format(test_result.test_name, 251 test_name), 252 error_no="00405") 253 found_unexpected_test = True 254 test_result.current = self.state_machine.running_test_index + 1 255 self.state_machine.test().is_completed = True 256 if found_unexpected_test: 257 test_result.code = ResultCode.FAILED.value 258 259 for listener in self.get_listeners(): 260 result = copy.copy(test_result) 261 listener.__ended__(LifeCycle.TestCase, result) 262 self.state_machine.running_test_index += 1 263 264 def fake_run_marker(self, message): 265 fake_marker = re.compile(" +").split(message) 266 self.handle_test_started_tag(fake_marker) 267 268 def handle_suites_started_tag(self, message): 269 self.state_machine.get_suites(reset=True) 270 matcher = re.match(r'.* Running (\d+) test[s]? from .*', message) 271 expected_test_num = int(matcher.group(1)) if matcher else -1 272 if expected_test_num >= 0: 273 test_suites = self.state_machine.get_suites() 274 test_suites.suites_name = self.get_suite_name() 275 test_suites.test_num = expected_test_num 276 test_suites.product_info = self.product_info 277 for listener in self.get_listeners(): 278 suite_report = copy.copy(test_suites) 279 listener.__started__(LifeCycle.TestSuites, suite_report) 280 281 def handle_suite_started_tag(self, message): 282 self.state_machine.suite(reset=True) 283 matcher = re.match(r'(\d+) test[s]? from (.*)', message) 284 expected_test_num = int(matcher.group(1)) if matcher else -1 285 if expected_test_num >= 0: 286 test_suite = self.state_machine.suite() 287 test_suite.suite_name = matcher.group(2) 288 test_suite.test_num = expected_test_num 289 for listener in self.get_listeners(): 290 suite_report = copy.copy(test_suite) 291 listener.__started__(LifeCycle.TestSuite, suite_report) 292 293 def handle_suite_ended_tag(self, message): 294 suite_result = self.state_machine.suite() 295 matcher = re.match(r'.*\((\d+) ms total\)', message) 296 if matcher: 297 suite_result.run_time = int(matcher.group(1)) 298 suite_result.is_completed = True 299 for listener in self.get_listeners(): 300 suite = copy.copy(suite_result) 301 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 302 self.state_machine.running_test_index = 0 303 304 def handle_suites_ended_tag(self, message): 305 suites = self.state_machine.get_suites() 306 matcher = re.match(r'.*\((\d+) ms total\)', message) 307 if matcher: 308 suites.run_time = int(matcher.group(1)) 309 suites.is_completed = True 310 for listener in self.get_listeners(): 311 copy_suites = copy.copy(suites) 312 listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites, 313 suites_name=suites.suites_name, 314 product_info=suites.product_info) 315 316 def append_test_output(self, message): 317 if self.state_machine.test().stacktrace: 318 self.state_machine.test().stacktrace = "{}\r\n".format( 319 self.state_machine.test().stacktrace) 320 self.state_machine.test().stacktrace = "{}{}".format( 321 self.state_machine.test().stacktrace, message) 322 323 @staticmethod 324 def handle_test_run_failed(error_msg): 325 if not error_msg: 326 error_msg = "Unknown error" 327 if not check_pub_key_exist(): 328 LOG.debug("Error msg:%s" % error_msg) 329 330 def mark_test_as_failed(self, test): 331 if not self.state_machine.current_suite and not test.class_name: 332 return 333 suites_result = self.state_machine.get_suites(reset=True) 334 suites_result.suites_name = self.get_suite_name() 335 336 suite_name = self.state_machine.current_suite.suite_name if \ 337 self.state_machine.current_suite else None 338 suite_result = self.state_machine.suite(reset=True) 339 test_result = self.state_machine.test(reset=True) 340 suite_result.suite_name = suite_name or test.class_name 341 suite_result.suite_num = 1 342 test_result.test_class = test.class_name 343 test_result.test_name = test.test_name 344 test_result.stacktrace = "error_msg: Unknown error" 345 test_result.num_tests = 1 346 test_result.run_time = 0 347 test_result.code = ResultCode.FAILED.value 348 for listener in self.get_listeners(): 349 suite_report = copy.copy(suites_result) 350 listener.__started__(LifeCycle.TestSuites, suite_report) 351 for listener in self.get_listeners(): 352 suite_report = copy.copy(suite_result) 353 listener.__started__(LifeCycle.TestSuite, suite_report) 354 for listener in self.get_listeners(): 355 test_result = copy.copy(test_result) 356 listener.__started__(LifeCycle.TestCase, test_result) 357 for listener in self.get_listeners(): 358 test_result = copy.copy(test_result) 359 listener.__ended__(LifeCycle.TestCase, test_result) 360 for listener in self.get_listeners(): 361 suite_report = copy.copy(suite_result) 362 listener.__ended__(LifeCycle.TestSuite, suite_report, 363 is_clear=True) 364 self.__done__() 365 366 367@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_list_lite) 368class CppTestListParserLite(IParser): 369 def __init__(self): 370 self.last_test_class_name = None 371 self.state_machine = StateRecorder() 372 self.listeners = [] 373 self.tests = [] 374 self.suites_name = "" 375 self.class_result = None 376 self.method_result = None 377 378 def __process__(self, lines): 379 for line in lines: 380 if not check_pub_key_exist(): 381 LOG.debug(line) 382 self.parse(line) 383 384 def get_suite_name(self): 385 return self.suites_name 386 387 def get_listeners(self): 388 return self.listeners 389 390 def __done__(self): 391 if self.state_machine.is_started(): 392 self.handle_suite_ended_tag() 393 suites_result = self.state_machine.get_suites() 394 if not suites_result.suites_name: 395 return 396 for listener in self.get_listeners(): 397 suites = copy.copy(suites_result) 398 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 399 suites_name=suites.suites_name) 400 self.state_machine.current_suites = None 401 402 def _is_class(self, line): 403 self.class_result = re.compile('^([a-zA-Z]+.*)\\.$').match(line) 404 return self.class_result 405 406 def _is_method(self, line): 407 self.method_result = re.compile( 408 '\\s+([a-zA-Z_]+[\\S]*)(.*)?(\\s+.*)?$').match(line) 409 return self.method_result 410 411 def _process_class_line(self, line): 412 del line 413 if not self.state_machine.suites_is_started(): 414 self.handle_suites_started_tag() 415 self.last_test_class_name = self.class_result.group(1) 416 if self.state_machine.is_started(): 417 self.handle_suite_ended_tag() 418 self.handle_suite_started_tag(self.class_result.group(1)) 419 420 def _process_method_line(self, line): 421 if not self.last_test_class_name: 422 LOG.error( 423 "Parsed new test case name %s but no test class" 424 " name has been set" % line, error_no="00405") 425 else: 426 test = TestDescription(self.last_test_class_name, 427 self.method_result.group(1)) 428 self.tests.append(test) 429 self.handle_test_tag(self.last_test_class_name, 430 self.method_result.group(1)) 431 432 @staticmethod 433 def _is_cpp_test_dryrun(line): 434 return True if line.find(_CPP_TEST_DRYRUN_TAG) != -1 else False 435 436 def parse(self, line): 437 if self.state_machine.suites_is_started() or self._is_cpp_test_dryrun( 438 line): 439 if self._is_cpp_test_dryrun(line): 440 self.handle_suites_started_tag() 441 elif self._is_class(line): 442 self._process_class_line(line) 443 elif self._is_method(line): 444 self._process_method_line(line) 445 else: 446 if not check_pub_key_exist(): 447 LOG.debug("Line ignored: %s" % line) 448 449 def handle_test_tag(self, test_class, test_name): 450 test_result = self.state_machine.test(reset=True) 451 test_result.test_class = test_class 452 test_result.test_name = test_name 453 for listener in self.get_listeners(): 454 test_result = copy.copy(test_result) 455 listener.__started__(LifeCycle.TestCase, test_result) 456 self.state_machine.test().is_completed = True 457 test_result.code = ResultCode.SKIPPED.value 458 for listener in self.get_listeners(): 459 result = copy.copy(test_result) 460 listener.__ended__(LifeCycle.TestCase, result) 461 self.state_machine.running_test_index += 1 462 test_suites = self.state_machine.get_suites() 463 test_suite = self.state_machine.suite() 464 test_suites.test_num += 1 465 test_suite.test_num += 1 466 467 def handle_suites_started_tag(self): 468 self.state_machine.get_suites(reset=True) 469 test_suites = self.state_machine.get_suites() 470 test_suites.suites_name = self.get_suite_name() 471 test_suites.test_num = 0 472 for listener in self.get_listeners(): 473 suite_report = copy.copy(test_suites) 474 listener.__started__(LifeCycle.TestSuites, suite_report) 475 476 def handle_suite_started_tag(self, class_name): 477 self.state_machine.suite(reset=True) 478 test_suite = self.state_machine.suite() 479 test_suite.suite_name = class_name 480 test_suite.test_num = 0 481 for listener in self.get_listeners(): 482 test_suite_copy = copy.copy(test_suite) 483 listener.__started__(LifeCycle.TestSuite, test_suite_copy) 484 485 def handle_suite_ended_tag(self): 486 suite_result = self.state_machine.suite() 487 suite_result.is_completed = True 488 for listener in self.get_listeners(): 489 suite = copy.copy(suite_result) 490 listener.__ended__(LifeCycle.TestSuite, suite) 491 492 def handle_suites_ended_tag(self): 493 suites = self.state_machine.get_suites() 494 suites.is_completed = True 495 for listener in self.get_listeners(): 496 copy_suites = copy.copy(suites) 497 listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites, 498 suites_name=suites.suites_name) 499 500 def mark_test_as_failed(self, test): 501 if not self.state_machine.current_suite and not test.class_name: 502 return 503 suite_name = self.state_machine.current_suite.suite_name if \ 504 self.state_machine.current_suite else None 505 suite_result = self.state_machine.suite(reset=True) 506 test_result = self.state_machine.test(reset=True) 507 suite_result.suite_name = suite_name or test.class_name 508 suite_result.suite_num = 1 509 test_result.test_class = test.class_name 510 test_result.test_name = test.test_name 511 test_result.stacktrace = "error_msg: Unknown error" 512 test_result.num_tests = 1 513 test_result.run_time = 0 514 test_result.code = ResultCode.FAILED.value 515 for listener in self.get_listeners(): 516 suite_report = copy.copy(suite_result) 517 listener.__started__(LifeCycle.TestSuite, suite_report) 518 for listener in self.get_listeners(): 519 test_result = copy.copy(test_result) 520 listener.__started__(LifeCycle.TestCase, test_result) 521 for listener in self.get_listeners(): 522 test_result = copy.copy(test_result) 523 listener.__ended__(LifeCycle.TestCase, test_result) 524 self.__done__() 525 526 527@Plugin(type=Plugin.PARSER, id=ParserType.ctest_lite) 528class CTestParser(IParser): 529 last_line = "" 530 pattern = r"(\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) " 531 532 def __init__(self): 533 self.state_machine = StateRecorder() 534 self.suites_name = "" 535 self.listeners = [] 536 self.product_info = {} 537 self.is_params = False 538 self.result_lines = [] 539 540 def get_suite_name(self): 541 return self.suites_name 542 543 def get_listeners(self): 544 return self.listeners 545 546 def __process__(self, lines): 547 if not self.state_machine.suites_is_started(): 548 self.state_machine.trace_logs.extend(lines) 549 for line in lines: 550 self.parse(line) 551 552 def __done__(self): 553 suites = self.state_machine.get_suites() 554 suites.is_completed = True 555 556 for listener in self.get_listeners(): 557 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 558 suites_name=suites.suites_name, 559 product_info=suites.product_info) 560 self.state_machine.current_suites = None 561 562 @staticmethod 563 def _is_ctest_start_test_run(line): 564 return True if line.endswith(_CTEST_START_TEST_RUN_TAG) else False 565 566 @staticmethod 567 def _is_ctest_end_test_run(line): 568 return True if line.endswith(_CTEST_END_TEST_RUN_TAG) else False 569 570 @staticmethod 571 def _is_ctest_run(line): 572 return re.match(r"[\s\S]*(Tests)[\s\S]*(Failures)[\s\S]*(Ignored)[\s\S]*", line) 573 574 def _is_ctest_suite_test_run(self, line): 575 return re.match("{}{}".format(self.pattern, _CTEST_SUITE_TEST_RUN_TAG), 576 line) 577 578 def is_ctest_suite_time_run(self, line): 579 return re.match("{}{}".format(self.pattern, _CTEST_SUITE_TIME_RUN_TAG), 580 line) 581 582 def _process_ctest_suite_test_run_line(self, line): 583 _, message_index = re.match( 584 "{}{}".format(self.pattern, _CTEST_SUITE_TEST_RUN_TAG), 585 line).span() 586 self.handle_suite_started_tag(line[message_index:].strip()) 587 588 @staticmethod 589 def _is_execute_result_line(line): 590 return re.match( 591 r"(.*" + "\\.c:" + "\\d+:.*:(PASS|FAIL|OK|IGNORE"")\\.*)", 592 line.strip()) 593 594 @staticmethod 595 def _is_result_line(line): 596 return line.find("PASS") != -1 or line.find("FAIL") != -1 or line.find( 597 "IGNORE") != -1 598 599 def parse(self, line): 600 self._parse_product_info(line) 601 602 if self.state_machine.suites_is_started() or \ 603 self._is_ctest_start_test_run(line): 604 try: 605 test_matcher = re.match(r".*(\d+ Tests).+", line) 606 failed_matcher = \ 607 re.match(r".*(Failures).*", line) 608 ignore_matcher = \ 609 re.match(r".*(Ignored).*", line) 610 if (test_matcher or failed_matcher or ignore_matcher) and \ 611 not self._is_ctest_run(line): 612 if test_matcher: 613 self.result_lines.append(test_matcher.group(1)) 614 if failed_matcher: 615 self.result_lines.append(failed_matcher.group(1)) 616 if ignore_matcher: 617 self.result_lines.append(ignore_matcher.group(1)) 618 line = " ".join(self.result_lines) 619 self.result_lines.clear() 620 if self._is_ctest_start_test_run(line): 621 self.handle_suites_started_tag() 622 elif self._is_ctest_end_test_run(line): 623 self.process_suites_ended_tag() 624 elif self._is_ctest_run(line): 625 self.handle_suite_ended_tag(line) 626 elif self._is_ctest_suite_test_run(line) and \ 627 not self.state_machine.suite_is_running(): 628 self._process_ctest_suite_test_run_line(line) 629 elif self.is_ctest_suite_time_run(line) and \ 630 not self.state_machine.suite_is_running(): 631 self.handle_suite_started_tag(line) 632 elif self._is_result_line(line) and \ 633 self.state_machine.suite_is_running(): 634 if line.find(":") != -1 and line.count( 635 ":") >= 3 and self._is_execute_result_line(line): 636 self.handle_one_test_tag(line.strip(), False) 637 else: 638 self.handle_one_test_tag(line.strip(), True) 639 except AttributeError as _: 640 LOG.error("Parsing log: %s failed" % (line.strip()), 641 error_no="00405") 642 self.last_line = line 643 644 def _parse_product_info(self, line): 645 if _PRODUCT_PARA_START in line: 646 self.is_params = True 647 elif _PRODUCT_PARA_END in line: 648 self.is_params = False 649 if self.is_params and re.match(_PRODUCT_PARA, line): 650 handle_product_info(line, self.product_info) 651 652 def parse_error_test_description(self, message): 653 end_time = re.match(self.pattern, message).group().strip() 654 start_time = re.match(self.pattern, 655 self.last_line.strip()).group().strip() 656 start_timestamp = int(time.mktime( 657 time.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 658 start_time.split(".")[-1]) 659 end_timestamp = int(time.mktime( 660 time.strptime(end_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 661 end_time.split(".")[-1]) 662 run_time = end_timestamp - start_timestamp 663 status_dict = {"PASS": ResultCode.PASSED, "FAIL": ResultCode.FAILED, 664 "IGNORE": ResultCode.SKIPPED} 665 status = "" 666 if message.find("PASS") != -1: 667 status = "PASS" 668 elif message.find("FAIL") != -1: 669 status = "FAIL" 670 elif message.find("IGNORE") != -1: 671 status = "IGNORE" 672 status = status_dict.get(status) 673 return "", "", status, run_time 674 675 def parse_test_description(self, message): 676 677 test_class = message.split(".c:")[0].split(" ")[-1].split("/")[-1] 678 message_index = message.index(".c:") 679 end_time = re.match(self.pattern, message).group().strip() 680 start_time = re.match(self.pattern, 681 self.last_line.strip()).group().strip() 682 start_timestamp = int(time.mktime( 683 time.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 684 start_time.split(".")[-1]) 685 end_timestamp = int(time.mktime( 686 time.strptime(end_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 687 end_time.split(".")[-1]) 688 run_time = end_timestamp - start_timestamp 689 message_list = message[message_index + 3:].split(":") 690 test_name, status = message_list[1].strip(), message_list[2].strip() 691 status_dict = {"PASS": ResultCode.PASSED, "FAIL": ResultCode.FAILED, 692 "IGNORE": ResultCode.SKIPPED} 693 status = status_dict.get(status) 694 return test_class, test_name, status, run_time 695 696 def handle_one_test_tag(self, message, is_error): 697 if is_error: 698 test_class, test_name, status, run_time = \ 699 self.parse_error_test_description(message) 700 else: 701 test_class, test_name, status, run_time = \ 702 self.parse_test_description(message) 703 test_result = self.state_machine.test(reset=True) 704 test_result.test_class = test_class 705 test_result.test_name = test_name 706 test_result.run_time = run_time 707 self.state_machine.running_test_index += 1 708 test_result.current = self.state_machine.running_test_index 709 test_result.code = status.value 710 self.state_machine.suite().run_time += run_time 711 for listener in self.get_listeners(): 712 test_result = copy.copy(test_result) 713 listener.__started__(LifeCycle.TestCase, test_result) 714 715 test_suite = self.state_machine.suite() 716 test_suites = self.state_machine.get_suites() 717 718 found_unexpected_test = False 719 720 if found_unexpected_test or ResultCode.FAILED == status: 721 if "FAIL:" in message and not message.endswith("FAIL:"): 722 test_result.stacktrace = message[ 723 message.rindex("FAIL:") + len( 724 "FAIL:"):] 725 for listener in self.get_listeners(): 726 result = copy.copy(test_result) 727 listener.__failed__(LifeCycle.TestCase, result) 728 elif ResultCode.SKIPPED == status: 729 for listener in self.get_listeners(): 730 result = copy.copy(test_result) 731 listener.__failed__(LifeCycle.TestCase, result) 732 733 self.state_machine.test().is_completed = True 734 test_suite.test_num += 1 735 test_suites.test_num += 1 736 737 for listener in self.get_listeners(): 738 result = copy.copy(test_result) 739 listener.__ended__(LifeCycle.TestCase, result) 740 741 def handle_suites_started_tag(self): 742 self.state_machine.get_suites(reset=True) 743 test_suites = self.state_machine.get_suites() 744 test_suites.suites_name = self.suites_name 745 test_suites.product_info = self.product_info 746 test_suites.test_num = 0 747 for listener in self.get_listeners(): 748 suite_report = copy.copy(test_suites) 749 listener.__started__(LifeCycle.TestSuites, suite_report) 750 751 def handle_suite_started_tag(self, message): 752 if re.match("{}{}".format(self.pattern, _CTEST_SUITE_TIME_RUN_TAG), 753 message.strip()): 754 message = self.state_machine.suite().suite_name 755 self.state_machine.suite(reset=True) 756 test_suite = self.state_machine.suite() 757 test_suite.suite_name = message 758 test_suite.test_num = 0 759 for listener in self.get_listeners(): 760 suite_report = copy.copy(test_suite) 761 listener.__started__(LifeCycle.TestSuite, suite_report) 762 763 def handle_suite_ended_tag(self, line): 764 suite_result = self.state_machine.suite() 765 suites = self.state_machine.get_suites() 766 suite_result.run_time = suite_result.run_time 767 suites.run_time += suite_result.run_time 768 suite_result.is_completed = True 769 770 for listener in self.get_listeners(): 771 suite = copy.copy(suite_result) 772 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 773 self.state_machine.running_test_index = 0 774 775 def process_suites_ended_tag(self): 776 suites = self.state_machine.get_suites() 777 suites.is_completed = True 778 779 for listener in self.get_listeners(): 780 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 781 suites_name=suites.suites_name, 782 product_info=suites.product_info) 783 784 def append_test_output(self, message): 785 if self.state_machine.test().stacktrace: 786 self.state_machine.test().stacktrace = "{}\r\n".format( 787 self.state_machine.test().stacktrace) 788 self.state_machine.test().stacktrace = "{}{}".format( 789 self.state_machine.test().stacktrace, message) 790 791 792@Plugin(type=Plugin.PARSER, id=ParserType.open_source_test) 793class OpenSourceParser(IParser): 794 def __init__(self): 795 self.state_machine = StateRecorder() 796 self.suite_name = "" 797 self.test_name = "" 798 self.test_num = 1 799 self.listeners = [] 800 self.output = "" 801 self.lines = [] 802 self.start_time = None 803 804 def get_suite_name(self): 805 return self.suite_name 806 807 def get_listeners(self): 808 return self.listeners 809 810 def __process__(self, lines): 811 if not self.start_time: 812 self.start_time = get_cst_time() 813 self.lines.extend(lines) 814 815 def __done__(self): 816 if not self.state_machine.suites_is_started(): 817 self.state_machine.trace_logs.extend(self.lines) 818 self.handle_suite_started_tag(self.test_num) 819 820 test_result = self.state_machine.test(reset=True, 821 test_index=self.test_name) 822 test_result.run_time = 0 823 test_result.test_class = self.suite_name 824 test_result.test_name = self.test_name 825 test_result.test_num = 1 826 test_result.current = 1 827 for listener in self.get_listeners(): 828 result = copy.copy(test_result) 829 listener.__started__(LifeCycle.TestCase, result) 830 for line in self.lines: 831 self.output = "{}{}".format(self.output, line) 832 if _TEST_PASSED_LOWER in line.lower(): 833 test_result.code = ResultCode.PASSED.value 834 if self.start_time: 835 end_time = get_cst_time() 836 run_time = (end_time - self.start_time).total_seconds() 837 test_result.run_time = int(run_time * 1000) 838 for listener in self.get_listeners(): 839 result = copy.copy(test_result) 840 listener.__ended__(LifeCycle.TestCase, result) 841 break 842 else: 843 test_result.code = ResultCode.FAILED.value 844 test_result.stacktrace = "\\n".join(self.lines) 845 if self.start_time: 846 end_time = get_cst_time() 847 run_time = (end_time - self.start_time).total_seconds() 848 test_result.run_time = int(run_time * 1000) 849 for listener in self.get_listeners(): 850 result = copy.copy(test_result) 851 listener.__ended__(LifeCycle.TestCase, result) 852 853 self.state_machine.test().is_completed = True 854 self.handle_suite_ended_tag() 855 856 def handle_suite_started_tag(self, test_num): 857 test_suite = self.state_machine.suite() 858 if test_num >= 0: 859 test_suite.suite_name = self.suite_name 860 test_suite.test_num = test_num 861 for listener in self.get_listeners(): 862 suite_report = copy.copy(test_suite) 863 listener.__started__(LifeCycle.TestSuite, suite_report) 864 865 def handle_suite_ended_tag(self): 866 suite_result = self.state_machine.suite() 867 for listener in self.get_listeners(): 868 suite = copy.copy(suite_result) 869 listener.__ended__(LifeCycle.TestSuite, suite, 870 suite_report=True) 871 872 873@Plugin(type=Plugin.PARSER, id=ParserType.build_only_test) 874class BuildOnlyParser(IParser): 875 def __init__(self): 876 self.state_machine = StateRecorder() 877 self.suite_name = "" 878 self.test_name = "" 879 self.test_num = 0 880 self.listeners = [] 881 self.output = "" 882 883 def get_suite_name(self): 884 return self.suite_name 885 886 def get_listeners(self): 887 return self.listeners 888 889 def __process__(self, lines): 890 if not self.state_machine.suites_is_started(): 891 self.state_machine.trace_logs.extend(lines) 892 self.handle_suite_started_tag(self.test_num) 893 894 self.state_machine.running_test_index = \ 895 self.state_machine.running_test_index + 1 896 897 for line in lines: 898 if re.match(_COMPILE_PARA, line): 899 self.test_name = str(line).split('compile')[0].strip() 900 test_result = self.state_machine.test(reset=True) 901 test_result.run_time = 0 902 test_result.test_class = self.suite_name 903 test_result.test_name = self.test_name 904 for listener in self.get_listeners(): 905 result = copy.copy(test_result) 906 listener.__started__(LifeCycle.TestCase, result) 907 if _COMPILE_PASSED in line: 908 test_result.code = ResultCode.PASSED.value 909 for listener in self.get_listeners(): 910 result = copy.copy(test_result) 911 listener.__ended__(LifeCycle.TestCase, result) 912 else: 913 test_result.code = ResultCode.FAILED.value 914 for listener in self.get_listeners(): 915 result = copy.copy(test_result) 916 listener.__failed__(LifeCycle.TestCase, result) 917 self.state_machine.test().is_completed = True 918 919 def __done__(self): 920 self.handle_suite_ended_tag() 921 922 def handle_suite_started_tag(self, test_num): 923 test_suite = self.state_machine.suite() 924 if test_num >= 0: 925 test_suite.suite_name = self.suite_name 926 test_suite.test_num = test_num 927 for listener in self.get_listeners(): 928 suite_report = copy.copy(test_suite) 929 listener.__started__(LifeCycle.TestSuite, suite_report) 930 931 def handle_suite_ended_tag(self): 932 suite_result = self.state_machine.suite() 933 for listener in self.get_listeners(): 934 suite = copy.copy(suite_result) 935 listener.__ended__(LifeCycle.TestSuite, suite, 936 suite_report=True) 937 938 939@Plugin(type=Plugin.PARSER, id=ParserType.jsuit_test_lite) 940class JSUnitParserLite(IParser): 941 last_line = "" 942 pattern = r"(\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) " 943 944 def __init__(self): 945 self.state_machine = StateRecorder() 946 self.suites_name = "" 947 self.listeners = [] 948 949 def get_listeners(self): 950 return self.listeners 951 952 def __process__(self, lines): 953 if not self.state_machine.suites_is_started(): 954 self.state_machine.trace_logs.extend(lines) 955 for line in lines: 956 self.parse(line) 957 958 def __done__(self): 959 pass 960 961 def parse(self, line): 962 if (self.state_machine.suites_is_started() or 963 line.find(_START_JSUNIT_RUN_MARKER) != -1) and \ 964 line.find(_ACE_LOG_MARKER) != -1: 965 if line.find(_START_JSUNIT_RUN_MARKER) != -1: 966 self.handle_suites_started_tag() 967 elif line.endswith(_END_JSUNIT_RUN_MARKER): 968 self.handle_suites_ended_tag() 969 elif line.find(_START_JSUNIT_SUITE_RUN_MARKER) != -1: 970 self.handle_suite_started_tag(line.strip()) 971 elif line.endswith(_START_JSUNIT_SUITE_END_MARKER): 972 self.handle_suite_ended_tag() 973 elif _PASS_JSUNIT_MARKER in line or _FAIL_JSUNIT_MARKER \ 974 in line: 975 self.handle_one_test_tag(line.strip()) 976 self.last_line = line 977 978 def parse_test_description(self, message): 979 pattern = r"\[(pass|fail)\]" 980 year = time.strftime("%Y") 981 filter_message = message.split("[Console Info]")[1].strip() 982 end_time = "%s-%s" % \ 983 (year, re.match(self.pattern, message).group().strip()) 984 start_time = "%s-%s" % \ 985 (year, re.match(self.pattern, 986 self.last_line.strip()).group().strip()) 987 start_timestamp = int(time.mktime( 988 time.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 989 start_time.split(".")[-1]) 990 end_timestamp = int(time.mktime( 991 time.strptime(end_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int( 992 end_time.split(".")[-1]) 993 run_time = end_timestamp - start_timestamp 994 _, status_end_index = re.match(pattern, filter_message).span() 995 status = filter_message[:status_end_index] 996 test_name = filter_message[status_end_index:] 997 status_dict = {"pass": ResultCode.PASSED, "fail": ResultCode.FAILED, 998 "ignore": ResultCode.SKIPPED} 999 status = status_dict.get(status[1:-1]) 1000 return test_name, status, run_time 1001 1002 def handle_suites_started_tag(self): 1003 self.state_machine.get_suites(reset=True) 1004 test_suites = self.state_machine.get_suites() 1005 test_suites.suites_name = self.suites_name 1006 test_suites.test_num = 0 1007 for listener in self.get_listeners(): 1008 suite_report = copy.copy(test_suites) 1009 listener.__started__(LifeCycle.TestSuites, suite_report) 1010 1011 def handle_suites_ended_tag(self): 1012 suites = self.state_machine.get_suites() 1013 suites.is_completed = True 1014 1015 for listener in self.get_listeners(): 1016 listener.__ended__(LifeCycle.TestSuites, test_result=suites, 1017 suites_name=suites.suites_name) 1018 1019 def handle_one_test_tag(self, message): 1020 test_name, status, run_time = \ 1021 self.parse_test_description(message) 1022 test_result = self.state_machine.test(reset=True) 1023 test_suite = self.state_machine.suite() 1024 test_result.test_class = test_suite.suite_name 1025 test_result.test_name = test_name 1026 test_result.run_time = run_time 1027 test_result.code = status.value 1028 test_result.current = self.state_machine.running_test_index + 1 1029 self.state_machine.suite().run_time += run_time 1030 for listener in self.get_listeners(): 1031 test_result = copy.copy(test_result) 1032 listener.__started__(LifeCycle.TestCase, test_result) 1033 1034 test_suites = self.state_machine.get_suites() 1035 found_unexpected_test = False 1036 1037 if found_unexpected_test or ResultCode.FAILED == status: 1038 for listener in self.get_listeners(): 1039 result = copy.copy(test_result) 1040 listener.__failed__(LifeCycle.TestCase, result) 1041 elif ResultCode.SKIPPED == status: 1042 for listener in self.get_listeners(): 1043 result = copy.copy(test_result) 1044 listener.__skipped__(LifeCycle.TestCase, result) 1045 1046 self.state_machine.test().is_completed = True 1047 test_suite.test_num += 1 1048 test_suites.test_num += 1 1049 for listener in self.get_listeners(): 1050 result = copy.copy(test_result) 1051 listener.__ended__(LifeCycle.TestCase, result) 1052 self.state_machine.running_test_index += 1 1053 1054 def fake_run_marker(self, message): 1055 fake_marker = re.compile(" +").split(message) 1056 self.processTestStartedTag(fake_marker) 1057 1058 def handle_suite_started_tag(self, message): 1059 self.state_machine.suite(reset=True) 1060 test_suite = self.state_machine.suite() 1061 if re.match(r".*\[suite start\].*", message): 1062 _, index = re.match(r".*\[suite start\]", message).span() 1063 test_suite.suite_name = message[index:] 1064 test_suite.test_num = 0 1065 for listener in self.get_listeners(): 1066 suite_report = copy.copy(test_suite) 1067 listener.__started__(LifeCycle.TestSuite, suite_report) 1068 1069 def handle_suite_ended_tag(self): 1070 suite_result = self.state_machine.suite() 1071 suites = self.state_machine.get_suites() 1072 suite_result.run_time = suite_result.run_time 1073 suites.run_time += suite_result.run_time 1074 suite_result.is_completed = True 1075 1076 for listener in self.get_listeners(): 1077 suite = copy.copy(suite_result) 1078 listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True) 1079 1080 def append_test_output(self, message): 1081 if self.state_machine.test().stacktrace: 1082 self.state_machine.test().stacktrace = \ 1083 "%s\r\n" % self.state_machine.test().stacktrace 1084 self.state_machine.test().stacktrace = \ 1085 ''.join((self.state_machine.test().stacktrace, message)) 1086 1087 1088def handle_product_info(message, product_info): 1089 message = message[message.index("The"):] 1090 items = message[len("The "):].split(" is ") 1091 product_info.setdefault(items[0].strip(), 1092 items[1].strip().strip("[").strip("]")) 1093