1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21import json 22import stat 23import shutil 24import re 25from datetime import datetime 26from enum import Enum 27 28from xdevice import ConfigConst 29from xdevice import ParamError 30from xdevice import IDriver 31from xdevice import platform_logger 32from xdevice import Plugin 33from xdevice import get_plugin 34from xdevice import JsonParser 35from xdevice import ShellHandler 36from xdevice import driver_output_method 37from xdevice import TestDescription 38from xdevice import get_device_log_file 39from xdevice import check_result_report 40from xdevice import get_kit_instances 41from xdevice import get_config_value 42from xdevice import do_module_kit_setup 43from xdevice import do_module_kit_teardown 44from xdevice import DeviceTestType 45from xdevice import CommonParserType 46from xdevice import FilePermission 47from xdevice import ResourceManager 48from xdevice import get_file_absolute_path 49from xdevice import exec_cmd 50 51from ohos.executor.listener import CollectingPassListener 52from ohos.constants import CKit 53from ohos.environment.dmlib import process_command_ret 54 55__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver", 56 "OHYaraTestDriver", "oh_jsunit_para_parse"] 57 58TIME_OUT = 300 * 1000 59 60LOG = platform_logger("OpenHarmony") 61 62 63def oh_jsunit_para_parse(runner, junit_paras): 64 junit_paras = dict(junit_paras) 65 test_type_list = ["function", "performance", "reliability", "security"] 66 size_list = ["small", "medium", "large"] 67 level_list = ["0", "1", "2", "3"] 68 for para_name in junit_paras.keys(): 69 para_name = para_name.strip() 70 para_values = junit_paras.get(para_name, []) 71 if para_name == "class": 72 runner.add_arg(para_name, ",".join(para_values)) 73 elif para_name == "notClass": 74 runner.add_arg(para_name, ",".join(para_values)) 75 elif para_name == "testType": 76 if para_values[0] not in test_type_list: 77 continue 78 # function/performance/reliability/security 79 runner.add_arg(para_name, para_values[0]) 80 elif para_name == "size": 81 if para_values[0] not in size_list: 82 continue 83 # size small/medium/large 84 runner.add_arg(para_name, para_values[0]) 85 elif para_name == "level": 86 if para_values[0] not in level_list: 87 continue 88 # 0/1/2/3/4 89 runner.add_arg(para_name, para_values[0]) 90 elif para_name == "stress": 91 runner.add_arg(para_name, para_values[0]) 92 93 94@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test) 95class OHKernelTestDriver(IDriver): 96 """ 97 OpenHarmonyKernelTest 98 """ 99 def __init__(self): 100 self.timeout = 30 * 1000 101 self.result = "" 102 self.error_message = "" 103 self.kits = [] 104 self.config = None 105 self.runner = None 106 # log 107 self.device_log = None 108 self.hilog = None 109 self.log_proc = None 110 self.hilog_proc = None 111 112 def __check_environment__(self, device_options): 113 pass 114 115 def __check_config__(self, config): 116 pass 117 118 def __execute__(self, request): 119 try: 120 LOG.debug("Start to Execute OpenHarmony Kernel Test") 121 122 self.config = request.config 123 self.config.device = request.config.environment.devices[0] 124 125 config_file = request.root.source.config_file 126 127 self.result = "%s.xml" % \ 128 os.path.join(request.config.report_path, 129 "result", request.get_module_name()) 130 self.device_log = get_device_log_file( 131 request.config.report_path, 132 request.config.device.__get_serial__(), 133 "device_log", 134 module_name=request.get_module_name()) 135 136 self.hilog = get_device_log_file( 137 request.config.report_path, 138 request.config.device.__get_serial__(), 139 "device_hilog", 140 module_name=request.get_module_name()) 141 142 device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT | 143 os.O_APPEND, FilePermission.mode_755) 144 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 145 FilePermission.mode_755) 146 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 147 with os.fdopen(device_log_open, "a") as log_file_pipe, \ 148 os.fdopen(hilog_open, "a") as hilog_file_pipe: 149 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 150 start_catch_device_log(log_file_pipe, hilog_file_pipe) 151 self._run_oh_kernel(config_file, request.listeners, request) 152 log_file_pipe.flush() 153 hilog_file_pipe.flush() 154 except Exception as exception: 155 self.error_message = exception 156 if not getattr(exception, "error_no", ""): 157 setattr(exception, "error_no", "03409") 158 LOG.exception(self.error_message, exc_info=False, error_no="03409") 159 raise exception 160 finally: 161 do_module_kit_teardown(request) 162 self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog) 163 self.config.device.device_log_collector.stop_catch_device_log(self.log_proc) 164 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 165 self.result = check_result_report( 166 request.config.report_path, self.result, self.error_message) 167 168 def _run_oh_kernel(self, config_file, listeners=None, request=None): 169 try: 170 json_config = JsonParser(config_file) 171 kits = get_kit_instances(json_config, self.config.resource_path, 172 self.config.testcases_path) 173 self._get_driver_config(json_config) 174 do_module_kit_setup(request, kits) 175 self.runner = OHKernelTestRunner(self.config) 176 self.runner.suite_name = request.get_module_name() 177 self.runner.run(listeners) 178 finally: 179 do_module_kit_teardown(request) 180 181 def _get_driver_config(self, json_config): 182 target_test_path = get_config_value('native-test-device-path', 183 json_config.get_driver(), False) 184 test_suite_name = get_config_value('test-suite-name', 185 json_config.get_driver(), False) 186 test_suites_list = get_config_value('test-suites-list', 187 json_config.get_driver(), False) 188 timeout_limit = get_config_value('timeout-limit', 189 json_config.get_driver(), False) 190 conf_file = get_config_value('conf-file', 191 json_config.get_driver(), False) 192 self.config.arg_list = {} 193 if target_test_path: 194 self.config.target_test_path = target_test_path 195 if test_suite_name: 196 self.config.arg_list["test-suite-name"] = test_suite_name 197 if test_suites_list: 198 self.config.arg_list["test-suites-list"] = test_suites_list 199 if timeout_limit: 200 self.config.arg_list["timeout-limit"] = timeout_limit 201 if conf_file: 202 self.config.arg_list["conf-file"] = conf_file 203 timeout_config = get_config_value('shell-timeout', 204 json_config.get_driver(), False) 205 if timeout_config: 206 self.config.timeout = int(timeout_config) 207 else: 208 self.config.timeout = TIME_OUT 209 210 def __result__(self): 211 return self.result if os.path.exists(self.result) else "" 212 213 214class OHKernelTestRunner: 215 def __init__(self, config): 216 self.suite_name = None 217 self.config = config 218 self.arg_list = config.arg_list 219 220 def run(self, listeners): 221 handler = self._get_shell_handler(listeners) 222 # hdc shell cd /data/local/tmp/OH_kernel_test; 223 # sh runtest test -t OpenHarmony_RK3568_config 224 # -n OpenHarmony_RK3568_skiptest -l 60 225 command = "cd %s; chmod +x *; sh runtest test %s" % ( 226 self.config.target_test_path, self.get_args_command()) 227 self.config.device.execute_shell_command( 228 command, timeout=self.config.timeout, receiver=handler, retry=0) 229 230 def _get_shell_handler(self, listeners): 231 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test) 232 if parsers: 233 parsers = parsers[:1] 234 parser_instances = [] 235 for parser in parsers: 236 parser_instance = parser.__class__() 237 parser_instance.suites_name = self.suite_name 238 parser_instance.listeners = listeners 239 parser_instances.append(parser_instance) 240 handler = ShellHandler(parser_instances) 241 return handler 242 243 def get_args_command(self): 244 args_commands = "" 245 for key, value in self.arg_list.items(): 246 if key == "test-suite-name" or key == "test-suites-list": 247 args_commands = "%s -t %s" % (args_commands, value) 248 elif key == "conf-file": 249 args_commands = "%s -n %s" % (args_commands, value) 250 elif key == "timeout-limit": 251 args_commands = "%s -l %s" % (args_commands, value) 252 return args_commands 253 254 255@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 256class OHJSUnitTestDriver(IDriver): 257 """ 258 OHJSUnitTestDriver is a Test that runs a native test package on 259 given device. 260 """ 261 262 def __init__(self): 263 self.timeout = 80 * 1000 264 self.start_time = None 265 self.result = "" 266 self.error_message = "" 267 self.kits = [] 268 self.config = None 269 self.runner = None 270 self.rerun = True 271 self.rerun_all = True 272 # log 273 self.device_log = None 274 self.hilog = None 275 self.log_proc = None 276 self.hilog_proc = None 277 278 def __check_environment__(self, device_options): 279 pass 280 281 def __check_config__(self, config): 282 pass 283 284 def __execute__(self, request): 285 try: 286 LOG.debug("Start execute OpenHarmony JSUnitTest") 287 self.result = os.path.join( 288 request.config.report_path, "result", 289 '.'.join((request.get_module_name(), "xml"))) 290 self.config = request.config 291 self.config.device = request.config.environment.devices[0] 292 293 config_file = request.root.source.config_file 294 suite_file = request.root.source.source_file 295 296 if not suite_file: 297 raise ParamError( 298 "test source '%s' not exists" % 299 request.root.source.source_string, error_no="00110") 300 LOG.debug("Test case file path: %s" % suite_file) 301 self.config.device.set_device_report_path(request.config.report_path) 302 self.hilog = get_device_log_file( 303 request.config.report_path, 304 request.config.device.__get_serial__(), 305 "device_hilog", 306 module_name=request.get_module_name()) 307 308 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 309 0o755) 310 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 311 self.config.device.execute_shell_command(command="hilog -r") 312 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 313 if hasattr(self.config, ConfigConst.device_log) \ 314 and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 315 and hasattr(self.config.device, "clear_crash_log"): 316 self.config.device.device_log_collector.clear_crash_log() 317 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 318 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 319 self._run_oh_jsunit(config_file, request) 320 except Exception as exception: 321 self.error_message = exception 322 if not getattr(exception, "error_no", ""): 323 setattr(exception, "error_no", "03409") 324 LOG.exception(self.error_message, exc_info=True, error_no="03409") 325 raise exception 326 finally: 327 try: 328 self._handle_logs(request) 329 finally: 330 self.result = check_result_report( 331 request.config.report_path, self.result, self.error_message) 332 333 def __dry_run_execute__(self, request): 334 LOG.debug("Start dry run xdevice JSUnit Test") 335 self.config = request.config 336 self.config.device = request.config.environment.devices[0] 337 config_file = request.root.source.config_file 338 suite_file = request.root.source.source_file 339 340 if not suite_file: 341 raise ParamError( 342 "test source '%s' not exists" % 343 request.root.source.source_string, error_no="00110") 344 LOG.debug("Test case file path: %s" % suite_file) 345 self._dry_run_oh_jsunit(config_file, request) 346 347 def _dry_run_oh_jsunit(self, config_file, request): 348 try: 349 if not os.path.exists(config_file): 350 LOG.error("Error: Test cases don't exist %s." % config_file) 351 raise ParamError( 352 "Error: Test cases don't exist %s." % config_file, 353 error_no="00102") 354 json_config = JsonParser(config_file) 355 self.kits = get_kit_instances(json_config, 356 self.config.resource_path, 357 self.config.testcases_path) 358 359 self._get_driver_config(json_config) 360 self.config.device.connector_command("target mount") 361 do_module_kit_setup(request, self.kits) 362 self.runner = OHJSUnitTestRunner(self.config) 363 self.runner.suites_name = request.get_module_name() 364 # execute test case 365 self._get_runner_config(json_config) 366 oh_jsunit_para_parse(self.runner, self.config.testargs) 367 368 test_to_run = self._collect_test_to_run() 369 LOG.info("Collected suite count is: {}, test count is: {}". 370 format(len(self.runner.expect_tests_dict.keys()), 371 len(test_to_run) if test_to_run else 0)) 372 finally: 373 do_module_kit_teardown(request) 374 375 def _run_oh_jsunit(self, config_file, request): 376 try: 377 if not os.path.exists(config_file): 378 LOG.error("Error: Test cases don't exist %s." % config_file) 379 raise ParamError( 380 "Error: Test cases don't exist %s." % config_file, 381 error_no="00102") 382 json_config = JsonParser(config_file) 383 self.kits = get_kit_instances(json_config, 384 self.config.resource_path, 385 self.config.testcases_path) 386 387 self._get_driver_config(json_config) 388 self.config.device.connector_command("target mount") 389 self._start_smart_perf() 390 do_module_kit_setup(request, self.kits) 391 self.runner = OHJSUnitTestRunner(self.config) 392 self.runner.suites_name = request.get_module_name() 393 self._get_runner_config(json_config) 394 if hasattr(self.config, "history_report_path") and \ 395 self.config.testargs.get("test"): 396 self._do_test_retry(request.listeners, self.config.testargs) 397 else: 398 if self.rerun: 399 self.runner.retry_times = self.runner.MAX_RETRY_TIMES 400 # execute test case 401 self._do_tf_suite() 402 self._make_exclude_list_file(request) 403 oh_jsunit_para_parse(self.runner, self.config.testargs) 404 self._do_test_run(listener=request.listeners) 405 406 finally: 407 do_module_kit_teardown(request) 408 409 def _get_driver_config(self, json_config): 410 package = get_config_value('package-name', 411 json_config.get_driver(), False) 412 module = get_config_value('module-name', 413 json_config.get_driver(), False) 414 bundle = get_config_value('bundle-name', 415 json_config. get_driver(), False) 416 is_rerun = get_config_value('rerun', json_config.get_driver(), False) 417 418 self.config.package_name = package 419 self.config.module_name = module 420 self.config.bundle_name = bundle 421 self.rerun = True if is_rerun == 'true' else False 422 423 if not package and not module: 424 raise ParamError("Neither package nor module is found" 425 " in config file.", error_no="03201") 426 timeout_config = get_config_value("shell-timeout", 427 json_config.get_driver(), False) 428 if timeout_config: 429 self.config.timeout = int(timeout_config) 430 else: 431 self.config.timeout = TIME_OUT 432 433 def _get_runner_config(self, json_config): 434 test_timeout = get_config_value('test-timeout', 435 json_config.get_driver(), False) 436 if test_timeout: 437 self.runner.add_arg("wait_time", int(test_timeout)) 438 439 testcase_timeout = get_config_value('testcase-timeout', 440 json_config.get_driver(), False) 441 if testcase_timeout: 442 self.runner.add_arg("timeout", int(testcase_timeout)) 443 self.runner.compile_mode = get_config_value( 444 'compile-mode', json_config.get_driver(), False) 445 446 def _do_test_run(self, listener): 447 test_to_run = self._collect_test_to_run() 448 LOG.info("Collected suite count is: {}, test count is: {}". 449 format(len(self.runner.expect_tests_dict.keys()), 450 len(test_to_run) if test_to_run else 0)) 451 if not test_to_run or not self.rerun: 452 self.runner.run(listener) 453 self.runner.notify_finished() 454 else: 455 self._run_with_rerun(listener, test_to_run) 456 457 def _collect_test_to_run(self): 458 run_results = self.runner.dry_run() 459 return run_results 460 461 def _run_tests(self, listener): 462 test_tracker = CollectingPassListener() 463 listener_copy = listener.copy() 464 listener_copy.append(test_tracker) 465 self.runner.run(listener_copy) 466 test_run = test_tracker.get_current_run_results() 467 return test_run 468 469 def _run_with_rerun(self, listener, expected_tests): 470 LOG.debug("Ready to run with rerun, expect run: %s" 471 % len(expected_tests)) 472 test_run = self._run_tests(listener) 473 self.runner.retry_times -= 1 474 LOG.debug("Run with rerun, has run: %s" % len(test_run) 475 if test_run else 0) 476 if len(test_run) < len(expected_tests): 477 expected_tests = TestDescription.remove_test(expected_tests, 478 test_run) 479 if not expected_tests: 480 LOG.debug("No tests to re-run twice,please check") 481 self.runner.notify_finished() 482 else: 483 self._rerun_twice(expected_tests, listener) 484 else: 485 LOG.debug("Rerun once success") 486 self.runner.notify_finished() 487 488 def _rerun_twice(self, expected_tests, listener): 489 tests = [] 490 for test in expected_tests: 491 tests.append("%s#%s" % (test.class_name, test.test_name)) 492 self.runner.add_arg("class", ",".join(tests)) 493 LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests)) 494 test_run = self._run_tests(listener) 495 self.runner.retry_times -= 1 496 LOG.debug("Rerun twice, has run: %s" % len(test_run)) 497 if len(test_run) < len(expected_tests): 498 expected_tests = TestDescription.remove_test(expected_tests, 499 test_run) 500 if not expected_tests: 501 LOG.debug("No tests to re-run third,please check") 502 self.runner.notify_finished() 503 else: 504 self._rerun_third(expected_tests, listener) 505 else: 506 LOG.debug("Rerun twice success") 507 self.runner.notify_finished() 508 509 def _rerun_third(self, expected_tests, listener): 510 tests = [] 511 for test in expected_tests: 512 tests.append("%s#%s" % (test.class_name, test.test_name)) 513 self.runner.add_arg("class", ",".join(tests)) 514 LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests)) 515 self._run_tests(listener) 516 LOG.debug("Rerun third success") 517 self.runner.notify_finished() 518 519 def _make_exclude_list_file(self, request): 520 if "all-test-file-exclude-filter" in self.config.testargs: 521 json_file_list = self.config.testargs.get( 522 "all-test-file-exclude-filter") 523 self.config.testargs.pop("all-test-file-exclude-filter") 524 if not json_file_list: 525 LOG.warning("all-test-file-exclude-filter value is empty!") 526 else: 527 if not os.path.isfile(json_file_list[0]): 528 LOG.warning( 529 "[{}] is not a valid file".format(json_file_list[0])) 530 return 531 file_open = os.open(json_file_list[0], os.O_RDONLY, 532 stat.S_IWUSR | stat.S_IRUSR) 533 with os.fdopen(file_open, "r") as file_handler: 534 json_data = json.load(file_handler) 535 exclude_list = json_data.get( 536 DeviceTestType.oh_jsunit_test, []) 537 filter_list = [] 538 for exclude in exclude_list: 539 if request.get_module_name() not in exclude: 540 continue 541 filter_list.extend(exclude.get(request.get_module_name())) 542 if not isinstance(self.config.testargs, dict): 543 return 544 if 'notClass' in self.config.testargs.keys(): 545 filter_list.extend(self.config.testargs.get('notClass', [])) 546 self.config.testargs.update({"notClass": filter_list}) 547 548 def _do_test_retry(self, listener, testargs): 549 tests_dict = dict() 550 case_list = list() 551 for test in testargs.get("test"): 552 test_item = test.split("#") 553 if len(test_item) != 2: 554 continue 555 case_list.append(test) 556 if test_item[0] not in tests_dict: 557 tests_dict.update({test_item[0] : []}) 558 tests_dict.get(test_item[0]).append( 559 TestDescription(test_item[0], test_item[1])) 560 self.runner.add_arg("class", ",".join(case_list)) 561 self.runner.expect_tests_dict = tests_dict 562 self.config.testargs.pop("test") 563 self.runner.run(listener) 564 self.runner.notify_finished() 565 566 def _do_tf_suite(self): 567 if hasattr(self.config, "tf_suite") and \ 568 self.config.tf_suite.get("cases", []): 569 case_list = self.config["tf_suite"]["cases"] 570 self.config.testargs.update({"class": case_list}) 571 572 def _start_smart_perf(self): 573 if not hasattr(self.config, ConfigConst.kits_in_module): 574 return 575 if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module): 576 return 577 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 578 sp_kits.target_name = self.config.bundle_name 579 param_config = self.config.get(ConfigConst.kits_params).get( 580 CKit.smartperf, "") 581 sp_kits.__check_config__(param_config) 582 self.kits.insert(0, sp_kits) 583 584 def _handle_logs(self, request): 585 serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns()) 586 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 587 if hasattr(self.config, ConfigConst.device_log) and \ 588 self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 589 and hasattr(self.config.device, "start_get_crash_log"): 590 self.config.device.device_log_collector.\ 591 start_get_crash_log(log_tar_file_name, module_name=request.get_module_name()) 592 self.config.device.device_log_collector.\ 593 remove_log_address(self.device_log, self.hilog) 594 self.config.device.device_log_collector.\ 595 stop_catch_device_log(self.log_proc) 596 self.config.device.device_log_collector.\ 597 stop_catch_device_log(self.hilog_proc) 598 599 def __result__(self): 600 return self.result if os.path.exists(self.result) else "" 601 602 603class OHJSUnitTestRunner: 604 MAX_RETRY_TIMES = 3 605 606 def __init__(self, config): 607 self.arg_list = {} 608 self.suites_name = None 609 self.config = config 610 self.rerun_attemp = 3 611 self.suite_recorder = {} 612 self.finished = False 613 self.expect_tests_dict = dict() 614 self.finished_observer = None 615 self.retry_times = 1 616 self.compile_mode = "" 617 618 def dry_run(self): 619 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 620 if parsers: 621 parsers = parsers[:1] 622 parser_instances = [] 623 for parser in parsers: 624 parser_instance = parser.__class__() 625 parser_instances.append(parser_instance) 626 handler = ShellHandler(parser_instances) 627 handler.add_process_method(driver_output_method) 628 command = self._get_dry_run_command() 629 self.config.device.execute_shell_command( 630 command, timeout=self.config.timeout, receiver=handler, retry=0) 631 self.expect_tests_dict = parser_instances[0].tests_dict 632 return parser_instances[0].tests 633 634 def run(self, listener): 635 handler = self._get_shell_handler(listener) 636 command = self._get_run_command() 637 self.config.device.execute_shell_command( 638 command, timeout=self.config.timeout, receiver=handler, retry=0) 639 640 def notify_finished(self): 641 if self.finished_observer: 642 self.finished_observer.notify_task_finished() 643 self.retry_times -= 1 644 645 def _get_shell_handler(self, listener): 646 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 647 if parsers: 648 parsers = parsers[:1] 649 parser_instances = [] 650 for parser in parsers: 651 parser_instance = parser.__class__() 652 parser_instance.suites_name = self.suites_name 653 parser_instance.listeners = listener 654 parser_instance.runner = self 655 parser_instances.append(parser_instance) 656 self.finished_observer = parser_instance 657 handler = ShellHandler(parser_instances) 658 return handler 659 660 def add_arg(self, name, value): 661 if not name or not value: 662 return 663 self.arg_list[name] = value 664 665 def remove_arg(self, name): 666 if not name: 667 return 668 if name in self.arg_list: 669 del self.arg_list[name] 670 671 def get_args_command(self): 672 args_commands = "" 673 for key, value in self.arg_list.items(): 674 if "wait_time" == key: 675 args_commands = "%s -w %s " % (args_commands, value) 676 else: 677 args_commands = "%s -s %s %s " % (args_commands, key, value) 678 return args_commands 679 680 def _get_run_command(self): 681 command = "" 682 if self.config.package_name: 683 # aa test -p ${packageName} -b ${bundleName}-s 684 # unittest OpenHarmonyTestRunner 685 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 686 " {}".format(self.config.package_name, 687 self.config.bundle_name, 688 self.get_args_command()) 689 elif self.config.module_name: 690 # aa test -m ${moduleName} -b ${bundleName} 691 # -s unittest OpenHarmonyTestRunner 692 command = "aa test -m {} -b {} -s unittest {} {}".format( 693 self.config.module_name, self.config.bundle_name, 694 self.get_oh_test_runner_path(), self.get_args_command()) 695 return command 696 697 def _get_dry_run_command(self): 698 command = "" 699 if self.config.package_name: 700 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 701 " {} -s dryRun true".format(self.config.package_name, 702 self.config.bundle_name, 703 self.get_args_command()) 704 elif self.config.module_name: 705 command = "aa test -m {} -b {} -s unittest {}" \ 706 " {} -s dryRun true".format(self.config.module_name, 707 self.config.bundle_name, 708 self.get_oh_test_runner_path(), 709 self.get_args_command()) 710 711 return command 712 713 def get_oh_test_runner_path(self): 714 if self.compile_mode == "esmodule": 715 return "/ets/testrunner/OpenHarmonyTestRunner" 716 else: 717 return "OpenHarmonyTestRunner" 718 719 720@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 721class OHRustTestDriver(IDriver): 722 def __init__(self): 723 self.result = "" 724 self.error_message = "" 725 self.config = None 726 727 def __check_environment__(self, device_options): 728 pass 729 730 def __check_config__(self, config): 731 pass 732 733 def __execute__(self, request): 734 try: 735 LOG.debug("Start to execute open harmony rust test") 736 self.config = request.config 737 self.config.device = request.config.environment.devices[0] 738 self.config.target_test_path = "/system/bin" 739 740 suite_file = request.root.source.source_file 741 LOG.debug("Testsuite filepath:{}".format(suite_file)) 742 743 if not suite_file: 744 LOG.error("test source '{}' not exists".format( 745 request.root.source.source_string)) 746 return 747 748 self.result = "{}.xml".format( 749 os.path.join(request.config.report_path, 750 "result", request.get_module_name())) 751 self.config.device.set_device_report_path(request.config.report_path) 752 self.config.device.device_log_collector.start_hilog_task() 753 self._init_oh_rust() 754 self._run_oh_rust(suite_file, request) 755 except Exception as exception: 756 self.error_message = exception 757 if not getattr(exception, "error_no", ""): 758 setattr(exception, "error_no", "03409") 759 LOG.exception(self.error_message, exc_info=False, error_no="03409") 760 finally: 761 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 762 time.time_ns()) 763 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 764 self.config.device.device_log_collector.stop_hilog_task( 765 log_tar_file_name, module_name=request.get_module_name()) 766 self.result = check_result_report( 767 request.config.report_path, self.result, self.error_message) 768 769 def _init_oh_rust(self): 770 self.config.device.connector_command("target mount") 771 self.config.device.execute_shell_command( 772 "mount -o rw,remount,rw /") 773 774 def _run_oh_rust(self, suite_file, request=None): 775 # push testsuite file 776 self.config.device.push_file(suite_file, self.config.target_test_path) 777 # push resource file 778 resource_manager = ResourceManager() 779 resource_data_dict, resource_dir = \ 780 resource_manager.get_resource_data_dic(suite_file) 781 resource_manager.process_preparer_data(resource_data_dict, 782 resource_dir, 783 self.config.device) 784 for listener in request.listeners: 785 listener.device_sn = self.config.device.device_sn 786 787 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 788 if parsers: 789 parsers = parsers[:1] 790 parser_instances = [] 791 for parser in parsers: 792 parser_instance = parser.__class__() 793 parser_instance.suite_name = request.get_module_name() 794 parser_instance.listeners = request.listeners 795 parser_instances.append(parser_instance) 796 handler = ShellHandler(parser_instances) 797 798 command = "cd {}; chmod +x *; ./{}".format( 799 self.config.target_test_path, os.path.basename(suite_file)) 800 self.config.device.execute_shell_command( 801 command, timeout=TIME_OUT, receiver=handler, retry=0) 802 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 803 self.config.device) 804 805 def __result__(self): 806 return self.result if os.path.exists(self.result) else "" 807 808 809class OHYaraConfig(Enum): 810 HAP_FILE = "hap-file" 811 BUNDLE_NAME = "bundle-name" 812 CLEANUP_APPS = "cleanup-apps" 813 814 OS_FULLNAME_LIST = "osFullNameList" 815 VULNERABILITIES = "vulnerabilities" 816 VUL_ID = "vul_id" 817 OPENHARMONY_SA = "openharmony-sa" 818 CVE = "cve" 819 AFFECTED_VERSION = "affected_versions" 820 MONTH = "month" 821 SEVERITY = "severity" 822 VUL_DESCRIPTION = "vul_description" 823 DISCLOSURE = "disclosure" 824 OBJECT_TYPE = "object_type" 825 AFFECTED_FILES = "affected_files" 826 YARA_RULES = "yara_rules" 827 828 PASS = "pass" 829 FAIL = "fail" 830 BLOCK = "block" 831 832 ERROR_MSG_001 = "The patch label is longer than two months (60 days), which violates the OHCA agreement." 833 ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed." 834 ERROR_MSG_003 = "Modify the code according to the patch requirements: " 835 836 837class VulItem: 838 vul_id = "" 839 month = "" 840 severity = "" 841 vul_description = dict() 842 disclosure = dict() 843 object_type = "" 844 affected_files = "" 845 affected_versions = "" 846 yara_rules = "" 847 trace = "" 848 final_risk = OHYaraConfig.PASS.value 849 complete = False 850 851 852@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test) 853class OHYaraTestDriver(IDriver): 854 def __init__(self): 855 self.result = "" 856 self.error_message = "" 857 self.config = None 858 self.tool_hap_info = dict() 859 self.security_patch = None 860 self.system_version = None 861 862 def __check_environment__(self, device_options): 863 pass 864 865 def __check_config__(self, config): 866 pass 867 868 def __execute__(self, request): 869 try: 870 LOG.debug("Start to execute open harmony yara test") 871 self.result = os.path.join( 872 request.config.report_path, "result", 873 '.'.join((request.get_module_name(), "xml"))) 874 self.config = request.config 875 self.config.device = request.config.environment.devices[0] 876 877 config_file = request.root.source.config_file 878 suite_file = request.root.source.source_file 879 880 if not suite_file: 881 raise ParamError( 882 "test source '%s' not exists" % 883 request.root.source.source_string, error_no="00110") 884 LOG.debug("Test case file path: %s" % suite_file) 885 self.config.device.set_device_report_path(request.config.report_path) 886 self._run_oh_yara(config_file, request) 887 888 except Exception as exception: 889 self.error_message = exception 890 if not getattr(exception, "error_no", ""): 891 setattr(exception, "error_no", "03409") 892 LOG.exception(self.error_message, exc_info=False, error_no="03409") 893 finally: 894 if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value): 895 cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)] 896 result = self.config.device.connector_command(cmd) 897 LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format( 898 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result)) 899 900 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 901 time.time_ns()) 902 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 903 self.config.device.device_log_collector.stop_hilog_task( 904 log_tar_file_name, module_name=request.get_module_name()) 905 906 self.result = check_result_report( 907 request.config.report_path, self.result, self.error_message) 908 909 def _get_driver_config(self, json_config): 910 yara_bin = get_config_value('yara-bin', 911 json_config.get_driver(), False) 912 vmlinux_to_elf_bin = get_config_value('vmlinux-to-elf-bin', 913 json_config.get_driver(), False) 914 version_mapping_file = get_config_value('version-mapping-file', 915 json_config.get_driver(), False) 916 vul_info_file = get_config_value('vul-info-file', 917 json_config.get_driver(), False) 918 # get absolute file path 919 self.config.yara_bin = get_file_absolute_path(yara_bin) 920 self.config.vmlinux_to_elf_bin = get_file_absolute_path(vmlinux_to_elf_bin) 921 self.config.version_mapping_file = get_file_absolute_path(version_mapping_file) 922 if vul_info_file != "vul_info_patch_label_test": 923 self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path]) 924 925 # get tool hap info 926 # default value 927 self.tool_hap_info = { 928 OHYaraConfig.HAP_FILE.value: "sststool.hap", 929 OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool", 930 OHYaraConfig.CLEANUP_APPS.value: "true" 931 } 932 tool_hap_info = get_config_value('tools-hap-info', 933 json_config.get_driver(), False) 934 if tool_hap_info: 935 self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \ 936 tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap") 937 self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \ 938 tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool") 939 self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \ 940 tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true") 941 942 def _run_oh_yara(self, config_file, request=None): 943 message_list = list() 944 945 json_config = JsonParser(config_file) 946 self._get_driver_config(json_config) 947 # get device info 948 self.security_patch = self.config.device.execute_shell_command( 949 "param get const.ohos.version.security_patch").strip() 950 self.system_version = self.config.device.execute_shell_command( 951 "param get const.ohos.fullname").strip() 952 953 if "fail" in self.system_version: 954 self._get_full_name_by_tool_hap() 955 956 vul_info_file = get_config_value('vul-info-file', json_config.get_driver(), False) 957 # Extract patch labels into separate testcase 958 if vul_info_file == "vul_info_patch_label_test": 959 vul_items = list() 960 item = VulItem() 961 item.vul_id = "Patch-label-test" 962 item.month = "Patch-label-test" 963 964 # security patch verify 965 current_date_str = datetime.now().strftime('%Y-%m') 966 if self._check_if_expire_or_risk(current_date_str): 967 LOG.info("Security patch has expired.") 968 item.final_risk = OHYaraConfig.FAIL.value 969 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value) 970 else: 971 LOG.info("Security patch is shorter than two months.") 972 item.final_risk = OHYaraConfig.PASS.value 973 item.complete = True 974 vul_items.append(item) 975 976 else: 977 vul_items = self._get_vul_items() 978 # parse version mapping file 979 mapping_info = self._do_parse_json(self.config.version_mapping_file) 980 os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None) 981 982 # check if system version in version mapping list 983 vul_version = os_full_name_list.get(self.system_version, None) 984 # not in the maintenance scope, skip all case 985 if not vul_version and "OpenHarmony" in self.system_version: 986 vul_version_list = self.system_version.split("-")[-1].split(".")[:2] 987 vul_version_list.append("0") 988 vul_version = ".".join(vul_version_list) 989 if vul_version is None: 990 LOG.debug("The system version is not in the maintenance scope, skip it. " 991 "system versions is {}".format(self.system_version)) 992 else: 993 for _, item in enumerate(vul_items): 994 LOG.debug("Affected files: {}".format(item.affected_files)) 995 LOG.debug("Object type: {}".format(item.object_type)) 996 for index, affected_file in enumerate(item.affected_files): 997 has_inter = False 998 for i, _ in enumerate(item.affected_versions): 999 if self._check_if_intersection(vul_version, item.affected_versions[i]): 1000 has_inter = True 1001 break 1002 if not has_inter: 1003 LOG.debug("Yara rule [{}] affected versions has no intersection " 1004 "in mapping version, skip it. Mapping version is {}, " 1005 "affected versions is {}".format(item.vul_id, vul_version, 1006 item.affected_versions)) 1007 continue 1008 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 1009 request.get_module_name(), item.yara_rules[index].split('.')[0]) 1010 if not os.path.exists(local_path): 1011 os.makedirs(local_path) 1012 if item.object_type == "kernel_linux": 1013 img_file = "/data/local/tmp/boot_linux.img" 1014 package_file = self.kernel_packing(affected_file, img_file) 1015 if not package_file: 1016 LOG.error("Execute failed. Not found file named {}, " 1017 "please check the input".format(affected_file)) 1018 item.final_risk = OHYaraConfig.FAIL.value 1019 item.trace = "Failed to pack the kernel file." 1020 continue 1021 self.config.device.pull_file(package_file, local_path) 1022 affected_file = os.path.join(local_path, os.path.basename(package_file)) 1023 else: 1024 self.config.device.pull_file(affected_file, local_path) 1025 affected_file = os.path.join(local_path, os.path.basename(affected_file)) 1026 1027 if not os.path.exists(affected_file): 1028 LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index])) 1029 item.final_risk = OHYaraConfig.PASS.value 1030 continue 1031 yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path]) 1032 if item.object_type == "kernel_linux": 1033 affected_file_processed = self.file_process_kernel(affected_file, local_path) 1034 if not affected_file_processed: 1035 item.final_risk = OHYaraConfig.FAIL.value 1036 item.trace = "Kernel file extraction error" 1037 continue 1038 cmd = [self.config.yara_bin, yara_file, affected_file_processed] 1039 else: 1040 cmd = [self.config.yara_bin, yara_file, affected_file] 1041 result = exec_cmd(cmd) 1042 LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index])) 1043 if "testcase pass" in result: 1044 item.final_risk = OHYaraConfig.PASS.value 1045 break 1046 else: 1047 if self._check_if_expire_or_risk(item.month, check_risk=True): 1048 item.final_risk = OHYaraConfig.FAIL.value 1049 item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value, 1050 item.disclosure.get("zh", "")) 1051 else: 1052 item.final_risk = OHYaraConfig.BLOCK.value 1053 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value) 1054 # if no risk delete files, if rule has risk keep it 1055 if item.final_risk != OHYaraConfig.FAIL.value: 1056 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 1057 request.get_module_name(), item.yara_rules[index].split('.')[0]) 1058 if os.path.exists(local_path): 1059 LOG.debug( 1060 "Yara rule [{}] has no risk, remove affected files.".format( 1061 item.yara_rules[index])) 1062 shutil.rmtree(local_path) 1063 item.complete = True 1064 self._generate_yara_report(request, vul_items, message_list) 1065 self._generate_xml_report(request, vul_items, message_list) 1066 1067 def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False): 1068 from dateutil.relativedelta import relativedelta 1069 self.security_patch = self.security_patch.replace(' ', '') 1070 self.security_patch = self.security_patch.replace('/', '-') 1071 # get current date 1072 source_date = datetime.strptime(date_str, '%Y-%m') 1073 security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m') 1074 # check if expire 2 months 1075 rd = relativedelta(source_date, security_patch_date) 1076 months = rd.months + (rd.years * 12) 1077 if check_risk: 1078 # vul time before security patch time no risk 1079 LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}" 1080 .format(self.security_patch[:-3], date_str, months)) 1081 if months > 0: 1082 return False 1083 else: 1084 return True 1085 else: 1086 # check if security patch time expire current time 2 months 1087 LOG.debug("Security patch time: {}, current time: {}, delta_months: {}" 1088 .format(self.security_patch[:-3], date_str, months)) 1089 if months > expire_time: 1090 return True 1091 else: 1092 return False 1093 1094 @staticmethod 1095 def _check_if_intersection(source_version, dst_version): 1096 # para dst_less_sor control if dst less than source 1097 def _do_check(soruce, dst, dst_less_sor=True): 1098 if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \ 1099 re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst): 1100 source_vers = soruce.split(".") 1101 dst_vers = dst.split(".") 1102 for index, _ in enumerate(source_vers): 1103 if dst_less_sor: 1104 # check if all source number less than dst number 1105 if int(source_vers[index]) < int(dst_vers[index]): 1106 return False 1107 else: 1108 # check if all source number larger than dst number 1109 if int(source_vers[index]) > int(dst_vers[index]): 1110 return False 1111 return True 1112 return False 1113 1114 source_groups = source_version.split("-") 1115 dst_groups = dst_version.split("-") 1116 if source_version == dst_version: 1117 return True 1118 elif len(source_groups) == 1 and len(dst_groups) == 1: 1119 return source_version == dst_version 1120 elif len(source_groups) == 1 and len(dst_groups) == 2: 1121 return _do_check(source_groups[0], dst_groups[0]) and \ 1122 _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) 1123 elif len(source_groups) == 2 and len(dst_groups) == 1: 1124 return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \ 1125 _do_check(source_groups[1], dst_groups[0]) 1126 elif len(source_groups) == 2 and len(dst_groups) == 2: 1127 return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \ 1128 _do_check(source_groups[1], dst_groups[0]) 1129 return False 1130 1131 def kernel_packing(self, affected_file, img_file): 1132 cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip() 1133 LOG.debug("kernel file detail: {}".format(cmd_result)) 1134 if "No such file or directory" in cmd_result or "Not a directory" in cmd_result: 1135 affected_file = self.config.device.execute_shell_command("find /dev/block/platform " 1136 "-name boot_linux").strip() 1137 LOG.info("kernel path is : {}".format(affected_file)) 1138 cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip() 1139 if "No such file or directory" in cmd_result: 1140 return False 1141 link_file = cmd_result.split(" ")[-1] 1142 pack_result = self.config.device.execute_shell_command(f"dd if={link_file} of={img_file}") 1143 LOG.debug("kernel package detail: {}".format(pack_result)) 1144 if "No such file or directory" in pack_result: 1145 return False 1146 return img_file 1147 1148 def file_process_kernel(self, affected_file, local_path): 1149 # 内核文件解析慢,解析过一次放到公共目录下,该月份下用例共用 1150 dir_path = os.path.dirname(local_path) 1151 processed_file = os.path.join(dir_path, "vmlinux.elf") 1152 if os.path.exists(processed_file): 1153 LOG.debug("The kernel file has been extracted, will reuse the previous pasing file.") 1154 return processed_file 1155 # 1 解压 1156 try: 1157 exec_cmd("7z") 1158 except (OSError, NameError): 1159 LOG.error("Please install the command of 7z before running.") 1160 return False 1161 decompress_result = exec_cmd(f"7z x {affected_file} -o{local_path}") 1162 LOG.debug("kernel file decompress detail: {}".format(decompress_result)) 1163 # 2 解析 1164 print("Kernel file extraction will take a few minutes, please wait patiently...") 1165 input_file = os.path.join(local_path, "extlinux", "Image") 1166 output_file = processed_file 1167 if not input_file: 1168 LOG.error("An error occurred when decompressing the kernel file.") 1169 return False 1170 parse_cmd = [self.config.vmlinux_to_elf_bin, input_file, output_file] 1171 parse_result = exec_cmd(parse_cmd) 1172 if "error" in parse_result: 1173 LOG.error("An error occurred when pasing the kernel file.") 1174 return False 1175 LOG.info("Kernel file extraction successful.") 1176 return output_file 1177 1178 def _get_vul_items(self): 1179 vul_items = list() 1180 vul_info = self._do_parse_json(self.config.vul_info_file) 1181 vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, []) 1182 for _, vul in enumerate(vulnerabilities): 1183 affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, []) 1184 item = VulItem() 1185 item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.CVE.value, "") 1186 item.affected_versions = affected_versions 1187 item.month = vul.get(OHYaraConfig.MONTH.value, "") 1188 item.severity = vul.get(OHYaraConfig.SEVERITY.value, "") 1189 item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "") 1190 item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "") 1191 item.object_type = vul.get(OHYaraConfig.OBJECT_TYPE.value, "") 1192 item.affected_files = \ 1193 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1194 OHYaraConfig.AFFECTED_FILES.value, []) 1195 item.yara_rules = \ 1196 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1197 OHYaraConfig.YARA_RULES.value, []) 1198 vul_items.append(item) 1199 LOG.debug("Vul size is {}".format(len(vul_items))) 1200 return vul_items 1201 1202 @staticmethod 1203 def _do_parse_json(file_path): 1204 json_content = None 1205 if not os.path.exists(file_path): 1206 raise ParamError("The json file {} does not exist".format( 1207 file_path), error_no="00110") 1208 flags = os.O_RDONLY 1209 modes = stat.S_IWUSR | stat.S_IRUSR 1210 with os.fdopen(os.open(file_path, flags, modes), 1211 "r", encoding="utf-8") as file_content: 1212 json_content = json.load(file_content) 1213 if json_content is None: 1214 raise ParamError("The json file {} parse error".format( 1215 file_path), error_no="00110") 1216 return json_content 1217 1218 def _get_full_name_by_tool_hap(self): 1219 # check if tool hap has installed 1220 result = self.config.device.execute_shell_command( 1221 "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1222 LOG.debug(result) 1223 if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result: 1224 hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value)) 1225 self.config.device.push_file(hap_path, "/data/local/tmp") 1226 result = self.config.device.execute_shell_command( 1227 "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path))) 1228 LOG.debug(result) 1229 self.config.device.execute_shell_command( 1230 "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format( 1231 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1232 self.config.device.execute_shell_command( 1233 "aa start -a {}.MainAbility -b {}".format( 1234 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), 1235 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1236 time.sleep(1) 1237 self.system_version = self.config.device.execute_shell_command( 1238 "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format( 1239 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '') 1240 LOG.debug(self.system_version) 1241 1242 def _generate_yara_report(self, request, vul_items, result_message): 1243 import csv 1244 result_message.clear() 1245 yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv" 1246 .format(request.config.device.device_sn)) 1247 if os.path.exists(yara_report): 1248 data = [] 1249 else: 1250 data = [ 1251 ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch], 1252 ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"] 1253 ] 1254 fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) 1255 for _, item in enumerate(vul_items): 1256 data.append([item.vul_id, item.severity, 1257 item.month, item.final_risk, 1258 item.disclosure.get("zh", ""), item.vul_description.get("zh", "")]) 1259 result = "{}|{}|{}|{}|{}|{}|{}\n".format( 1260 item.vul_id, item.severity, 1261 item.month, item.final_risk, 1262 item.disclosure.get("zh", ""), item.vul_description.get("zh", ""), 1263 item.trace) 1264 result_message.append(result) 1265 with os.fdopen(fd, "a", newline='') as file_handler: 1266 writer = csv.writer(file_handler) 1267 writer.writerows(data) 1268 1269 def _generate_xml_report(self, request, vul_items, message_list): 1270 result_message = "".join(message_list) 1271 listener_copy = request.listeners.copy() 1272 parsers = get_plugin( 1273 Plugin.PARSER, CommonParserType.oh_yara) 1274 if parsers: 1275 parsers = parsers[:1] 1276 for listener in listener_copy: 1277 listener.device_sn = self.config.device.device_sn 1278 parser_instances = [] 1279 for parser in parsers: 1280 parser_instance = parser.__class__() 1281 parser_instance.suites_name = request.get_module_name() 1282 parser_instance.vul_items = vul_items 1283 parser_instance.listeners = listener_copy 1284 parser_instances.append(parser_instance) 1285 handler = ShellHandler(parser_instances) 1286 process_command_ret(result_message, handler) 1287 1288 def __result__(self): 1289 return self.result if os.path.exists(self.result) else "" 1290 1291 @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test) 1292 class ValidatorTestDriver(IDriver): 1293 1294 def __init__(self): 1295 self.error_message = "" 1296 self.xml_path = "" 1297 self.result = "" 1298 self.config = None 1299 self.kits = [] 1300 1301 def __check_environment__(self, device_options): 1302 pass 1303 1304 def __check_config__(self, config): 1305 pass 1306 1307 def __execute__(self, request): 1308 try: 1309 self.result = os.path.join( 1310 request.config.report_path, "result", 1311 ".".join((request.get_module_name(), "xml"))) 1312 self.config = request.config 1313 self.config.device = request.config.environment.devices[0] 1314 config_file = request.root.source.config_file 1315 self._run_validate_test(config_file, request) 1316 except Exception as exception: 1317 self.error_message = exception 1318 if not getattr(exception, "error_no", ""): 1319 setattr(exception, "error_no", "03409") 1320 LOG.exception(self.error_message, exc_info=True, error_no="03409") 1321 raise exception 1322 finally: 1323 self.result = check_result_report(request.config.report_path, 1324 self.result, self.error_message) 1325 1326 def _run_validate_test(self, config_file, request): 1327 is_update = False 1328 try: 1329 if "update" in self.config.testargs.keys(): 1330 if dict(self.config.testargs).get("update")[0] == "true": 1331 is_update = True 1332 json_config = JsonParser(config_file) 1333 self.kits = get_kit_instances(json_config, self.config.resource_path, 1334 self.config.testcases_path) 1335 self._get_driver_config(json_config) 1336 if is_update: 1337 do_module_kit_setup(request, self.kits) 1338 while True: 1339 print("Is test finished?Y/N") 1340 usr_input = input(">>>") 1341 if usr_input == "Y" or usr_input == "y": 1342 LOG.debug("Finish current test") 1343 break 1344 else: 1345 print("continue") 1346 LOG.debug("Your input is:{}, continue".format(usr_input)) 1347 if self.xml_path: 1348 result_dir = os.path.join(request.config.report_path, "result") 1349 if not os.path.exists(result_dir): 1350 os.makedirs(result_dir) 1351 self.config.device.pull_file(self.xml_path, self.result) 1352 finally: 1353 if is_update: 1354 do_module_kit_teardown(request) 1355 1356 def _get_driver_config(self, json_config): 1357 self.xml_path = get_config_value("xml_path", json_config.get_driver(), False) 1358 def __result__(self): 1359 return self.result if os.path.exists(self.result) else "" 1360