• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21import json
22import shutil
23import zipfile
24import tempfile
25import stat
26import re
27from dataclasses import dataclass
28
29from xdevice import ParamError
30from xdevice import ExecuteTerminate
31from xdevice import IDriver
32from xdevice import platform_logger
33from xdevice import Plugin
34from xdevice import get_plugin
35from xdevice import JsonParser
36from xdevice import ShellHandler
37from xdevice import TestDescription
38from xdevice import ResourceManager
39from xdevice import get_device_log_file
40from xdevice import check_result_report
41from xdevice import get_kit_instances
42from xdevice import get_config_value
43from xdevice import do_module_kit_setup
44from xdevice import do_module_kit_teardown
45
46from xdevice_extension._core.constants import DeviceTestType
47from xdevice_extension._core.constants import DeviceConnectorType
48from xdevice_extension._core.constants import CommonParserType
49from xdevice_extension._core.constants import FilePermission
50from xdevice_extension._core.environment.dmlib import DisplayOutputReceiver
51from xdevice_extension._core.exception import ShellCommandUnresponsiveException
52from xdevice_extension._core.exception import HapNotSupportTest
53from xdevice_extension._core.exception import HdcCommandRejectedException
54from xdevice_extension._core.exception import HdcError
55from xdevice_extension._core.testkit.kit import junit_para_parse
56from xdevice_extension._core.testkit.kit import reset_junit_para
57from xdevice_extension._core.utils import get_filename_extension
58from xdevice_extension._core.utils import start_standing_subprocess
59from xdevice_extension._core.executor.listener import CollectingTestListener
60from xdevice_extension._core.testkit.kit import gtest_para_parse
61from xdevice_extension._core.environment.dmlib import process_command_ret
62
63
64__all__ = ["CppTestDriver", "HapTestDriver", "OHKernelTestDriver",
65           "JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner",
66           "RemoteDexRunner", "disable_keyguard"]
67LOG = platform_logger("Drivers")
68DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
69ON_DEVICE_TEST_DIR_LOCATION = "/%s/%s/%s/" % ("data", "local", "tmp")
70
71FAILED_RUN_TEST_ATTEMPTS = 3
72TIME_OUT = 900 * 1000
73
74
75@dataclass
76class ZunitConst(object):
77    z_unit_app = "ohos.unittest.App"
78    output_dir = "OUTPUT_DIR="
79    output_file = "OUTPUT_FILE="
80    test_class = "TEST_CLASS="
81    exec_class = "EXEC_CLASS="
82    exec_method = "EXEC_METHOD="
83    exec_level = "EXEC_LEVEL="
84    jacoco_exec_file = "JACOCO_EXEC_FILE="
85    jtest_status_filename = "jtest_status.txt"
86    remote_command_dir = "commandtmp"
87
88
89def get_level_para_string(level_string):
90    level_list = list(set(level_string.split(",")))
91    level_para_string = ""
92    for item in level_list:
93        if not item.isdigit():
94            continue
95        item = item.strip(" ")
96        level_para_string = "%sLevel%s," % (level_para_string, item)
97    level_para_string = level_para_string.strip(",")
98    return level_para_string
99
100
101def get_execute_java_test_files(suite_file):
102    java_test_file = ""
103    test_info_file = "%s.info" % suite_file[:suite_file.rfind(".")]
104    if not os.path.exists(test_info_file):
105        return java_test_file
106    try:
107        test_info_file_open = os.open(test_info_file, os.O_RDWR,
108                                      stat.S_IWUSR | stat.S_IRUSR)
109        with os.fdopen(test_info_file_open, "r") as file_desc:
110            lines = file_desc.readlines()
111            for line in lines:
112                class_name, _ = line.split(',', 1)
113                class_name = class_name.strip()
114                if not class_name.endswith("Test"):
115                    continue
116                java_test_file = "%s%s," % (java_test_file, class_name)
117    except(IOError, ValueError) as err_msg:
118        LOG.exception("Error to read info file: ", err_msg, exc_info=False)
119    if java_test_file != "":
120        java_test_file = java_test_file[:-1]
121    return java_test_file
122
123
124def get_java_test_para(testcase, testlevel):
125    exec_class = "*"
126    exec_method = "*"
127    exec_level = ""
128
129    if "" != testcase and "" == testlevel:
130        pos = testcase.rfind(".")
131        if pos != -1:
132            exec_class = testcase[0:pos]
133            exec_method = testcase[pos + 1:]
134            exec_level = ""
135        else:
136            exec_class = "*"
137            exec_method = testcase
138            exec_level = ""
139    elif "" == testcase and "" != testlevel:
140        exec_class = "*"
141        exec_method = "*"
142        exec_level = get_level_para_string(testlevel)
143
144    return exec_class, exec_method, exec_level
145
146
147def get_xml_output(config, json_config):
148    xml_output = config.testargs.get("xml-output")
149    if not xml_output:
150        if get_config_value('xml-output', json_config.get_driver(), False):
151            xml_output = get_config_value('xml-output',
152                                          json_config.get_driver(), False)
153        else:
154            xml_output = "false"
155    else:
156        xml_output = xml_output[0]
157    xml_output = str(xml_output).lower()
158    return xml_output
159
160
161def get_result_savepath(testsuit_path, result_rootpath):
162    findkey = "%stests%s" % (os.sep, os.sep)
163    filedir, _ = os.path.split(testsuit_path)
164    pos = filedir.find(findkey)
165    if -1 != pos:
166        subpath = filedir[pos + len(findkey):]
167        pos1 = subpath.find(os.sep)
168        if -1 != pos1:
169            subpath = subpath[pos1 + len(os.sep):]
170            result_path = os.path.join(result_rootpath, "result", subpath)
171        else:
172            result_path = os.path.join(result_rootpath, "result")
173    else:
174        result_path = os.path.join(result_rootpath, "result")
175
176    if not os.path.exists(result_path):
177        os.makedirs(result_path)
178
179    LOG.info("result_savepath = %s" % result_path)
180    return result_path
181
182
183# all testsuit common Unavailable test result xml
184def _create_empty_result_file(filepath, filename, error_message):
185    error_message = str(error_message)
186    error_message = error_message.replace("\"", """)
187    error_message = error_message.replace("<", "&lt;")
188    error_message = error_message.replace(">", "&gt;")
189    error_message = error_message.replace("&", "&amp;")
190    if filename.endswith(".hap"):
191        filename = filename.split(".")[0]
192    if not os.path.exists(filepath):
193        file_open = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
194                            0o755)
195        with os.fdopen(file_open, "w") as file_desc:
196            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
197                                       time.localtime())
198            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
199            file_desc.write('<testsuites tests="0" failures="0" '
200                            'disabled="0" errors="0" timestamp="%s" '
201                            'time="0" name="AllTests">\n' % time_stamp)
202            file_desc.write(
203                '  <testsuite name="%s" tests="0" failures="0" '
204                'disabled="0" errors="0" time="0.0" '
205                'unavailable="1" message="%s">\n' %
206                (filename, error_message))
207            file_desc.write('  </testsuite>\n')
208            file_desc.write('</testsuites>\n')
209            file_desc.flush()
210    return
211
212
213class ResultManager(object):
214    def __init__(self, testsuit_path, result_rootpath, device,
215                 device_testpath):
216        self.testsuite_path = testsuit_path
217        self.result_rootpath = result_rootpath
218        self.device = device
219        self.device_testpath = device_testpath
220        self.testsuite_name = os.path.basename(self.testsuite_path)
221        self.is_coverage = False
222
223    def set_is_coverage(self, is_coverage):
224        self.is_coverage = is_coverage
225
226    def get_test_results(self, error_message=""):
227        # Get test result files
228        filepath = self.obtain_test_result_file()
229        if not os.path.exists(filepath):
230            _create_empty_result_file(filepath, self.testsuite_name,
231                                      error_message)
232
233        # Get coverage data files
234        if self.is_coverage:
235            self.obtain_coverage_data()
236
237        return filepath
238
239    def obtain_test_result_file(self):
240        result_savepath = get_result_savepath(self.testsuite_path,
241                                              self.result_rootpath)
242        if self.testsuite_path.endswith('.hap'):
243            filepath = os.path.join(result_savepath, "%s.xml" % str(
244                self.testsuite_name).split(".")[0])
245
246            remote_result_name = ""
247            if self.device.is_file_exist(os.path.join(self.device_testpath,
248                                                      "testcase_result.xml")):
249                remote_result_name = "testcase_result.xml"
250            elif self.device.is_file_exist(os.path.join(self.device_testpath,
251                                                        "report.xml")):
252                remote_result_name = "report.xml"
253
254            if remote_result_name:
255                self.device.pull_file(
256                    os.path.join(self.device_testpath, remote_result_name),
257                    filepath)
258            else:
259                LOG.error("%s no report file", self.device_testpath)
260
261        else:
262            filepath = os.path.join(result_savepath, "%s.xml" %
263                                    self.testsuite_name)
264            remote_result_file = os.path.join(self.device_testpath,
265                                              "%s.xml" % self.testsuite_name)
266
267            if self.device.is_file_exist(remote_result_file):
268                self.device.pull_file(remote_result_file, result_savepath)
269            else:
270                LOG.error("%s not exists", remote_result_file)
271        return filepath
272
273    def is_exist_target_in_device(self, path, target):
274        command = "ls -l %s | grep %s" % (path, target)
275
276        check_result = False
277        stdout_info = self.device.execute_shell_command(command)
278        if stdout_info != "" and stdout_info.find(target) != -1:
279            check_result = True
280        return check_result
281
282    def obtain_coverage_data(self):
283        java_cov_path = os.path.abspath(
284            os.path.join(self.result_rootpath, "..", "coverage/data/exec"))
285        dst_target_name = "%s.exec" % self.testsuite_name
286        src_target_name = "jacoco.exec"
287        if self.is_exist_target_in_device(self.device_testpath,
288                                          src_target_name):
289            if not os.path.exists(java_cov_path):
290                os.makedirs(java_cov_path)
291            self.device.pull_file(
292                os.path.join(self.device_testpath, src_target_name),
293                os.path.join(java_cov_path, dst_target_name))
294
295        cxx_cov_path = os.path.abspath(
296            os.path.join(self.result_rootpath, "..", "coverage/data/cxx",
297                         self.testsuite_name))
298        target_name = "obj"
299        if self.is_exist_target_in_device(self.device_testpath, target_name):
300            if not os.path.exists(cxx_cov_path):
301                os.makedirs(cxx_cov_path)
302            src_file = os.path.join(self.device_testpath, target_name)
303            self.device.pull_file(src_file, cxx_cov_path)
304
305
306@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
307class CppTestDriver(IDriver):
308    """
309    CppTestDriver is a Test that runs a native test package on given device.
310    """
311
312    def __init__(self):
313        self.result = ""
314        self.error_message = ""
315        self.config = None
316        self.rerun = True
317        self.rerun_all = True
318        self.runner = None
319
320    def __check_environment__(self, device_options):
321        pass
322
323    def __check_config__(self, config):
324        pass
325
326    def __execute__(self, request):
327        try:
328            LOG.debug("Start execute xdevice extension CppTest")
329
330            self.config = request.config
331            self.config.device = request.config.environment.devices[0]
332
333            config_file = request.root.source.config_file
334            self.result = "%s.xml" % \
335                          os.path.join(request.config.report_path,
336                                       "result", request.root.source.test_name)
337
338            hilog = get_device_log_file(
339                request.config.report_path,
340                request.config.device.__get_serial__(),
341                "device_hilog")
342
343            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
344                                 0o755)
345            self.config.device.hdc_command("shell hilog -r")
346            self._run_cpp_test(config_file, listeners=request.listeners,
347                               request=request)
348            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
349                self.config.device.start_catch_device_log(hilog_file_pipe)
350                time.sleep(30)
351                hilog_file_pipe.flush()
352
353        except Exception as exception:
354            self.error_message = exception
355            if not getattr(exception, "error_no", ""):
356                setattr(exception, "error_no", "03404")
357            LOG.exception(self.error_message, exc_info=False, error_no="03404")
358            raise exception
359
360        finally:
361            self.config.device.stop_catch_device_log()
362            self.result = check_result_report(
363                request.config.report_path, self.result, self.error_message)
364
365    def _run_cpp_test(self, config_file, listeners=None, request=None):
366        try:
367            if not os.path.exists(config_file):
368                LOG.error("Error: Test cases don't exit %s." % config_file,
369                          error_no="00102")
370                raise ParamError(
371                    "Error: Test cases don't exit %s." % config_file,
372                    error_no="00102")
373
374            json_config = JsonParser(config_file)
375            kits = get_kit_instances(json_config, self.config.resource_path,
376                                     self.config.testcases_path)
377
378            for listener in listeners:
379                listener.device_sn = self.config.device.device_sn
380
381            self._get_driver_config(json_config)
382            do_module_kit_setup(request, kits)
383            self.runner = RemoteCppTestRunner(self.config)
384            self.runner.suite_name = request.root.source.test_name
385
386            if hasattr(self.config, "history_report_path") and \
387                    self.config.testargs.get("test"):
388                self._do_test_retry(listeners, self.config.testargs)
389            else:
390                gtest_para_parse(self.config.testargs, self.runner)
391                self._do_test_run(listeners)
392
393        finally:
394            do_module_kit_teardown(request)
395
396    def _do_test_retry(self, listener, testargs):
397        for test in testargs.get("test"):
398            test_item = test.split("#")
399            if len(test_item) != 2:
400                continue
401            self.runner.add_instrumentation_arg(
402                "gtest_filter", "%s.%s" % (test_item[0], test_item[1]))
403            self.runner.run(listener)
404
405    def _do_test_run(self, listener):
406        test_to_run = self._collect_test_to_run()
407        LOG.debug("collected test count is: %s" % len(test_to_run))
408        if not test_to_run:
409            self.runner.run(listener)
410        else:
411            self._run_with_rerun(listener, test_to_run)
412
413    def _collect_test_to_run(self):
414        if self.rerun:
415            self.runner.add_instrumentation_arg("gtest_list_tests", True)
416            run_results = self.runner.dry_run()
417            self.runner.remove_instrumentation_arg("gtest_list_tests")
418            return run_results
419        return None
420
421    def _run_tests(self, listener):
422        test_tracker = CollectingTestListener()
423        listener_copy = listener.copy()
424        listener_copy.append(test_tracker)
425        self.runner.run(listener_copy)
426        test_run = test_tracker.get_current_run_results()
427        return test_run
428
429    def _run_with_rerun(self, listener, expected_tests):
430        LOG.debug("ready to run with rerun, expect run: %s"
431                  % len(expected_tests))
432        test_run = self._run_tests(listener)
433        LOG.debug("run with rerun, has run: %s" % len(expected_tests))
434        if len(test_run) < len(expected_tests):
435            expected_tests = TestDescription.remove_test(expected_tests,
436                                                         test_run)
437            if not expected_tests:
438                LOG.debug("No tests to re-run, all tests executed at least "
439                          "once.")
440            if self.rerun_all:
441                self._rerun_all(expected_tests, listener)
442            else:
443                self._rerun_serially(expected_tests, listener)
444
445    def _rerun_all(self, expected_tests, listener):
446        tests = []
447        for test in expected_tests:
448            tests.append("%s.%s" % (test.class_name, test.test_name))
449        self.runner.add_instrumentation_arg("gtest_filter", ":".join(tests))
450        LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests))
451        test_run = self._run_tests(listener)
452        LOG.debug("rerun file, has run: %s" % len(test_run))
453        if len(test_run) < len(expected_tests):
454            expected_tests = TestDescription.remove_test(expected_tests,
455                                                         test_run)
456            if not expected_tests:
457                LOG.debug("Rerun textFile success")
458            self._rerun_serially(expected_tests, listener)
459
460    def _rerun_serially(self, expected_tests, listener):
461        LOG.debug("rerun serially, expected run: %s" % len(expected_tests))
462        for test in expected_tests:
463            self.runner.add_instrumentation_arg(
464                "gtest_filter", "%s.%s" % (test.class_name, test.test_name))
465            self.runner.rerun(listener, test)
466            self.runner.remove_instrumentation_arg("gtest_filter")
467
468    def _get_driver_config(self, json_config):
469        target_test_path = get_config_value('native-test-device-path',
470                                            json_config.get_driver(), False)
471        if target_test_path:
472            self.config.target_test_path = target_test_path
473        else:
474            self.config.target_test_path = DEFAULT_TEST_PATH
475
476        self.config.module_name = get_config_value(
477            'module-name', json_config.get_driver(), False)
478
479        timeout_config = get_config_value('native-test-timeout',
480                                          json_config.get_driver(), False)
481        if timeout_config:
482            self.config.timeout = int(timeout_config)
483        else:
484            self.config.timeout = TIME_OUT
485
486        rerun = get_config_value('rerun', json_config.get_driver(), False)
487        if isinstance(rerun, bool):
488            self.rerun = rerun
489        elif str(rerun).lower() == "false":
490            self.rerun = False
491        else:
492            self.rerun = True
493
494    def __result__(self):
495        return self.result if os.path.exists(self.result) else ""
496
497
498class RemoteCppTestRunner:
499    def __init__(self, config):
500        self.arg_list = {}
501        self.suite_name = None
502        self.config = config
503        self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS
504
505    def dry_run(self):
506        parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest_list)
507        if parsers:
508            parsers = parsers[:1]
509        parser_instances = []
510        for parser in parsers:
511            parser_instance = parser.__class__()
512            parser_instances.append(parser_instance)
513        handler = ShellHandler(parser_instances)
514        # apply execute right
515        chmod_cmd = "shell chmod 777 {}/{}".format(
516            self.config.target_test_path, self.config.module_name)
517        LOG.info("The apply execute command is {}".format(chmod_cmd))
518        self.config.device.hdc_command(chmod_cmd, timeout=30 * 1000)
519        # dry run command
520        dry_command = "{}/{} {}".format(self.config.target_test_path,
521                                         self.config.module_name,
522                                         self.get_args_command())
523        pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn
524        command = "%s %s" % (pre_cmd, dry_command)
525        LOG.info("The dry_command execute command is {}".format(command))
526        output = start_standing_subprocess(command.split(),
527                                           return_result=True)
528        LOG.debug("dryrun output ---")
529        output = output.replace("\r", "")
530        handler.__read__(output)
531        handler.__done__()
532        return parser_instances[0].tests
533
534    def run(self, listener):
535        handler = self._get_shell_handler(listener)
536        pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn
537        command = "{}/{} {}".format(self.config.target_test_path,
538                                         self.config.module_name,
539                                         self.get_args_command())
540        command = "%s %s" % (pre_cmd, command)
541        LOG.debug("run command is: %s" % command)
542        output = start_standing_subprocess(command.split(), return_result=True)
543        LOG.debug("run output ---")
544        output = output.replace("\r", "")
545        handler.__read__(output)
546        handler.__done__()
547
548    def rerun(self, listener, test):
549        if self.rerun_attempt:
550            test_tracker = CollectingTestListener()
551            listener_copy = listener.copy()
552            listener_copy.append(test_tracker)
553            handler = self._get_shell_handler(listener_copy)
554            try:
555                pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn
556                command = "{}/{} {}".format(self.config.target_test_path,
557                                             self.config.module_name,
558                                             self.get_args_command())
559                command = "%s %s" % (pre_cmd, command)
560                LOG.debug("rerun command is: %s" % command)
561                output = start_standing_subprocess(command.split(),
562                                                   return_result=True)
563                LOG.debug("run output ---")
564                output = output.replace("\r", "")
565                handler.__read__(output)
566                handler.__done__()
567
568            except ShellCommandUnresponsiveException:
569                LOG.debug("Exception: ShellCommandUnresponsiveException")
570            finally:
571                if not len(test_tracker.get_current_run_results()):
572                    LOG.debug("No test case is obtained finally")
573                    self.rerun_attempt -= 1
574                    handler.parsers[0].mark_test_as_blocked(test)
575        else:
576            LOG.debug("not execute and mark as blocked finally")
577            handler = self._get_shell_handler(listener)
578            handler.parsers[0].mark_test_as_blocked(test)
579
580    def add_instrumentation_arg(self, name, value):
581        if not name or not value:
582            return
583        self.arg_list[name] = value
584
585    def remove_instrumentation_arg(self, name):
586        if not name:
587            return
588        if name in self.arg_list:
589            del self.arg_list[name]
590
591    def get_args_command(self):
592        args_commands = ""
593        for key, value in self.arg_list.items():
594            if key == "gtest_list_tests":
595                args_commands = "%s --%s" % (args_commands, key)
596            else:
597                args_commands = "%s --%s=%s" % (args_commands, key, value)
598        return args_commands
599
600    def _get_shell_handler(self, listener):
601        parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest)
602        if parsers:
603            parsers = parsers[:1]
604        parser_instances = []
605        for parser in parsers:
606            parser_instance = parser.__class__()
607            parser_instance.suite_name = self.suite_name
608            parser_instance.listeners = listener
609            parser_instances.append(parser_instance)
610        handler = ShellHandler(parser_instances)
611        return handler
612
613
614@Plugin(type=Plugin.DRIVER, id=DeviceTestType.junit_test)
615class JUnitTestDriver(IDriver):
616    """
617    JUnitTestDriver is a Test that runs a native test package on given device.
618    """
619
620    def __init__(self):
621        self.result = ""
622        self.error_message = ""
623        self.kits = []
624        self.config = None
625        self.rerun = True
626        self.runner = None
627        self.rerun_using_test_file = True
628        self.temp_file_list = []
629        self.is_no_test = False
630
631    def __check_environment__(self, device_options):
632        pass
633
634    def __check_config__(self, config):
635        if hasattr(config, "devices") and len(config.devices) > 1:
636            for device in config.devices:
637                device_name = device.get("name")
638                if not device_name:
639                    self.error_message = "JUnitTest Load Error(03100)"
640                    raise ParamError("device name not set in config file",
641                                     error_no="03100")
642
643    def __execute__(self, request):
644        try:
645            LOG.debug("Start execute xdevice extension JUnit Test")
646
647            self.config = request.config
648            self.config.device = request.get_devices()[0]
649            self.config.devices = request.get_devices()
650
651            config_file = request.get_config_file()
652            LOG.info("config file: %s", config_file)
653            self.result = os.path.join(request.get("report_path"), "result",
654                                       ".".join((request.get_test_name(),
655                                                 "xml")))
656            self.__check_config__(self.config)
657
658            device_log_pipes = []
659            try:
660                for device in self.config.devices:
661                    device_name = device.get("name", "")
662                    hilog = get_device_log_file(
663                        request.config.report_path, device.__get_serial__(),
664                        "device_hilog", device_name)
665                    hilog_open = os.open(hilog, os.O_WRONLY |
666                                         os.O_CREAT | os.O_APPEND, 0o755)
667                    hilog_pipe = os.fdopen(hilog_open, "a")
668                    device.start_catch_device_log(hilog_pipe)
669                    device_log_pipes.extend([hilog_pipe])
670
671                self._run_junit(config_file, listeners=request.listeners,
672                                request=request)
673            finally:
674                for device_log_pipe in device_log_pipes:
675                    device_log_pipe.flush()
676                    device_log_pipe.close()
677                for device in self.config.devices:
678                    device.stop_catch_device_log()
679
680        except Exception as exception:
681            if not getattr(exception, "error_no", ""):
682                setattr(exception, "error_no", "03405")
683            LOG.exception(self.error_message, exc_info=False, error_no="03405")
684            raise exception
685
686        finally:
687            if not self._is_ignore_report():
688                self.result = check_result_report(
689                    request.config.report_path, self.result,
690                    self.error_message)
691            else:
692                LOG.debug("hide result and not generate report")
693
694    def _run_junit(self, config_file, listeners, request):
695        try:
696            if not os.path.exists(config_file):
697                error_msg = "Error: Test cases %s don't exist." % config_file
698                LOG.error(error_msg, error_no="00102")
699                raise ParamError(error_msg, error_no="00102")
700
701            for device in self.config.devices:
702                cmd = "target mount" \
703                    if device.usb_type == DeviceConnectorType.hdc \
704                    else "remount"
705                device.hdc_command(cmd)
706            json_config = JsonParser(config_file)
707            self.kits = get_kit_instances(json_config,
708                                          self.config.resource_path,
709                                          self.config.testcases_path)
710
711            for listener in listeners:
712                listener.device_sn = self.config.device.device_sn
713
714            self._get_driver_config(json_config)
715            do_module_kit_setup(request, self.kits)
716            self.runner = RemoteTestRunner(self.config)
717            self.runner.suite_name = request.get_test_name()
718            self.runner.suite_file = "%s.hap" % \
719                                     get_filename_extension(config_file)[0]
720
721            self._get_runner_config(json_config)
722            if hasattr(self.config, "history_report_path") and \
723                    self.config.testargs.get("test"):
724                self._do_test_retry(listeners, self.config.testargs)
725            else:
726                self._do_include_tests()
727                self.runner.junit_para = junit_para_parse(
728                    self.config.device, self.config.testargs, "-s")
729                self._do_test_run(listeners)
730
731        finally:
732            do_module_kit_teardown(request)
733            if self.runner and self.runner.junit_para and (
734                    self.runner.junit_para.find("testFile") != -1
735                    or self.runner.junit_para.find("notTestFile") != -1):
736                self._junit_clear()
737
738    def _junit_clear(self):
739        self.config.device.execute_shell_command(
740            "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur"))
741        for temp_file in self.temp_file_list:
742            if os.path.exists(temp_file):
743                os.remove(temp_file)
744        self.temp_file_list.clear()
745
746    def _get_driver_config(self, json_config):
747        package = get_config_value('package', json_config.get_driver(), False)
748        runner = "ohos.testkit.runner.Runner"
749        include_tests = get_config_value("include-tests",
750                                         json_config.get_driver(), True, [])
751        if not package:
752            raise ParamError("Can't find package in config file.")
753        self.config.package = package
754        self.config.runner = runner
755        self.config.include_tests = include_tests
756
757        self.config.xml_output = get_xml_output(self.config, json_config)
758
759        timeout_config = get_config_value('shell-timeout',
760                                          json_config.get_driver(), False)
761        if timeout_config:
762            self.config.timeout = int(timeout_config)
763        else:
764            self.config.timeout = TIME_OUT
765
766        nohup = get_config_value('nohup', json_config.get_driver(), False)
767        if nohup and (nohup == "true" or nohup == "True"):
768            self.config.nohup = True
769        else:
770            self.config.nohup = False
771
772    def _get_runner_config(self, json_config):
773        test_timeout = get_config_value('test-timeout',
774                                        json_config.get_driver(), False)
775        if test_timeout:
776            self.runner.add_instrumentation_arg("timeout_sec",
777                                                int(test_timeout))
778
779    def _do_test_retry(self, listener, testargs):
780        for test in testargs.get("test"):
781            test_item = test.split("#")
782            if len(test_item) != 2:
783                continue
784            self.runner.class_name = test_item[0]
785            self.runner.test_name = test_item[1]
786            self.runner.run(listener)
787
788    def _do_test_run(self, listener):
789        if not self._check_package():
790            LOG.error("%s is not supported test" % self.config.package)
791            raise HapNotSupportTest("%s is not supported test" %
792                                    self.config.package)
793        test_to_run = self._collect_test_to_run()
794        LOG.debug("collected test count is: %s" % len(test_to_run))
795        if not test_to_run:
796            self.is_no_test = True
797            self.runner.run(listener)
798        else:
799            self._run_with_rerun(listener, test_to_run)
800
801    def _check_package(self):
802        command = '''systemdumper -s 401 -a "-bundle %s"''' % \
803                  self.config.package
804        output = self.config.device.execute_shell_command(command)
805        LOG.debug("systemdumper output: %s" % output)
806        if output and "ohos.testkit.runner.EntryAbility" in output:
807            return True
808        else:
809            LOG.info("try hidumper command to check package")
810            command = '''hidumper -s 401 -a "-bundle %s"''' % \
811                      self.config.package
812            output = self.config.device.execute_shell_command(command)
813            LOG.debug("hidumper output: %s" % output)
814            if output and "ohos.testkit.runner.EntryAbility" in output:
815                return True
816        return False
817
818    def _run_tests(self, listener):
819        test_tracker = CollectingTestListener()
820        listener_copy = listener.copy()
821        listener_copy.append(test_tracker)
822        self.runner.run(listener_copy)
823        test_run = test_tracker.get_current_run_results()
824        return test_run
825
826    def _run_with_rerun(self, listener, expected_tests):
827        LOG.debug("ready to run with rerun, expect run: %s"
828                  % len(expected_tests))
829        test_run = self._run_tests(listener)
830        LOG.debug("run with rerun, has run: %s" % len(expected_tests))
831        if len(test_run) < len(expected_tests):
832            expected_tests = TestDescription.remove_test(
833                expected_tests, test_run)
834            if not expected_tests:
835                LOG.debug("No tests to re-run, all tests executed at least "
836                          "once.")
837            if self.rerun_using_test_file:
838                self._rerun_file(expected_tests, listener)
839            else:
840                self._rerun_serially(expected_tests, listener)
841
842    def _make_test_file(self, expected_tests, test_file_path):
843        file_name = 'xdevice_testFile_%s.txt' % self.runner.suite_name
844        file_path = os.path.join(test_file_path, file_name)
845        try:
846            file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT |
847                                     os.O_APPEND, 0o755)
848            with os.fdopen(file_path_open, "a") as file_desc:
849                for test in expected_tests:
850                    file_desc.write("%s#%s" % (test.class_name,
851                                               test.test_name))
852                    file_desc.write("\n")
853                    file_desc.flush()
854        except(IOError, ValueError) as err_msg:
855            LOG.exception("Error for make long command file: ", err_msg,
856                          exc_info=False, error_no="03200")
857        return file_name, file_path
858
859    def _rerun_file(self, expected_tests, listener):
860        test_file_path = tempfile.mkdtemp(prefix="test_file_",
861                                          dir=self.config.report_path)
862        file_name, file_path = self._make_test_file(
863            expected_tests, test_file_path)
864        self.config.device.push_file(file_path, ON_DEVICE_TEST_DIR_LOCATION)
865        file_path_on_device = ''.join((ON_DEVICE_TEST_DIR_LOCATION, file_name))
866        self.runner.add_instrumentation_arg("testFile", file_path_on_device)
867        self.runner.junit_para = reset_junit_para(self.runner.junit_para, "-s")
868        LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests))
869        test_run = self._run_tests(listener)
870        LOG.debug("rerun file, has run: %s" % len(test_run))
871        self.config.device.execute_shell_command("rm %s" % file_path_on_device)
872        if len(test_run) < len(expected_tests):
873            expected_tests = TestDescription.remove_test(expected_tests,
874                                                         test_run)
875            if not expected_tests:
876                LOG.debug("Rerun textFile success")
877            self._rerun_serially(expected_tests, listener)
878        shutil.rmtree(test_file_path)
879
880    def _rerun_serially(self, expected_tests, listener):
881        LOG.debug("rerun serially, expected run: %s" % len(expected_tests))
882        self.runner.remove_instrumentation_arg("testFile")
883        for test in expected_tests:
884            self.runner.class_name = test.class_name
885            self.runner.test_name = test.test_name
886            self.runner.rerun(listener, test)
887
888    def _collect_test_to_run(self):
889        if self.rerun and self.config.xml_output == "false" and \
890                not self.config.nohup:
891            self.runner.set_test_collection(True)
892            tests = self._collect_test_and_retry()
893            self.runner.set_test_collection(False)
894            return tests
895        return None
896
897    def _collect_test_and_retry(self):
898        collector = CollectingTestListener()
899        listener = [collector]
900        self.runner.run(listener)
901        run_results = collector.get_current_run_results()
902        return run_results
903
904    def _do_include_tests(self):
905        """
906        handler the include-tests parameters in json file of current module.
907        the main approach is to inject new dict into "testargs".
908        """
909        if not self.config.include_tests:
910            return
911        keys_list = [key.strip() for key in self.config.testargs.keys()]
912        if "test-file-include-filter" in keys_list:
913            return
914        test_filter = self._slice_include_tests()
915        if not test_filter:
916            LOG.error("invalid include-tests! please check json file.")
917            return
918        if "test" in keys_list or "class" in keys_list:
919            test_list = list()
920            if "test" in keys_list:
921                for element in self.config.testargs.get("test", []):
922                    if self._filter_valid_test(element, test_filter):
923                        test_list.append(element.strip())
924                self.config.testargs.pop("test")
925            if "class" in keys_list:
926                for element in self.config.testargs.get("class", []):
927                    if self._filter_valid_test(element, test_filter):
928                        test_list.append(element.strip())
929                self.config.testargs.pop("class")
930        else:
931            test_list = [ele.strip() for ele in self.config.include_tests]
932        if test_list:
933            import datetime
934            prefix = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')
935
936            save_file = \
937                os.path.join(self.config.report_path, "temp_%s.txt" % prefix)
938            save_file_open = os.open(save_file, os.O_WRONLY
939                                     | os.O_CREAT, 0o755)
940            with os.fdopen(save_file_open, "w") as save_handler:
941                for test in test_list:
942                    save_handler.write("{}\n".format(test.strip()))
943                save_handler.flush()
944            self.temp_file_list.append(save_file)
945            include_tests_key = "test-file-include-filter"
946            self.config.testargs.update(
947                {include_tests_key: self.temp_file_list})
948            LOG.debug("handle include-tests, write to %s, data length is %s" %
949                      (self.temp_file_list[0], len(test_list)))
950        else:
951            msg = "there is any valid test after filter by 'include-tests'"
952            LOG.error(msg)
953            raise ParamError(msg)
954
955    def _slice_include_tests(self):
956        test_filter = dict()
957        for include_test in self.config.include_tests:
958            include_test = include_test.strip()
959            if include_test:
960                # element like 'class#method'
961                if "#" in include_test:
962                    test_list = test_filter.get("test_in", [])
963                    test_list.append(include_test)
964                    test_filter.update({"test_in": test_list})
965                # element like 'class'
966                else:
967                    class_list = test_filter.get("class_in", [])
968                    class_list.append(include_test)
969                    test_filter.update({"class_in": class_list})
970            else:
971                LOG.warning("there is empty element in include-tests")
972        if len([ele for test in test_filter.values() for ele in test]) > 0:
973            return test_filter
974
975    @classmethod
976    def _filter_valid_test(cls, element, test_filter):
977        element = element.strip()
978        # if element in the list which element like 'class#method'
979        if element in test_filter.get("test_in", []):
980            return element
981        # if element is the list which element like 'class'
982        element_items = element.split("#")
983        if element_items[0].strip() in test_filter.get("class_in", []):
984            return element
985        raise ParamError("{} not match 'include-tests'!".format(element))
986
987    def _is_ignore_report(self):
988        if self.config.task and not self.config.testlist and \
989                not self.config.testfile:
990            if self.is_no_test and self.config.testargs.get("level", None):
991                return True
992
993    def __result__(self):
994        return self.result if os.path.exists(self.result) else ""
995
996
997class RemoteTestRunner:
998    def __init__(self, config):
999        self.arg_list = {}
1000        self.suite_name = None
1001        self.suite_file = None
1002        self.class_name = None
1003        self.test_name = None
1004        self.junit_para = None
1005        self.config = config
1006        self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS
1007
1008    def __check_environment__(self, device_options):
1009        pass
1010
1011    def run(self, listener):
1012        handler = self._get_shell_handler(listener)
1013        # execute test case
1014        command = "aa start -p %s " \
1015                  "-n ohos.testkit.runner.EntryAbility" \
1016                  " -s unittest %s -s rawLog true %s %s" \
1017                  % (self.config.package, self.config.runner, self.junit_para,
1018                     self.get_args_command())
1019
1020        try:
1021            if self.config.nohup:
1022                nohup_command = "nohup %s &" % command
1023                result_value = self.config.device.execute_shell_cmd_background(
1024                    nohup_command, timeout=self.config.timeout)
1025            elif self.config.xml_output == "true":
1026                result_value = self.config.device.execute_shell_command(
1027                    command, timeout=self.config.timeout,
1028                    retry=0)
1029            else:
1030                self.config.device.execute_shell_command(
1031                    command, timeout=self.config.timeout, receiver=handler,
1032                    retry=0)
1033                return
1034        except ShellCommandUnresponsiveException:
1035            LOG.debug("Exception: ShellCommandUnresponsiveException")
1036        else:
1037            self.config.target_test_path = "/%s/%s/%s/%s/%s/" % \
1038                                           ("data", "user", "0",
1039                                            self.config.package, "cache")
1040            result = ResultManager(self.suite_file, self.config.report_path,
1041                                   self.config.device,
1042                                   self.config.target_test_path)
1043            result.get_test_results(result_value)
1044
1045    def rerun(self, listener, test):
1046        if self.rerun_attempt:
1047            listener_copy = listener.copy()
1048            test_tracker = CollectingTestListener()
1049            listener_copy.append(test_tracker)
1050            handler = self._get_shell_handler(listener_copy)
1051            try:
1052                command = "aa start -p %s " \
1053                          "-n ohos.testkit.runner.EntryAbility" \
1054                          " -s unittest %s -s rawLog true %s" \
1055                          % (self.config.package, self.config.runner,
1056                             self.get_args_command())
1057
1058                self.config.device.execute_shell_command(
1059                    command, timeout=self.config.timeout, receiver=handler,
1060                    retry=0)
1061
1062            except Exception as error:
1063                LOG.error("rerun error %s, %s" % (error, error.__class__))
1064            finally:
1065                if not len(test_tracker.get_current_run_results()):
1066                    LOG.debug("No test case is obtained finally")
1067                    self.rerun_attempt -= 1
1068                    handler.parsers[0].mark_test_as_blocked(test)
1069        else:
1070            LOG.debug("not execute and mark as blocked finally")
1071            handler = self._get_shell_handler(listener)
1072            handler.parsers[0].mark_test_as_blocked(test)
1073
1074    def set_test_collection(self, collect):
1075        if collect:
1076            self.add_instrumentation_arg("log", "true")
1077        else:
1078            self.remove_instrumentation_arg("log")
1079
1080    def add_instrumentation_arg(self, name, value):
1081        if not name or not value:
1082            return
1083        self.arg_list[name] = value
1084
1085    def remove_instrumentation_arg(self, name):
1086        if not name:
1087            return
1088        if name in self.arg_list:
1089            del self.arg_list[name]
1090
1091    def get_args_command(self):
1092        args_commands = ""
1093        for key, value in self.arg_list.items():
1094            args_commands = "%s -s %s %s" % (args_commands, key, value)
1095        if self.class_name and self.test_name:
1096            args_commands = "%s -s class %s#%s" % (args_commands,
1097                                                   self.class_name,
1098                                                   self.test_name)
1099        elif self.class_name:
1100            args_commands = "%s -s class %s" % (args_commands, self.class_name)
1101        return args_commands
1102
1103    def _get_shell_handler(self, listener):
1104        parsers = get_plugin(Plugin.PARSER, CommonParserType.junit)
1105        if parsers:
1106            parsers = parsers[:1]
1107        parser_instances = []
1108        for parser in parsers:
1109            parser_instance = parser.__class__()
1110            parser_instance.suite_name = self.suite_name
1111            parser_instance.listeners = listener
1112            parser_instances.append(parser_instance)
1113        handler = ShellHandler(parser_instances)
1114        return handler
1115
1116
1117class RemoteDexRunner:
1118    def __init__(self, config):
1119        self.arg_list = {}
1120        self.suite_name = None
1121        self.junit_para = ""
1122        self.config = config
1123        self.class_name = None
1124        self.test_name = None
1125        self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS
1126
1127    def run(self, listener):
1128        handler = self._get_shell_handler(listener)
1129        command = "export BOOTCLASSPATH=$BOOTCLASSPATH:" \
1130                  "{remote_path}/{module_name};cd {remote_path}; " \
1131                  "app_process -cp {remote_path}/{module_name} / " \
1132                  "ohos.testkit.runner.JUnitRunner {junit_para}{arg_list}" \
1133                  " --rawLog true --coverage false " \
1134                  "--classpathToScan {remote_path}/{module_name}".format(
1135                   remote_path=self.config.remote_path,
1136                   module_name=self.config.module_name,
1137                   junit_para=self.junit_para,
1138                   arg_list=self.get_args_command())
1139
1140        try:
1141            self.config.device.execute_shell_command(
1142                command, timeout=self.config.timeout,
1143                receiver=handler, retry=0)
1144        except ConnectionResetError:
1145            if len(listener) == 1 and isinstance(listener[0],
1146                                                 CollectingTestListener):
1147                LOG.info("try subprocess ")
1148                listener[0].tests.clear()
1149                command = ["shell", command]
1150                result = self.config.device.hdc_command(
1151                    command, timeout=self.config.timeout, retry=0,
1152                    join_result=True)
1153                handler.__read__(result)
1154                handler.__done__()
1155                LOG.info("get current testcase: %s " %
1156                         len(listener[0].get_current_run_results()))
1157
1158    def rerun(self, listener, test):
1159        if self.rerun_attempt:
1160            listener_copy = listener.copy()
1161            test_tracker = CollectingTestListener()
1162            listener_copy.append(test_tracker)
1163            handler = self._get_shell_handler(listener_copy)
1164            try:
1165                command = "export BOOTCLASSPATH=$BOOTCLASSPATH:" \
1166                          "{remote_path}/{module_name};cd {remote_path}; " \
1167                          "app_process -cp {remote_path}/{module_name} / " \
1168                          "ohos.testkit.runner.JUnitRunner {arg_list} " \
1169                          "--rawLog true --coverage false " \
1170                          "--classpathToScan " \
1171                          "{remote_path}/{module_name}".format(
1172                           remote_path=self.config.remote_path,
1173                           module_name=self.config.module_name,
1174                           arg_list=self.get_args_command())
1175                self.config.device.execute_shell_command(
1176                    command, timeout=self.config.timeout,
1177                    receiver=handler, retry=0)
1178
1179            except ShellCommandUnresponsiveException as _:
1180                LOG.debug("Exception: ShellCommandUnresponsiveException")
1181            finally:
1182                if not len(test_tracker.get_current_run_results()):
1183                    LOG.debug("No test case is obtained finally")
1184                    self.rerun_attempt -= 1
1185                    handler.parsers[0].mark_test_as_blocked(test)
1186        else:
1187            LOG.debug("not execute and mark as blocked finally")
1188            handler = self._get_shell_handler(listener)
1189            handler.parsers[0].mark_test_as_blocked(test)
1190
1191    def set_test_collection(self, collect):
1192        if collect:
1193            self.add_instrumentation_arg("log", "true")
1194        else:
1195            self.remove_instrumentation_arg("log")
1196
1197    def add_instrumentation_arg(self, name, value):
1198        if not name or not value:
1199            return
1200        self.arg_list[name] = value
1201
1202    def remove_instrumentation_arg(self, name):
1203        if not name:
1204            return
1205        if name in self.arg_list:
1206            del self.arg_list[name]
1207
1208    def get_args_command(self):
1209        args_commands = ""
1210        for key, value in self.arg_list.items():
1211            args_commands = "%s --%s %s" % (args_commands, key, value)
1212        if self.class_name and self.test_name:
1213            args_commands = "%s --class %s#%s" % (args_commands,
1214                                                  self.class_name,
1215                                                  self.test_name)
1216        elif self.class_name:
1217            args_commands = "%s --class %s" % (args_commands, self.class_name)
1218        return args_commands
1219
1220    def _get_shell_handler(self, listener):
1221        parsers = get_plugin(Plugin.PARSER, CommonParserType.junit)
1222        if parsers:
1223            parsers = parsers[:1]
1224        parser_instances = []
1225        for parser in parsers:
1226            parser_instance = parser.__class__()
1227            parser_instance.suite_name = self.suite_name
1228            parser_instance.listeners = listener
1229            parser_instances.append(parser_instance)
1230        handler = ShellHandler(parser_instances)
1231        return handler
1232
1233
1234@Plugin(type=Plugin.DRIVER, id=DeviceTestType.hap_test)
1235class HapTestDriver(IDriver):
1236    """
1237    HapTestDriver is a Test that runs a native test package on given device.
1238    """
1239    # test driver config
1240    config = None
1241    instrument_hap_file_suffix = '_ad.hap'
1242    result = ""
1243
1244    def __init__(self):
1245        self.ability_name = ""
1246        self.package_name = ""
1247        self.activity_name = ""
1248
1249    def __check_environment__(self, device_options):
1250        pass
1251
1252    def __check_config__(self, config):
1253        pass
1254
1255    def __execute__(self, request):
1256        try:
1257            LOG.debug("Start execute xdevice extension HapTest")
1258
1259            self.config = request.config
1260            self.config.target_test_path = DEFAULT_TEST_PATH
1261            self.config.device = request.config.environment.devices[0]
1262
1263            suite_file = request.root.source.source_file
1264            if not suite_file:
1265                LOG.error("test source '%s' not exists" %
1266                          request.root.source.source_string, error_no="00110")
1267                return
1268
1269            LOG.debug("Testsuite FilePath: %s" % suite_file)
1270            package_name, ability_name = self._get_package_and_ability_name(
1271                suite_file)
1272            self.package_name = package_name
1273            self.ability_name = ability_name
1274            self.activity_name = "%s.MainAbilityShellActivity" % \
1275                                 self.package_name
1276            self.config.test_hap_out_path = \
1277                "/data/data/%s/files/test/result/" % self.package_name
1278            self.config.test_suite_timeout = 300 * 1000
1279
1280            serial = request.config.device.__get_serial__()
1281            device_log_file = get_device_log_file(request.config.report_path,
1282                                                  serial)
1283            device_log_file_open = os.open(
1284                device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
1285            with os.fdopen(device_log_file_open, "a")as file_pipe:
1286                self.config.device.start_catch_device_log(file_pipe)
1287                self._init_junit_test()
1288                self._run_junit_test(suite_file)
1289                file_pipe.flush()
1290        finally:
1291            self.config.device.stop_catch_device_log()
1292
1293    def _init_junit_test(self):
1294        cmd = "target mount" \
1295            if self.config.device.usb_type == DeviceConnectorType.hdc \
1296            else "remount"
1297        self.config.device.hdc_command(cmd)
1298        self.config.device.execute_shell_command(
1299            "rm -rf %s" % self.config.target_test_path)
1300        self.config.device.execute_shell_command(
1301            "mkdir -p %s" % self.config.target_test_path)
1302        self.config.device.execute_shell_command(
1303            "mount -o rw,remount,rw /%s" % "system")
1304
1305    def _run_junit_test(self, suite_file):
1306        filename = os.path.basename(suite_file)
1307        suitefile_target_test_path = self.config.test_hap_out_path
1308        junit_test_para = self._get_junit_test_para(filename, suite_file)
1309        is_coverage_test = True if self.config.coverage == "coverage" else \
1310            False
1311
1312        # push testsuite file
1313        self.config.device.push_file(suite_file, self.config.target_test_path)
1314
1315        resource_manager = ResourceManager()
1316        resource_data_dic, resource_dir = \
1317            resource_manager.get_resource_data_dic(suite_file)
1318        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
1319                                               self.config.device)
1320
1321        # execute testcase
1322        install_result = self._install_hap(filename)
1323        result = ResultManager(suite_file, self.config.report_path,
1324                               self.config.device,
1325                               self.config.test_hap_out_path)
1326        result.set_is_coverage(is_coverage_test)
1327        if install_result:
1328            return_message = self._execute_suitefile_junittest(
1329                filename, junit_test_para, suitefile_target_test_path)
1330
1331            self.result = result.get_test_results(return_message)
1332            self._uninstall_hap(self.package_name)
1333        else:
1334            self.result = result.get_test_results("Error: install hap failed.")
1335            LOG.error("Error: install hap failed.", error_no="03204")
1336
1337        resource_manager.process_cleaner_data(resource_data_dic, resource_dir,
1338                                              self.config.device)
1339
1340    def _get_junit_test_para(self, filename, suite_file):
1341        if not filename.endswith(self.instrument_hap_file_suffix):
1342            exec_class, exec_method, exec_level = get_java_test_para(
1343                self.config.testcase, self.config.testlevel)
1344            java_test_file = get_execute_java_test_files(suite_file)
1345            junit_test_para = self._get_hap_test_para(java_test_file,
1346                                                      exec_class, exec_method,
1347                                                      exec_level)
1348        else:
1349            junit_test_para = get_execute_java_test_files(suite_file)
1350        return junit_test_para
1351
1352    @staticmethod
1353    def _get_hap_test_para(java_test_file, exec_class, exec_method,
1354                           exec_level):
1355        hap_test_para = "%s%s#%s%s#%s%s#%s%s" % (
1356            ZunitConst.test_class, java_test_file,
1357            ZunitConst.exec_class, exec_class,
1358            ZunitConst.exec_method, exec_method,
1359            ZunitConst.exec_level, exec_level)
1360        return hap_test_para
1361
1362    def _execute_suitefile_junittest(self, filename, testpara,
1363                                     target_test_path):
1364        return_message = self._execute_hapfile_junittest(filename, testpara,
1365                                                         target_test_path)
1366        return return_message
1367
1368    def _execute_hapfile_junittest(self, filename, testpara, target_test_path):
1369        _unlock_screen(self.config.device)
1370        _unlock_device(self.config.device)
1371
1372        try:
1373            if not filename.endswith(self.instrument_hap_file_suffix):
1374                return_message = self.start_hap_activity(testpara, filename)
1375                LOG.info("HAP Testcase is executing, please wait a moment...")
1376                if "Error" not in return_message:
1377                    self._check_hap_finished(target_test_path)
1378            else:
1379                return_message = self.start_instrument_hap_activity(testpara)
1380        except (ExecuteTerminate, HdcCommandRejectedException,
1381                ShellCommandUnresponsiveException, HdcError) as exception:
1382            return_message = str(exception.args)
1383            if not getattr(exception, "error_no", ""):
1384                setattr(exception, "error_no", "03203")
1385
1386        _lock_screen(self.config.device)
1387        return return_message
1388
1389    def _init_hap_device(self):
1390        self.config.device.execute_shell_command(
1391            "rm -rf %s" % self.config.test_hap_out_path)
1392        self.config.device.execute_shell_command(
1393            "mkdir -p %s" % self.config.test_hap_out_path)
1394
1395    def _install_hap(self, filename):
1396        message = self.config.device.execute_shell_command(
1397            "bm install -p %s" % os.path.join(self.config.target_test_path,
1398                                              filename))
1399        message = str(message).rstrip()
1400        if message == "" or "Success" in message:
1401            return_code = True
1402            if message != "":
1403                LOG.info(message)
1404        else:
1405            return_code = False
1406            if message != "":
1407                LOG.warning(message)
1408
1409        _sleep_according_to_result(return_code)
1410        return return_code
1411
1412    def start_hap_activity(self, testpara, filename):
1413        execute_para = testpara
1414        if self.config.coverage == "coverage":
1415            execute_para = ''.join((execute_para, ' ',
1416                                    ZunitConst.jacoco_exec_file, filename,
1417                                    ".exec"))
1418        try:
1419            display_receiver = DisplayOutputReceiver()
1420            self.config.device.execute_shell_command(
1421                "am start -S -n %s/%s --es param '%s'" %
1422                (self.package_name, self.activity_name,
1423                 execute_para), receiver=display_receiver,
1424                timeout=self.config.test_suite_timeout)
1425            _sleep_according_to_result(display_receiver.output)
1426            return_message = display_receiver.output
1427
1428        except (ExecuteTerminate, HdcCommandRejectedException,
1429                ShellCommandUnresponsiveException, HdcError) as exception:
1430            return_message = exception.args
1431            if not getattr(exception, "error_no", ""):
1432                setattr(exception, "error_no", "03203")
1433        return return_message
1434
1435    def start_instrument_hap_activity(self, testpara):
1436        from xdevice import Variables
1437        try:
1438            display_receiver = DisplayOutputReceiver()
1439            if self.config.coverage != "coverage":
1440                self.config.device.execute_shell_command(
1441                    "aa start -p %s -n %s -s AbilityTestCase %s -w %s" %
1442                    (self.package_name, self.ability_name, testpara,
1443                     str(self.config.test_suite_timeout)),
1444                    receiver=display_receiver,
1445                    timeout=self.config.test_suite_timeout)
1446            else:
1447                build_variant_outpath = os.path.join(
1448                    Variables.source_code_rootpath, "out",
1449                    self.config.build_variant)
1450                strip_num = len(build_variant_outpath.split(os.sep)) - 1
1451                self.config.device.execute_shell_command(
1452                    "cd %s; export GCOV_PREFIX=%s; "
1453                    "export GCOV_PREFIX_STRIP=%d; "
1454                    "aa start -p %s -n %s -s AbilityTestCase %s -w %s" %
1455                    (self.config.target_test_path,
1456                     self.config.target_test_path,
1457                     strip_num, self.package_name, self.ability_name, testpara,
1458                     str(self.config.test_suite_timeout)),
1459                    receiver=display_receiver,
1460                    timeout=self.config.test_suite_timeout)
1461            _sleep_according_to_result(display_receiver.output)
1462            return_message = display_receiver.output
1463        except (ExecuteTerminate, HdcCommandRejectedException,
1464                ShellCommandUnresponsiveException, HdcError) as exception:
1465            return_message = exception.args
1466            if not getattr(exception, "error_no", ""):
1467                setattr(exception, "error_no", "03203")
1468        return return_message
1469
1470    def _check_hap_finished(self, target_test_path):
1471        run_timeout = True
1472        sleep_duration = 3
1473        target_file = os.path.join(target_test_path,
1474                                   ZunitConst.jtest_status_filename)
1475        for _ in range(
1476                int(self.config.test_suite_timeout / (1000 * sleep_duration))):
1477            check_value = self.config.device.is_file_exist(target_file)
1478            LOG.info("%s state: %s", self.config.device.device_sn,
1479                     self.config.device.test_device_state.value)
1480            if not check_value:
1481                time.sleep(sleep_duration)
1482                continue
1483            run_timeout = False
1484            break
1485        if run_timeout:
1486            return_code = False
1487            LOG.error("HAP Testcase executed timeout or exception, please "
1488                      "check detail information from system log",
1489                      error_no="03205")
1490        else:
1491            return_code = True
1492            LOG.info("HAP Testcase executed finished")
1493        return return_code
1494
1495    def _uninstall_hap(self, package_name):
1496        return_message = self.config.device.execute_shell_command(
1497            "pm uninstall %s" % package_name)
1498        _sleep_according_to_result(return_message)
1499        return return_message
1500
1501    @staticmethod
1502    def _get_package_and_ability_name(hap_filepath):
1503        package_name = ""
1504        ability_name = ""
1505
1506        if os.path.exists(hap_filepath):
1507            filename = os.path.basename(hap_filepath)
1508
1509            # unzip the hap file
1510            hap_bak_path = os.path.abspath(
1511                os.path.join(os.path.dirname(hap_filepath),
1512                             "%s.bak" % filename))
1513            try:
1514                with zipfile.ZipFile(hap_filepath) as zf_desc:
1515                    zf_desc.extractall(path=hap_bak_path)
1516            except RuntimeError as error:
1517                LOG.error(error, error_no="03206")
1518
1519            # verify config.json file
1520            app_profile_path = os.path.join(hap_bak_path,
1521                                            "config.json")
1522            if not os.path.exists(app_profile_path):
1523                LOG.info("file %s not exists" % app_profile_path)
1524                return package_name, ability_name
1525
1526            if os.path.isdir(app_profile_path):
1527                LOG.info("%s is a folder, and not a file" % app_profile_path)
1528                return package_name, ability_name
1529
1530            # get package_name and ability_name value.
1531            app_profile_path_open = os.open(app_profile_path, os.O_RDONLY,
1532                                            stat.S_IWUSR | stat.S_IRUSR)
1533            with os.fdopen(app_profile_path_open, 'r') as load_f:
1534                load_dict = json.load(load_f)
1535            profile_list = load_dict.values()
1536            for profile in profile_list:
1537                package_name = profile.get("package")
1538                if not package_name:
1539                    continue
1540                abilities = profile.get("abilities")
1541                for abilitie in abilities:
1542                    abilities_name = abilitie.get("name")
1543                    if abilities_name.startswith("."):
1544                        ability_name = ''.join(
1545                            (package_name,
1546                             abilities_name[abilities_name.find("."):]))
1547                    else:
1548                        ability_name = abilities_name
1549                    break
1550                break
1551
1552            # delete hap_bak_path
1553            if os.path.exists(hap_bak_path):
1554                shutil.rmtree(hap_bak_path)
1555        else:
1556            LOG.info("file %s not exists" % hap_filepath)
1557
1558        return package_name, ability_name
1559
1560    def __result__(self):
1561        return self.result if os.path.exists(self.result) else ""
1562
1563
1564@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
1565class JSUnitTestDriver(IDriver):
1566    """
1567    JSUnitTestDriver is a Test that runs a native test package on given device.
1568    """
1569
1570    def __init__(self):
1571        self.xml_output = "false"
1572        self.timeout = 80 * 1000
1573        self.start_time = None
1574        self.result = ""
1575        self.error_message = ""
1576        self.kits = []
1577        self.config = None
1578
1579    def __check_environment__(self, device_options):
1580        pass
1581
1582    def __check_config__(self, config):
1583        pass
1584
1585    def __execute__(self, request):
1586        try:
1587            LOG.debug("Start execute xdevice extension JSUnit Test")
1588            self.result = os.path.join(
1589                request.config.report_path, "result",
1590                '.'.join((request.get_module_name(), "xml")))
1591            self.config = request.config
1592            self.config.device = request.config.environment.devices[0]
1593
1594            config_file = request.root.source.config_file
1595            suite_file = request.root.source.source_file
1596
1597            if not suite_file:
1598                raise ParamError(
1599                    "test source '%s' not exists" %
1600                    request.root.source.source_string, error_no="00110")
1601
1602            LOG.debug("Test case file path: %s" % suite_file)
1603            # avoid hilog service stuck issue
1604            self.config.device.hdc_command("shell stop_service hilogd",
1605                                           timeout=30 * 1000)
1606            self.config.device.hdc_command("shell start_service hilogd",
1607                                           timeout=30 * 1000)
1608            time.sleep(10)
1609
1610            self.config.device.hdc_command("shell hilog -r", timeout=30 * 1000)
1611            self._run_jsunit(config_file, request)
1612        except Exception as exception:
1613            self.error_message = exception
1614            if not getattr(exception, "error_no", ""):
1615                setattr(exception, "error_no", "03409")
1616            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1617            raise exception
1618        finally:
1619            self.config.device.stop_catch_device_log()
1620            self.result = check_result_report(
1621                request.config.report_path, self.result, self.error_message)
1622
1623    def generate_console_output(self, device_log_file, request, timeout):
1624        LOG.info("prepare to read device log, may wait some time")
1625        message_list = list()
1626        label_list, suite_info, is_suites_end = self.read_device_log_timeout(
1627            device_log_file, message_list, timeout)
1628        if not is_suites_end:
1629            message_list.append("app Log: [end] run suites end\n")
1630            LOG.warning("there is no suites end")
1631        if len(label_list[0]) > 0 and sum(label_list[0]) != 0:
1632            # the problem happened! when the sum of label list is not zero
1633            self._insert_suite_end(label_list, message_list)
1634
1635        result_message = "".join(message_list)
1636        message_list.clear()
1637        expect_tests_dict = self._parse_suite_info(suite_info)
1638        self._analyse_tests(request, result_message, expect_tests_dict)
1639
1640    @classmethod
1641    def _insert_suite_end(cls, label_list, message_list):
1642        for i in range(len(label_list[0])):
1643            if label_list[0][i] != 1:  # skipp
1644                continue
1645            # check the start label, then peek next position
1646            if i + 1 == len(label_list[0]):  # next position at the tail
1647                message_list.insert(-1, "app Log: [suite end]\n")
1648                LOG.warning("there is no suite end")
1649                continue
1650            if label_list[0][i + 1] != 1:  # 0 present the end label
1651                continue
1652            message_list.insert(label_list[1][i + 1],
1653                                "app Log: [suite end]\n")
1654            LOG.warning("there is no suite end")
1655            for j in range(i + 1, len(label_list[1])):
1656                label_list[1][j] += 1  # move the index to next
1657
1658    def _analyse_tests(self, request, result_message, expect_tests_dict):
1659        listener_copy = request.listeners.copy()
1660        parsers = get_plugin(
1661            Plugin.PARSER, CommonParserType.jsunit)
1662        if parsers:
1663            parsers = parsers[:1]
1664        for listener in listener_copy:
1665            listener.device_sn = self.config.device.device_sn
1666        parser_instances = []
1667        for parser in parsers:
1668            parser_instance = parser.__class__()
1669            parser_instance.suites_name = request.get_module_name()
1670            parser_instance.listeners = listener_copy
1671            parser_instances.append(parser_instance)
1672        handler = ShellHandler(parser_instances)
1673        handler.parsers[0].expect_tests_dict = expect_tests_dict
1674        process_command_ret(result_message, handler)
1675
1676    @classmethod
1677    def _parse_suite_info(cls, suite_info):
1678        tests_dict = dict()
1679        test_count = 0
1680        if suite_info:
1681            LOG.debug("Suites info: %s" % suite_info)
1682            json_str = "".join(suite_info)
1683            try:
1684                suite_dict_list = json.loads(json_str).get("suites", [])
1685                for suite_dict in suite_dict_list:
1686                    for class_name, test_name_dict_list in suite_dict.items():
1687                        tests_dict.update({class_name: []})
1688                        for test_name_dict in test_name_dict_list:
1689                            for test_name in test_name_dict.values():
1690                                test = TestDescription(class_name, test_name)
1691                                tests_dict.get(class_name).append(test)
1692                                test_count += 1
1693            except json.decoder.JSONDecodeError as json_error:
1694                LOG.warning("Suites info is invalid: %s" % json_error)
1695        LOG.debug("Collect suite count is %s, test count is %s" %
1696                  (len(tests_dict), test_count))
1697        return tests_dict
1698
1699    def read_device_log_timeout(self, device_log_file,
1700                                message_list, timeout):
1701        LOG.info("The timeout is {} seconds".format(timeout))
1702        pattern = "^\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s+(\\d+)"
1703        while time.time() - self.start_time <= timeout:
1704            with open(device_log_file, "r", encoding='utf-8',
1705                      errors='ignore') as file_read_pipe:
1706                pid = ""
1707                message_list.clear()
1708                label_list = [[], []]  # [-1, 1 ..] [line1, line2 ..]
1709                suite_info = []
1710                while True:
1711                    try:
1712                        line = file_read_pipe.readline()
1713                    except UnicodeError as error:
1714                        LOG.warning("While read log file: %s" % error)
1715                    if not line:
1716                        time.sleep(5)  # wait for log write to file
1717                        break
1718                    if line.lower().find("jsapp:") != -1:
1719                        if "[suites info]" in line:
1720                            _, pos = re.match(".+\\[suites info]", line).span()
1721                            suite_info.append(line[pos:].strip())
1722
1723                        if "[start] start run suites" in line:  # 发现了任务开始标签
1724                            pid, is_update = \
1725                                self._init_suites_start(line, pattern, pid)
1726                            if is_update:
1727                                message_list.clear()
1728                                label_list[0].clear()
1729                                label_list[1].clear()
1730
1731                        if not pid or pid not in line:
1732                            continue
1733                        message_list.append(line)
1734                        if "[suite end]" in line:
1735                            label_list[0].append(-1)
1736                            label_list[1].append(len(message_list) - 1)
1737                        if "[suite start]" in line:
1738                            label_list[0].append(1)
1739                            label_list[1].append(len(message_list) - 1)
1740                        if "[end] run suites end" in line:
1741                            LOG.info("Find the end mark then analysis result")
1742                            LOG.debug("current JSApp pid= %s" % pid)
1743                            return label_list, suite_info, True
1744        else:
1745            LOG.error("Hjsunit run timeout {}s reached".format(timeout))
1746            LOG.debug("current JSApp pid= %s" % pid)
1747            return label_list, suite_info, False
1748
1749    @classmethod
1750    def _init_suites_start(cls, line, pattern, pid):
1751        matcher = re.match(pattern, line.strip())
1752        if matcher and matcher.group(1):
1753            pid = matcher.group(1)
1754            return pid, True
1755        return pid, False
1756
1757    def _run_jsunit(self, config_file, request):
1758        try:
1759            if not os.path.exists(config_file):
1760                LOG.error("Error: Test cases don't exist %s." % config_file)
1761                raise ParamError(
1762                    "Error: Test cases don't exist %s." % config_file,
1763                    error_no="00102")
1764
1765            json_config = JsonParser(config_file)
1766            self.kits = get_kit_instances(json_config,
1767                                          self.config.resource_path,
1768                                          self.config.testcases_path)
1769
1770            package, ability_name = self._get_driver_config(json_config)
1771            self.config.device.hdc_command("target mount")
1772            do_module_kit_setup(request, self.kits)
1773
1774            hilog = get_device_log_file(
1775                request.config.report_path,
1776                request.config.device.__get_serial__() + "_" + request.
1777                get_module_name(),
1778                "device_hilog")
1779
1780            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1781                                 0o755)
1782
1783            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
1784                self.config.device.start_catch_device_log(hilog_file_pipe)
1785
1786            # execute test case
1787            command = "shell aa start -d 123 -a %s -b %s" \
1788                      % (ability_name, package)
1789
1790            result_value = self.config.device.hdc_command(command)
1791            if result_value and "start ability successfully" in\
1792                    str(result_value).lower():
1793                setattr(self, "start_success", True)
1794                LOG.info("execute %s's testcase success. result value=%s"
1795                         % (package, result_value))
1796            else:
1797                LOG.info("execute %s's testcase failed. result value=%s"
1798                         % (package, result_value))
1799                raise RuntimeError("hjsunit test run error happened!")
1800
1801            self.start_time = time.time()
1802            timeout_config = get_config_value('test-timeout',
1803                                              json_config.get_driver(),
1804                                              False, 60000)
1805            timeout = int(timeout_config) / 1000
1806            self.generate_console_output(hilog, request, timeout)
1807        finally:
1808            do_module_kit_teardown(request)
1809
1810    def _jsunit_clear(self):
1811        self.config.device.execute_shell_command(
1812            "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur"))
1813
1814    def _get_driver_config(self, json_config):
1815        package = get_config_value('package', json_config.get_driver(), False)
1816        default_ability = "{}.MainAbility".format(package)
1817        ability_name = get_config_value('abilityName', json_config.
1818                                        get_driver(), False, default_ability)
1819        self.xml_output = get_xml_output(self.config, json_config)
1820        timeout_config = get_config_value('native-test-timeout',
1821                                          json_config.get_driver(), False)
1822        if timeout_config:
1823            self.timeout = int(timeout_config)
1824
1825        if not package:
1826            raise ParamError("Can't find package in config file.",
1827                             error_no="03201")
1828        return package, ability_name
1829
1830    def __result__(self):
1831        return self.result if os.path.exists(self.result) else ""
1832
1833
1834@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ltp_posix_test)
1835class LTPPosixTestDriver(IDriver):
1836    def __init__(self):
1837        self.timeout = 80 * 1000
1838        self.start_time = None
1839        self.result = ""
1840        self.error_message = ""
1841        self.kits = []
1842        self.config = None
1843        self.handler = None
1844
1845    def __check_environment__(self, device_options):
1846        pass
1847
1848    def __check_config__(self, config):
1849        pass
1850
1851    def __execute__(self, request):
1852        try:
1853            LOG.debug("Start execute xdevice extension LTP Posix Test")
1854            self.result = os.path.join(
1855                request.config.report_path, "result",
1856                '.'.join((request.get_module_name(), "xml")))
1857            self.config = request.config
1858            self.config.device = request.config.environment.devices[0]
1859
1860            config_file = request.root.source.config_file
1861            suite_file = request.root.source.source_file
1862
1863            if not suite_file:
1864                raise ParamError(
1865                    "test source '%s' not exists" %
1866                    request.root.source.source_string, error_no="00110")
1867
1868            LOG.debug("Test case file path: %s" % suite_file)
1869            # avoid hilog service stuck issue
1870            self.config.device.hdc_command("shell stop_service hilogd",
1871                                           timeout=30 * 1000)
1872            self.config.device.hdc_command("shell start_service hilogd",
1873                                           timeout=30 * 1000)
1874            time.sleep(10)
1875
1876            self.config.device.hdc_command("shell hilog -r", timeout=30 * 1000)
1877            self._run_posix(config_file, request)
1878        except Exception as exception:
1879            self.error_message = exception
1880            if not getattr(exception, "error_no", ""):
1881                setattr(exception, "error_no", "03409")
1882            LOG.exception(self.error_message, exc_info=True, error_no="03409")
1883            raise exception
1884        finally:
1885            self.config.device.stop_catch_device_log()
1886            self.result = check_result_report(
1887                request.config.report_path, self.result, self.error_message)
1888
1889    def _run_posix(self, config_file, request):
1890        try:
1891            if not os.path.exists(config_file):
1892                LOG.error("Error: Test cases don't exist %s." % config_file)
1893                raise ParamError(
1894                    "Error: Test cases don't exist %s." % config_file,
1895                    error_no="00102")
1896
1897            json_config = JsonParser(config_file)
1898            self.kits = get_kit_instances(json_config,
1899                                          self.config.resource_path,
1900                                          self.config.testcases_path)
1901            self.config.device.hdc_command("target mount")
1902            test_list = None
1903            dst = None
1904            for kit in self.kits:
1905                test_list, dst = kit.__setup__(request.config.device,
1906                                               request=request)
1907            # apply execute right
1908            self.config.device.hdc_command("shell chmod -R 777 {}".format(dst))
1909
1910            hilog = get_device_log_file(
1911                request.config.report_path,
1912                request.config.device.__get_serial__() + "_" + request.
1913                get_module_name(),
1914                "device_hilog")
1915
1916            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1917                                 0o755)
1918            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
1919                for test_bin in test_list:
1920                    if not test_bin.endswith(".run-test"):
1921                        continue
1922                    listeners = request.listeners
1923                    for listener in listeners:
1924                        listener.device_sn = self.config.device.device_sn
1925                    parsers = get_plugin(Plugin.PARSER,
1926                                         "OpenSourceTest")
1927                    parser_instances = []
1928                    for parser in parsers:
1929                        parser_instance = parser.__class__()
1930                        parser_instance.suite_name = request.root.source.\
1931                            test_name
1932                        parser_instance.test_name = test_bin.replace("./", "")
1933                        parser_instance.listeners = listeners
1934                        parser_instances.append(parser_instance)
1935                    self.handler = ShellHandler(parser_instances)
1936                    result_message = self.config.device.hdc_command(
1937                        "shell {}".format(test_bin))
1938                    LOG.info("get result from command {}".
1939                             format(result_message))
1940                    process_command_ret(result_message, self.handler)
1941        finally:
1942            do_module_kit_teardown(request)
1943
1944    def __result__(self):
1945        return self.result if os.path.exists(self.result) else ""
1946
1947
1948@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
1949class OHKernelTestDriver(IDriver):
1950    """
1951        OpenHarmonyKernelTest
1952    """
1953    def __init__(self):
1954        self.timeout = 30 * 1000
1955        self.result = ""
1956        self.error_message = ""
1957        self.kits = []
1958        self.config = None
1959        self.runner = None
1960
1961    def __check_environment__(self, device_options):
1962        pass
1963
1964    def __check_config__(self, config):
1965        pass
1966
1967    def __execute__(self, request):
1968        try:
1969            LOG.debug("Start to Execute OpenHarmony Kernel Test")
1970
1971            self.config = request.config
1972            self.config.device = request.config.environment.devices[0]
1973
1974            config_file = request.root.source.config_file
1975
1976            self.result = "%s.xml" % \
1977                          os.path.join(request.config.report_path,
1978                                       "result", request.get_module_name())
1979            hilog = get_device_log_file(
1980                request.config.report_path,
1981                request.config.device.__get_serial__(),
1982                "device_hilog")
1983            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1984                                 FilePermission.mode_755)
1985            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
1986                self.config.device.start_catch_device_log(hilog_file_pipe)
1987                self._run_oh_kernel(config_file, request.listeners, request)
1988                hilog_file_pipe.flush()
1989        except Exception as exception:
1990            self.error_message = exception
1991            if not getattr(exception, "error_no", ""):
1992                setattr(exception, "error_no", "03409")
1993            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1994            raise exception
1995        finally:
1996            do_module_kit_teardown(request)
1997            self.config.device.stop_catch_device_log()
1998            self.result = check_result_report(
1999                request.config.report_path, self.result, self.error_message)
2000
2001    def _run_oh_kernel(self, config_file, listeners=None, request=None):
2002        try:
2003            json_config = JsonParser(config_file)
2004            kits = get_kit_instances(json_config, self.config.resource_path,
2005                                     self.config.testcases_path)
2006            self._get_driver_config(json_config)
2007            do_module_kit_setup(request, kits)
2008            self.runner = OHKernelTestRunner(self.config)
2009            self.runner.suite_name = request.get_module_name()
2010            self.runner.run(listeners)
2011        finally:
2012            do_module_kit_teardown(request)
2013
2014    def _get_driver_config(self, json_config):
2015        target_test_path = get_config_value('native-test-device-path',
2016                                            json_config.get_driver(), False)
2017        test_suite_name = get_config_value('test-suite-name',
2018                                           json_config.get_driver(), False)
2019        test_suites_list = get_config_value('test-suites-list',
2020                                            json_config.get_driver(), False)
2021        timeout_limit = get_config_value('timeout-limit',
2022                                         json_config.get_driver(), False)
2023        conf_file = get_config_value('conf-file',
2024                                     json_config.get_driver(), False)
2025        self.config.arg_list = {}
2026        if target_test_path:
2027            self.config.target_test_path = target_test_path
2028        if test_suite_name:
2029            self.config.arg_list["test-suite-name"] = test_suite_name
2030        if test_suites_list:
2031            self.config.arg_list["test-suites-list"] = test_suites_list
2032        if timeout_limit:
2033            self.config.arg_list["timeout-limit"] = timeout_limit
2034        if conf_file:
2035            self.config.arg_list["conf-file"] = conf_file
2036        timeout_config = get_config_value('shell-timeout',
2037                                          json_config.get_driver(), False)
2038        if timeout_config:
2039            self.config.timeout = int(timeout_config)
2040        else:
2041            self.config.timeout = TIME_OUT
2042
2043    def __result__(self):
2044        return self.result if os.path.exists(self.result) else ""
2045
2046
2047class OHKernelTestRunner:
2048    def __init__(self, config):
2049        self.suite_name = None
2050        self.config = config
2051        self.arg_list = config.arg_list
2052
2053    def run(self, listeners):
2054        handler = self._get_shell_handler(listeners)
2055        # hdc shell cd /data/local/tmp/OH_kernel_test;
2056        # sh runtest test -t OpenHarmony_RK3568_config
2057        # -n OpenHarmony_RK3568_skiptest -l 60
2058        command = "cd %s; chmod +x *; sh runtest test %s" % (
2059            self.config.target_test_path, self.get_args_command())
2060        self.config.device.execute_shell_command(
2061            command, timeout=self.config.timeout, receiver=handler, retry=0)
2062
2063    def _get_shell_handler(self, listeners):
2064        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test)
2065        if parsers:
2066            parsers = parsers[:1]
2067        parser_instances = []
2068        for parser in parsers:
2069            parser_instance = parser.__class__()
2070            parser_instance.suites_name = self.suite_name
2071            parser_instance.listeners = listeners
2072            parser_instances.append(parser_instance)
2073        handler = ShellHandler(parser_instances)
2074        return handler
2075
2076    def get_args_command(self):
2077        args_commands = ""
2078        for key, value in self.arg_list.items():
2079            if key == "test-suite-name" or key == "test-suites-list":
2080                args_commands = "%s -t %s" % (args_commands, value)
2081            elif key == "conf-file":
2082                args_commands = "%s -n %s" % (args_commands, value)
2083            elif key == "timeout-limit":
2084                args_commands = "%s -l %s" % (args_commands, value)
2085        return args_commands
2086
2087
2088def disable_keyguard(device):
2089    _unlock_screen(device)
2090    _unlock_device(device)
2091
2092
2093def _unlock_screen(device):
2094    device.execute_shell_command("svc power stayon true")
2095    time.sleep(1)
2096
2097
2098def _unlock_device(device):
2099    device.execute_shell_command("input keyevent 82")
2100    time.sleep(1)
2101    device.execute_shell_command("wm dismiss-keyguard")
2102    time.sleep(1)
2103
2104
2105def _lock_screen(device):
2106    time.sleep(1)
2107
2108
2109def _sleep_according_to_result(result):
2110    if result:
2111        time.sleep(1)
2112