• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from ohos.parser import *
20from ohos.constants import ParserType
21from ohos.parser.constants import parse_product_info
22
23__all__ = ["CppTestListParserLite", "CppTestParserLite"]
24
25_INFORMATIONAL_START = "[----------]"
26_TEST_START_RUN_TAG = "[==========] Running"
27_TEST_RUN_TAG = "[==========]"
28_CPP_TEST_DRYRUN_TAG = "Running main() "
29_TEST_START_TAG = "[ RUN      ]"
30_TEST_OK_TAG = "[       OK ]"
31_TEST_SKIPPED_TAG = "[  SKIPPED ]"
32_TEST_FAILED_TAG = "[  FAILED  ]"
33_ALT_OK_TAG = "[    OK    ]"
34_TIMEOUT_TAG = "[ TIMEOUT  ]"
35
36LOG = platform_logger("CppTestParserLite")
37
38
39@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_lite)
40class CppTestParserLite(IParser):
41    def __init__(self):
42        self.state_machine = StateRecorder()
43        self.suite_name = ""
44        self.listeners = []
45        self.product_info = {}
46        self.is_params = False
47
48    def get_suite_name(self):
49        return self.suite_name
50
51    def get_listeners(self):
52        return self.listeners
53
54    def __process__(self, lines):
55        if not self.state_machine.suites_is_started():
56            self.state_machine.trace_logs.extend(lines)
57        for line in lines:
58            if not check_pub_key_exist():
59                LOG.debug(line)
60            self.parse(line)
61
62    def __done__(self):
63        suite_result = self.state_machine.suite()
64        suite_result.is_completed = True
65        for listener in self.get_listeners():
66            suite = copy.copy(suite_result)
67            listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
68        self.state_machine.running_test_index = 0
69
70        suites_result = self.state_machine.get_suites()
71        if not suites_result.suites_name:
72            return
73        for listener in self.get_listeners():
74            suites = copy.copy(suites_result)
75            listener.__ended__(LifeCycle.TestSuites, test_result=suites,
76                               suites_name=suites.suites_name)
77        self.state_machine.current_suites = None
78
79    @staticmethod
80    def _is_test_run(line):
81        return True if _TEST_RUN_TAG in line else False
82
83    @staticmethod
84    def _is_test_start_run(line):
85        return True if _TEST_START_RUN_TAG in line else False
86
87    @staticmethod
88    def _is_informational_start(line):
89        return True if _INFORMATIONAL_START in line else False
90
91    @staticmethod
92    def _is_test_start(line):
93        return True if _TEST_START_TAG in line else False
94
95    def _process_informational_line(self, line):
96        pattern = r"(.*) (\(\d+ ms total\))"
97        message = line[len(_INFORMATIONAL_START):].strip()
98        if re.match(pattern, line.strip()):
99            self.handle_suite_ended_tag(message)
100        elif re.match(r'(\d+) test[s]? from (.*)', message):
101            self.handle_suite_started_tag(message)
102
103    def _process_test_run_line(self, line):
104        if not self.state_machine.suites_is_running():
105            return
106        message = line[len(_TEST_RUN_TAG):].strip()
107        self.handle_suites_ended_tag(message)
108
109    def parse(self, line):
110        parse_product_info(line, self.is_params, self.product_info)
111
112        if self.state_machine.suites_is_started() or self._is_test_run(line):
113            if self._is_test_start_run(line):
114                self.handle_suites_started_tag(line)
115            elif self._is_informational_start(line):
116                self._process_informational_line(line)
117            elif self._is_test_run(line):
118                self._process_test_run_line(line)
119            elif self._is_test_start(line):
120                message = line[line.index(_TEST_START_TAG) +
121                               len(_TEST_START_TAG):].strip()
122                self.handle_test_started_tag(message)
123            else:
124                self.process_test(line)
125
126    def process_test(self, line):
127        if _TEST_SKIPPED_TAG in line:
128            message = line[line.index(_TEST_SKIPPED_TAG) + len(
129                _TEST_SKIPPED_TAG):].strip()
130            if not self.state_machine.test_is_running():
131                LOG.error(
132                    "Found {} without {} before, wrong GTest log format".
133                        format(line, _TEST_START_TAG), error_no="00405")
134                return
135            self.handle_test_ended_tag(message, ResultCode.SKIPPED)
136        elif _TEST_OK_TAG in line:
137            message = line[line.index(_TEST_OK_TAG) + len(
138                _TEST_OK_TAG):].strip()
139            if not self.state_machine.test_is_running():
140                LOG.error(
141                    "Found {} without {} before, wrong GTest log format".
142                        format(line, _TEST_START_TAG), error_no="00405")
143                return
144            self.handle_test_ended_tag(message, ResultCode.PASSED)
145        elif _ALT_OK_TAG in line:
146            message = line[line.index(_ALT_OK_TAG) + len(
147                _ALT_OK_TAG):].strip()
148            self.fake_run_marker(message)
149            self.handle_test_ended_tag(message, ResultCode.PASSED)
150        elif _TEST_FAILED_TAG in line:
151            message = line[line.index(_TEST_FAILED_TAG) + len(
152                _TEST_FAILED_TAG):].strip()
153            if not self.state_machine.suite_is_running():
154                return
155            if not self.state_machine.test_is_running():
156                self.fake_run_marker(message)
157            self.handle_test_ended_tag(message, ResultCode.FAILED)
158        elif _TIMEOUT_TAG in line:
159            message = line[line.index(_TIMEOUT_TAG) + len(
160                _TIMEOUT_TAG):].strip()
161            self.fake_run_marker(message)
162            self.handle_test_ended_tag(message, ResultCode.FAILED)
163        elif self.state_machine.test_is_running():
164            self.append_test_output(line)
165
166    def handle_test_started_tag(self, message):
167        test_class, test_name, _ = self.parse_test_description(message)
168        test_result = self.state_machine.test(reset=True)
169        test_result.test_class = test_class
170        test_result.test_name = test_name
171        for listener in self.get_listeners():
172            test_result = copy.copy(test_result)
173            listener.__started__(LifeCycle.TestCase, test_result)
174
175    @classmethod
176    def parse_test_description(cls, message):
177        run_time = 0
178        matcher = re.match(r'(.*) \((\d+) ms\)(.*)', message)
179        if matcher:
180            test_class, test_name = matcher.group(1).rsplit(".", 1)
181            run_time = int(matcher.group(2))
182        else:
183            test_class, test_name = message.rsplit(".", 1)
184        return test_class.split(" ")[-1], test_name.split(" ")[0], run_time
185
186    def handle_test_ended_tag(self, message, test_status):
187        test_class, test_name, run_time = self.parse_test_description(
188            message)
189        test_result = self.state_machine.test()
190        test_result.run_time = int(run_time)
191        test_result.code = test_status.value
192        if not test_result.is_running():
193            LOG.error(
194                "Test has no start tag when trying to end test: %s", message,
195                error_no="00405")
196            return
197        found_unexpected_test = False
198        if test_result.test_class != test_class:
199            LOG.error(
200                "Expected class: {} but got:{} ".format(test_result.test_class,
201                                                        test_class),
202                error_no="00405")
203            found_unexpected_test = True
204        if test_result.test_name != test_name:
205            LOG.error(
206                "Expected test: {} but got: {}".format(test_result.test_name,
207                                                       test_name),
208                error_no="00405")
209            found_unexpected_test = True
210        test_result.current = self.state_machine.running_test_index + 1
211        self.state_machine.test().is_completed = True
212        if found_unexpected_test:
213            test_result.code = ResultCode.FAILED.value
214
215        for listener in self.get_listeners():
216            result = copy.copy(test_result)
217            listener.__ended__(LifeCycle.TestCase, result)
218        self.state_machine.running_test_index += 1
219
220    def fake_run_marker(self, message):
221        fake_marker = re.compile(" +").split(message)
222        self.handle_test_started_tag(fake_marker)
223
224    def handle_suites_started_tag(self, message):
225        self.state_machine.get_suites(reset=True)
226        matcher = re.match(r'.* Running (\d+) test[s]? from .*', message)
227        expected_test_num = int(matcher.group(1)) if matcher else -1
228        if expected_test_num >= 0:
229            test_suites = self.state_machine.get_suites()
230            test_suites.suites_name = self.get_suite_name()
231            test_suites.test_num = expected_test_num
232            for listener in self.get_listeners():
233                suite_report = copy.copy(test_suites)
234                listener.__started__(LifeCycle.TestSuites, suite_report)
235
236    def handle_suite_started_tag(self, message):
237        self.state_machine.suite(reset=True)
238        matcher = re.match(r'(\d+) test[s]? from (.*)', message)
239        expected_test_num = int(matcher.group(1)) if matcher else -1
240        if expected_test_num >= 0:
241            test_suite = self.state_machine.suite()
242            test_suite.suite_name = matcher.group(2)
243            test_suite.test_num = expected_test_num
244            for listener in self.get_listeners():
245                suite_report = copy.copy(test_suite)
246                listener.__started__(LifeCycle.TestSuite, suite_report)
247
248    def handle_suite_ended_tag(self, message):
249        suite_result = self.state_machine.suite()
250        matcher = re.match(r'.*\((\d+) ms total\)', message)
251        if matcher:
252            suite_result.run_time = int(matcher.group(1))
253        suite_result.is_completed = True
254        for listener in self.get_listeners():
255            suite = copy.copy(suite_result)
256            listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
257        self.state_machine.running_test_index = 0
258
259    def handle_suites_ended_tag(self, message):
260        suites = self.state_machine.get_suites()
261        matcher = re.match(r'.*\((\d+) ms total\)', message)
262        if matcher:
263            suites.run_time = int(matcher.group(1))
264        suites.is_completed = True
265        for listener in self.get_listeners():
266            copy_suites = copy.copy(suites)
267            listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites,
268                               suites_name=suites.suites_name)
269
270    def append_test_output(self, message):
271        if self.state_machine.test().stacktrace:
272            self.state_machine.test().stacktrace = "{}\r\n".format(
273                self.state_machine.test().stacktrace)
274        self.state_machine.test().stacktrace = "{}{}".format(
275            self.state_machine.test().stacktrace, message)
276
277    @staticmethod
278    def handle_test_run_failed(error_msg):
279        if not error_msg:
280            error_msg = "Unknown error"
281        if not check_pub_key_exist():
282            LOG.debug("Error msg:%s" % error_msg)
283
284    def mark_test_as_failed(self, test):
285        if not self.state_machine.current_suite and not test.class_name:
286            return
287        suites_result = self.state_machine.get_suites(reset=True)
288        suites_result.suites_name = self.get_suite_name()
289
290        suite_name = self.state_machine.current_suite.suite_name if \
291            self.state_machine.current_suite else None
292        suite_result = self.state_machine.suite(reset=True)
293        test_result = self.state_machine.test(reset=True)
294        suite_result.suite_name = suite_name or test.class_name
295        suite_result.suite_num = 1
296        test_result.test_class = test.class_name
297        test_result.test_name = test.test_name
298        test_result.stacktrace = "error_msg: Unknown error"
299        test_result.num_tests = 1
300        test_result.run_time = 0
301        test_result.code = ResultCode.FAILED.value
302        for listener in self.get_listeners():
303            suite_report = copy.copy(suites_result)
304            listener.__started__(LifeCycle.TestSuites, suite_report)
305        for listener in self.get_listeners():
306            suite_report = copy.copy(suite_result)
307            listener.__started__(LifeCycle.TestSuite, suite_report)
308        for listener in self.get_listeners():
309            test_result = copy.copy(test_result)
310            listener.__started__(LifeCycle.TestCase, test_result)
311        for listener in self.get_listeners():
312            test_result = copy.copy(test_result)
313            listener.__ended__(LifeCycle.TestCase, test_result)
314        for listener in self.get_listeners():
315            suite_report = copy.copy(suite_result)
316            listener.__ended__(LifeCycle.TestSuite, suite_report,
317                               is_clear=True)
318        self.__done__()
319
320
321@Plugin(type=Plugin.PARSER, id=ParserType.cpp_test_list_lite)
322class CppTestListParserLite(IParser):
323    def __init__(self):
324        self.last_test_class_name = None
325        self.state_machine = StateRecorder()
326        self.listeners = []
327        self.tests = []
328        self.suites_name = ""
329        self.class_result = None
330        self.method_result = None
331
332    def __process__(self, lines):
333        for line in lines:
334            if not check_pub_key_exist():
335                LOG.debug(line)
336            self.parse(line)
337
338    def get_suite_name(self):
339        return self.suites_name
340
341    def get_listeners(self):
342        return self.listeners
343
344    def __done__(self):
345        if self.state_machine.is_started():
346            self.handle_suite_ended_tag()
347        suites_result = self.state_machine.get_suites()
348        if not suites_result.suites_name:
349            return
350        for listener in self.get_listeners():
351            suites = copy.copy(suites_result)
352            listener.__ended__(LifeCycle.TestSuites, test_result=suites,
353                               suites_name=suites.suites_name)
354        self.state_machine.current_suites = None
355
356    def _is_class(self, line):
357        self.class_result = re.compile('^([a-zA-Z]+.*)\\.$').match(line)
358        return self.class_result
359
360    def _is_method(self, line):
361        self.method_result = re.compile(
362            '\\s+([a-zA-Z_]+[\\S]*)(.*)?(\\s+.*)?$').match(line)
363        return self.method_result
364
365    def _process_class_line(self, line):
366        del line
367        if not self.state_machine.suites_is_started():
368            self.handle_suites_started_tag()
369        self.last_test_class_name = self.class_result.group(1)
370        if self.state_machine.is_started():
371            self.handle_suite_ended_tag()
372        self.handle_suite_started_tag(self.class_result.group(1))
373
374    def _process_method_line(self, line):
375        if not self.last_test_class_name:
376            LOG.error(
377                "Parsed new test case name %s but no test class"
378                " name has been set" % line, error_no="00405")
379        else:
380            test = TestDescription(self.last_test_class_name,
381                                   self.method_result.group(1))
382            self.tests.append(test)
383            self.handle_test_tag(self.last_test_class_name,
384                                 self.method_result.group(1))
385
386    @staticmethod
387    def _is_cpp_test_dryrun(line):
388        return True if line.find(_CPP_TEST_DRYRUN_TAG) != -1 else False
389
390    def parse(self, line):
391        if self.state_machine.suites_is_started() or self._is_cpp_test_dryrun(
392                line):
393            if self._is_cpp_test_dryrun(line):
394                self.handle_suites_started_tag()
395            elif self._is_class(line):
396                self._process_class_line(line)
397            elif self._is_method(line):
398                self._process_method_line(line)
399            else:
400                if not check_pub_key_exist():
401                    LOG.debug("Line ignored: %s" % line)
402
403    def handle_test_tag(self, test_class, test_name):
404        test_result = self.state_machine.test(reset=True)
405        test_result.test_class = test_class
406        test_result.test_name = test_name
407        for listener in self.get_listeners():
408            test_result = copy.copy(test_result)
409            listener.__started__(LifeCycle.TestCase, test_result)
410        self.state_machine.test().is_completed = True
411        test_result.code = ResultCode.SKIPPED.value
412        for listener in self.get_listeners():
413            result = copy.copy(test_result)
414            listener.__ended__(LifeCycle.TestCase, result)
415        self.state_machine.running_test_index += 1
416        test_suites = self.state_machine.get_suites()
417        test_suite = self.state_machine.suite()
418        test_suites.test_num += 1
419        test_suite.test_num += 1
420
421    def handle_suites_started_tag(self):
422        self.state_machine.get_suites(reset=True)
423        test_suites = self.state_machine.get_suites()
424        test_suites.suites_name = self.get_suite_name()
425        test_suites.test_num = 0
426        for listener in self.get_listeners():
427            suite_report = copy.copy(test_suites)
428            listener.__started__(LifeCycle.TestSuites, suite_report)
429
430    def handle_suite_started_tag(self, class_name):
431        self.state_machine.suite(reset=True)
432        test_suite = self.state_machine.suite()
433        test_suite.suite_name = class_name
434        test_suite.test_num = 0
435        for listener in self.get_listeners():
436            test_suite_copy = copy.copy(test_suite)
437            listener.__started__(LifeCycle.TestSuite, test_suite_copy)
438
439    def handle_suite_ended_tag(self):
440        suite_result = self.state_machine.suite()
441        suite_result.is_completed = True
442        for listener in self.get_listeners():
443            suite = copy.copy(suite_result)
444            listener.__ended__(LifeCycle.TestSuite, suite)
445
446    def handle_suites_ended_tag(self):
447        suites = self.state_machine.get_suites()
448        suites.is_completed = True
449        for listener in self.get_listeners():
450            copy_suites = copy.copy(suites)
451            listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites,
452                               suites_name=suites.suites_name)
453
454    def mark_test_as_failed(self, test):
455        if not self.state_machine.current_suite and not test.class_name:
456            return
457        suite_name = self.state_machine.current_suite.suite_name if \
458            self.state_machine.current_suite else None
459        suite_result = self.state_machine.suite(reset=True)
460        test_result = self.state_machine.test(reset=True)
461        suite_result.suite_name = suite_name or test.class_name
462        suite_result.suite_num = 1
463        test_result.test_class = test.class_name
464        test_result.test_name = test.test_name
465        test_result.stacktrace = "error_msg: Unknown error"
466        test_result.num_tests = 1
467        test_result.run_time = 0
468        test_result.code = ResultCode.FAILED.value
469        for listener in self.get_listeners():
470            suite_report = copy.copy(suite_result)
471            listener.__started__(LifeCycle.TestSuite, suite_report)
472        for listener in self.get_listeners():
473            test_result = copy.copy(test_result)
474            listener.__started__(LifeCycle.TestCase, test_result)
475        for listener in self.get_listeners():
476            test_result = copy.copy(test_result)
477            listener.__ended__(LifeCycle.TestCase, test_result)
478        self.__done__()
479