• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21import json
22import stat
23import shutil
24import re
25from datetime import datetime
26from enum import Enum
27
28from xdevice import ConfigConst
29from xdevice import ParamError
30from xdevice import IDriver
31from xdevice import platform_logger
32from xdevice import Plugin
33from xdevice import get_plugin
34from xdevice import JsonParser
35from xdevice import ShellHandler
36from xdevice import driver_output_method
37from xdevice import TestDescription
38from xdevice import get_device_log_file
39from xdevice import check_result_report
40from xdevice import get_kit_instances
41from xdevice import get_config_value
42from xdevice import do_module_kit_setup
43from xdevice import do_module_kit_teardown
44from xdevice import DeviceTestType
45from xdevice import CommonParserType
46from xdevice import FilePermission
47from xdevice import ResourceManager
48from xdevice import get_file_absolute_path
49from xdevice import exec_cmd
50
51from ohos.executor.listener import CollectingPassListener
52from ohos.constants import CKit
53from ohos.environment.dmlib import process_command_ret
54
55__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver",
56           "OHYaraTestDriver", "oh_jsunit_para_parse"]
57
58TIME_OUT = 300 * 1000
59
60LOG = platform_logger("OpenHarmony")
61
62
63def oh_jsunit_para_parse(runner, junit_paras):
64    junit_paras = dict(junit_paras)
65    test_type_list = ["function", "performance", "reliability", "security"]
66    size_list = ["small", "medium", "large"]
67    level_list = ["0", "1", "2", "3"]
68    for para_name in junit_paras.keys():
69        para_name = para_name.strip()
70        para_values = junit_paras.get(para_name, [])
71        if para_name == "class":
72            runner.add_arg(para_name, ",".join(para_values))
73        elif para_name == "notClass":
74            runner.add_arg(para_name, ",".join(para_values))
75        elif para_name == "testType":
76            if para_values[0] not in test_type_list:
77                continue
78            # function/performance/reliability/security
79            runner.add_arg(para_name, para_values[0])
80        elif para_name == "size":
81            if para_values[0] not in size_list:
82                continue
83            # size small/medium/large
84            runner.add_arg(para_name, para_values[0])
85        elif para_name == "level":
86            if para_values[0] not in level_list:
87                continue
88            # 0/1/2/3/4
89            runner.add_arg(para_name, para_values[0])
90        elif para_name == "stress":
91            runner.add_arg(para_name, para_values[0])
92
93
94@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
95class OHKernelTestDriver(IDriver):
96    """
97        OpenHarmonyKernelTest
98    """
99    def __init__(self):
100        self.timeout = 30 * 1000
101        self.result = ""
102        self.error_message = ""
103        self.kits = []
104        self.config = None
105        self.runner = None
106        # log
107        self.device_log = None
108        self.hilog = None
109        self.log_proc = None
110        self.hilog_proc = None
111
112    def __check_environment__(self, device_options):
113        pass
114
115    def __check_config__(self, config):
116        pass
117
118    def __execute__(self, request):
119        try:
120            LOG.debug("Start to Execute OpenHarmony Kernel Test")
121
122            self.config = request.config
123            self.config.device = request.config.environment.devices[0]
124
125            config_file = request.root.source.config_file
126
127            self.result = "%s.xml" % \
128                          os.path.join(request.config.report_path,
129                                       "result", request.get_module_name())
130            self.device_log = get_device_log_file(
131                request.config.report_path,
132                request.config.device.__get_serial__(),
133                "device_log",
134                module_name=request.get_module_name())
135
136            self.hilog = get_device_log_file(
137                request.config.report_path,
138                request.config.device.__get_serial__(),
139                "device_hilog",
140                module_name=request.get_module_name())
141
142            device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT |
143                                      os.O_APPEND, FilePermission.mode_755)
144            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
145                                 FilePermission.mode_755)
146            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
147            with os.fdopen(device_log_open, "a") as log_file_pipe, \
148                    os.fdopen(hilog_open, "a") as hilog_file_pipe:
149                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
150                    start_catch_device_log(log_file_pipe, hilog_file_pipe)
151                self._run_oh_kernel(config_file, request.listeners, request)
152                log_file_pipe.flush()
153                hilog_file_pipe.flush()
154        except Exception as exception:
155            self.error_message = exception
156            if not getattr(exception, "error_no", ""):
157                setattr(exception, "error_no", "03409")
158            LOG.exception(self.error_message, exc_info=False, error_no="03409")
159            raise exception
160        finally:
161            do_module_kit_teardown(request)
162            self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog)
163            self.config.device.device_log_collector.stop_catch_device_log(self.log_proc)
164            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
165            self.result = check_result_report(
166                request.config.report_path, self.result, self.error_message)
167
168    def _run_oh_kernel(self, config_file, listeners=None, request=None):
169        try:
170            json_config = JsonParser(config_file)
171            kits = get_kit_instances(json_config, self.config.resource_path,
172                                     self.config.testcases_path)
173            self._get_driver_config(json_config)
174            do_module_kit_setup(request, kits)
175            self.runner = OHKernelTestRunner(self.config)
176            self.runner.suite_name = request.get_module_name()
177            self.runner.run(listeners)
178        finally:
179            do_module_kit_teardown(request)
180
181    def _get_driver_config(self, json_config):
182        target_test_path = get_config_value('native-test-device-path',
183                                            json_config.get_driver(), False)
184        test_suite_name = get_config_value('test-suite-name',
185                                           json_config.get_driver(), False)
186        test_suites_list = get_config_value('test-suites-list',
187                                            json_config.get_driver(), False)
188        timeout_limit = get_config_value('timeout-limit',
189                                         json_config.get_driver(), False)
190        conf_file = get_config_value('conf-file',
191                                     json_config.get_driver(), False)
192        self.config.arg_list = {}
193        if target_test_path:
194            self.config.target_test_path = target_test_path
195        if test_suite_name:
196            self.config.arg_list["test-suite-name"] = test_suite_name
197        if test_suites_list:
198            self.config.arg_list["test-suites-list"] = test_suites_list
199        if timeout_limit:
200            self.config.arg_list["timeout-limit"] = timeout_limit
201        if conf_file:
202            self.config.arg_list["conf-file"] = conf_file
203        timeout_config = get_config_value('shell-timeout',
204                                          json_config.get_driver(), False)
205        if timeout_config:
206            self.config.timeout = int(timeout_config)
207        else:
208            self.config.timeout = TIME_OUT
209
210    def __result__(self):
211        return self.result if os.path.exists(self.result) else ""
212
213
214class OHKernelTestRunner:
215    def __init__(self, config):
216        self.suite_name = None
217        self.config = config
218        self.arg_list = config.arg_list
219
220    def run(self, listeners):
221        handler = self._get_shell_handler(listeners)
222        # hdc shell cd /data/local/tmp/OH_kernel_test;
223        # sh runtest test -t OpenHarmony_RK3568_config
224        # -n OpenHarmony_RK3568_skiptest -l 60
225        command = "cd %s; chmod +x *; sh runtest test %s" % (
226            self.config.target_test_path, self.get_args_command())
227        self.config.device.execute_shell_command(
228            command, timeout=self.config.timeout, receiver=handler, retry=0)
229
230    def _get_shell_handler(self, listeners):
231        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test)
232        if parsers:
233            parsers = parsers[:1]
234        parser_instances = []
235        for parser in parsers:
236            parser_instance = parser.__class__()
237            parser_instance.suites_name = self.suite_name
238            parser_instance.listeners = listeners
239            parser_instances.append(parser_instance)
240        handler = ShellHandler(parser_instances)
241        return handler
242
243    def get_args_command(self):
244        args_commands = ""
245        for key, value in self.arg_list.items():
246            if key == "test-suite-name" or key == "test-suites-list":
247                args_commands = "%s -t %s" % (args_commands, value)
248            elif key == "conf-file":
249                args_commands = "%s -n %s" % (args_commands, value)
250            elif key == "timeout-limit":
251                args_commands = "%s -l %s" % (args_commands, value)
252        return args_commands
253
254
255@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
256class OHJSUnitTestDriver(IDriver):
257    """
258       OHJSUnitTestDriver is a Test that runs a native test package on
259       given device.
260    """
261
262    def __init__(self):
263        self.timeout = 80 * 1000
264        self.start_time = None
265        self.result = ""
266        self.error_message = ""
267        self.kits = []
268        self.config = None
269        self.runner = None
270        self.rerun = True
271        self.rerun_all = True
272        # log
273        self.device_log = None
274        self.hilog = None
275        self.log_proc = None
276        self.hilog_proc = None
277
278    def __check_environment__(self, device_options):
279        pass
280
281    def __check_config__(self, config):
282        pass
283
284    def __execute__(self, request):
285        try:
286            LOG.debug("Start execute OpenHarmony JSUnitTest")
287            self.result = os.path.join(
288                request.config.report_path, "result",
289                '.'.join((request.get_module_name(), "xml")))
290            self.config = request.config
291            self.config.device = request.config.environment.devices[0]
292
293            config_file = request.root.source.config_file
294            suite_file = request.root.source.source_file
295
296            if not suite_file:
297                raise ParamError(
298                    "test source '%s' not exists" %
299                    request.root.source.source_string, error_no="00110")
300            LOG.debug("Test case file path: %s" % suite_file)
301            self.config.device.set_device_report_path(request.config.report_path)
302            self.hilog = get_device_log_file(
303                request.config.report_path,
304                request.config.device.__get_serial__(),
305                "device_hilog",
306                module_name=request.get_module_name())
307
308            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
309                                 0o755)
310            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
311            self.config.device.execute_shell_command(command="hilog -r")
312            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
313                if hasattr(self.config, ConfigConst.device_log) \
314                        and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
315                        and hasattr(self.config.device, "clear_crash_log"):
316                    self.config.device.device_log_collector.clear_crash_log()
317                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
318                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
319                self._run_oh_jsunit(config_file, request)
320        except Exception as exception:
321            self.error_message = exception
322            if not getattr(exception, "error_no", ""):
323                setattr(exception, "error_no", "03409")
324            LOG.exception(self.error_message, exc_info=True, error_no="03409")
325            raise exception
326        finally:
327            try:
328                self._handle_logs(request)
329            finally:
330                self.result = check_result_report(
331                    request.config.report_path, self.result, self.error_message)
332
333    def __dry_run_execute__(self, request):
334        LOG.debug("Start dry run xdevice JSUnit Test")
335        self.config = request.config
336        self.config.device = request.config.environment.devices[0]
337        config_file = request.root.source.config_file
338        suite_file = request.root.source.source_file
339
340        if not suite_file:
341            raise ParamError(
342                "test source '%s' not exists" %
343                request.root.source.source_string, error_no="00110")
344        LOG.debug("Test case file path: %s" % suite_file)
345        self._dry_run_oh_jsunit(config_file, request)
346
347    def _dry_run_oh_jsunit(self, config_file, request):
348        try:
349            if not os.path.exists(config_file):
350                LOG.error("Error: Test cases don't exist %s." % config_file)
351                raise ParamError(
352                    "Error: Test cases don't exist %s." % config_file,
353                    error_no="00102")
354            json_config = JsonParser(config_file)
355            self.kits = get_kit_instances(json_config,
356                                          self.config.resource_path,
357                                          self.config.testcases_path)
358
359            self._get_driver_config(json_config)
360            self.config.device.connector_command("target mount")
361            do_module_kit_setup(request, self.kits)
362            self.runner = OHJSUnitTestRunner(self.config)
363            self.runner.suites_name = request.get_module_name()
364            # execute test case
365            self._get_runner_config(json_config)
366            oh_jsunit_para_parse(self.runner, self.config.testargs)
367
368            test_to_run = self._collect_test_to_run()
369            LOG.info("Collected suite count is: {}, test count is: {}".
370                     format(len(self.runner.expect_tests_dict.keys()),
371                            len(test_to_run) if test_to_run else 0))
372        finally:
373            do_module_kit_teardown(request)
374
375    def _run_oh_jsunit(self, config_file, request):
376        try:
377            if not os.path.exists(config_file):
378                LOG.error("Error: Test cases don't exist %s." % config_file)
379                raise ParamError(
380                    "Error: Test cases don't exist %s." % config_file,
381                    error_no="00102")
382            json_config = JsonParser(config_file)
383            self.kits = get_kit_instances(json_config,
384                                          self.config.resource_path,
385                                          self.config.testcases_path)
386
387            self._get_driver_config(json_config)
388            self.config.device.connector_command("target mount")
389            self._start_smart_perf()
390            do_module_kit_setup(request, self.kits)
391            self.runner = OHJSUnitTestRunner(self.config)
392            self.runner.suites_name = request.get_module_name()
393            self._get_runner_config(json_config)
394            if hasattr(self.config, "history_report_path") and \
395                    self.config.testargs.get("test"):
396                self._do_test_retry(request.listeners, self.config.testargs)
397            else:
398                if self.rerun:
399                    self.runner.retry_times = self.runner.MAX_RETRY_TIMES
400                    # execute test case
401                self._do_tf_suite()
402                self._make_exclude_list_file(request)
403                oh_jsunit_para_parse(self.runner, self.config.testargs)
404                self._do_test_run(listener=request.listeners)
405
406        finally:
407            do_module_kit_teardown(request)
408
409    def _get_driver_config(self, json_config):
410        package = get_config_value('package-name',
411                                   json_config.get_driver(), False)
412        module = get_config_value('module-name',
413                                  json_config.get_driver(), False)
414        bundle = get_config_value('bundle-name',
415                                  json_config. get_driver(), False)
416        is_rerun = get_config_value('rerun', json_config.get_driver(), False)
417
418        self.config.package_name = package
419        self.config.module_name = module
420        self.config.bundle_name = bundle
421        self.rerun = True if is_rerun == 'true' else False
422
423        if not package and not module:
424            raise ParamError("Neither package nor module is found"
425                             " in config file.", error_no="03201")
426        timeout_config = get_config_value("shell-timeout",
427                                          json_config.get_driver(), False)
428        if timeout_config:
429            self.config.timeout = int(timeout_config)
430        else:
431            self.config.timeout = TIME_OUT
432
433    def _get_runner_config(self, json_config):
434        test_timeout = get_config_value('test-timeout',
435                                        json_config.get_driver(), False)
436        if test_timeout:
437            self.runner.add_arg("wait_time", int(test_timeout))
438
439        testcase_timeout = get_config_value('testcase-timeout',
440                                            json_config.get_driver(), False)
441        if testcase_timeout:
442            self.runner.add_arg("timeout", int(testcase_timeout))
443        self.runner.compile_mode = get_config_value(
444            'compile-mode', json_config.get_driver(), False)
445
446    def _do_test_run(self, listener):
447        test_to_run = self._collect_test_to_run()
448        LOG.info("Collected suite count is: {}, test count is: {}".
449                 format(len(self.runner.expect_tests_dict.keys()),
450                        len(test_to_run) if test_to_run else 0))
451        if not test_to_run or not self.rerun:
452            self.runner.run(listener)
453            self.runner.notify_finished()
454        else:
455            self._run_with_rerun(listener, test_to_run)
456
457    def _collect_test_to_run(self):
458        run_results = self.runner.dry_run()
459        return run_results
460
461    def _run_tests(self, listener):
462        test_tracker = CollectingPassListener()
463        listener_copy = listener.copy()
464        listener_copy.append(test_tracker)
465        self.runner.run(listener_copy)
466        test_run = test_tracker.get_current_run_results()
467        return test_run
468
469    def _run_with_rerun(self, listener, expected_tests):
470        LOG.debug("Ready to run with rerun, expect run: %s"
471                  % len(expected_tests))
472        test_run = self._run_tests(listener)
473        self.runner.retry_times -= 1
474        LOG.debug("Run with rerun, has run: %s" % len(test_run)
475                  if test_run else 0)
476        if len(test_run) < len(expected_tests):
477            expected_tests = TestDescription.remove_test(expected_tests,
478                                                         test_run)
479            if not expected_tests:
480                LOG.debug("No tests to re-run twice,please check")
481                self.runner.notify_finished()
482            else:
483                self._rerun_twice(expected_tests, listener)
484        else:
485            LOG.debug("Rerun once success")
486            self.runner.notify_finished()
487
488    def _rerun_twice(self, expected_tests, listener):
489        tests = []
490        for test in expected_tests:
491            tests.append("%s#%s" % (test.class_name, test.test_name))
492        self.runner.add_arg("class", ",".join(tests))
493        LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
494        test_run = self._run_tests(listener)
495        self.runner.retry_times -= 1
496        LOG.debug("Rerun twice, has run: %s" % len(test_run))
497        if len(test_run) < len(expected_tests):
498            expected_tests = TestDescription.remove_test(expected_tests,
499                                                         test_run)
500            if not expected_tests:
501                LOG.debug("No tests to re-run third,please check")
502                self.runner.notify_finished()
503            else:
504                self._rerun_third(expected_tests, listener)
505        else:
506            LOG.debug("Rerun twice success")
507            self.runner.notify_finished()
508
509    def _rerun_third(self, expected_tests, listener):
510        tests = []
511        for test in expected_tests:
512            tests.append("%s#%s" % (test.class_name, test.test_name))
513        self.runner.add_arg("class", ",".join(tests))
514        LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
515        self._run_tests(listener)
516        LOG.debug("Rerun third success")
517        self.runner.notify_finished()
518
519    def _make_exclude_list_file(self, request):
520        if "all-test-file-exclude-filter" in self.config.testargs:
521            json_file_list = self.config.testargs.get(
522                "all-test-file-exclude-filter")
523            self.config.testargs.pop("all-test-file-exclude-filter")
524            if not json_file_list:
525                LOG.warning("all-test-file-exclude-filter value is empty!")
526            else:
527                if not os.path.isfile(json_file_list[0]):
528                    LOG.warning(
529                        "[{}] is not a valid file".format(json_file_list[0]))
530                    return
531                file_open = os.open(json_file_list[0], os.O_RDONLY,
532                                    stat.S_IWUSR | stat.S_IRUSR)
533                with os.fdopen(file_open, "r") as file_handler:
534                    json_data = json.load(file_handler)
535                exclude_list = json_data.get(
536                    DeviceTestType.oh_jsunit_test, [])
537                filter_list = []
538                for exclude in exclude_list:
539                    if request.get_module_name() not in exclude:
540                        continue
541                    filter_list.extend(exclude.get(request.get_module_name()))
542                if not isinstance(self.config.testargs, dict):
543                    return
544                if 'notClass' in self.config.testargs.keys():
545                    filter_list.extend(self.config.testargs.get('notClass', []))
546                self.config.testargs.update({"notClass": filter_list})
547
548    def _do_test_retry(self, listener, testargs):
549        tests_dict = dict()
550        case_list = list()
551        for test in testargs.get("test"):
552            test_item = test.split("#")
553            if len(test_item) != 2:
554                continue
555            case_list.append(test)
556            if test_item[0] not in tests_dict:
557                tests_dict.update({test_item[0] : []})
558            tests_dict.get(test_item[0]).append(
559                TestDescription(test_item[0], test_item[1]))
560        self.runner.add_arg("class", ",".join(case_list))
561        self.runner.expect_tests_dict = tests_dict
562        self.config.testargs.pop("test")
563        self.runner.run(listener)
564        self.runner.notify_finished()
565
566    def _do_tf_suite(self):
567        if hasattr(self.config, "tf_suite") and \
568                self.config.tf_suite.get("cases", []):
569            case_list = self.config["tf_suite"]["cases"]
570            self.config.testargs.update({"class": case_list})
571
572    def _start_smart_perf(self):
573        if not hasattr(self.config, ConfigConst.kits_in_module):
574            return
575        if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module):
576            return
577        sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
578        sp_kits.target_name = self.config.bundle_name
579        param_config = self.config.get(ConfigConst.kits_params).get(
580            CKit.smartperf, "")
581        sp_kits.__check_config__(param_config)
582        self.kits.insert(0, sp_kits)
583
584    def _handle_logs(self, request):
585        serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
586        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
587        if hasattr(self.config, ConfigConst.device_log) and \
588                self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
589                and hasattr(self.config.device, "start_get_crash_log"):
590            self.config.device.device_log_collector.\
591                start_get_crash_log(log_tar_file_name, module_name=request.get_module_name())
592        self.config.device.device_log_collector.\
593            remove_log_address(self.device_log, self.hilog)
594        self.config.device.device_log_collector.\
595            stop_catch_device_log(self.log_proc)
596        self.config.device.device_log_collector.\
597            stop_catch_device_log(self.hilog_proc)
598
599    def __result__(self):
600        return self.result if os.path.exists(self.result) else ""
601
602
603class OHJSUnitTestRunner:
604    MAX_RETRY_TIMES = 3
605
606    def __init__(self, config):
607        self.arg_list = {}
608        self.suites_name = None
609        self.config = config
610        self.rerun_attemp = 3
611        self.suite_recorder = {}
612        self.finished = False
613        self.expect_tests_dict = dict()
614        self.finished_observer = None
615        self.retry_times = 1
616        self.compile_mode = ""
617
618    def dry_run(self):
619        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
620        if parsers:
621            parsers = parsers[:1]
622        parser_instances = []
623        for parser in parsers:
624            parser_instance = parser.__class__()
625            parser_instances.append(parser_instance)
626        handler = ShellHandler(parser_instances)
627        handler.add_process_method(driver_output_method)
628        command = self._get_dry_run_command()
629        self.config.device.execute_shell_command(
630            command, timeout=self.config.timeout, receiver=handler, retry=0)
631        self.expect_tests_dict = parser_instances[0].tests_dict
632        return parser_instances[0].tests
633
634    def run(self, listener):
635        handler = self._get_shell_handler(listener)
636        command = self._get_run_command()
637        self.config.device.execute_shell_command(
638            command, timeout=self.config.timeout, receiver=handler, retry=0)
639
640    def notify_finished(self):
641        if self.finished_observer:
642            self.finished_observer.notify_task_finished()
643        self.retry_times -= 1
644
645    def _get_shell_handler(self, listener):
646        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
647        if parsers:
648            parsers = parsers[:1]
649        parser_instances = []
650        for parser in parsers:
651            parser_instance = parser.__class__()
652            parser_instance.suites_name = self.suites_name
653            parser_instance.listeners = listener
654            parser_instance.runner = self
655            parser_instances.append(parser_instance)
656            self.finished_observer = parser_instance
657        handler = ShellHandler(parser_instances)
658        return handler
659
660    def add_arg(self, name, value):
661        if not name or not value:
662            return
663        self.arg_list[name] = value
664
665    def remove_arg(self, name):
666        if not name:
667            return
668        if name in self.arg_list:
669            del self.arg_list[name]
670
671    def get_args_command(self):
672        args_commands = ""
673        for key, value in self.arg_list.items():
674            if "wait_time" == key:
675                args_commands = "%s -w %s " % (args_commands, value)
676            else:
677                args_commands = "%s -s %s %s " % (args_commands, key, value)
678        return args_commands
679
680    def _get_run_command(self):
681        command = ""
682        if self.config.package_name:
683            # aa test -p ${packageName} -b ${bundleName}-s
684            # unittest OpenHarmonyTestRunner
685            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
686                      " {}".format(self.config.package_name,
687                                   self.config.bundle_name,
688                                   self.get_args_command())
689        elif self.config.module_name:
690            #  aa test -m ${moduleName}  -b ${bundleName}
691            #  -s unittest OpenHarmonyTestRunner
692            command = "aa test -m {} -b {} -s unittest {} {}".format(
693                self.config.module_name, self.config.bundle_name,
694                self.get_oh_test_runner_path(), self.get_args_command())
695        return command
696
697    def _get_dry_run_command(self):
698        command = ""
699        if self.config.package_name:
700            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
701                      " {} -s dryRun true".format(self.config.package_name,
702                                                  self.config.bundle_name,
703                                                  self.get_args_command())
704        elif self.config.module_name:
705            command = "aa test -m {} -b {} -s unittest {}" \
706                      " {} -s dryRun true".format(self.config.module_name,
707                                                  self.config.bundle_name,
708                                                  self.get_oh_test_runner_path(),
709                                                  self.get_args_command())
710
711        return command
712
713    def get_oh_test_runner_path(self):
714        if self.compile_mode == "esmodule":
715            return "/ets/testrunner/OpenHarmonyTestRunner"
716        else:
717            return "OpenHarmonyTestRunner"
718
719
720@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
721class OHRustTestDriver(IDriver):
722    def __init__(self):
723        self.result = ""
724        self.error_message = ""
725        self.config = None
726
727    def __check_environment__(self, device_options):
728        pass
729
730    def __check_config__(self, config):
731        pass
732
733    def __execute__(self, request):
734        try:
735            LOG.debug("Start to execute open harmony rust test")
736            self.config = request.config
737            self.config.device = request.config.environment.devices[0]
738            self.config.target_test_path = "/system/bin"
739
740            suite_file = request.root.source.source_file
741            LOG.debug("Testsuite filepath:{}".format(suite_file))
742
743            if not suite_file:
744                LOG.error("test source '{}' not exists".format(
745                    request.root.source.source_string))
746                return
747
748            self.result = "{}.xml".format(
749                os.path.join(request.config.report_path,
750                             "result", request.get_module_name()))
751            self.config.device.set_device_report_path(request.config.report_path)
752            self.config.device.device_log_collector.start_hilog_task()
753            self._init_oh_rust()
754            self._run_oh_rust(suite_file, request)
755        except Exception as exception:
756            self.error_message = exception
757            if not getattr(exception, "error_no", ""):
758                setattr(exception, "error_no", "03409")
759            LOG.exception(self.error_message, exc_info=False, error_no="03409")
760        finally:
761            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
762                                    time.time_ns())
763            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
764            self.config.device.device_log_collector.stop_hilog_task(
765                log_tar_file_name, module_name=request.get_module_name())
766            self.result = check_result_report(
767                request.config.report_path, self.result, self.error_message)
768
769    def _init_oh_rust(self):
770        self.config.device.connector_command("target mount")
771        self.config.device.execute_shell_command(
772            "mount -o rw,remount,rw /")
773
774    def _run_oh_rust(self, suite_file, request=None):
775        # push testsuite file
776        self.config.device.push_file(suite_file, self.config.target_test_path)
777        # push resource file
778        resource_manager = ResourceManager()
779        resource_data_dict, resource_dir = \
780            resource_manager.get_resource_data_dic(suite_file)
781        resource_manager.process_preparer_data(resource_data_dict,
782                                               resource_dir,
783                                               self.config.device)
784        for listener in request.listeners:
785            listener.device_sn = self.config.device.device_sn
786
787        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
788        if parsers:
789            parsers = parsers[:1]
790        parser_instances = []
791        for parser in parsers:
792            parser_instance = parser.__class__()
793            parser_instance.suite_name = request.get_module_name()
794            parser_instance.listeners = request.listeners
795            parser_instances.append(parser_instance)
796        handler = ShellHandler(parser_instances)
797
798        command = "cd {}; chmod +x *; ./{}".format(
799            self.config.target_test_path, os.path.basename(suite_file))
800        self.config.device.execute_shell_command(
801            command, timeout=TIME_OUT, receiver=handler, retry=0)
802        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
803                                              self.config.device)
804
805    def __result__(self):
806        return self.result if os.path.exists(self.result) else ""
807
808
809class OHYaraConfig(Enum):
810    HAP_FILE = "hap-file"
811    BUNDLE_NAME = "bundle-name"
812    CLEANUP_APPS = "cleanup-apps"
813
814    OS_FULLNAME_LIST = "osFullNameList"
815    VULNERABILITIES = "vulnerabilities"
816    VUL_ID = "vul_id"
817    OPENHARMONY_SA = "openharmony-sa"
818    CVE = "cve"
819    AFFECTED_VERSION = "affected_versions"
820    MONTH = "month"
821    SEVERITY = "severity"
822    VUL_DESCRIPTION = "vul_description"
823    DISCLOSURE = "disclosure"
824    OBJECT_TYPE = "object_type"
825    AFFECTED_FILES = "affected_files"
826    YARA_RULES = "yara_rules"
827
828    PASS = "pass"
829    FAIL = "fail"
830    BLOCK = "block"
831
832    ERROR_MSG_001 = "The patch label is longer than two months (60 days), which violates the OHCA agreement."
833    ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed."
834    ERROR_MSG_003 = "Modify the code according to the patch requirements: "
835
836
837class VulItem:
838    vul_id = ""
839    month = ""
840    severity = ""
841    vul_description = dict()
842    disclosure = dict()
843    object_type = ""
844    affected_files = ""
845    affected_versions = ""
846    yara_rules = ""
847    trace = ""
848    final_risk = OHYaraConfig.PASS.value
849    complete = False
850
851
852@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test)
853class OHYaraTestDriver(IDriver):
854    def __init__(self):
855        self.result = ""
856        self.error_message = ""
857        self.config = None
858        self.tool_hap_info = dict()
859        self.security_patch = None
860        self.system_version = None
861
862    def __check_environment__(self, device_options):
863        pass
864
865    def __check_config__(self, config):
866        pass
867
868    def __execute__(self, request):
869        try:
870            LOG.debug("Start to execute open harmony yara test")
871            self.result = os.path.join(
872                request.config.report_path, "result",
873                '.'.join((request.get_module_name(), "xml")))
874            self.config = request.config
875            self.config.device = request.config.environment.devices[0]
876
877            config_file = request.root.source.config_file
878            suite_file = request.root.source.source_file
879
880            if not suite_file:
881                raise ParamError(
882                    "test source '%s' not exists" %
883                    request.root.source.source_string, error_no="00110")
884            LOG.debug("Test case file path: %s" % suite_file)
885            self.config.device.set_device_report_path(request.config.report_path)
886            self._run_oh_yara(config_file, request)
887
888        except Exception as exception:
889            self.error_message = exception
890            if not getattr(exception, "error_no", ""):
891                setattr(exception, "error_no", "03409")
892            LOG.exception(self.error_message, exc_info=False, error_no="03409")
893        finally:
894            if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value):
895                cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)]
896                result = self.config.device.connector_command(cmd)
897                LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format(
898                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result))
899
900            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
901                                    time.time_ns())
902            log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
903            self.config.device.device_log_collector.stop_hilog_task(
904                log_tar_file_name, module_name=request.get_module_name())
905
906            self.result = check_result_report(
907                request.config.report_path, self.result, self.error_message)
908
909    def _get_driver_config(self, json_config):
910        yara_bin = get_config_value('yara-bin',
911                                    json_config.get_driver(), False)
912        vmlinux_to_elf_bin = get_config_value('vmlinux-to-elf-bin',
913                                              json_config.get_driver(), False)
914        version_mapping_file = get_config_value('version-mapping-file',
915                                                json_config.get_driver(), False)
916        vul_info_file = get_config_value('vul-info-file',
917                                         json_config.get_driver(), False)
918        # get absolute file path
919        self.config.yara_bin = get_file_absolute_path(yara_bin)
920        self.config.vmlinux_to_elf_bin = get_file_absolute_path(vmlinux_to_elf_bin)
921        self.config.version_mapping_file = get_file_absolute_path(version_mapping_file)
922        if vul_info_file != "vul_info_patch_label_test":
923            self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
924
925        # get tool hap info
926        # default value
927        self.tool_hap_info = {
928            OHYaraConfig.HAP_FILE.value: "sststool.hap",
929            OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool",
930            OHYaraConfig.CLEANUP_APPS.value: "true"
931        }
932        tool_hap_info = get_config_value('tools-hap-info',
933                                         json_config.get_driver(), False)
934        if tool_hap_info:
935            self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \
936                tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap")
937            self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \
938                tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool")
939            self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \
940                tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true")
941
942    def _run_oh_yara(self, config_file, request=None):
943        message_list = list()
944
945        json_config = JsonParser(config_file)
946        self._get_driver_config(json_config)
947        # get device info
948        self.security_patch = self.config.device.execute_shell_command(
949            "param get const.ohos.version.security_patch").strip()
950        self.system_version = self.config.device.execute_shell_command(
951            "param get const.ohos.fullname").strip()
952
953        if "fail" in self.system_version:
954            self._get_full_name_by_tool_hap()
955
956        vul_info_file = get_config_value('vul-info-file', json_config.get_driver(), False)
957        # Extract patch labels into separate testcase
958        if vul_info_file == "vul_info_patch_label_test":
959            vul_items = list()
960            item = VulItem()
961            item.vul_id = "Patch-label-test"
962            item.month = "Patch-label-test"
963
964            # security patch verify
965            current_date_str = datetime.now().strftime('%Y-%m')
966            if self._check_if_expire_or_risk(current_date_str):
967                LOG.info("Security patch has expired.")
968                item.final_risk = OHYaraConfig.FAIL.value
969                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
970            else:
971                LOG.info("Security patch is shorter than two months.")
972                item.final_risk = OHYaraConfig.PASS.value
973            item.complete = True
974            vul_items.append(item)
975
976        else:
977            vul_items = self._get_vul_items()
978            # parse version mapping file
979            mapping_info = self._do_parse_json(self.config.version_mapping_file)
980            os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
981
982            # check if system version in version mapping list
983            vul_version = os_full_name_list.get(self.system_version, None)
984            # not in the maintenance scope, skip all case
985            if not vul_version and "OpenHarmony" in self.system_version:
986                vul_version_list = self.system_version.split("-")[-1].split(".")[:2]
987                vul_version_list.append("0")
988                vul_version = ".".join(vul_version_list)
989            if vul_version is None:
990                LOG.debug("The system version is not in the maintenance scope, skip it. "
991                          "system versions is {}".format(self.system_version))
992            else:
993                for _, item in enumerate(vul_items):
994                    LOG.debug("Affected files: {}".format(item.affected_files))
995                    LOG.debug("Object type: {}".format(item.object_type))
996                    for index, affected_file in enumerate(item.affected_files):
997                        has_inter = False
998                        for i, _ in enumerate(item.affected_versions):
999                            if self._check_if_intersection(vul_version, item.affected_versions[i]):
1000                                has_inter = True
1001                                break
1002                        if not has_inter:
1003                            LOG.debug("Yara rule [{}] affected versions has no intersection "
1004                                      "in mapping version, skip it. Mapping version is {}, "
1005                                      "affected versions is {}".format(item.vul_id, vul_version,
1006                                                                       item.affected_versions))
1007                            continue
1008                        local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
1009                                                  request.get_module_name(), item.yara_rules[index].split('.')[0])
1010                        if not os.path.exists(local_path):
1011                            os.makedirs(local_path)
1012                        if item.object_type == "kernel_linux":
1013                            img_file = "/data/local/tmp/boot_linux.img"
1014                            package_file = self.kernel_packing(affected_file, img_file)
1015                            if not package_file:
1016                                LOG.error("Execute failed. Not found file named {}, "
1017                                          "please check the input".format(affected_file))
1018                                item.final_risk = OHYaraConfig.FAIL.value
1019                                item.trace = "Failed to pack the kernel file."
1020                                continue
1021                            self.config.device.pull_file(package_file, local_path)
1022                            affected_file = os.path.join(local_path, os.path.basename(package_file))
1023                        else:
1024                            self.config.device.pull_file(affected_file, local_path)
1025                            affected_file = os.path.join(local_path, os.path.basename(affected_file))
1026
1027                        if not os.path.exists(affected_file):
1028                            LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
1029                            item.final_risk = OHYaraConfig.PASS.value
1030                            continue
1031                        yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
1032                        if item.object_type == "kernel_linux":
1033                            affected_file_processed = self.file_process_kernel(affected_file, local_path)
1034                            if not affected_file_processed:
1035                                item.final_risk = OHYaraConfig.FAIL.value
1036                                item.trace = "Kernel file extraction error"
1037                                continue
1038                            cmd = [self.config.yara_bin, yara_file, affected_file_processed]
1039                        else:
1040                            cmd = [self.config.yara_bin, yara_file, affected_file]
1041                        result = exec_cmd(cmd)
1042                        LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
1043                        if "testcase pass" in result:
1044                            item.final_risk = OHYaraConfig.PASS.value
1045                            break
1046                        else:
1047                            if self._check_if_expire_or_risk(item.month, check_risk=True):
1048                                item.final_risk = OHYaraConfig.FAIL.value
1049                                item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
1050                                                           item.disclosure.get("zh", ""))
1051                            else:
1052                                item.final_risk = OHYaraConfig.BLOCK.value
1053                                item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
1054                        # if no risk delete files, if rule has risk keep it
1055                        if item.final_risk != OHYaraConfig.FAIL.value:
1056                            local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
1057                                                      request.get_module_name(), item.yara_rules[index].split('.')[0])
1058                            if os.path.exists(local_path):
1059                                LOG.debug(
1060                                    "Yara rule [{}] has no risk, remove affected files.".format(
1061                                        item.yara_rules[index]))
1062                                shutil.rmtree(local_path)
1063                    item.complete = True
1064        self._generate_yara_report(request, vul_items, message_list)
1065        self._generate_xml_report(request, vul_items, message_list)
1066
1067    def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False):
1068        from dateutil.relativedelta import relativedelta
1069        self.security_patch = self.security_patch.replace(' ', '')
1070        self.security_patch = self.security_patch.replace('/', '-')
1071        # get current date
1072        source_date = datetime.strptime(date_str, '%Y-%m')
1073        security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m')
1074        # check if expire 2 months
1075        rd = relativedelta(source_date, security_patch_date)
1076        months = rd.months + (rd.years * 12)
1077        if check_risk:
1078            # vul time before security patch time no risk
1079            LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}"
1080                      .format(self.security_patch[:-3], date_str, months))
1081            if months > 0:
1082                return False
1083            else:
1084                return True
1085        else:
1086            # check if security patch time expire current time 2 months
1087            LOG.debug("Security patch time: {}, current time: {}, delta_months: {}"
1088                      .format(self.security_patch[:-3], date_str, months))
1089            if months > expire_time:
1090                return True
1091            else:
1092                return False
1093
1094    @staticmethod
1095    def _check_if_intersection(source_version, dst_version):
1096        # para dst_less_sor control if dst less than source
1097        def _do_check(soruce, dst, dst_less_sor=True):
1098            if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \
1099                    re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst):
1100                source_vers = soruce.split(".")
1101                dst_vers = dst.split(".")
1102                for index, _ in enumerate(source_vers):
1103                    if dst_less_sor:
1104                        # check if all source number less than dst number
1105                        if int(source_vers[index]) < int(dst_vers[index]):
1106                            return False
1107                    else:
1108                        # check if all source number larger than dst number
1109                        if int(source_vers[index]) > int(dst_vers[index]):
1110                            return False
1111                return True
1112            return False
1113
1114        source_groups = source_version.split("-")
1115        dst_groups = dst_version.split("-")
1116        if source_version == dst_version:
1117            return True
1118        elif len(source_groups) == 1 and len(dst_groups) == 1:
1119            return source_version == dst_version
1120        elif len(source_groups) == 1 and len(dst_groups) == 2:
1121            return _do_check(source_groups[0], dst_groups[0]) and \
1122                   _do_check(source_groups[0], dst_groups[1], dst_less_sor=False)
1123        elif len(source_groups) == 2 and len(dst_groups) == 1:
1124            return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \
1125                   _do_check(source_groups[1], dst_groups[0])
1126        elif len(source_groups) == 2 and len(dst_groups) == 2:
1127            return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \
1128                   _do_check(source_groups[1], dst_groups[0])
1129        return False
1130
1131    def kernel_packing(self, affected_file, img_file):
1132        cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip()
1133        LOG.debug("kernel file detail: {}".format(cmd_result))
1134        if "No such file or directory" in cmd_result or "Not a directory" in cmd_result:
1135            affected_file = self.config.device.execute_shell_command("find /dev/block/platform "
1136                                                                     "-name boot_linux").strip()
1137            LOG.info("kernel path is : {}".format(affected_file))
1138            cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip()
1139            if "No such file or directory" in cmd_result:
1140                return False
1141        link_file = cmd_result.split(" ")[-1]
1142        pack_result = self.config.device.execute_shell_command(f"dd if={link_file} of={img_file}")
1143        LOG.debug("kernel package detail: {}".format(pack_result))
1144        if "No such file or directory" in pack_result:
1145            return False
1146        return img_file
1147
1148    def file_process_kernel(self, affected_file, local_path):
1149        # 内核文件解析慢,解析过一次放到公共目录下,该月份下用例共用
1150        dir_path = os.path.dirname(local_path)
1151        processed_file = os.path.join(dir_path, "vmlinux.elf")
1152        if os.path.exists(processed_file):
1153            LOG.debug("The kernel file has been extracted, will reuse the previous pasing file.")
1154            return processed_file
1155        # 1 解压
1156        try:
1157            exec_cmd("7z")
1158        except (OSError, NameError):
1159            LOG.error("Please install the command of 7z before running.")
1160            return False
1161        decompress_result = exec_cmd(f"7z x {affected_file} -o{local_path}")
1162        LOG.debug("kernel file decompress detail: {}".format(decompress_result))
1163        # 2 解析
1164        print("Kernel file extraction will take a few minutes, please wait patiently...")
1165        input_file = os.path.join(local_path, "extlinux", "Image")
1166        output_file = processed_file
1167        if not input_file:
1168            LOG.error("An error occurred when decompressing the kernel file.")
1169            return False
1170        parse_cmd = [self.config.vmlinux_to_elf_bin, input_file, output_file]
1171        parse_result = exec_cmd(parse_cmd)
1172        if "error" in parse_result:
1173            LOG.error("An error occurred when pasing the kernel file.")
1174            return False
1175        LOG.info("Kernel file extraction successful.")
1176        return output_file
1177
1178    def _get_vul_items(self):
1179        vul_items = list()
1180        vul_info = self._do_parse_json(self.config.vul_info_file)
1181        vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, [])
1182        for _, vul in enumerate(vulnerabilities):
1183            affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
1184            item = VulItem()
1185            item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.CVE.value, "")
1186            item.affected_versions = affected_versions
1187            item.month = vul.get(OHYaraConfig.MONTH.value, "")
1188            item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
1189            item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
1190            item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
1191            item.object_type = vul.get(OHYaraConfig.OBJECT_TYPE.value, "")
1192            item.affected_files = \
1193                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1194                    OHYaraConfig.AFFECTED_FILES.value, [])
1195            item.yara_rules = \
1196                vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
1197                    OHYaraConfig.YARA_RULES.value, [])
1198            vul_items.append(item)
1199        LOG.debug("Vul size is {}".format(len(vul_items)))
1200        return vul_items
1201
1202    @staticmethod
1203    def _do_parse_json(file_path):
1204        json_content = None
1205        if not os.path.exists(file_path):
1206            raise ParamError("The json file {} does not exist".format(
1207                file_path), error_no="00110")
1208        flags = os.O_RDONLY
1209        modes = stat.S_IWUSR | stat.S_IRUSR
1210        with os.fdopen(os.open(file_path, flags, modes),
1211                       "r", encoding="utf-8") as file_content:
1212            json_content = json.load(file_content)
1213        if json_content is None:
1214            raise ParamError("The json file {} parse error".format(
1215                file_path), error_no="00110")
1216        return json_content
1217
1218    def _get_full_name_by_tool_hap(self):
1219        # check if tool hap has installed
1220        result = self.config.device.execute_shell_command(
1221            "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1222        LOG.debug(result)
1223        if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result:
1224            hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value))
1225            self.config.device.push_file(hap_path, "/data/local/tmp")
1226            result = self.config.device.execute_shell_command(
1227                "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path)))
1228            LOG.debug(result)
1229            self.config.device.execute_shell_command(
1230                "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format(
1231                    self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1232        self.config.device.execute_shell_command(
1233            "aa start -a {}.MainAbility -b {}".format(
1234                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value),
1235                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
1236        time.sleep(1)
1237        self.system_version = self.config.device.execute_shell_command(
1238            "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format(
1239                self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '')
1240        LOG.debug(self.system_version)
1241
1242    def _generate_yara_report(self, request, vul_items, result_message):
1243        import csv
1244        result_message.clear()
1245        yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv"
1246                                   .format(request.config.device.device_sn))
1247        if os.path.exists(yara_report):
1248            data = []
1249        else:
1250            data = [
1251                ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch],
1252                ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"]
1253            ]
1254        fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
1255        for _, item in enumerate(vul_items):
1256            data.append([item.vul_id, item.severity,
1257                         item.month, item.final_risk,
1258                         item.disclosure.get("zh", ""), item.vul_description.get("zh", "")])
1259            result = "{}|{}|{}|{}|{}|{}|{}\n".format(
1260                item.vul_id, item.severity,
1261                item.month, item.final_risk,
1262                item.disclosure.get("zh", ""), item.vul_description.get("zh", ""),
1263                item.trace)
1264            result_message.append(result)
1265        with os.fdopen(fd, "a", newline='') as file_handler:
1266            writer = csv.writer(file_handler)
1267            writer.writerows(data)
1268
1269    def _generate_xml_report(self, request, vul_items, message_list):
1270        result_message = "".join(message_list)
1271        listener_copy = request.listeners.copy()
1272        parsers = get_plugin(
1273            Plugin.PARSER, CommonParserType.oh_yara)
1274        if parsers:
1275            parsers = parsers[:1]
1276        for listener in listener_copy:
1277            listener.device_sn = self.config.device.device_sn
1278        parser_instances = []
1279        for parser in parsers:
1280            parser_instance = parser.__class__()
1281            parser_instance.suites_name = request.get_module_name()
1282            parser_instance.vul_items = vul_items
1283            parser_instance.listeners = listener_copy
1284            parser_instances.append(parser_instance)
1285        handler = ShellHandler(parser_instances)
1286        process_command_ret(result_message, handler)
1287
1288    def __result__(self):
1289        return self.result if os.path.exists(self.result) else ""
1290
1291    @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test)
1292    class ValidatorTestDriver(IDriver):
1293
1294        def __init__(self):
1295            self.error_message = ""
1296            self.xml_path = ""
1297            self.result = ""
1298            self.config = None
1299            self.kits = []
1300
1301        def __check_environment__(self, device_options):
1302            pass
1303
1304        def __check_config__(self, config):
1305            pass
1306
1307        def __execute__(self, request):
1308            try:
1309                self.result = os.path.join(
1310                    request.config.report_path, "result",
1311                    ".".join((request.get_module_name(), "xml")))
1312                self.config = request.config
1313                self.config.device = request.config.environment.devices[0]
1314                config_file = request.root.source.config_file
1315                self._run_validate_test(config_file, request)
1316            except Exception as exception:
1317                self.error_message = exception
1318                if not getattr(exception, "error_no", ""):
1319                    setattr(exception, "error_no", "03409")
1320                LOG.exception(self.error_message, exc_info=True, error_no="03409")
1321                raise exception
1322            finally:
1323                self.result = check_result_report(request.config.report_path,
1324                                                  self.result, self.error_message)
1325
1326        def _run_validate_test(self, config_file, request):
1327            is_update = False
1328            try:
1329                if "update" in self.config.testargs.keys():
1330                    if dict(self.config.testargs).get("update")[0] == "true":
1331                        is_update = True
1332                json_config = JsonParser(config_file)
1333                self.kits = get_kit_instances(json_config, self.config.resource_path,
1334                                              self.config.testcases_path)
1335                self._get_driver_config(json_config)
1336                if is_update:
1337                    do_module_kit_setup(request, self.kits)
1338                while True:
1339                    print("Is test finished?Y/N")
1340                    usr_input = input(">>>")
1341                    if usr_input == "Y" or usr_input == "y":
1342                        LOG.debug("Finish current test")
1343                        break
1344                    else:
1345                        print("continue")
1346                        LOG.debug("Your input is:{}, continue".format(usr_input))
1347                if self.xml_path:
1348                    result_dir = os.path.join(request.config.report_path, "result")
1349                    if not os.path.exists(result_dir):
1350                        os.makedirs(result_dir)
1351                    self.config.device.pull_file(self.xml_path, self.result)
1352            finally:
1353                if is_update:
1354                    do_module_kit_teardown(request)
1355
1356        def _get_driver_config(self, json_config):
1357            self.xml_path = get_config_value("xml_path", json_config.get_driver(), False)
1358        def __result__(self):
1359            return self.result if os.path.exists(self.result) else ""
1360