• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21import json
22import stat
23import shutil
24import re
25from datetime import datetime
26from enum import Enum
27
28from xdevice import ConfigConst
29from xdevice import ParamError
30from xdevice import IDriver
31from xdevice import platform_logger
32from xdevice import Plugin
33from xdevice import get_plugin
34from xdevice import JsonParser
35from xdevice import ShellHandler
36from xdevice import TestDescription
37from xdevice import get_device_log_file
38from xdevice import check_result_report
39from xdevice import get_kit_instances
40from xdevice import get_config_value
41from xdevice import do_module_kit_setup
42from xdevice import do_module_kit_teardown
43from xdevice import DeviceTestType
44from xdevice import CommonParserType
45from xdevice import FilePermission
46from xdevice import ResourceManager
47from xdevice import get_file_absolute_path
48from xdevice import exec_cmd
49
50from ohos.executor.listener import CollectingPassListener
51from ohos.constants import CKit
52from ohos.environment.dmlib import process_command_ret
53
54__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver",
55           "OHYaraTestDriver", "oh_jsunit_para_parse"]
56
57TIME_OUT = 300 * 1000
58
59LOG = platform_logger("OpenHarmony")
60
61
62def oh_jsunit_para_parse(runner, junit_paras):
63    junit_paras = dict(junit_paras)
64    test_type_list = ["function", "performance", "reliability", "security"]
65    size_list = ["small", "medium", "large"]
66    level_list = ["0", "1", "2", "3"]
67    for para_name in junit_paras.keys():
68        para_name = para_name.strip()
69        para_values = junit_paras.get(para_name, [])
70        if para_name == "class":
71            runner.add_arg(para_name, ",".join(para_values))
72        elif para_name == "notClass":
73            runner.add_arg(para_name, ",".join(para_values))
74        elif para_name == "testType":
75            if para_values[0] not in test_type_list:
76                continue
77            # function/performance/reliability/security
78            runner.add_arg(para_name, para_values[0])
79        elif para_name == "size":
80            if para_values[0] not in size_list:
81                continue
82            # size small/medium/large
83            runner.add_arg(para_name, para_values[0])
84        elif para_name == "level":
85            if para_values[0] not in level_list:
86                continue
87            # 0/1/2/3/4
88            runner.add_arg(para_name, para_values[0])
89        elif para_name == "stress":
90            runner.add_arg(para_name, para_values[0])
91
92
93@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
94class OHKernelTestDriver(IDriver):
95    """
96        OpenHarmonyKernelTest
97    """
98    def __init__(self):
99        self.timeout = 30 * 1000
100        self.result = ""
101        self.error_message = ""
102        self.kits = []
103        self.config = None
104        self.runner = None
105        # log
106        self.device_log = None
107        self.hilog = None
108        self.log_proc = None
109        self.hilog_proc = None
110
111    def __check_environment__(self, device_options):
112        pass
113
114    def __check_config__(self, config):
115        pass
116
117    def __execute__(self, request):
118        try:
119            LOG.debug("Start to Execute OpenHarmony Kernel Test")
120
121            self.config = request.config
122            self.config.device = request.config.environment.devices[0]
123
124            config_file = request.root.source.config_file
125
126            self.result = "%s.xml" % \
127                          os.path.join(request.config.report_path,
128                                       "result", request.get_module_name())
129            self.device_log = get_device_log_file(
130                request.config.report_path,
131                request.config.device.__get_serial__(),
132                "device_log")
133
134            self.hilog = get_device_log_file(
135                request.config.report_path,
136                request.config.device.__get_serial__(),
137                "device_hilog")
138
139            device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT |
140                                      os.O_APPEND, FilePermission.mode_755)
141            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
142                                 FilePermission.mode_755)
143            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
144            with os.fdopen(device_log_open, "a") as log_file_pipe, \
145                    os.fdopen(hilog_open, "a") as hilog_file_pipe:
146                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
147                    start_catch_device_log(log_file_pipe, hilog_file_pipe)
148                self._run_oh_kernel(config_file, request.listeners, request)
149                log_file_pipe.flush()
150                hilog_file_pipe.flush()
151        except Exception as exception:
152            self.error_message = exception
153            if not getattr(exception, "error_no", ""):
154                setattr(exception, "error_no", "03409")
155            LOG.exception(self.error_message, exc_info=False, error_no="03409")
156            raise exception
157        finally:
158            do_module_kit_teardown(request)
159            self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog)
160            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
161            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
162            self.result = check_result_report(
163                request.config.report_path, self.result, self.error_message)
164
165    def _run_oh_kernel(self, config_file, listeners=None, request=None):
166        try:
167            json_config = JsonParser(config_file)
168            kits = get_kit_instances(json_config, self.config.resource_path,
169                                     self.config.testcases_path)
170            self._get_driver_config(json_config)
171            do_module_kit_setup(request, kits)
172            self.runner = OHKernelTestRunner(self.config)
173            self.runner.suite_name = request.get_module_name()
174            self.runner.run(listeners)
175        finally:
176            do_module_kit_teardown(request)
177
178    def _get_driver_config(self, json_config):
179        target_test_path = get_config_value('native-test-device-path',
180                                            json_config.get_driver(), False)
181        test_suite_name = get_config_value('test-suite-name',
182                                           json_config.get_driver(), False)
183        test_suites_list = get_config_value('test-suites-list',
184                                            json_config.get_driver(), False)
185        timeout_limit = get_config_value('timeout-limit',
186                                         json_config.get_driver(), False)
187        conf_file = get_config_value('conf-file',
188                                     json_config.get_driver(), False)
189        self.config.arg_list = {}
190        if target_test_path:
191            self.config.target_test_path = target_test_path
192        if test_suite_name:
193            self.config.arg_list["test-suite-name"] = test_suite_name
194        if test_suites_list:
195            self.config.arg_list["test-suites-list"] = test_suites_list
196        if timeout_limit:
197            self.config.arg_list["timeout-limit"] = timeout_limit
198        if conf_file:
199            self.config.arg_list["conf-file"] = conf_file
200        timeout_config = get_config_value('shell-timeout',
201                                          json_config.get_driver(), False)
202        if timeout_config:
203            self.config.timeout = int(timeout_config)
204        else:
205            self.config.timeout = TIME_OUT
206
207    def __result__(self):
208        return self.result if os.path.exists(self.result) else ""
209
210
211class OHKernelTestRunner:
212    def __init__(self, config):
213        self.suite_name = None
214        self.config = config
215        self.arg_list = config.arg_list
216
217    def run(self, listeners):
218        handler = self._get_shell_handler(listeners)
219        # hdc shell cd /data/local/tmp/OH_kernel_test;
220        # sh runtest test -t OpenHarmony_RK3568_config
221        # -n OpenHarmony_RK3568_skiptest -l 60
222        command = "cd %s; chmod +x *; sh runtest test %s" % (
223            self.config.target_test_path, self.get_args_command())
224        self.config.device.execute_shell_command(
225            command, timeout=self.config.timeout, receiver=handler, retry=0)
226
227    def _get_shell_handler(self, listeners):
228        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test)
229        if parsers:
230            parsers = parsers[:1]
231        parser_instances = []
232        for parser in parsers:
233            parser_instance = parser.__class__()
234            parser_instance.suites_name = self.suite_name
235            parser_instance.listeners = listeners
236            parser_instances.append(parser_instance)
237        handler = ShellHandler(parser_instances)
238        return handler
239
240    def get_args_command(self):
241        args_commands = ""
242        for key, value in self.arg_list.items():
243            if key == "test-suite-name" or key == "test-suites-list":
244                args_commands = "%s -t %s" % (args_commands, value)
245            elif key == "conf-file":
246                args_commands = "%s -n %s" % (args_commands, value)
247            elif key == "timeout-limit":
248                args_commands = "%s -l %s" % (args_commands, value)
249        return args_commands
250
251
252@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
253class OHJSUnitTestDriver(IDriver):
254    """
255       OHJSUnitTestDriver is a Test that runs a native test package on
256       given device.
257    """
258
259    def __init__(self):
260        self.timeout = 80 * 1000
261        self.start_time = None
262        self.result = ""
263        self.error_message = ""
264        self.kits = []
265        self.config = None
266        self.runner = None
267        self.rerun = True
268        self.rerun_all = True
269        # log
270        self.device_log = None
271        self.hilog = None
272        self.log_proc = None
273        self.hilog_proc = None
274
275    def __check_environment__(self, device_options):
276        pass
277
278    def __check_config__(self, config):
279        pass
280
281    def __execute__(self, request):
282        try:
283            LOG.debug("Start execute OpenHarmony JSUnitTest")
284            self.result = os.path.join(
285                request.config.report_path, "result",
286                '.'.join((request.get_module_name(), "xml")))
287            self.config = request.config
288            self.config.device = request.config.environment.devices[0]
289
290            config_file = request.root.source.config_file
291            suite_file = request.root.source.source_file
292
293            if not suite_file:
294                raise ParamError(
295                    "test source '%s' not exists" %
296                    request.root.source.source_string, error_no="00110")
297            LOG.debug("Test case file path: %s" % suite_file)
298            self.config.device.set_device_report_path(request.config.report_path)
299            self.hilog = get_device_log_file(request.config.report_path,
300                                        request.config.device.__get_serial__() + "_" + request.
301                                        get_module_name(),
302                                        "device_hilog")
303
304            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
305                                 0o755)
306            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
307            self.config.device.execute_shell_command(command="hilog -r")
308            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
309                if hasattr(self.config, ConfigConst.device_log) \
310                        and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
311                        and hasattr(self.config.device, "clear_crash_log"):
312                    self.config.device.device_log_collector.clear_crash_log()
313                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
314                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
315                self._run_oh_jsunit(config_file, request)
316        except Exception as exception:
317            self.error_message = exception
318            if not getattr(exception, "error_no", ""):
319                setattr(exception, "error_no", "03409")
320            LOG.exception(self.error_message, exc_info=True, error_no="03409")
321            raise exception
322        finally:
323            try:
324                self._handle_logs(request)
325            finally:
326                self.result = check_result_report(
327                    request.config.report_path, self.result, self.error_message)
328
329    def __dry_run_execute__(self, request):
330        LOG.debug("Start dry run xdevice JSUnit Test")
331        self.config = request.config
332        self.config.device = request.config.environment.devices[0]
333        config_file = request.root.source.config_file
334        suite_file = request.root.source.source_file
335
336        if not suite_file:
337            raise ParamError(
338                "test source '%s' not exists" %
339                request.root.source.source_string, error_no="00110")
340        LOG.debug("Test case file path: %s" % suite_file)
341        self._dry_run_oh_jsunit(config_file, request)
342
343    def _dry_run_oh_jsunit(self, config_file, request):
344        try:
345            if not os.path.exists(config_file):
346                LOG.error("Error: Test cases don't exist %s." % config_file)
347                raise ParamError(
348                    "Error: Test cases don't exist %s." % config_file,
349                    error_no="00102")
350            json_config = JsonParser(config_file)
351            self.kits = get_kit_instances(json_config,
352                                          self.config.resource_path,
353                                          self.config.testcases_path)
354
355            self._get_driver_config(json_config)
356            self.config.device.connector_command("target mount")
357            do_module_kit_setup(request, self.kits)
358            self.runner = OHJSUnitTestRunner(self.config)
359            self.runner.suites_name = request.get_module_name()
360            # execute test case
361            self._get_runner_config(json_config)
362            oh_jsunit_para_parse(self.runner, self.config.testargs)
363
364            test_to_run = self._collect_test_to_run()
365            LOG.info("Collected suite count is: {}, test count is: {}".
366                     format(len(self.runner.expect_tests_dict.keys()),
367                            len(test_to_run) if test_to_run else 0))
368        finally:
369            do_module_kit_teardown(request)
370
371    def _run_oh_jsunit(self, config_file, request):
372        try:
373            if not os.path.exists(config_file):
374                LOG.error("Error: Test cases don't exist %s." % config_file)
375                raise ParamError(
376                    "Error: Test cases don't exist %s." % config_file,
377                    error_no="00102")
378            json_config = JsonParser(config_file)
379            self.kits = get_kit_instances(json_config,
380                                          self.config.resource_path,
381                                          self.config.testcases_path)
382
383            self._get_driver_config(json_config)
384            self.config.device.connector_command("target mount")
385            self._start_smart_perf()
386            do_module_kit_setup(request, self.kits)
387            self.runner = OHJSUnitTestRunner(self.config)
388            self.runner.suites_name = request.get_module_name()
389            self._get_runner_config(json_config)
390            if hasattr(self.config, "history_report_path") and \
391                    self.config.testargs.get("test"):
392                self._do_test_retry(request.listeners, self.config.testargs)
393            else:
394                if self.rerun:
395                    self.runner.retry_times = self.runner.MAX_RETRY_TIMES
396                    # execute test case
397                self._do_tf_suite()
398                self._make_exclude_list_file(request)
399                oh_jsunit_para_parse(self.runner, self.config.testargs)
400                self._do_test_run(listener=request.listeners)
401
402        finally:
403            do_module_kit_teardown(request)
404
405    def _get_driver_config(self, json_config):
406        package = get_config_value('package-name',
407                                   json_config.get_driver(), False)
408        module = get_config_value('module-name',
409                                  json_config.get_driver(), False)
410        bundle = get_config_value('bundle-name',
411                                  json_config. get_driver(), False)
412        is_rerun = get_config_value('rerun', json_config.get_driver(), False)
413
414        self.config.package_name = package
415        self.config.module_name = module
416        self.config.bundle_name = bundle
417        self.rerun = True if is_rerun == 'true' else False
418
419        if not package and not module:
420            raise ParamError("Neither package nor module is found"
421                             " in config file.", error_no="03201")
422        timeout_config = get_config_value("shell-timeout",
423                                          json_config.get_driver(), False)
424        if timeout_config:
425            self.config.timeout = int(timeout_config)
426        else:
427            self.config.timeout = TIME_OUT
428
429    def _get_runner_config(self, json_config):
430        test_timeout = get_config_value('test-timeout',
431                                        json_config.get_driver(), False)
432        if test_timeout:
433            self.runner.add_arg("wait_time", int(test_timeout))
434
435        testcase_timeout = get_config_value('testcase-timeout',
436                                            json_config.get_driver(), False)
437        if testcase_timeout:
438            self.runner.add_arg("timeout", int(testcase_timeout))
439        self.runner.compile_mode = get_config_value(
440            'compile-mode', json_config.get_driver(), False)
441
442    def _do_test_run(self, listener):
443        test_to_run = self._collect_test_to_run()
444        LOG.info("Collected suite count is: {}, test count is: {}".
445                 format(len(self.runner.expect_tests_dict.keys()),
446                        len(test_to_run) if test_to_run else 0))
447        if not test_to_run or not self.rerun:
448            self.runner.run(listener)
449            self.runner.notify_finished()
450        else:
451            self._run_with_rerun(listener, test_to_run)
452
453    def _collect_test_to_run(self):
454        run_results = self.runner.dry_run()
455        return run_results
456
457    def _run_tests(self, listener):
458        test_tracker = CollectingPassListener()
459        listener_copy = listener.copy()
460        listener_copy.append(test_tracker)
461        self.runner.run(listener_copy)
462        test_run = test_tracker.get_current_run_results()
463        return test_run
464
465    def _run_with_rerun(self, listener, expected_tests):
466        LOG.debug("Ready to run with rerun, expect run: %s"
467                  % len(expected_tests))
468        test_run = self._run_tests(listener)
469        self.runner.retry_times -= 1
470        LOG.debug("Run with rerun, has run: %s" % len(test_run)
471                  if test_run else 0)
472        if len(test_run) < len(expected_tests):
473            expected_tests = TestDescription.remove_test(expected_tests,
474                                                         test_run)
475            if not expected_tests:
476                LOG.debug("No tests to re-run twice,please check")
477                self.runner.notify_finished()
478            else:
479                self._rerun_twice(expected_tests, listener)
480        else:
481            LOG.debug("Rerun once success")
482            self.runner.notify_finished()
483
484    def _rerun_twice(self, expected_tests, listener):
485        tests = []
486        for test in expected_tests:
487            tests.append("%s#%s" % (test.class_name, test.test_name))
488        self.runner.add_arg("class", ",".join(tests))
489        LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
490        test_run = self._run_tests(listener)
491        self.runner.retry_times -= 1
492        LOG.debug("Rerun twice, has run: %s" % len(test_run))
493        if len(test_run) < len(expected_tests):
494            expected_tests = TestDescription.remove_test(expected_tests,
495                                                         test_run)
496            if not expected_tests:
497                LOG.debug("No tests to re-run third,please check")
498                self.runner.notify_finished()
499            else:
500                self._rerun_third(expected_tests, listener)
501        else:
502            LOG.debug("Rerun twice success")
503            self.runner.notify_finished()
504
505    def _rerun_third(self, expected_tests, listener):
506        tests = []
507        for test in expected_tests:
508            tests.append("%s#%s" % (test.class_name, test.test_name))
509        self.runner.add_arg("class", ",".join(tests))
510        LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
511        self._run_tests(listener)
512        LOG.debug("Rerun third success")
513        self.runner.notify_finished()
514
515    def _make_exclude_list_file(self, request):
516        if "all-test-file-exclude-filter" in self.config.testargs:
517            json_file_list = self.config.testargs.get(
518                "all-test-file-exclude-filter")
519            self.config.testargs.pop("all-test-file-exclude-filter")
520            if not json_file_list:
521                LOG.warning("all-test-file-exclude-filter value is empty!")
522            else:
523                if not os.path.isfile(json_file_list[0]):
524                    LOG.warning(
525                        "[{}] is not a valid file".format(json_file_list[0]))
526                    return
527                file_open = os.open(json_file_list[0], os.O_RDONLY,
528                                    stat.S_IWUSR | stat.S_IRUSR)
529                with os.fdopen(file_open, "r") as file_handler:
530                    json_data = json.load(file_handler)
531                exclude_list = json_data.get(
532                    DeviceTestType.oh_jsunit_test, [])
533                filter_list = []
534                for exclude in exclude_list:
535                    if request.get_module_name() not in exclude:
536                        continue
537                    filter_list.extend(exclude.get(request.get_module_name()))
538                if not isinstance(self.config.testargs, dict):
539                    return
540                if 'notClass' in self.config.testargs.keys():
541                    filter_list.extend(self.config.testargs.get('notClass', []))
542                self.config.testargs.update({"notClass": filter_list})
543
544    def _do_test_retry(self, listener, testargs):
545        tests_dict = dict()
546        case_list = list()
547        for test in testargs.get("test"):
548            test_item = test.split("#")
549            if len(test_item) != 2:
550                continue
551            case_list.append(test)
552            if test_item[0] not in tests_dict:
553                tests_dict.update({test_item[0] : []})
554            tests_dict.get(test_item[0]).append(
555                TestDescription(test_item[0], test_item[1]))
556        self.runner.add_arg("class", ",".join(case_list))
557        self.runner.expect_tests_dict = tests_dict
558        self.config.testargs.pop("test")
559        self.runner.run(listener)
560        self.runner.notify_finished()
561
562    def _do_tf_suite(self):
563        if hasattr(self.config, "tf_suite") and \
564                self.config.tf_suite.get("cases", []):
565            case_list = self.config["tf_suite"]["cases"]
566            self.config.testargs.update({"class": case_list})
567
568    def _start_smart_perf(self):
569        if not hasattr(self.config, ConfigConst.kits_in_module):
570            return
571        if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module):
572            return
573        sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
574        sp_kits.target_name = self.config.bundle_name
575        param_config = self.config.get(ConfigConst.kits_params).get(
576            CKit.smartperf, "")
577        sp_kits.__check_config__(param_config)
578        self.kits.insert(0, sp_kits)
579
580    def _handle_logs(self, request):
581        serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
582        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
583        if hasattr(self.config, ConfigConst.device_log) and \
584                self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
585                and hasattr(self.config.device, "start_get_crash_log"):
586            self.config.device.device_log_collector.\
587                start_get_crash_log(log_tar_file_name, module_name=request.get_module_name())
588        self.config.device.device_log_collector.\
589            remove_log_address(self.device_log, self.hilog)
590        self.config.device.device_log_collector.\
591            stop_catch_device_log(self.log_proc)
592        self.config.device.device_log_collector.\
593            stop_catch_device_log(self.hilog_proc)
594
595    def __result__(self):
596        return self.result if os.path.exists(self.result) else ""
597
598
599class OHJSUnitTestRunner:
600    MAX_RETRY_TIMES = 3
601
602    def __init__(self, config):
603        self.arg_list = {}
604        self.suites_name = None
605        self.config = config
606        self.rerun_attemp = 3
607        self.suite_recorder = {}
608        self.finished = False
609        self.expect_tests_dict = dict()
610        self.finished_observer = None
611        self.retry_times = 1
612        self.compile_mode = ""
613
614    def dry_run(self):
615        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
616        if parsers:
617            parsers = parsers[:1]
618        parser_instances = []
619        for parser in parsers:
620            parser_instance = parser.__class__()
621            parser_instances.append(parser_instance)
622        handler = ShellHandler(parser_instances)
623        command = self._get_dry_run_command()
624        self.config.device.execute_shell_command(
625            command, timeout=self.config.timeout, receiver=handler, retry=0)
626        self.expect_tests_dict = parser_instances[0].tests_dict
627        return parser_instances[0].tests
628
629    def run(self, listener):
630        handler = self._get_shell_handler(listener)
631        command = self._get_run_command()
632        self.config.device.execute_shell_command(
633            command, timeout=self.config.timeout, receiver=handler, retry=0)
634
635    def notify_finished(self):
636        if self.finished_observer:
637            self.finished_observer.notify_task_finished()
638        self.retry_times -= 1
639
640    def _get_shell_handler(self, listener):
641        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
642        if parsers:
643            parsers = parsers[:1]
644        parser_instances = []
645        for parser in parsers:
646            parser_instance = parser.__class__()
647            parser_instance.suites_name = self.suites_name
648            parser_instance.listeners = listener
649            parser_instance.runner = self
650            parser_instances.append(parser_instance)
651            self.finished_observer = parser_instance
652        handler = ShellHandler(parser_instances)
653        return handler
654
655    def add_arg(self, name, value):
656        if not name or not value:
657            return
658        self.arg_list[name] = value
659
660    def remove_arg(self, name):
661        if not name:
662            return
663        if name in self.arg_list:
664            del self.arg_list[name]
665
666    def get_args_command(self):
667        args_commands = ""
668        for key, value in self.arg_list.items():
669            if "wait_time" == key:
670                args_commands = "%s -w %s " % (args_commands, value)
671            else:
672                args_commands = "%s -s %s %s " % (args_commands, key, value)
673        return args_commands
674
675    def _get_run_command(self):
676        command = ""
677        if self.config.package_name:
678            # aa test -p ${packageName} -b ${bundleName}-s
679            # unittest OpenHarmonyTestRunner
680            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
681                      " {}".format(self.config.package_name,
682                                   self.config.bundle_name,
683                                   self.get_args_command())
684        elif self.config.module_name:
685            #  aa test -m ${moduleName}  -b ${bundleName}
686            #  -s unittest OpenHarmonyTestRunner
687            command = "aa test -m {} -b {} -s unittest {} {}".format(
688                self.config.module_name, self.config.bundle_name,
689                self.get_oh_test_runner_path(), self.get_args_command())
690        return command
691
692    def _get_dry_run_command(self):
693        command = ""
694        if self.config.package_name:
695            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
696                      " {} -s dryRun true".format(self.config.package_name,
697                                                  self.config.bundle_name,
698                                                  self.get_args_command())
699        elif self.config.module_name:
700            command = "aa test -m {} -b {} -s unittest {}" \
701                      " {} -s dryRun true".format(self.config.module_name,
702                                                  self.config.bundle_name,
703                                                  self.get_oh_test_runner_path(),
704                                                  self.get_args_command())
705
706        return command
707
708    def get_oh_test_runner_path(self):
709        if self.compile_mode == "esmodule":
710            return "/ets/testrunner/OpenHarmonyTestRunner"
711        else:
712            return "OpenHarmonyTestRunner"
713
714
715@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
716class OHRustTestDriver(IDriver):
717    def __init__(self):
718        self.result = ""
719        self.error_message = ""
720        self.config = None
721
722    def __check_environment__(self, device_options):
723        pass
724
725    def __check_config__(self, config):
726        pass
727
728    def __execute__(self, request):
729        try:
730            LOG.debug("Start to execute open harmony rust test")
731            self.config = request.config
732            self.config.device = request.config.environment.devices[0]
733            self.config.target_test_path = "/system/bin"
734
735            suite_file = request.root.source.source_file
736            LOG.debug("Testsuite filepath:{}".format(suite_file))
737
738            if not suite_file:
739                LOG.error("test source '{}' not exists".format(
740                    request.root.source.source_string))
741                return
742
743            self.result = "{}.xml".format(
744                os.path.join(request.config.report_path,
745                             "result", request.get_module_name()))
746            self.config.device.set_device_report_path(request.config.report_path)
747            self.config.device.device_log_collector.start_hilog_task()
748            self._init_oh_rust()
749            self._run_oh_rust(suite_file, request)
750        except Exception as exception:
751            self.error_message = exception
752            if not getattr(exception, "error_no", ""):
753                setattr(exception, "error_no", "03409")
754            LOG.exception(self.error_message, exc_info=False, error_no="03409")
755        finally:
756            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
757                                    time.time_ns())
758            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
759            self.config.device.device_log_collector.stop_hilog_task(
760                log_tar_file_name, module_name=request.get_module_name())
761            self.result = check_result_report(
762                request.config.report_path, self.result, self.error_message)
763
764    def _init_oh_rust(self):
765        self.config.device.connector_command("target mount")
766        self.config.device.execute_shell_command(
767            "mount -o rw,remount,rw /")
768
769    def _run_oh_rust(self, suite_file, request=None):
770        # push testsuite file
771        self.config.device.push_file(suite_file, self.config.target_test_path)
772        # push resource file
773        resource_manager = ResourceManager()
774        resource_data_dict, resource_dir = \
775            resource_manager.get_resource_data_dic(suite_file)
776        resource_manager.process_preparer_data(resource_data_dict,
777                                               resource_dir,
778                                               self.config.device)
779        for listener in request.listeners:
780            listener.device_sn = self.config.device.device_sn
781
782        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
783        if parsers:
784            parsers = parsers[:1]
785        parser_instances = []
786        for parser in parsers:
787            parser_instance = parser.__class__()
788            parser_instance.suite_name = request.get_module_name()
789            parser_instance.listeners = request.listeners
790            parser_instances.append(parser_instance)
791        handler = ShellHandler(parser_instances)
792
793        command = "cd {}; chmod +x *; ./{}".format(
794            self.config.target_test_path, os.path.basename(suite_file))
795        self.config.device.execute_shell_command(
796            command, timeout=TIME_OUT, receiver=handler, retry=0)
797        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
798                                              self.config.device)
799
800    def __result__(self):
801        return self.result if os.path.exists(self.result) else ""
802
803
804class OHYaraConfig(Enum):
805    HAP_FILE = "hap-file"
806    BUNDLE_NAME = "bundle-name"
807    CLEANUP_APPS = "cleanup-apps"
808
809    OS_FULLNAME_LIST = "osFullNameList"
810    VULNERABILITIES = "vulnerabilities"
811    VUL_ID = "vul_id"
812    OPENHARMONY_SA = "openharmony-sa"
813    AFFECTED_VERSION = "affected_versions"
814    MONTH = "month"
815    SEVERITY = "severity"
816    VUL_DESCRIPTION = "vul_description"
817    DISCLOSURE = "disclosure"
818    AFFECTED_FILES = "affected_files"
819    YARA_RULES = "yara_rules"
820
821    PASS = "pass"
822    FAIL = "fail"
823    BLOCK = "block"
824
825    ERROR_MSG_001 = "The patch label is longer than two months (60 days), " \
826                    "which violates the OHCA agreement [https://compatibility.openharmony.cn/]."
827    ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed."
828    ERROR_MSG_003 = "Modify the code according to the patch requirements: "
829
830
831class VulItem:
832    vul_id = ""
833    month = ""
834    severity = ""
835    vul_description = dict()
836    disclosure = dict()
837    affected_files = ""
838    affected_versions = ""
839    yara_rules = ""
840    trace = ""
841    final_risk = OHYaraConfig.PASS.value
842    complete = False
843
844
845@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test)
846class OHYaraTestDriver(IDriver):
847    def __init__(self):
848        self.result = ""
849        self.error_message = ""
850        self.config = None
851        self.tool_hap_info = dict()
852        self.security_patch = None
853        self.system_version = None
854
855    def __check_environment__(self, device_options):
856        pass
857
858    def __check_config__(self, config):
859        pass
860
861    def __execute__(self, request):
862        try:
863            LOG.debug("Start to execute open harmony yara test")
864            self.result = os.path.join(
865                request.config.report_path, "result",
866                '.'.join((request.get_module_name(), "xml")))
867            self.config = request.config
868            self.config.device = request.config.environment.devices[0]
869
870            config_file = request.root.source.config_file
871            suite_file = request.root.source.source_file
872
873            if not suite_file:
874                raise ParamError(
875                    "test source '%s' not exists" %
876                    request.root.source.source_string, error_no="00110")
877            LOG.debug("Test case file path: %s" % suite_file)
878            self.config.device.set_device_report_path(request.config.report_path)
879            self._run_oh_yara(config_file, request)
880
881        except Exception as exception:
882            self.error_message = exception
883            if not getattr(exception, "error_no", ""):
884                setattr(exception, "error_no", "03409")
885            LOG.exception(self.error_message, exc_info=False, error_no="03409")
886        finally:
887            if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value):
888                cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)]
889                result = self.config.device.connector_command(cmd)
890                LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format(
891                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result))
892
893            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
894                                    time.time_ns())
895            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
896            self.config.device.device_log_collector.stop_hilog_task(
897                log_tar_file_name, module_name=request.get_module_name())
898
899            self.result = check_result_report(
900                request.config.report_path, self.result, self.error_message)
901
902    def _get_driver_config(self, json_config):
903        yara_bin = get_config_value('yara-bin',
904                                    json_config.get_driver(), False)
905        version_mapping_file = get_config_value('version-mapping-file',
906                                                json_config.get_driver(), False)
907        vul_info_file = get_config_value('vul-info-file',
908                                         json_config.get_driver(), False)
909        # get absolute file path
910        self.config.yara_bin = get_file_absolute_path(yara_bin)
911        self.config.version_mapping_file = get_file_absolute_path(version_mapping_file)
912        self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
913
914        # get tool hap info
915        # default value
916        self.tool_hap_info = {
917            OHYaraConfig.HAP_FILE.value: "sststool.hap",
918            OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool",
919            OHYaraConfig.CLEANUP_APPS.value: "true"
920        }
921        tool_hap_info = get_config_value('tools-hap-info',
922                                         json_config.get_driver(), False)
923        if tool_hap_info:
924            self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \
925                tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap")
926            self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \
927                tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool")
928            self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \
929                tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true")
930
931    def _run_oh_yara(self, config_file, request=None):
932        message_list = list()
933
934        json_config = JsonParser(config_file)
935        self._get_driver_config(json_config)
936
937        # get device info
938        self.security_patch = self.config.device.execute_shell_command(
939            "param get const.ohos.version.security_patch").strip()
940        self.system_version = self.config.device.execute_shell_command(
941            "param get const.ohos.fullname").strip()
942
943        if "fail" in self.system_version:
944            self._get_full_name_by_tool_hap()
945
946        vul_items = self._get_vul_items()
947        # if security patch expire, case fail
948        current_date_str = datetime.now().strftime('%Y-%m')
949        if self._check_if_expire_or_risk(current_date_str):
950            LOG.info("Security patch has expired. Set all case fail.")
951            for _, item in enumerate(vul_items):
952                item.complete = True
953                item.final_risk = OHYaraConfig.FAIL.value
954                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
955        else:
956            LOG.info("Security patch is shorter than two months. Start yara test.")
957            # parse version mapping file
958            mapping_info = self._do_parse_json(self.config.version_mapping_file)
959            os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
960            # check if system version in version mapping list
961            vul_version = os_full_name_list.get(self.system_version, None)
962            # not in the maintenance scope, skip all case
963            if vul_version is None:
964                LOG.debug("The system version is not in the maintenance scope, skip it. "
965                          "system versions is {}".format(self.system_version))
966            else:
967                for _, item in enumerate(vul_items):
968                    LOG.debug("Affected files: {}".format(item.affected_files))
969                    for index, affected_file in enumerate(item.affected_files):
970                        has_inter = False
971                        for i, _ in enumerate(item.affected_versions):
972                            if self._check_if_intersection(vul_version, item.affected_versions[i]):
973                                has_inter = True
974                                break
975                        if not has_inter:
976                            LOG.debug("Yara rule [{}] affected versions has no intersection "
977                                      "in mapping version, skip it. Mapping version is {}, "
978                                      "affected versions is {}".format(item.vul_id, vul_version,
979                                                                       item.affected_versions))
980                            continue
981                        local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
982                                                  request.get_module_name(), item.yara_rules[index].split('.')[0])
983                        if not os.path.exists(local_path):
984                            os.makedirs(local_path)
985                        yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
986                        self.config.device.pull_file(affected_file, local_path)
987                        affected_file = os.path.join(local_path, os.path.basename(affected_file))
988                        if not os.path.exists(affected_file):
989                            LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
990                            item.final_risk = OHYaraConfig.PASS.value
991                            continue
992                        cmd = [self.config.yara_bin, yara_file, affected_file]
993                        result = exec_cmd(cmd)
994                        LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
995                        if "testcase pass" in result:
996                            item.final_risk = OHYaraConfig.PASS.value
997                            break
998                        else:
999                            if self._check_if_expire_or_risk(item.month, check_risk=True):
1000                                item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
1001                                                           item.disclosure.get("zh", ""))
1002                                item.final_risk = OHYaraConfig.FAIL.value
1003                            else:
1004                                item.final_risk = OHYaraConfig.BLOCK.value
1005                                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
1006                        # if no risk delete files, if rule has risk keep it
1007                        if item.final_risk != OHYaraConfig.FAIL.value:
1008                            local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
1009                                                      request.get_module_name(), item.yara_rules[index].split('.')[0])
1010                            if os.path.exists(local_path):
1011                                LOG.debug(
1012                                    "Yara rule [{}] has no risk, remove affected files.".format(
1013                                        item.yara_rules[index]))
1014                                shutil.rmtree(local_path)
1015                    item.complete = True
1016        self._generate_yara_report(request, vul_items, message_list)
1017        self._generate_xml_report(request, vul_items, message_list)
1018
1019    def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False):
1020        from dateutil.relativedelta import relativedelta
1021        self.security_patch = self.security_patch.replace(' ', '')
1022        self.security_patch = self.security_patch.replace('/', '-')
1023        # get current date
1024        source_date = datetime.strptime(date_str, '%Y-%m')
1025        security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m')
1026        # check if expire 2 months
1027        rd = relativedelta(source_date, security_patch_date)
1028        months = rd.months + (rd.years * 12)
1029        if check_risk:
1030            # vul time before security patch time no risk
1031            LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}"
1032                      .format(self.security_patch[:-3], date_str, months))
1033            if months > 0:
1034                return False
1035            else:
1036                return True
1037        else:
1038            # check if security patch time expire current time 2 months
1039            LOG.debug("Security patch time: {}, current time: {}, delta_months: {}"
1040                      .format(self.security_patch[:-3], date_str, months))
1041            if months > expire_time:
1042                return True
1043            else:
1044                return False
1045
1046    @staticmethod
1047    def _check_if_intersection(source_version, dst_version):
1048        # para dst_less_sor control if dst less than source
1049        def _do_check(soruce, dst, dst_less_sor=True):
1050            if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \
1051                    re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst):
1052                source_vers = soruce.split(".")
1053                dst_vers = dst.split(".")
1054                for index, _ in enumerate(source_vers):
1055                    if dst_less_sor:
1056                        # check if all source number less than dst number
1057                        if int(source_vers[index]) < int(dst_vers[index]):
1058                            return False
1059                    else:
1060                        # check if all source number larger than dst number
1061                        if int(source_vers[index]) > int(dst_vers[index]):
1062                            return False
1063                return True
1064            return False
1065
1066        source_groups = source_version.split("-")
1067        dst_groups = dst_version.split("-")
1068        if source_version == dst_version:
1069            return True
1070        elif len(source_groups) == 1 and len(dst_groups) == 1:
1071            return source_version == dst_version
1072        elif len(source_groups) == 1 and len(dst_groups) == 2:
1073            return _do_check(source_groups[0], dst_groups[0]) and \
1074                   _do_check(source_groups[0], dst_groups[1], dst_less_sor=False)
1075        elif len(source_groups) == 2 and len(dst_groups) == 1:
1076            return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \
1077                   _do_check(source_groups[1], dst_groups[0])
1078        elif len(source_groups) == 2 and len(dst_groups) == 2:
1079            return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \
1080                   _do_check(source_groups[1], dst_groups[0])
1081        return False
1082
1083    def _get_vul_items(self):
1084        vul_items = list()
1085        vul_info = self._do_parse_json(self.config.vul_info_file)
1086        vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, [])
1087        for _, vul in enumerate(vulnerabilities):
1088            affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
1089            item = VulItem()
1090            item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "")
1091            item.affected_versions = affected_versions
1092            item.month = vul.get(OHYaraConfig.MONTH.value, "")
1093            item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
1094            item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
1095            item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
1096            item.affected_files = \
1097                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1098                    OHYaraConfig.AFFECTED_FILES.value, [])
1099            item.yara_rules = \
1100                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1101                    OHYaraConfig.YARA_RULES.value, [])
1102            vul_items.append(item)
1103        LOG.debug("Vul size is {}".format(len(vul_items)))
1104        return vul_items
1105
1106    @staticmethod
1107    def _do_parse_json(file_path):
1108        json_content = None
1109        if not os.path.exists(file_path):
1110            raise ParamError("The json file {} does not exist".format(
1111                file_path), error_no="00110")
1112        flags = os.O_RDONLY
1113        modes = stat.S_IWUSR | stat.S_IRUSR
1114        with os.fdopen(os.open(file_path, flags, modes),
1115                       "r", encoding="utf-8") as file_content:
1116            json_content = json.load(file_content)
1117        if json_content is None:
1118            raise ParamError("The json file {} parse error".format(
1119                file_path), error_no="00110")
1120        return json_content
1121
1122    def _get_full_name_by_tool_hap(self):
1123        # check if tool hap has installed
1124        result = self.config.device.execute_shell_command(
1125            "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1126        LOG.debug(result)
1127        if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result:
1128            hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value))
1129            self.config.device.push_file(hap_path, "/data/local/tmp")
1130            result = self.config.device.execute_shell_command(
1131                "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path)))
1132            LOG.debug(result)
1133            self.config.device.execute_shell_command(
1134                "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format(
1135                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1136        self.config.device.execute_shell_command(
1137            "aa start -a {}.MainAbility -b {}".format(
1138                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value),
1139                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1140        time.sleep(1)
1141        self.system_version = self.config.device.execute_shell_command(
1142            "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format(
1143                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '')
1144        LOG.debug(self.system_version)
1145
1146    def _generate_yara_report(self, request, vul_items, result_message):
1147        import csv
1148        result_message.clear()
1149        yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv"
1150                                   .format(request.config.device.device_sn))
1151        if os.path.exists(yara_report):
1152            data = []
1153        else:
1154            data = [
1155                ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch],
1156                ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"]
1157            ]
1158        fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
1159        for _, item in enumerate(vul_items):
1160            data.append([item.vul_id, item.severity,
1161                         item.month, item.final_risk,
1162                         item.disclosure.get("zh", ""), item.vul_description.get("zh", "")])
1163            result = "{}|{}|{}|{}|{}|{}|{}\n".format(
1164                item.vul_id, item.severity,
1165                item.month, item.final_risk,
1166                item.disclosure.get("zh", ""), item.vul_description.get("zh", ""),
1167                item.trace)
1168            result_message.append(result)
1169        with os.fdopen(fd, "a", newline='') as file_handler:
1170            writer = csv.writer(file_handler)
1171            writer.writerows(data)
1172
1173    def _generate_xml_report(self, request, vul_items, message_list):
1174        result_message = "".join(message_list)
1175        listener_copy = request.listeners.copy()
1176        parsers = get_plugin(
1177            Plugin.PARSER, CommonParserType.oh_yara)
1178        if parsers:
1179            parsers = parsers[:1]
1180        for listener in listener_copy:
1181            listener.device_sn = self.config.device.device_sn
1182        parser_instances = []
1183        for parser in parsers:
1184            parser_instance = parser.__class__()
1185            parser_instance.suites_name = request.get_module_name()
1186            parser_instance.vul_items = vul_items
1187            parser_instance.listeners = listener_copy
1188            parser_instances.append(parser_instance)
1189        handler = ShellHandler(parser_instances)
1190        process_command_ret(result_message, handler)
1191
1192    def __result__(self):
1193        return self.result if os.path.exists(self.result) else ""
1194
1195    @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test)
1196    class ValidatorTestDriver(IDriver):
1197
1198        def __init__(self):
1199            self.error_message = ""
1200            self.xml_path = ""
1201            self.result = ""
1202            self.config = None
1203            self.kits = []
1204
1205        def __check_environment__(self, device_options):
1206            pass
1207
1208        def __check_config__(self, config):
1209            pass
1210
1211        def __execute__(self, request):
1212            try:
1213                self.result = os.path.join(
1214                    request.config.report_path, "result",
1215                    ".".join((request.get_module_name(), "xml")))
1216                self.config = request.config
1217                self.config.device = request.config.environment.devices[0]
1218                config_file = request.root.source.config_file
1219                self._run_validate_test(config_file, request)
1220            except Exception as exception:
1221                self.error_message = exception
1222                if not getattr(exception, "error_no", ""):
1223                    setattr(exception, "error_no", "03409")
1224                LOG.exception(self.error_message, exc_info=True, error_no="03409")
1225                raise exception
1226            finally:
1227                self.result = check_result_report(request.config.report_path,
1228                                                  self.result, self.error_message)
1229
1230        def _run_validate_test(self, config_file, request):
1231            is_update = False
1232            try:
1233                if "update" in self.config.testargs.keys():
1234                    if dict(self.config.testargs).get("update")[0] == "true":
1235                        is_update = True
1236                json_config = JsonParser(config_file)
1237                self.kits = get_kit_instances(json_config, self.config.resource_path,
1238                                              self.config.testcases_path)
1239                self._get_driver_config(json_config)
1240                if is_update:
1241                    do_module_kit_setup(request, self.kits)
1242                while True:
1243                    print("Is test finished?Y/N")
1244                    usr_input = input(">>>")
1245                    if usr_input == "Y" or usr_input == "y":
1246                        LOG.debug("Finish current test")
1247                        break
1248                    else:
1249                        print("continue")
1250                        LOG.debug("Your input is:{}, continue".format(usr_input))
1251                if self.xml_path:
1252                    result_dir = os.path.join(request.config.report_path, "result")
1253                    if not os.path.exists(result_dir):
1254                        os.makedirs(result_dir)
1255                    self.config.device.pull_file(self.xml_path, self.result)
1256            finally:
1257                if is_update:
1258                    do_module_kit_teardown(request)
1259
1260        def _get_driver_config(self, json_config):
1261            self.xml_path = get_config_value("xml_path", json_config.get_driver(), False)
1262        def __result__(self):
1263            return self.result if os.path.exists(self.result) else ""
1264