• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import time
22import json
23import shutil
24import zipfile
25import tempfile
26import stat
27from dataclasses import dataclass
28
29from xdevice import ConfigConst
30from xdevice import ParamError
31from xdevice import ExecuteTerminate
32from xdevice import IDriver
33from xdevice import platform_logger
34from xdevice import Plugin
35from xdevice import get_plugin
36from xdevice import JsonParser
37from xdevice import ShellHandler
38from xdevice import TestDescription
39from xdevice import ResourceManager
40from xdevice import get_device_log_file
41from xdevice import check_result_report
42from xdevice import get_kit_instances
43from xdevice import get_config_value
44from xdevice import do_module_kit_setup
45from xdevice import do_module_kit_teardown
46from xdevice import DeviceTestType
47from xdevice import CommonParserType
48from xdevice import FilePermission
49from xdevice import CollectingTestListener
50from xdevice import ShellCommandUnresponsiveException
51from xdevice import HapNotSupportTest
52from xdevice import HdcCommandRejectedException
53from xdevice import HdcError
54from xdevice import DeviceConnectorType
55from xdevice import get_filename_extension
56from xdevice import junit_para_parse
57from xdevice import gtest_para_parse
58from xdevice import reset_junit_para
59from xdevice import disable_keyguard
60from xdevice import unlock_screen
61from xdevice import unlock_device
62from xdevice import get_cst_time
63
64from ohos.environment.dmlib import process_command_ret
65from ohos.environment.dmlib import DisplayOutputReceiver
66from ohos.testkit.kit import junit_dex_para_parse
67from ohos.parser.parser import _ACE_LOG_MARKER
68
69__all__ = ["CppTestDriver", "DexTestDriver", "HapTestDriver",
70           "JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner",
71           "RemoteDexRunner"]
72LOG = platform_logger("Drivers")
73DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
74ON_DEVICE_TEST_DIR_LOCATION = "/%s/%s/%s/" % ("data", "local", "tmp")
75
76FAILED_RUN_TEST_ATTEMPTS = 3
77TIME_OUT = 900 * 1000
78
79
80def get_xml_output(config, json_config):
81    xml_output = config.testargs.get("xml-output")
82    if not xml_output:
83        if get_config_value('xml-output', json_config.get_driver(), False):
84            xml_output = get_config_value('xml-output',
85                                          json_config.get_driver(), False)
86        else:
87            xml_output = "false"
88    else:
89        xml_output = xml_output[0]
90    xml_output = str(xml_output).lower()
91    return xml_output
92
93
94def get_result_savepath(testsuit_path, result_rootpath):
95    findkey = "%stests%s" % (os.sep, os.sep)
96    filedir, _ = os.path.split(testsuit_path)
97    pos = filedir.find(findkey)
98    if -1 != pos:
99        subpath = filedir[pos + len(findkey):]
100        pos1 = subpath.find(os.sep)
101        if -1 != pos1:
102            subpath = subpath[pos1 + len(os.sep):]
103            result_path = os.path.join(result_rootpath, "result", subpath)
104        else:
105            result_path = os.path.join(result_rootpath, "result")
106    else:
107        result_path = os.path.join(result_rootpath, "result")
108
109    if not os.path.exists(result_path):
110        os.makedirs(result_path)
111
112    LOG.info("Result save path = %s" % result_path)
113    return result_path
114
115
116# all testsuit common Unavailable test result xml
117def _create_empty_result_file(filepath, filename, error_message):
118    error_message = str(error_message)
119    error_message = error_message.replace("\"", """)
120    error_message = error_message.replace("<", "&lt;")
121    error_message = error_message.replace(">", "&gt;")
122    error_message = error_message.replace("&", "&amp;")
123    if filename.endswith(".hap"):
124        filename = filename.split(".")[0]
125    if not os.path.exists(filepath):
126        file_open = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
127                            FilePermission.mode_755)
128        with os.fdopen(file_open, "w") as file_desc:
129            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
130                                       time.localtime())
131            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
132            file_desc.write('<testsuites tests="0" failures="0" '
133                            'disabled="0" errors="0" timestamp="%s" '
134                            'time="0" name="AllTests">\n' % time_stamp)
135            file_desc.write(
136                '  <testsuite name="%s" tests="0" failures="0" '
137                'disabled="0" errors="0" time="0.0" '
138                'unavailable="1" message="%s">\n' %
139                (filename, error_message))
140            file_desc.write('  </testsuite>\n')
141            file_desc.write('</testsuites>\n')
142            file_desc.flush()
143    return
144
145
146class ResultManager(object):
147    def __init__(self, testsuit_path, result_rootpath, device,
148                 device_testpath):
149        self.testsuite_path = testsuit_path
150        self.result_rootpath = result_rootpath
151        self.device = device
152        self.device_testpath = device_testpath
153        self.testsuite_name = os.path.basename(self.testsuite_path)
154        self.is_coverage = False
155
156    def set_is_coverage(self, is_coverage):
157        self.is_coverage = is_coverage
158
159    def get_test_results(self, error_message=""):
160        # Get test result files
161        filepath = self.obtain_test_result_file()
162        if not os.path.exists(filepath):
163            _create_empty_result_file(filepath, self.testsuite_name,
164                                      error_message)
165
166        # Get coverage data files
167        if self.is_coverage:
168            self.obtain_coverage_data()
169
170        return filepath
171
172    def obtain_test_result_file(self):
173        result_savepath = get_result_savepath(self.testsuite_path,
174                                              self.result_rootpath)
175        if self.testsuite_path.endswith('.hap'):
176            filepath = os.path.join(result_savepath, "%s.xml" % str(
177                self.testsuite_name).split(".")[0])
178
179            remote_result_name = ""
180            if self.device.is_file_exist(os.path.join(self.device_testpath,
181                                                      "testcase_result.xml")):
182                remote_result_name = "testcase_result.xml"
183            elif self.device.is_file_exist(os.path.join(self.device_testpath,
184                                                        "report.xml")):
185                remote_result_name = "report.xml"
186
187            if remote_result_name:
188                self.device.pull_file(
189                    os.path.join(self.device_testpath, remote_result_name),
190                    filepath)
191            else:
192                LOG.error("%s no report file", self.device_testpath)
193
194        else:
195            filepath = os.path.join(result_savepath, "%s.xml" %
196                                    self.testsuite_name)
197            remote_result_file = os.path.join(self.device_testpath,
198                                              "%s.xml" % self.testsuite_name)
199
200            if self.device.is_file_exist(remote_result_file):
201                self.device.pull_file(remote_result_file, result_savepath)
202            else:
203                LOG.error("%s not exists", remote_result_file)
204        return filepath
205
206    def is_exist_target_in_device(self, path, target):
207        command = "ls -l %s | grep %s" % (path, target)
208
209        check_result = False
210        stdout_info = self.device.execute_shell_command(command)
211        if stdout_info != "" and stdout_info.find(target) != -1:
212            check_result = True
213        return check_result
214
215    def obtain_coverage_data(self):
216        java_cov_path = os.path.abspath(
217            os.path.join(self.result_rootpath, "..", "coverage/data/exec"))
218        dst_target_name = "%s.exec" % self.testsuite_name
219        src_target_name = "jacoco.exec"
220        if self.is_exist_target_in_device(self.device_testpath,
221                                          src_target_name):
222            if not os.path.exists(java_cov_path):
223                os.makedirs(java_cov_path)
224            self.device.pull_file(
225                os.path.join(self.device_testpath, src_target_name),
226                os.path.join(java_cov_path, dst_target_name))
227
228        cxx_cov_path = os.path.abspath(
229            os.path.join(self.result_rootpath, "..", "coverage/data/cxx",
230                         self.testsuite_name))
231        target_name = "obj"
232        if self.is_exist_target_in_device(self.device_testpath, target_name):
233            if not os.path.exists(cxx_cov_path):
234                os.makedirs(cxx_cov_path)
235            src_file = os.path.join(self.device_testpath, target_name)
236            self.device.pull_file(src_file, cxx_cov_path)
237
238
239@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
240class CppTestDriver(IDriver):
241    """
242    CppTestDriver is a Test that runs a native test package on given harmony
243    device.
244    """
245
246    def __init__(self):
247        self.result = ""
248        self.error_message = ""
249        self.config = None
250        self.rerun = True
251        self.rerun_all = True
252        self.runner = None
253        # log
254        self.device_log = None
255        self.hilog = None
256        self.log_proc = None
257        self.hilog_proc = None
258
259    def __check_environment__(self, device_options):
260        pass
261
262    def __check_config__(self, config):
263        pass
264
265    def __execute__(self, request):
266        try:
267            LOG.debug("Start execute xdevice extension CppTest")
268
269            self.config = request.config
270            self.config.device = request.config.environment.devices[0]
271
272            config_file = request.root.source.config_file
273            self.result = "%s.xml" % \
274                          os.path.join(request.config.report_path,
275                                       "result", request.root.source.test_name)
276
277            self.device_log = get_device_log_file(
278                request.config.report_path,
279                request.config.device.__get_serial__() + "_" + request.
280                get_module_name(),
281                "device_log")
282
283            self.hilog = get_device_log_file(
284                request.config.report_path,
285                request.config.device.__get_serial__() + "_" + request.
286                get_module_name(),
287                "device_hilog")
288
289            device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT |
290                                      os.O_APPEND, FilePermission.mode_755)
291            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
292                                 FilePermission.mode_755)
293            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
294            with os.fdopen(device_log_open, "a") as log_file_pipe, \
295                    os.fdopen(hilog_open, "a") as hilog_file_pipe:
296                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
297                    start_catch_device_log(log_file_pipe, hilog_file_pipe)
298                self._run_cpp_test(config_file, listeners=request.listeners,
299                                   request=request)
300                log_file_pipe.flush()
301                hilog_file_pipe.flush()
302
303        except Exception as exception:
304            self.error_message = exception
305            if not getattr(exception, "error_no", ""):
306                setattr(exception, "error_no", "03404")
307            LOG.exception(self.error_message, exc_info=False, error_no="03404")
308            raise exception
309
310        finally:
311            self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog)
312            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
313            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
314            self.result = check_result_report(
315                request.config.report_path, self.result, self.error_message)
316
317    def _run_cpp_test(self, config_file, listeners=None, request=None):
318        try:
319            if not os.path.exists(config_file):
320                LOG.error("Error: Test cases don't exit %s." % config_file,
321                          error_no="00102")
322                raise ParamError(
323                    "Error: Test cases don't exit %s." % config_file,
324                    error_no="00102")
325
326            json_config = JsonParser(config_file)
327            kits = get_kit_instances(json_config, self.config.resource_path,
328                                     self.config.testcases_path)
329
330            for listener in listeners:
331                listener.device_sn = self.config.device.device_sn
332
333            self._get_driver_config(json_config)
334            do_module_kit_setup(request, kits)
335            self.runner = RemoteCppTestRunner(self.config)
336            self.runner.suite_name = request.root.source.test_name
337
338            if hasattr(self.config, "history_report_path") and \
339                    self.config.testargs.get("test"):
340                self._do_test_retry(listeners, self.config.testargs)
341            else:
342                gtest_para_parse(self.config.testargs, self.runner, request)
343                self._do_test_run(listeners)
344
345        finally:
346            do_module_kit_teardown(request)
347
348    def _do_test_retry(self, listener, testargs):
349        for test in testargs.get("test"):
350            test_item = test.split("#")
351            if len(test_item) != 2:
352                continue
353            self.runner.add_instrumentation_arg(
354                "gtest_filter", "%s.%s" % (test_item[0], test_item[1]))
355            self.runner.run(listener)
356
357    def _do_test_run(self, listener):
358        test_to_run = self._collect_test_to_run()
359        LOG.info("Collected test count is: %s" % (len(test_to_run)
360                 if test_to_run else 0))
361        if not test_to_run:
362            self.runner.run(listener)
363        else:
364            self._run_with_rerun(listener, test_to_run)
365
366    def _collect_test_to_run(self):
367        if self.rerun:
368            self.runner.add_instrumentation_arg("gtest_list_tests", True)
369            run_results = self.runner.dry_run()
370            self.runner.remove_instrumentation_arg("gtest_list_tests")
371            return run_results
372        return None
373
374    def _run_tests(self, listener):
375        test_tracker = CollectingTestListener()
376        listener_copy = listener.copy()
377        listener_copy.append(test_tracker)
378        self.runner.run(listener_copy)
379        test_run = test_tracker.get_current_run_results()
380        return test_run
381
382    def _run_with_rerun(self, listener, expected_tests):
383        LOG.debug("Ready to run with rerun, expect run: %s"
384                  % len(expected_tests))
385        test_run = self._run_tests(listener)
386        LOG.debug("Run with rerun, has run: %s" % len(test_run)
387                  if test_run else 0)
388        if len(test_run) < len(expected_tests):
389            expected_tests = TestDescription.remove_test(expected_tests,
390                                                         test_run)
391            if not expected_tests:
392                LOG.debug("No tests to re-run, all tests executed at least "
393                          "once.")
394            if self.rerun_all:
395                self._rerun_all(expected_tests, listener)
396            else:
397                self._rerun_serially(expected_tests, listener)
398
399    def _rerun_all(self, expected_tests, listener):
400        tests = []
401        for test in expected_tests:
402            tests.append("%s.%s" % (test.class_name, test.test_name))
403        self.runner.add_instrumentation_arg("gtest_filter", ":".join(tests))
404        LOG.debug("Ready to rerun file, expect run: %s" % len(expected_tests))
405        test_run = self._run_tests(listener)
406        LOG.debug("Rerun file, has run: %s" % len(test_run))
407        if len(test_run) < len(expected_tests):
408            expected_tests = TestDescription.remove_test(expected_tests,
409                                                         test_run)
410            if not expected_tests:
411                LOG.debug("Rerun textFile success")
412            self._rerun_serially(expected_tests, listener)
413
414    def _rerun_serially(self, expected_tests, listener):
415        LOG.debug("Rerun serially, expected run: %s" % len(expected_tests))
416        for test in expected_tests:
417            self.runner.add_instrumentation_arg(
418                "gtest_filter", "%s.%s" % (test.class_name, test.test_name))
419            self.runner.rerun(listener, test)
420            self.runner.remove_instrumentation_arg("gtest_filter")
421
422    def _get_driver_config(self, json_config):
423        target_test_path = get_config_value('native-test-device-path',
424                                            json_config.get_driver(), False)
425        if target_test_path:
426            self.config.target_test_path = target_test_path
427        else:
428            self.config.target_test_path = DEFAULT_TEST_PATH
429
430        self.config.module_name = get_config_value(
431            'module-name', json_config.get_driver(), False)
432
433        timeout_config = get_config_value('native-test-timeout',
434                                          json_config.get_driver(), False)
435        if timeout_config:
436            self.config.timeout = int(timeout_config)
437        else:
438            self.config.timeout = TIME_OUT
439
440        rerun = get_config_value('rerun', json_config.get_driver(), False)
441        if isinstance(rerun, bool):
442            self.rerun = rerun
443        elif str(rerun).lower() == "false":
444            self.rerun = False
445        else:
446            self.rerun = True
447
448    def __result__(self):
449        return self.result if os.path.exists(self.result) else ""
450
451
452class RemoteCppTestRunner:
453    def __init__(self, config):
454        self.arg_list = {}
455        self.suite_name = None
456        self.config = config
457        self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS
458
459    def dry_run(self):
460        parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest_list)
461        if parsers:
462            parsers = parsers[:1]
463        parser_instances = []
464        for parser in parsers:
465            parser_instance = parser.__class__()
466            parser_instances.append(parser_instance)
467        handler = ShellHandler(parser_instances)
468
469        command = "cd %s; chmod +x *; ./%s %s" \
470                  % (self.config.target_test_path, self.config.module_name,
471                     self.get_args_command())
472
473        self.config.device.execute_shell_command(
474            command, timeout=self.config.timeout, receiver=handler, retry=0)
475        return parser_instances[0].tests
476
477    def run(self, listener):
478        handler = self._get_shell_handler(listener)
479        command = "cd %s; chmod +x *; ./%s %s" \
480                  % (self.config.target_test_path, self.config.module_name,
481                     self.get_args_command())
482
483        self.config.device.execute_shell_command(
484            command, timeout=self.config.timeout, receiver=handler, retry=0)
485
486    def rerun(self, listener, test):
487        if self.rerun_attempt:
488            test_tracker = CollectingTestListener()
489            listener_copy = listener.copy()
490            listener_copy.append(test_tracker)
491            handler = self._get_shell_handler(listener_copy)
492            try:
493                command = "cd %s; chmod +x *; ./%s %s" \
494                          % (self.config.target_test_path,
495                             self.config.module_name,
496                             self.get_args_command())
497
498                self.config.device.execute_shell_command(
499                    command, timeout=self.config.timeout, receiver=handler,
500                    retry=0)
501
502            except ShellCommandUnresponsiveException as _:
503                LOG.debug("Exception: ShellCommandUnresponsiveException")
504            finally:
505                if not len(test_tracker.get_current_run_results()):
506                    LOG.debug("No test case is obtained finally")
507                    self.rerun_attempt -= 1
508                    handler.parsers[0].mark_test_as_blocked(test)
509        else:
510            LOG.debug("Not execute and mark as blocked finally")
511            handler = self._get_shell_handler(listener)
512            handler.parsers[0].mark_test_as_blocked(test)
513
514    def add_instrumentation_arg(self, name, value):
515        if not name or not value:
516            return
517        self.arg_list[name] = value
518
519    def remove_instrumentation_arg(self, name):
520        if not name:
521            return
522        if name in self.arg_list:
523            del self.arg_list[name]
524
525    def get_args_command(self):
526        args_commands = ""
527        for key, value in self.arg_list.items():
528            if key == "gtest_list_tests":
529                args_commands = "%s --%s" % (args_commands, key)
530            else:
531                args_commands = "%s --%s=%s" % (args_commands, key, value)
532        return args_commands
533
534    def _get_shell_handler(self, listener):
535        parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest)
536        if parsers:
537            parsers = parsers[:1]
538        parser_instances = []
539        for parser in parsers:
540            parser_instance = parser.__class__()
541            parser_instance.suite_name = self.suite_name
542            parser_instance.listeners = listener
543            parser_instances.append(parser_instance)
544        handler = ShellHandler(parser_instances)
545        return handler
546
547
548@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
549class JSUnitTestDriver(IDriver):
550    """
551    JSUnitTestDriver is a Test that runs a native test package on given device.
552    """
553
554    def __init__(self):
555        self.xml_output = "false"
556        self.timeout = 30 * 1000
557        self.start_time = None
558        self.result = ""
559        self.error_message = ""
560        self.kits = []
561        self.config = None
562        # log
563        self.device_log = None
564        self.hilog = None
565        self.log_proc = None
566        self.hilog_proc = None
567
568    def __check_environment__(self, device_options):
569        pass
570
571    def __check_config__(self, config):
572        pass
573
574    def __execute__(self, request):
575
576        device = request.config.environment.devices[0]
577        exe_out = device.execute_shell_command(
578            "param get const.product.software.version")
579        LOG.debug("Software version is {}".format(exe_out))
580        self.run_js_outer(request)
581
582    def generate_console_output(self, request, timeout):
583        LOG.info("prepare to read device log, may wait some time")
584        message_list = list()
585        label_list, suite_info, is_suites_end = self.read_device_log_timeout(
586            self.hilog, message_list, timeout)
587        if not is_suites_end:
588            message_list.append(_ACE_LOG_MARKER + ": [end] run suites end\n")
589            LOG.warning("there is no suites end")
590        if len(label_list[0]) > 0 and sum(label_list[0]) != 0:
591            # the problem happened! when the sum of label list is not zero
592            self._insert_suite_end(label_list, message_list)
593
594        result_message = "".join(message_list)
595        message_list.clear()
596        expect_tests_dict = self._parse_suite_info(suite_info)
597        self._analyse_tests(request, result_message, expect_tests_dict)
598
599    @classmethod
600    def _insert_suite_end(cls, label_list, message_list):
601        for i in range(len(label_list[0])):
602            if label_list[0][i] != 1:  # skipp
603                continue
604            # check the start label, then peek next position
605            if i + 1 == len(label_list[0]):  # next position at the tail
606                message_list.insert(-1, _ACE_LOG_MARKER + ": [suite end]\n")
607                LOG.warning("there is no suite end")
608                continue
609            if label_list[0][i + 1] != 1:  # 0 present the end label
610                continue
611            message_list.insert(label_list[1][i + 1],
612                                _ACE_LOG_MARKER + ": [suite end]\n")
613            LOG.warning("there is no suite end")
614            for j in range(i + 1, len(label_list[1])):
615                label_list[1][j] += 1  # move the index to next
616
617    def _analyse_tests(self, request, result_message, expect_tests_dict):
618        exclude_list = self._make_exclude_list_file(request)
619        exclude_list.extend(self._get_retry_skip_list(expect_tests_dict))
620        listener_copy = request.listeners.copy()
621        parsers = get_plugin(
622            Plugin.PARSER, CommonParserType.jsunit)
623        if parsers:
624            parsers = parsers[:1]
625        for listener in listener_copy:
626            listener.device_sn = self.config.device.device_sn
627        parser_instances = []
628        for parser in parsers:
629            parser_instance = parser.__class__()
630            parser_instance.suites_name = request.get_module_name()
631            parser_instance.listeners = listener_copy
632            parser_instances.append(parser_instance)
633        handler = ShellHandler(parser_instances)
634        handler.parsers[0].expect_tests_dict = expect_tests_dict
635        handler.parsers[0].exclude_list = exclude_list
636        process_command_ret(result_message, handler)
637
638    def _get_retry_skip_list(self, expect_tests_dict):
639        # get already pass case
640        skip_list = []
641        if hasattr(self.config, "history_report_path") and \
642                self.config.testargs.get("test"):
643            for class_name in expect_tests_dict.keys():
644                for test_desc in expect_tests_dict.get(class_name, list()):
645                    test = "{}#{}".format(test_desc.class_name, test_desc.test_name)
646                    if test not in self.config.testargs.get("test"):
647                        skip_list.append(test)
648        LOG.debug("Retry skip list: {}, total skip case: {}".
649                  format(skip_list, len(skip_list)))
650        return skip_list
651
652    @classmethod
653    def _parse_suite_info(cls, suite_info):
654        tests_dict = dict()
655        test_count = 0
656        if suite_info:
657            json_str = "".join(suite_info)
658            LOG.debug("Suites info: %s" % json_str)
659            try:
660                suite_dict_list = json.loads(json_str).get("suites", [])
661                for suite_dict in suite_dict_list:
662                    for class_name, test_name_dict_list in suite_dict.items():
663                        tests_dict.update({class_name.strip(): []})
664                        for test_name_dict in test_name_dict_list:
665                            for test_name in test_name_dict.values():
666                                test = TestDescription(class_name.strip(),
667                                                       test_name.strip())
668                                tests_dict.get(class_name.strip()).append(test)
669                                test_count += 1
670            except json.decoder.JSONDecodeError as json_error:
671                LOG.warning("Suites info is invalid: %s" % json_error)
672        LOG.debug("Collect suite count is %s, test count is %s" %
673                  (len(tests_dict), test_count))
674        return tests_dict
675
676    def read_device_log_timeout(self, device_log_file,
677                                message_list, timeout):
678        LOG.info("The timeout is {} seconds".format(timeout))
679        pattern = "^\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s+(\\d+)"
680        while time.time() - self.start_time <= timeout:
681            with open(device_log_file, "r", encoding='utf-8',
682                      errors='ignore') as file_read_pipe:
683                pid = ""
684                message_list.clear()
685                label_list = [[], []]  # [-1, 1 ..] [line1, line2 ..]
686                suite_info = []
687                while True:
688                    try:
689                        line = file_read_pipe.readline()
690                    except UnicodeError as error:
691                        LOG.warning("While read log file: %s" % error)
692                    if not line:
693                        time.sleep(5)  # wait for log write to file
694                        break
695                    if line.lower().find(_ACE_LOG_MARKER + ":") != -1:
696                        if "[suites info]" in line:
697                            _, pos = re.match(".+\\[suites info]", line).span()
698                            suite_info.append(line[pos:].strip())
699
700                        if "[start] start run suites" in line:  # 发现了任务开始标签
701                            pid, is_update = \
702                                self._init_suites_start(line, pattern, pid)
703                            if is_update:
704                                message_list.clear()
705                                label_list[0].clear()
706                                label_list[1].clear()
707                        if not pid or pid not in line:
708                            continue
709                        message_list.append(line)
710                        if "[suite end]" in line:
711                            label_list[0].append(-1)
712                            label_list[1].append(len(message_list) - 1)
713                        if "[suite start]" in line:
714                            label_list[0].append(1)
715                            label_list[1].append(len(message_list) - 1)
716                        if "[end] run suites end" in line:
717                            LOG.info("Find the end mark then analysis result")
718                            LOG.debug("current JSApp pid= %s" % pid)
719                            return label_list, suite_info, True
720        else:
721            LOG.error("Hjsunit run timeout {}s reached".format(timeout))
722            LOG.debug("current JSApp pid= %s" % pid)
723            return label_list, suite_info, False
724
725    @classmethod
726    def _init_suites_start(cls, line, pattern, pid):
727        matcher = re.match(pattern, line.strip())
728        if matcher and matcher.group(1):
729            pid = matcher.group(1)
730            return pid, True
731        return pid, False
732
733    def run_js_outer(self, request):
734        try:
735            LOG.debug("Start execute xdevice extension JSUnit Test")
736            LOG.debug("Outer version about Community")
737            self.result = os.path.join(
738                request.config.report_path, "result",
739                '.'.join((request.get_module_name(), "xml")))
740            self.config = request.config
741            self.config.device = request.config.environment.devices[0]
742
743            config_file = request.root.source.config_file
744            suite_file = request.root.source.source_file
745
746            if not suite_file:
747                raise ParamError(
748                    "test source '%s' not exists" %
749                    request.root.source.source_string, error_no="00110")
750
751            LOG.debug("Test case file path: %s" % suite_file)
752            # avoid hilog service stuck issue
753            self.config.device.connector_command("shell stop_service hilogd",
754                                           timeout=30 * 1000)
755            self.config.device.connector_command("shell start_service hilogd",
756                                           timeout=30 * 1000)
757            time.sleep(10)
758
759            self.config.device.set_device_report_path(request.config.report_path)
760            self.config.device.connector_command("shell hilog -r", timeout=30 * 1000)
761            self._run_jsunit_outer(config_file, request)
762        except Exception as exception:
763            self.error_message = exception
764            if not getattr(exception, "error_no", ""):
765                setattr(exception, "error_no", "03409")
766            LOG.exception(self.error_message, exc_info=False, error_no="03409")
767            raise exception
768        finally:
769            serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
770            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
771            if hasattr(self.config, ConfigConst.device_log) and \
772                    self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on:
773                self.config.device.device_log_collector.start_get_crash_log(log_tar_file_name,
774                                                                            module_name=request.get_module_name())
775            self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog)
776            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
777            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
778            do_module_kit_teardown(request)
779            self.result = check_result_report(
780                request.config.report_path, self.result, self.error_message)
781
782    def _run_jsunit_outer(self, config_file, request):
783        if not os.path.exists(config_file):
784            LOG.error("Error: Test cases don't exist %s." % config_file)
785            raise ParamError(
786                "Error: Test cases don't exist %s." % config_file,
787                error_no="00102")
788
789        json_config = JsonParser(config_file)
790        self.kits = get_kit_instances(json_config,
791                                      self.config.resource_path,
792                                      self.config.testcases_path)
793
794        package, ability_name = self._get_driver_config_outer(json_config)
795        self.config.device.connector_command("target mount")
796        do_module_kit_setup(request, self.kits)
797
798        self.hilog = get_device_log_file(
799            request.config.report_path,
800            request.config.device.__get_serial__() + "_" + request.
801            get_module_name(),
802            "device_hilog")
803
804        hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
805                             0o755)
806        self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
807        with os.fdopen(hilog_open, "a") as hilog_file_pipe:
808            if hasattr(self.config, ConfigConst.device_log) and \
809                    self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on:
810                self.config.device.device_log_collector.clear_crash_log()
811            self.log_proc, self.hilog_proc = self.config.device.device_log_collector. \
812                start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
813
814        # execute test case
815        command = "shell aa start -d 123 -a %s -b %s" \
816                  % (ability_name, package)
817        result_value = self.config.device.connector_command(command)
818        if result_value and "start ability successfully" in \
819                str(result_value).lower():
820            setattr(self, "start_success", True)
821            LOG.info("execute %s's testcase success. result value=%s"
822                     % (package, result_value))
823        else:
824            LOG.info("execute %s's testcase failed. result value=%s"
825                     % (package, result_value))
826            raise RuntimeError("hjsunit test run error happened!")
827
828        self.start_time = time.time()
829        timeout_config = get_config_value('test-timeout',
830                                          json_config.get_driver(),
831                                          False, 60000)
832        timeout = int(timeout_config) / 1000
833        self.generate_console_output(request, timeout)
834
835    def _jsunit_clear_outer(self):
836        self.config.device.execute_shell_command(
837            "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur"))
838
839    def _get_driver_config_outer(self, json_config):
840        package = get_config_value('package', json_config.get_driver(), False)
841        default_ability = "{}.MainAbility".format(package)
842        ability_name = get_config_value('abilityName', json_config.
843                                        get_driver(), False, default_ability)
844        self.xml_output = get_xml_output(self.config, json_config)
845        timeout_config = get_config_value('native-test-timeout',
846                                          json_config.get_driver(), False)
847        if timeout_config:
848            self.timeout = int(timeout_config)
849
850        if not package:
851            raise ParamError("Can't find package in config file.",
852                             error_no="03201")
853        return package, ability_name
854
855    def _make_exclude_list_file(self, request):
856        filter_list = []
857        if "all-test-file-exclude-filter" in self.config.testargs:
858            json_file_list = self.config.testargs.get(
859                "all-test-file-exclude-filter")
860            self.config.testargs.pop("all-test-file-exclude-filter")
861            if not json_file_list:
862                LOG.debug("all-test-file-exclude-filter value is empty!")
863            else:
864                if not os.path.isfile(json_file_list[0]):
865                    LOG.warning(
866                        "[{}] is not a valid file".format(json_file_list[0]))
867                    return []
868                file_open = os.open(json_file_list[0], os.O_RDONLY,
869                                    stat.S_IWUSR | stat.S_IRUSR)
870                with os.fdopen(file_open, "r") as file_handler:
871                    json_data = json.load(file_handler)
872                exclude_list = json_data.get(
873                    DeviceTestType.jsunit_test, [])
874
875                for exclude in exclude_list:
876                    if request.get_module_name() not in exclude:
877                        continue
878                    filter_list.extend(exclude.get(request.get_module_name()))
879        return filter_list
880
881    def __result__(self):
882        return self.result if os.path.exists(self.result) else ""
883
884
885@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ltp_posix_test)
886class LTPPosixTestDriver(IDriver):
887    def __init__(self):
888        self.timeout = 80 * 1000
889        self.start_time = None
890        self.result = ""
891        self.error_message = ""
892        self.kits = []
893        self.config = None
894        self.handler = None
895        # log
896        self.hilog = None
897        self.log_proc = None
898
899    def __check_environment__(self, device_options):
900        pass
901
902    def __check_config__(self, config):
903        pass
904
905    def __execute__(self, request):
906        try:
907            LOG.debug("Start execute xdevice extension LTP Posix Test")
908            self.result = os.path.join(
909                request.config.report_path, "result",
910                '.'.join((request.get_module_name(), "xml")))
911            self.config = request.config
912            self.config.device = request.config.environment.devices[0]
913
914            config_file = request.root.source.config_file
915            suite_file = request.root.source.source_file
916
917            if not suite_file:
918                raise ParamError(
919                    "test source '%s' not exists" %
920                    request.root.source.source_string, error_no="00110")
921
922            LOG.debug("Test case file path: %s" % suite_file)
923            # avoid hilog service stuck issue
924            self.config.device.connector_command("shell stop_service hilogd",
925                                           timeout=30 * 1000)
926            self.config.device.connector_command("shell start_service hilogd",
927                                           timeout=30 * 1000)
928            time.sleep(10)
929
930            self.config.device.connector_command("shell hilog -r", timeout=30 * 1000)
931            self._run_posix(config_file, request)
932        except Exception as exception:
933            self.error_message = exception
934            if not getattr(exception, "error_no", ""):
935                setattr(exception, "error_no", "03409")
936            LOG.exception(self.error_message, exc_info=True, error_no="03409")
937            raise exception
938        finally:
939            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
940            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
941            self.result = check_result_report(
942                request.config.report_path, self.result, self.error_message)
943
944    def _run_posix(self, config_file, request):
945        try:
946            if not os.path.exists(config_file):
947                LOG.error("Error: Test cases don't exist %s." % config_file)
948                raise ParamError(
949                    "Error: Test cases don't exist %s." % config_file,
950                    error_no="00102")
951
952            json_config = JsonParser(config_file)
953            self.kits = get_kit_instances(json_config,
954                                          self.config.resource_path,
955                                          self.config.testcases_path)
956            self.config.device.connector_command("target mount")
957            test_list = None
958            dst = None
959            for kit in self.kits:
960                test_list, dst = kit.__setup__(request.config.device,
961                                               request=request)
962            # apply execute right
963            self.config.device.connector_command("shell chmod -R 777 {}".format(dst))
964
965            self.hilog = get_device_log_file(
966                request.config.report_path,
967                request.config.device.__get_serial__() + "_" + request.
968                get_module_name(),
969                "device_hilog")
970
971            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
972                                 0o755)
973            self.config.device.device_log_collector.add_log_address(None, self.hilog)
974            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
975                _, self.log_proc = self.config.device.device_log_collector.\
976                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
977                if hasattr(self.config, "history_report_path") and \
978                        self.config.testargs.get("test"):
979                    self._do_test_retry(request, self.config.testargs)
980                else:
981                    self._do_test_run(request, test_list)
982        finally:
983            do_module_kit_teardown(request)
984
985    def _do_test_retry(self, request, testargs):
986        un_pass_list = []
987        for test in testargs.get("test"):
988            test_item = test.split("#")
989            if len(test_item) != 2:
990                continue
991            un_pass_list.append(test_item[1])
992        LOG.debug("LTP Posix un pass list: [{}]".format(un_pass_list))
993        self._do_test_run(request, un_pass_list)
994
995    def _do_test_run(self, request, test_list):
996        for test_bin in test_list:
997            if not test_bin.endswith(".run-test"):
998                continue
999            listeners = request.listeners
1000            for listener in listeners:
1001                listener.device_sn = self.config.device.device_sn
1002            parsers = get_plugin(Plugin.PARSER,
1003                                 "OpenSourceTest")
1004            parser_instances = []
1005            for parser in parsers:
1006                parser_instance = parser.__class__()
1007                parser_instance.suite_name = request.root.source. \
1008                    test_name
1009                parser_instance.test_name = test_bin.replace("./", "")
1010                parser_instance.listeners = listeners
1011                parser_instances.append(parser_instance)
1012            self.handler = ShellHandler(parser_instances)
1013            self.handler.add_process_method(_ltp_output_method)
1014            result_message = self.config.device.connector_command(
1015                "shell {}".format(test_bin))
1016            LOG.info("get result from command {}".
1017                     format(result_message))
1018            process_command_ret(result_message, self.handler)
1019
1020    def __result__(self):
1021        return self.result if os.path.exists(self.result) else ""
1022
1023
1024def _lock_screen(device):
1025    device.execute_shell_command("svc power stayon false")
1026    time.sleep(1)
1027
1028
1029def _sleep_according_to_result(result):
1030    if result:
1031        time.sleep(1)
1032
1033
1034def _ltp_output_method(handler, output, end_mark="\n"):
1035    content = output
1036    if handler.unfinished_line:
1037        content = "".join((handler.unfinished_line, content))
1038        handler.unfinished_line = ""
1039    lines = content.split(end_mark)
1040    if content.endswith(end_mark):
1041        # get rid of the tail element of this list contains empty str
1042        return lines[:-1]
1043    else:
1044        handler.unfinished_line = lines[-1]
1045        # not return the tail element of this list contains unfinished str,
1046        # so we set position -1
1047        return lines