• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21import json
22import stat
23import shutil
24import re
25from datetime import datetime
26from enum import Enum
27
28from xdevice import ConfigConst
29from xdevice import ParamError
30from xdevice import IDriver
31from xdevice import platform_logger
32from xdevice import Plugin
33from xdevice import get_plugin
34from xdevice import JsonParser
35from xdevice import ShellHandler
36from xdevice import TestDescription
37from xdevice import get_device_log_file
38from xdevice import check_result_report
39from xdevice import get_kit_instances
40from xdevice import get_config_value
41from xdevice import do_module_kit_setup
42from xdevice import do_module_kit_teardown
43from xdevice import DeviceTestType
44from xdevice import CommonParserType
45from xdevice import FilePermission
46from xdevice import ResourceManager
47from xdevice import get_file_absolute_path
48from xdevice import exec_cmd
49
50from ohos.executor.listener import CollectingPassListener
51from ohos.constants import CKit
52from ohos.environment.dmlib import process_command_ret
53
54__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver",
55           "OHYaraTestDriver", "oh_jsunit_para_parse"]
56
57TIME_OUT = 300 * 1000
58
59LOG = platform_logger("OpenHarmony")
60
61
62def oh_jsunit_para_parse(runner, junit_paras):
63    junit_paras = dict(junit_paras)
64    test_type_list = ["function", "performance", "reliability", "security"]
65    size_list = ["small", "medium", "large"]
66    level_list = ["0", "1", "2", "3"]
67    for para_name in junit_paras.keys():
68        para_name = para_name.strip()
69        para_values = junit_paras.get(para_name, [])
70        if para_name == "class":
71            runner.add_arg(para_name, ",".join(para_values))
72        elif para_name == "notClass":
73            runner.add_arg(para_name, ",".join(para_values))
74        elif para_name == "testType":
75            if para_values[0] not in test_type_list:
76                continue
77            # function/performance/reliability/security
78            runner.add_arg(para_name, para_values[0])
79        elif para_name == "size":
80            if para_values[0] not in size_list:
81                continue
82            # size small/medium/large
83            runner.add_arg(para_name, para_values[0])
84        elif para_name == "level":
85            if para_values[0] not in level_list:
86                continue
87            # 0/1/2/3/4
88            runner.add_arg(para_name, para_values[0])
89        elif para_name == "stress":
90            runner.add_arg(para_name, para_values[0])
91
92
93@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
94class OHKernelTestDriver(IDriver):
95    """
96        OpenHarmonyKernelTest
97    """
98    def __init__(self):
99        self.timeout = 30 * 1000
100        self.result = ""
101        self.error_message = ""
102        self.kits = []
103        self.config = None
104        self.runner = None
105        # log
106        self.device_log = None
107        self.hilog = None
108        self.log_proc = None
109        self.hilog_proc = None
110
111    def __check_environment__(self, device_options):
112        pass
113
114    def __check_config__(self, config):
115        pass
116
117    def __execute__(self, request):
118        try:
119            LOG.debug("Start to Execute OpenHarmony Kernel Test")
120
121            self.config = request.config
122            self.config.device = request.config.environment.devices[0]
123
124            config_file = request.root.source.config_file
125
126            self.result = "%s.xml" % \
127                          os.path.join(request.config.report_path,
128                                       "result", request.get_module_name())
129            self.device_log = get_device_log_file(
130                request.config.report_path,
131                request.config.device.__get_serial__(),
132                "device_log")
133
134            self.hilog = get_device_log_file(
135                request.config.report_path,
136                request.config.device.__get_serial__(),
137                "device_hilog")
138
139            device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT |
140                                      os.O_APPEND, FilePermission.mode_755)
141            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
142                                 FilePermission.mode_755)
143            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
144            with os.fdopen(device_log_open, "a") as log_file_pipe, \
145                    os.fdopen(hilog_open, "a") as hilog_file_pipe:
146                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
147                    start_catch_device_log(log_file_pipe, hilog_file_pipe)
148                self._run_oh_kernel(config_file, request.listeners, request)
149                log_file_pipe.flush()
150                hilog_file_pipe.flush()
151        except Exception as exception:
152            self.error_message = exception
153            if not getattr(exception, "error_no", ""):
154                setattr(exception, "error_no", "03409")
155            LOG.exception(self.error_message, exc_info=False, error_no="03409")
156            raise exception
157        finally:
158            do_module_kit_teardown(request)
159            self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog)
160            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
161            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
162            self.result = check_result_report(
163                request.config.report_path, self.result, self.error_message)
164
165    def _run_oh_kernel(self, config_file, listeners=None, request=None):
166        try:
167            json_config = JsonParser(config_file)
168            kits = get_kit_instances(json_config, self.config.resource_path,
169                                     self.config.testcases_path)
170            self._get_driver_config(json_config)
171            do_module_kit_setup(request, kits)
172            self.runner = OHKernelTestRunner(self.config)
173            self.runner.suite_name = request.get_module_name()
174            self.runner.run(listeners)
175        finally:
176            do_module_kit_teardown(request)
177
178    def _get_driver_config(self, json_config):
179        target_test_path = get_config_value('native-test-device-path',
180                                            json_config.get_driver(), False)
181        test_suite_name = get_config_value('test-suite-name',
182                                           json_config.get_driver(), False)
183        test_suites_list = get_config_value('test-suites-list',
184                                            json_config.get_driver(), False)
185        timeout_limit = get_config_value('timeout-limit',
186                                         json_config.get_driver(), False)
187        conf_file = get_config_value('conf-file',
188                                     json_config.get_driver(), False)
189        self.config.arg_list = {}
190        if target_test_path:
191            self.config.target_test_path = target_test_path
192        if test_suite_name:
193            self.config.arg_list["test-suite-name"] = test_suite_name
194        if test_suites_list:
195            self.config.arg_list["test-suites-list"] = test_suites_list
196        if timeout_limit:
197            self.config.arg_list["timeout-limit"] = timeout_limit
198        if conf_file:
199            self.config.arg_list["conf-file"] = conf_file
200        timeout_config = get_config_value('shell-timeout',
201                                          json_config.get_driver(), False)
202        if timeout_config:
203            self.config.timeout = int(timeout_config)
204        else:
205            self.config.timeout = TIME_OUT
206
207    def __result__(self):
208        return self.result if os.path.exists(self.result) else ""
209
210
211class OHKernelTestRunner:
212    def __init__(self, config):
213        self.suite_name = None
214        self.config = config
215        self.arg_list = config.arg_list
216
217    def run(self, listeners):
218        handler = self._get_shell_handler(listeners)
219        # hdc shell cd /data/local/tmp/OH_kernel_test;
220        # sh runtest test -t OpenHarmony_RK3568_config
221        # -n OpenHarmony_RK3568_skiptest -l 60
222        command = "cd %s; chmod +x *; sh runtest test %s" % (
223            self.config.target_test_path, self.get_args_command())
224        self.config.device.execute_shell_command(
225            command, timeout=self.config.timeout, receiver=handler, retry=0)
226
227    def _get_shell_handler(self, listeners):
228        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test)
229        if parsers:
230            parsers = parsers[:1]
231        parser_instances = []
232        for parser in parsers:
233            parser_instance = parser.__class__()
234            parser_instance.suites_name = self.suite_name
235            parser_instance.listeners = listeners
236            parser_instances.append(parser_instance)
237        handler = ShellHandler(parser_instances)
238        return handler
239
240    def get_args_command(self):
241        args_commands = ""
242        for key, value in self.arg_list.items():
243            if key == "test-suite-name" or key == "test-suites-list":
244                args_commands = "%s -t %s" % (args_commands, value)
245            elif key == "conf-file":
246                args_commands = "%s -n %s" % (args_commands, value)
247            elif key == "timeout-limit":
248                args_commands = "%s -l %s" % (args_commands, value)
249        return args_commands
250
251
252@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
253class OHJSUnitTestDriver(IDriver):
254    """
255       OHJSUnitTestDriver is a Test that runs a native test package on
256       given device.
257    """
258
259    def __init__(self):
260        self.timeout = 80 * 1000
261        self.start_time = None
262        self.result = ""
263        self.error_message = ""
264        self.kits = []
265        self.config = None
266        self.runner = None
267        self.rerun = True
268        self.rerun_all = True
269        # log
270        self.device_log = None
271        self.hilog = None
272        self.log_proc = None
273        self.hilog_proc = None
274
275    def __check_environment__(self, device_options):
276        pass
277
278    def __check_config__(self, config):
279        pass
280
281    def __execute__(self, request):
282        try:
283            LOG.debug("Start execute OpenHarmony JSUnitTest")
284            self.result = os.path.join(
285                request.config.report_path, "result",
286                '.'.join((request.get_module_name(), "xml")))
287            self.config = request.config
288            self.config.device = request.config.environment.devices[0]
289
290            config_file = request.root.source.config_file
291            suite_file = request.root.source.source_file
292
293            if not suite_file:
294                raise ParamError(
295                    "test source '%s' not exists" %
296                    request.root.source.source_string, error_no="00110")
297            LOG.debug("Test case file path: %s" % suite_file)
298            self.config.device.set_device_report_path(request.config.report_path)
299            self.hilog = get_device_log_file(request.config.report_path,
300                                        request.config.device.__get_serial__() + "_" + request.
301                                        get_module_name(),
302                                        "device_hilog")
303
304            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
305                                 0o755)
306            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
307            self.config.device.execute_shell_command(command="hilog -r")
308            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
309                if hasattr(self.config, ConfigConst.device_log) \
310                        and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
311                        and hasattr(self.config.device, "clear_crash_log"):
312                    self.config.device.device_log_collector.clear_crash_log()
313                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
314                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
315                self._run_oh_jsunit(config_file, request)
316        except Exception as exception:
317            self.error_message = exception
318            if not getattr(exception, "error_no", ""):
319                setattr(exception, "error_no", "03409")
320            LOG.exception(self.error_message, exc_info=True, error_no="03409")
321            raise exception
322        finally:
323            try:
324                self._handle_logs(request)
325            finally:
326                self.result = check_result_report(
327                    request.config.report_path, self.result, self.error_message)
328
329    def __dry_run_execute__(self, request):
330        LOG.debug("Start dry run xdevice JSUnit Test")
331        self.config = request.config
332        self.config.device = request.config.environment.devices[0]
333        config_file = request.root.source.config_file
334        suite_file = request.root.source.source_file
335
336        if not suite_file:
337            raise ParamError(
338                "test source '%s' not exists" %
339                request.root.source.source_string, error_no="00110")
340        LOG.debug("Test case file path: %s" % suite_file)
341        self._dry_run_oh_jsunit(config_file, request)
342
343    def _dry_run_oh_jsunit(self, config_file, request):
344        try:
345            if not os.path.exists(config_file):
346                LOG.error("Error: Test cases don't exist %s." % config_file)
347                raise ParamError(
348                    "Error: Test cases don't exist %s." % config_file,
349                    error_no="00102")
350            json_config = JsonParser(config_file)
351            self.kits = get_kit_instances(json_config,
352                                          self.config.resource_path,
353                                          self.config.testcases_path)
354
355            self._get_driver_config(json_config)
356            self.config.device.connector_command("target mount")
357            do_module_kit_setup(request, self.kits)
358            self.runner = OHJSUnitTestRunner(self.config)
359            self.runner.suites_name = request.get_module_name()
360            # execute test case
361            self._get_runner_config(json_config)
362            oh_jsunit_para_parse(self.runner, self.config.testargs)
363
364            test_to_run = self._collect_test_to_run()
365            LOG.info("Collected suite count is: {}, test count is: {}".
366                     format(len(self.runner.expect_tests_dict.keys()),
367                            len(test_to_run) if test_to_run else 0))
368        finally:
369            do_module_kit_teardown(request)
370
371    def _run_oh_jsunit(self, config_file, request):
372        try:
373            if not os.path.exists(config_file):
374                LOG.error("Error: Test cases don't exist %s." % config_file)
375                raise ParamError(
376                    "Error: Test cases don't exist %s." % config_file,
377                    error_no="00102")
378            json_config = JsonParser(config_file)
379            self.kits = get_kit_instances(json_config,
380                                          self.config.resource_path,
381                                          self.config.testcases_path)
382
383            self._get_driver_config(json_config)
384            self.config.device.connector_command("target mount")
385            self._start_smart_perf()
386            do_module_kit_setup(request, self.kits)
387            self.runner = OHJSUnitTestRunner(self.config)
388            self.runner.suites_name = request.get_module_name()
389            self._get_runner_config(json_config)
390            if hasattr(self.config, "history_report_path") and \
391                    self.config.testargs.get("test"):
392                self._do_test_retry(request.listeners, self.config.testargs)
393            else:
394                if self.rerun:
395                    self.runner.retry_times = self.runner.MAX_RETRY_TIMES
396                    # execute test case
397                self._do_tf_suite()
398                self._make_exclude_list_file(request)
399                oh_jsunit_para_parse(self.runner, self.config.testargs)
400                self._do_test_run(listener=request.listeners)
401
402        finally:
403            do_module_kit_teardown(request)
404
405    def _get_driver_config(self, json_config):
406        package = get_config_value('package-name',
407                                   json_config.get_driver(), False)
408        module = get_config_value('module-name',
409                                  json_config.get_driver(), False)
410        bundle = get_config_value('bundle-name',
411                                  json_config. get_driver(), False)
412        is_rerun = get_config_value('rerun', json_config.get_driver(), False)
413
414        self.config.package_name = package
415        self.config.module_name = module
416        self.config.bundle_name = bundle
417        self.rerun = True if is_rerun == 'true' else False
418
419        if not package and not module:
420            raise ParamError("Neither package nor module is found"
421                             " in config file.", error_no="03201")
422        timeout_config = get_config_value("shell-timeout",
423                                          json_config.get_driver(), False)
424        if timeout_config:
425            self.config.timeout = int(timeout_config)
426        else:
427            self.config.timeout = TIME_OUT
428
429    def _get_runner_config(self, json_config):
430        test_timeout = get_config_value('test-timeout',
431                                        json_config.get_driver(), False)
432        if test_timeout:
433            self.runner.add_arg("wait_time", int(test_timeout))
434
435        testcase_timeout = get_config_value('testcase-timeout',
436                                            json_config.get_driver(), False)
437        if testcase_timeout:
438            self.runner.add_arg("timeout", int(testcase_timeout))
439        self.runner.compile_mode = get_config_value(
440            'compile-mode', json_config.get_driver(), False)
441
442    def _do_test_run(self, listener):
443        test_to_run = self._collect_test_to_run()
444        LOG.info("Collected suite count is: {}, test count is: {}".
445                 format(len(self.runner.expect_tests_dict.keys()),
446                        len(test_to_run) if test_to_run else 0))
447        if not test_to_run or not self.rerun:
448            self.runner.run(listener)
449            self.runner.notify_finished()
450        else:
451            self._run_with_rerun(listener, test_to_run)
452
453    def _collect_test_to_run(self):
454        run_results = self.runner.dry_run()
455        return run_results
456
457    def _run_tests(self, listener):
458        test_tracker = CollectingPassListener()
459        listener_copy = listener.copy()
460        listener_copy.append(test_tracker)
461        self.runner.run(listener_copy)
462        test_run = test_tracker.get_current_run_results()
463        return test_run
464
465    def _run_with_rerun(self, listener, expected_tests):
466        LOG.debug("Ready to run with rerun, expect run: %s"
467                  % len(expected_tests))
468        test_run = self._run_tests(listener)
469        self.runner.retry_times -= 1
470        LOG.debug("Run with rerun, has run: %s" % len(test_run)
471                  if test_run else 0)
472        if len(test_run) < len(expected_tests):
473            expected_tests = TestDescription.remove_test(expected_tests,
474                                                         test_run)
475            if not expected_tests:
476                LOG.debug("No tests to re-run twice,please check")
477                self.runner.notify_finished()
478            else:
479                self._rerun_twice(expected_tests, listener)
480        else:
481            LOG.debug("Rerun once success")
482            self.runner.notify_finished()
483
484    def _rerun_twice(self, expected_tests, listener):
485        tests = []
486        for test in expected_tests:
487            tests.append("%s#%s" % (test.class_name, test.test_name))
488        self.runner.add_arg("class", ",".join(tests))
489        LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
490        test_run = self._run_tests(listener)
491        self.runner.retry_times -= 1
492        LOG.debug("Rerun twice, has run: %s" % len(test_run))
493        if len(test_run) < len(expected_tests):
494            expected_tests = TestDescription.remove_test(expected_tests,
495                                                         test_run)
496            if not expected_tests:
497                LOG.debug("No tests to re-run third,please check")
498                self.runner.notify_finished()
499            else:
500                self._rerun_third(expected_tests, listener)
501        else:
502            LOG.debug("Rerun twice success")
503            self.runner.notify_finished()
504
505    def _rerun_third(self, expected_tests, listener):
506        tests = []
507        for test in expected_tests:
508            tests.append("%s#%s" % (test.class_name, test.test_name))
509        self.runner.add_arg("class", ",".join(tests))
510        LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
511        self._run_tests(listener)
512        LOG.debug("Rerun third success")
513        self.runner.notify_finished()
514
515    def _make_exclude_list_file(self, request):
516        if "all-test-file-exclude-filter" in self.config.testargs:
517            json_file_list = self.config.testargs.get(
518                "all-test-file-exclude-filter")
519            self.config.testargs.pop("all-test-file-exclude-filter")
520            if not json_file_list:
521                LOG.warning("all-test-file-exclude-filter value is empty!")
522            else:
523                if not os.path.isfile(json_file_list[0]):
524                    LOG.warning(
525                        "[{}] is not a valid file".format(json_file_list[0]))
526                    return
527                file_open = os.open(json_file_list[0], os.O_RDONLY,
528                                    stat.S_IWUSR | stat.S_IRUSR)
529                with os.fdopen(file_open, "r") as file_handler:
530                    json_data = json.load(file_handler)
531                exclude_list = json_data.get(
532                    DeviceTestType.oh_jsunit_test, [])
533                filter_list = []
534                for exclude in exclude_list:
535                    if request.get_module_name() not in exclude:
536                        continue
537                    filter_list.extend(exclude.get(request.get_module_name()))
538                if not isinstance(self.config.testargs, dict):
539                    return
540                if 'notClass' in self.config.testargs.keys():
541                    filter_list.extend(self.config.testargs.get('notClass', []))
542                self.config.testargs.update({"notClass": filter_list})
543
544    def _do_test_retry(self, listener, testargs):
545        tests_dict = dict()
546        case_list = list()
547        for test in testargs.get("test"):
548            test_item = test.split("#")
549            if len(test_item) != 2:
550                continue
551            case_list.append(test)
552            if test_item[0] not in tests_dict:
553                tests_dict.update({test_item[0] : []})
554            tests_dict.get(test_item[0]).append(
555                TestDescription(test_item[0], test_item[1]))
556        self.runner.add_arg("class", ",".join(case_list))
557        self.runner.expect_tests_dict = tests_dict
558        self.config.testargs.pop("test")
559        self.runner.run(listener)
560        self.runner.notify_finished()
561
562    def _do_tf_suite(self):
563        if hasattr(self.config, "tf_suite") and \
564                self.config.tf_suite.get("cases", []):
565            case_list = self.config["tf_suite"]["cases"]
566            self.config.testargs.update({"class": case_list})
567
568    def _start_smart_perf(self):
569        if not hasattr(self.config, ConfigConst.kits_in_module):
570            return
571        if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module):
572            return
573        sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
574        sp_kits.target_name = self.config.bundle_name
575        param_config = self.config.get(ConfigConst.kits_params).get(
576            CKit.smartperf, "")
577        sp_kits.__check_config__(param_config)
578        self.kits.insert(0, sp_kits)
579
580    def _handle_logs(self, request):
581        serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
582        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
583        if hasattr(self.config, ConfigConst.device_log) and \
584                self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
585                and hasattr(self.config.device, "start_get_crash_log"):
586            self.config.device.device_log_collector.\
587                start_get_crash_log(log_tar_file_name, module_name=request.get_module_name())
588        self.config.device.device_log_collector.\
589            remove_log_address(self.device_log, self.hilog)
590        self.config.device.device_log_collector.\
591            stop_catch_device_log(self.log_proc)
592        self.config.device.device_log_collector.\
593            stop_catch_device_log(self.hilog_proc)
594
595    def __result__(self):
596        return self.result if os.path.exists(self.result) else ""
597
598
599class OHJSUnitTestRunner:
600    MAX_RETRY_TIMES = 3
601
602    def __init__(self, config):
603        self.arg_list = {}
604        self.suites_name = None
605        self.config = config
606        self.rerun_attemp = 3
607        self.suite_recorder = {}
608        self.finished = False
609        self.expect_tests_dict = dict()
610        self.finished_observer = None
611        self.retry_times = 1
612        self.compile_mode = ""
613
614    def dry_run(self):
615        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
616        if parsers:
617            parsers = parsers[:1]
618        parser_instances = []
619        for parser in parsers:
620            parser_instance = parser.__class__()
621            parser_instances.append(parser_instance)
622        handler = ShellHandler(parser_instances)
623        command = self._get_dry_run_command()
624        self.config.device.execute_shell_command(
625            command, timeout=self.config.timeout, receiver=handler, retry=0)
626        self.expect_tests_dict = parser_instances[0].tests_dict
627        return parser_instances[0].tests
628
629    def run(self, listener):
630        handler = self._get_shell_handler(listener)
631        command = self._get_run_command()
632        self.config.device.execute_shell_command(
633            command, timeout=self.config.timeout, receiver=handler, retry=0)
634
635    def notify_finished(self):
636        if self.finished_observer:
637            self.finished_observer.notify_task_finished()
638        self.retry_times -= 1
639
640    def _get_shell_handler(self, listener):
641        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
642        if parsers:
643            parsers = parsers[:1]
644        parser_instances = []
645        for parser in parsers:
646            parser_instance = parser.__class__()
647            parser_instance.suites_name = self.suites_name
648            parser_instance.listeners = listener
649            parser_instance.runner = self
650            parser_instances.append(parser_instance)
651            self.finished_observer = parser_instance
652        handler = ShellHandler(parser_instances)
653        return handler
654
655    def add_arg(self, name, value):
656        if not name or not value:
657            return
658        self.arg_list[name] = value
659
660    def remove_arg(self, name):
661        if not name:
662            return
663        if name in self.arg_list:
664            del self.arg_list[name]
665
666    def get_args_command(self):
667        args_commands = ""
668        for key, value in self.arg_list.items():
669            if "wait_time" == key:
670                args_commands = "%s -w %s " % (args_commands, value)
671            else:
672                args_commands = "%s -s %s %s " % (args_commands, key, value)
673        return args_commands
674
675    def _get_run_command(self):
676        command = ""
677        if self.config.package_name:
678            # aa test -p ${packageName} -b ${bundleName}-s
679            # unittest OpenHarmonyTestRunner
680            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
681                      " {}".format(self.config.package_name,
682                                   self.config.bundle_name,
683                                   self.get_args_command())
684        elif self.config.module_name:
685            #  aa test -m ${moduleName}  -b ${bundleName}
686            #  -s unittest OpenHarmonyTestRunner
687            command = "aa test -m {} -b {} -s unittest {} {}".format(
688                self.config.module_name, self.config.bundle_name,
689                self.get_oh_test_runner_path(), self.get_args_command())
690        return command
691
692    def _get_dry_run_command(self):
693        command = ""
694        if self.config.package_name:
695            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
696                      " {} -s dryRun true".format(self.config.package_name,
697                                                  self.config.bundle_name,
698                                                  self.get_args_command())
699        elif self.config.module_name:
700            command = "aa test -m {} -b {} -s unittest {}" \
701                      " {} -s dryRun true".format(self.config.module_name,
702                                                  self.config.bundle_name,
703                                                  self.get_oh_test_runner_path(),
704                                                  self.get_args_command())
705
706        return command
707
708    def get_oh_test_runner_path(self):
709        if self.compile_mode == "esmodule":
710            return "/ets/testrunner/OpenHarmonyTestRunner"
711        else:
712            return "OpenHarmonyTestRunner"
713
714
715@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
716class OHRustTestDriver(IDriver):
717    def __init__(self):
718        self.result = ""
719        self.error_message = ""
720        self.config = None
721
722    def __check_environment__(self, device_options):
723        pass
724
725    def __check_config__(self, config):
726        pass
727
728    def __execute__(self, request):
729        try:
730            LOG.debug("Start to execute open harmony rust test")
731            self.config = request.config
732            self.config.device = request.config.environment.devices[0]
733            self.config.target_test_path = "/system/bin"
734
735            suite_file = request.root.source.source_file
736            LOG.debug("Testsuite filepath:{}".format(suite_file))
737
738            if not suite_file:
739                LOG.error("test source '{}' not exists".format(
740                    request.root.source.source_string))
741                return
742
743            self.result = "{}.xml".format(
744                os.path.join(request.config.report_path,
745                             "result", request.get_module_name()))
746            self.config.device.set_device_report_path(request.config.report_path)
747            self.config.device.device_log_collector.start_hilog_task()
748            self._init_oh_rust()
749            self._run_oh_rust(suite_file, request)
750        except Exception as exception:
751            self.error_message = exception
752            if not getattr(exception, "error_no", ""):
753                setattr(exception, "error_no", "03409")
754            LOG.exception(self.error_message, exc_info=False, error_no="03409")
755        finally:
756            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
757                                    time.time_ns())
758            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
759            self.config.device.device_log_collector.stop_hilog_task(
760                log_tar_file_name, module_name=request.get_module_name())
761            self.result = check_result_report(
762                request.config.report_path, self.result, self.error_message)
763
764    def _init_oh_rust(self):
765        self.config.device.connector_command("target mount")
766        self.config.device.execute_shell_command(
767            "mount -o rw,remount,rw /")
768
769    def _run_oh_rust(self, suite_file, request=None):
770        # push testsuite file
771        self.config.device.push_file(suite_file, self.config.target_test_path)
772        # push resource file
773        resource_manager = ResourceManager()
774        resource_data_dict, resource_dir = \
775            resource_manager.get_resource_data_dic(suite_file)
776        resource_manager.process_preparer_data(resource_data_dict,
777                                               resource_dir,
778                                               self.config.device)
779        for listener in request.listeners:
780            listener.device_sn = self.config.device.device_sn
781
782        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
783        if parsers:
784            parsers = parsers[:1]
785        parser_instances = []
786        for parser in parsers:
787            parser_instance = parser.__class__()
788            parser_instance.suite_name = request.get_module_name()
789            parser_instance.listeners = request.listeners
790            parser_instances.append(parser_instance)
791        handler = ShellHandler(parser_instances)
792
793        command = "cd {}; chmod +x *; ./{}".format(
794            self.config.target_test_path, os.path.basename(suite_file))
795        self.config.device.execute_shell_command(
796            command, timeout=TIME_OUT, receiver=handler, retry=0)
797        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
798                                              self.config.device)
799
800    def __result__(self):
801        return self.result if os.path.exists(self.result) else ""
802
803
804class OHYaraConfig(Enum):
805    HAP_FILE = "hap-file"
806    BUNDLE_NAME = "bundle-name"
807    CLEANUP_APPS = "cleanup-apps"
808
809    OS_FULLNAME_LIST = "osFullNameList"
810    VULNERABILITIES = "vulnerabilities"
811    VUL_ID = "vul_id"
812    OPENHARMONY_SA = "openharmony-sa"
813    AFFECTED_VERSION = "affected_versions"
814    MONTH = "month"
815    SEVERITY = "severity"
816    VUL_DESCRIPTION = "vul_description"
817    DISCLOSURE = "disclosure"
818    AFFECTED_FILES = "affected_files"
819    YARA_RULES = "yara_rules"
820
821    PASS = "pass"
822    FAIL = "fail"
823    BLOCK = "block"
824
825    ERROR_MSG_001 = "The patch label is longer than two months (60 days), which violates the OHCA agreement."
826    ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed."
827    ERROR_MSG_003 = "Modify the code according to the patch requirements: "
828
829
830class VulItem:
831    vul_id = ""
832    month = ""
833    severity = ""
834    vul_description = dict()
835    disclosure = dict()
836    affected_files = ""
837    affected_versions = ""
838    yara_rules = ""
839    trace = ""
840    final_risk = OHYaraConfig.PASS.value
841    complete = False
842
843
844@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test)
845class OHYaraTestDriver(IDriver):
846    def __init__(self):
847        self.result = ""
848        self.error_message = ""
849        self.config = None
850        self.tool_hap_info = dict()
851        self.security_patch = None
852        self.system_version = None
853
854    def __check_environment__(self, device_options):
855        pass
856
857    def __check_config__(self, config):
858        pass
859
860    def __execute__(self, request):
861        try:
862            LOG.debug("Start to execute open harmony yara test")
863            self.result = os.path.join(
864                request.config.report_path, "result",
865                '.'.join((request.get_module_name(), "xml")))
866            self.config = request.config
867            self.config.device = request.config.environment.devices[0]
868
869            config_file = request.root.source.config_file
870            suite_file = request.root.source.source_file
871
872            if not suite_file:
873                raise ParamError(
874                    "test source '%s' not exists" %
875                    request.root.source.source_string, error_no="00110")
876            LOG.debug("Test case file path: %s" % suite_file)
877            self.config.device.set_device_report_path(request.config.report_path)
878            self._run_oh_yara(config_file, request)
879
880        except Exception as exception:
881            self.error_message = exception
882            if not getattr(exception, "error_no", ""):
883                setattr(exception, "error_no", "03409")
884            LOG.exception(self.error_message, exc_info=False, error_no="03409")
885        finally:
886            if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value):
887                cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)]
888                result = self.config.device.connector_command(cmd)
889                LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format(
890                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result))
891
892            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
893                                    time.time_ns())
894            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
895            self.config.device.device_log_collector.stop_hilog_task(
896                log_tar_file_name, module_name=request.get_module_name())
897
898            self.result = check_result_report(
899                request.config.report_path, self.result, self.error_message)
900
901    def _get_driver_config(self, json_config):
902        yara_bin = get_config_value('yara-bin',
903                                    json_config.get_driver(), False)
904        version_mapping_file = get_config_value('version-mapping-file',
905                                                json_config.get_driver(), False)
906        vul_info_file = get_config_value('vul-info-file',
907                                         json_config.get_driver(), False)
908        # get absolute file path
909        self.config.yara_bin = get_file_absolute_path(yara_bin)
910        self.config.version_mapping_file = get_file_absolute_path(version_mapping_file)
911        self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
912
913        # get tool hap info
914        # default value
915        self.tool_hap_info = {
916            OHYaraConfig.HAP_FILE.value: "sststool.hap",
917            OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool",
918            OHYaraConfig.CLEANUP_APPS.value: "true"
919        }
920        tool_hap_info = get_config_value('tools-hap-info',
921                                         json_config.get_driver(), False)
922        if tool_hap_info:
923            self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \
924                tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap")
925            self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \
926                tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool")
927            self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \
928                tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true")
929
930    def _run_oh_yara(self, config_file, request=None):
931        message_list = list()
932
933        json_config = JsonParser(config_file)
934        self._get_driver_config(json_config)
935
936        # get device info
937        self.security_patch = self.config.device.execute_shell_command(
938            "param get const.ohos.version.security_patch").strip()
939        self.system_version = self.config.device.execute_shell_command(
940            "param get const.ohos.fullname").strip()
941
942        if "fail" in self.system_version:
943            self._get_full_name_by_tool_hap()
944
945        vul_items = self._get_vul_items()
946        # if security patch expire, case fail
947        current_date_str = datetime.now().strftime('%Y-%m')
948        if self._check_if_expire_or_risk(current_date_str):
949            LOG.info("Security patch has expired. Set all case fail.")
950            for _, item in enumerate(vul_items):
951                item.complete = True
952                item.final_risk = OHYaraConfig.FAIL.value
953                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
954        else:
955            LOG.info("Security patch is shorter than two months. Start yara test.")
956            # parse version mapping file
957            mapping_info = self._do_parse_json(self.config.version_mapping_file)
958            os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
959            # check if system version in version mapping list
960            vul_version = os_full_name_list.get(self.system_version, None)
961            # not in the maintenance scope, skip all case
962            if vul_version is None:
963                LOG.debug("The system version is not in the maintenance scope, skip it. "
964                          "system versions is {}".format(self.system_version))
965            else:
966                for _, item in enumerate(vul_items):
967                    LOG.debug("Affected files: {}".format(item.affected_files))
968                    for index, affected_file in enumerate(item.affected_files):
969                        has_inter = False
970                        for i, _ in enumerate(item.affected_versions):
971                            if self._check_if_intersection(vul_version, item.affected_versions[i]):
972                                has_inter = True
973                                break
974                        if not has_inter:
975                            LOG.debug("Yara rule [{}] affected versions has no intersection "
976                                      "in mapping version, skip it. Mapping version is {}, "
977                                      "affected versions is {}".format(item.vul_id, vul_version,
978                                                                       item.affected_versions))
979                            continue
980                        local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
981                                                  request.get_module_name(), item.yara_rules[index].split('.')[0])
982                        if not os.path.exists(local_path):
983                            os.makedirs(local_path)
984                        yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
985                        self.config.device.pull_file(affected_file, local_path)
986                        affected_file = os.path.join(local_path, os.path.basename(affected_file))
987                        if not os.path.exists(affected_file):
988                            LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
989                            item.final_risk = OHYaraConfig.PASS.value
990                            continue
991                        cmd = [self.config.yara_bin, yara_file, affected_file]
992                        result = exec_cmd(cmd)
993                        LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
994                        if "testcase pass" in result:
995                            item.final_risk = OHYaraConfig.PASS.value
996                            break
997                        else:
998                            if self._check_if_expire_or_risk(item.month, check_risk=True):
999                                item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
1000                                                           item.disclosure.get("zh", ""))
1001                                item.final_risk = OHYaraConfig.FAIL.value
1002                            else:
1003                                item.final_risk = OHYaraConfig.BLOCK.value
1004                                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
1005                        # if no risk delete files, if rule has risk keep it
1006                        if item.final_risk != OHYaraConfig.FAIL.value:
1007                            local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
1008                                                      request.get_module_name(), item.yara_rules[index].split('.')[0])
1009                            if os.path.exists(local_path):
1010                                LOG.debug(
1011                                    "Yara rule [{}] has no risk, remove affected files.".format(
1012                                        item.yara_rules[index]))
1013                                shutil.rmtree(local_path)
1014                    item.complete = True
1015        self._generate_yara_report(request, vul_items, message_list)
1016        self._generate_xml_report(request, vul_items, message_list)
1017
1018    def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False):
1019        from dateutil.relativedelta import relativedelta
1020        self.security_patch = self.security_patch.replace(' ', '')
1021        self.security_patch = self.security_patch.replace('/', '-')
1022        # get current date
1023        source_date = datetime.strptime(date_str, '%Y-%m')
1024        security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m')
1025        # check if expire 2 months
1026        rd = relativedelta(source_date, security_patch_date)
1027        months = rd.months + (rd.years * 12)
1028        if check_risk:
1029            # vul time before security patch time no risk
1030            LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}"
1031                      .format(self.security_patch[:-3], date_str, months))
1032            if months > 0:
1033                return False
1034            else:
1035                return True
1036        else:
1037            # check if security patch time expire current time 2 months
1038            LOG.debug("Security patch time: {}, current time: {}, delta_months: {}"
1039                      .format(self.security_patch[:-3], date_str, months))
1040            if months > expire_time:
1041                return True
1042            else:
1043                return False
1044
1045    @staticmethod
1046    def _check_if_intersection(source_version, dst_version):
1047        # para dst_less_sor control if dst less than source
1048        def _do_check(soruce, dst, dst_less_sor=True):
1049            if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \
1050                    re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst):
1051                source_vers = soruce.split(".")
1052                dst_vers = dst.split(".")
1053                for index, _ in enumerate(source_vers):
1054                    if dst_less_sor:
1055                        # check if all source number less than dst number
1056                        if int(source_vers[index]) < int(dst_vers[index]):
1057                            return False
1058                    else:
1059                        # check if all source number larger than dst number
1060                        if int(source_vers[index]) > int(dst_vers[index]):
1061                            return False
1062                return True
1063            return False
1064
1065        source_groups = source_version.split("-")
1066        dst_groups = dst_version.split("-")
1067        if source_version == dst_version:
1068            return True
1069        elif len(source_groups) == 1 and len(dst_groups) == 1:
1070            return source_version == dst_version
1071        elif len(source_groups) == 1 and len(dst_groups) == 2:
1072            return _do_check(source_groups[0], dst_groups[0]) and \
1073                   _do_check(source_groups[0], dst_groups[1], dst_less_sor=False)
1074        elif len(source_groups) == 2 and len(dst_groups) == 1:
1075            return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \
1076                   _do_check(source_groups[1], dst_groups[0])
1077        elif len(source_groups) == 2 and len(dst_groups) == 2:
1078            return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \
1079                   _do_check(source_groups[1], dst_groups[0])
1080        return False
1081
1082    def _get_vul_items(self):
1083        vul_items = list()
1084        vul_info = self._do_parse_json(self.config.vul_info_file)
1085        vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, [])
1086        for _, vul in enumerate(vulnerabilities):
1087            affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
1088            item = VulItem()
1089            item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "")
1090            item.affected_versions = affected_versions
1091            item.month = vul.get(OHYaraConfig.MONTH.value, "")
1092            item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
1093            item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
1094            item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
1095            item.affected_files = \
1096                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1097                    OHYaraConfig.AFFECTED_FILES.value, [])
1098            item.yara_rules = \
1099                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1100                    OHYaraConfig.YARA_RULES.value, [])
1101            vul_items.append(item)
1102        LOG.debug("Vul size is {}".format(len(vul_items)))
1103        return vul_items
1104
1105    @staticmethod
1106    def _do_parse_json(file_path):
1107        json_content = None
1108        if not os.path.exists(file_path):
1109            raise ParamError("The json file {} does not exist".format(
1110                file_path), error_no="00110")
1111        flags = os.O_RDONLY
1112        modes = stat.S_IWUSR | stat.S_IRUSR
1113        with os.fdopen(os.open(file_path, flags, modes),
1114                       "r", encoding="utf-8") as file_content:
1115            json_content = json.load(file_content)
1116        if json_content is None:
1117            raise ParamError("The json file {} parse error".format(
1118                file_path), error_no="00110")
1119        return json_content
1120
1121    def _get_full_name_by_tool_hap(self):
1122        # check if tool hap has installed
1123        result = self.config.device.execute_shell_command(
1124            "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1125        LOG.debug(result)
1126        if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result:
1127            hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value))
1128            self.config.device.push_file(hap_path, "/data/local/tmp")
1129            result = self.config.device.execute_shell_command(
1130                "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path)))
1131            LOG.debug(result)
1132            self.config.device.execute_shell_command(
1133                "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format(
1134                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1135        self.config.device.execute_shell_command(
1136            "aa start -a {}.MainAbility -b {}".format(
1137                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value),
1138                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1139        time.sleep(1)
1140        self.system_version = self.config.device.execute_shell_command(
1141            "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format(
1142                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '')
1143        LOG.debug(self.system_version)
1144
1145    def _generate_yara_report(self, request, vul_items, result_message):
1146        import csv
1147        result_message.clear()
1148        yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv"
1149                                   .format(request.config.device.device_sn))
1150        if os.path.exists(yara_report):
1151            data = []
1152        else:
1153            data = [
1154                ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch],
1155                ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"]
1156            ]
1157        fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
1158        for _, item in enumerate(vul_items):
1159            data.append([item.vul_id, item.severity,
1160                         item.month, item.final_risk,
1161                         item.disclosure.get("zh", ""), item.vul_description.get("zh", "")])
1162            result = "{}|{}|{}|{}|{}|{}|{}\n".format(
1163                item.vul_id, item.severity,
1164                item.month, item.final_risk,
1165                item.disclosure.get("zh", ""), item.vul_description.get("zh", ""),
1166                item.trace)
1167            result_message.append(result)
1168        with os.fdopen(fd, "a", newline='') as file_handler:
1169            writer = csv.writer(file_handler)
1170            writer.writerows(data)
1171
1172    def _generate_xml_report(self, request, vul_items, message_list):
1173        result_message = "".join(message_list)
1174        listener_copy = request.listeners.copy()
1175        parsers = get_plugin(
1176            Plugin.PARSER, CommonParserType.oh_yara)
1177        if parsers:
1178            parsers = parsers[:1]
1179        for listener in listener_copy:
1180            listener.device_sn = self.config.device.device_sn
1181        parser_instances = []
1182        for parser in parsers:
1183            parser_instance = parser.__class__()
1184            parser_instance.suites_name = request.get_module_name()
1185            parser_instance.vul_items = vul_items
1186            parser_instance.listeners = listener_copy
1187            parser_instances.append(parser_instance)
1188        handler = ShellHandler(parser_instances)
1189        process_command_ret(result_message, handler)
1190
1191    def __result__(self):
1192        return self.result if os.path.exists(self.result) else ""
1193
1194    @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test)
1195    class ValidatorTestDriver(IDriver):
1196
1197        def __init__(self):
1198            self.error_message = ""
1199            self.xml_path = ""
1200            self.result = ""
1201            self.config = None
1202            self.kits = []
1203
1204        def __check_environment__(self, device_options):
1205            pass
1206
1207        def __check_config__(self, config):
1208            pass
1209
1210        def __execute__(self, request):
1211            try:
1212                self.result = os.path.join(
1213                    request.config.report_path, "result",
1214                    ".".join((request.get_module_name(), "xml")))
1215                self.config = request.config
1216                self.config.device = request.config.environment.devices[0]
1217                config_file = request.root.source.config_file
1218                self._run_validate_test(config_file, request)
1219            except Exception as exception:
1220                self.error_message = exception
1221                if not getattr(exception, "error_no", ""):
1222                    setattr(exception, "error_no", "03409")
1223                LOG.exception(self.error_message, exc_info=True, error_no="03409")
1224                raise exception
1225            finally:
1226                self.result = check_result_report(request.config.report_path,
1227                                                  self.result, self.error_message)
1228
1229        def _run_validate_test(self, config_file, request):
1230            is_update = False
1231            try:
1232                if "update" in self.config.testargs.keys():
1233                    if dict(self.config.testargs).get("update")[0] == "true":
1234                        is_update = True
1235                json_config = JsonParser(config_file)
1236                self.kits = get_kit_instances(json_config, self.config.resource_path,
1237                                              self.config.testcases_path)
1238                self._get_driver_config(json_config)
1239                if is_update:
1240                    do_module_kit_setup(request, self.kits)
1241                while True:
1242                    print("Is test finished?Y/N")
1243                    usr_input = input(">>>")
1244                    if usr_input == "Y" or usr_input == "y":
1245                        LOG.debug("Finish current test")
1246                        break
1247                    else:
1248                        print("continue")
1249                        LOG.debug("Your input is:{}, continue".format(usr_input))
1250                if self.xml_path:
1251                    result_dir = os.path.join(request.config.report_path, "result")
1252                    if not os.path.exists(result_dir):
1253                        os.makedirs(result_dir)
1254                    self.config.device.pull_file(self.xml_path, self.result)
1255            finally:
1256                if is_update:
1257                    do_module_kit_teardown(request)
1258
1259        def _get_driver_config(self, json_config):
1260            self.xml_path = get_config_value("xml_path", json_config.get_driver(), False)
1261        def __result__(self):
1262            return self.result if os.path.exists(self.result) else ""
1263