1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21import json 22import stat 23import shutil 24import re 25from datetime import datetime 26from enum import Enum 27 28from xdevice import ConfigConst 29from xdevice import ParamError 30from xdevice import IDriver 31from xdevice import platform_logger 32from xdevice import Plugin 33from xdevice import get_plugin 34from xdevice import JsonParser 35from xdevice import ShellHandler 36from xdevice import TestDescription 37from xdevice import get_device_log_file 38from xdevice import check_result_report 39from xdevice import get_kit_instances 40from xdevice import get_config_value 41from xdevice import do_module_kit_setup 42from xdevice import do_module_kit_teardown 43from xdevice import DeviceTestType 44from xdevice import CommonParserType 45from xdevice import FilePermission 46from xdevice import ResourceManager 47from xdevice import get_file_absolute_path 48from xdevice import exec_cmd 49 50from ohos.executor.listener import CollectingPassListener 51from ohos.constants import CKit 52from ohos.environment.dmlib import process_command_ret 53 54__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver", 55 "OHYaraTestDriver", "oh_jsunit_para_parse"] 56 57TIME_OUT = 300 * 1000 58 59LOG = platform_logger("OpenHarmony") 60 61 62def oh_jsunit_para_parse(runner, junit_paras): 63 junit_paras = dict(junit_paras) 64 test_type_list = ["function", "performance", "reliability", "security"] 65 size_list = ["small", "medium", "large"] 66 level_list = ["0", "1", "2", "3"] 67 for para_name in junit_paras.keys(): 68 para_name = para_name.strip() 69 para_values = junit_paras.get(para_name, []) 70 if para_name == "class": 71 runner.add_arg(para_name, ",".join(para_values)) 72 elif para_name == "notClass": 73 runner.add_arg(para_name, ",".join(para_values)) 74 elif para_name == "testType": 75 if para_values[0] not in test_type_list: 76 continue 77 # function/performance/reliability/security 78 runner.add_arg(para_name, para_values[0]) 79 elif para_name == "size": 80 if para_values[0] not in size_list: 81 continue 82 # size small/medium/large 83 runner.add_arg(para_name, para_values[0]) 84 elif para_name == "level": 85 if para_values[0] not in level_list: 86 continue 87 # 0/1/2/3/4 88 runner.add_arg(para_name, para_values[0]) 89 elif para_name == "stress": 90 runner.add_arg(para_name, para_values[0]) 91 92 93@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test) 94class OHKernelTestDriver(IDriver): 95 """ 96 OpenHarmonyKernelTest 97 """ 98 def __init__(self): 99 self.timeout = 30 * 1000 100 self.result = "" 101 self.error_message = "" 102 self.kits = [] 103 self.config = None 104 self.runner = None 105 # log 106 self.device_log = None 107 self.hilog = None 108 self.log_proc = None 109 self.hilog_proc = None 110 111 def __check_environment__(self, device_options): 112 pass 113 114 def __check_config__(self, config): 115 pass 116 117 def __execute__(self, request): 118 try: 119 LOG.debug("Start to Execute OpenHarmony Kernel Test") 120 121 self.config = request.config 122 self.config.device = request.config.environment.devices[0] 123 124 config_file = request.root.source.config_file 125 126 self.result = "%s.xml" % \ 127 os.path.join(request.config.report_path, 128 "result", request.get_module_name()) 129 self.device_log = get_device_log_file( 130 request.config.report_path, 131 request.config.device.__get_serial__(), 132 "device_log") 133 134 self.hilog = get_device_log_file( 135 request.config.report_path, 136 request.config.device.__get_serial__(), 137 "device_hilog") 138 139 device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT | 140 os.O_APPEND, FilePermission.mode_755) 141 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 142 FilePermission.mode_755) 143 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 144 with os.fdopen(device_log_open, "a") as log_file_pipe, \ 145 os.fdopen(hilog_open, "a") as hilog_file_pipe: 146 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 147 start_catch_device_log(log_file_pipe, hilog_file_pipe) 148 self._run_oh_kernel(config_file, request.listeners, request) 149 log_file_pipe.flush() 150 hilog_file_pipe.flush() 151 except Exception as exception: 152 self.error_message = exception 153 if not getattr(exception, "error_no", ""): 154 setattr(exception, "error_no", "03409") 155 LOG.exception(self.error_message, exc_info=False, error_no="03409") 156 raise exception 157 finally: 158 do_module_kit_teardown(request) 159 self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog) 160 self.config.device.device_log_collector.stop_catch_device_log(self.log_proc) 161 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 162 self.result = check_result_report( 163 request.config.report_path, self.result, self.error_message) 164 165 def _run_oh_kernel(self, config_file, listeners=None, request=None): 166 try: 167 json_config = JsonParser(config_file) 168 kits = get_kit_instances(json_config, self.config.resource_path, 169 self.config.testcases_path) 170 self._get_driver_config(json_config) 171 do_module_kit_setup(request, kits) 172 self.runner = OHKernelTestRunner(self.config) 173 self.runner.suite_name = request.get_module_name() 174 self.runner.run(listeners) 175 finally: 176 do_module_kit_teardown(request) 177 178 def _get_driver_config(self, json_config): 179 target_test_path = get_config_value('native-test-device-path', 180 json_config.get_driver(), False) 181 test_suite_name = get_config_value('test-suite-name', 182 json_config.get_driver(), False) 183 test_suites_list = get_config_value('test-suites-list', 184 json_config.get_driver(), False) 185 timeout_limit = get_config_value('timeout-limit', 186 json_config.get_driver(), False) 187 conf_file = get_config_value('conf-file', 188 json_config.get_driver(), False) 189 self.config.arg_list = {} 190 if target_test_path: 191 self.config.target_test_path = target_test_path 192 if test_suite_name: 193 self.config.arg_list["test-suite-name"] = test_suite_name 194 if test_suites_list: 195 self.config.arg_list["test-suites-list"] = test_suites_list 196 if timeout_limit: 197 self.config.arg_list["timeout-limit"] = timeout_limit 198 if conf_file: 199 self.config.arg_list["conf-file"] = conf_file 200 timeout_config = get_config_value('shell-timeout', 201 json_config.get_driver(), False) 202 if timeout_config: 203 self.config.timeout = int(timeout_config) 204 else: 205 self.config.timeout = TIME_OUT 206 207 def __result__(self): 208 return self.result if os.path.exists(self.result) else "" 209 210 211class OHKernelTestRunner: 212 def __init__(self, config): 213 self.suite_name = None 214 self.config = config 215 self.arg_list = config.arg_list 216 217 def run(self, listeners): 218 handler = self._get_shell_handler(listeners) 219 # hdc shell cd /data/local/tmp/OH_kernel_test; 220 # sh runtest test -t OpenHarmony_RK3568_config 221 # -n OpenHarmony_RK3568_skiptest -l 60 222 command = "cd %s; chmod +x *; sh runtest test %s" % ( 223 self.config.target_test_path, self.get_args_command()) 224 self.config.device.execute_shell_command( 225 command, timeout=self.config.timeout, receiver=handler, retry=0) 226 227 def _get_shell_handler(self, listeners): 228 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test) 229 if parsers: 230 parsers = parsers[:1] 231 parser_instances = [] 232 for parser in parsers: 233 parser_instance = parser.__class__() 234 parser_instance.suites_name = self.suite_name 235 parser_instance.listeners = listeners 236 parser_instances.append(parser_instance) 237 handler = ShellHandler(parser_instances) 238 return handler 239 240 def get_args_command(self): 241 args_commands = "" 242 for key, value in self.arg_list.items(): 243 if key == "test-suite-name" or key == "test-suites-list": 244 args_commands = "%s -t %s" % (args_commands, value) 245 elif key == "conf-file": 246 args_commands = "%s -n %s" % (args_commands, value) 247 elif key == "timeout-limit": 248 args_commands = "%s -l %s" % (args_commands, value) 249 return args_commands 250 251 252@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 253class OHJSUnitTestDriver(IDriver): 254 """ 255 OHJSUnitTestDriver is a Test that runs a native test package on 256 given device. 257 """ 258 259 def __init__(self): 260 self.timeout = 80 * 1000 261 self.start_time = None 262 self.result = "" 263 self.error_message = "" 264 self.kits = [] 265 self.config = None 266 self.runner = None 267 self.rerun = True 268 self.rerun_all = True 269 # log 270 self.device_log = None 271 self.hilog = None 272 self.log_proc = None 273 self.hilog_proc = None 274 275 def __check_environment__(self, device_options): 276 pass 277 278 def __check_config__(self, config): 279 pass 280 281 def __execute__(self, request): 282 try: 283 LOG.debug("Start execute OpenHarmony JSUnitTest") 284 self.result = os.path.join( 285 request.config.report_path, "result", 286 '.'.join((request.get_module_name(), "xml"))) 287 self.config = request.config 288 self.config.device = request.config.environment.devices[0] 289 290 config_file = request.root.source.config_file 291 suite_file = request.root.source.source_file 292 293 if not suite_file: 294 raise ParamError( 295 "test source '%s' not exists" % 296 request.root.source.source_string, error_no="00110") 297 LOG.debug("Test case file path: %s" % suite_file) 298 self.config.device.set_device_report_path(request.config.report_path) 299 self.hilog = get_device_log_file(request.config.report_path, 300 request.config.device.__get_serial__() + "_" + request. 301 get_module_name(), 302 "device_hilog") 303 304 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 305 0o755) 306 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 307 self.config.device.execute_shell_command(command="hilog -r") 308 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 309 if hasattr(self.config, ConfigConst.device_log) \ 310 and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 311 and hasattr(self.config.device, "clear_crash_log"): 312 self.config.device.device_log_collector.clear_crash_log() 313 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 314 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 315 self._run_oh_jsunit(config_file, request) 316 except Exception as exception: 317 self.error_message = exception 318 if not getattr(exception, "error_no", ""): 319 setattr(exception, "error_no", "03409") 320 LOG.exception(self.error_message, exc_info=True, error_no="03409") 321 raise exception 322 finally: 323 try: 324 self._handle_logs(request) 325 finally: 326 self.result = check_result_report( 327 request.config.report_path, self.result, self.error_message) 328 329 def __dry_run_execute__(self, request): 330 LOG.debug("Start dry run xdevice JSUnit Test") 331 self.config = request.config 332 self.config.device = request.config.environment.devices[0] 333 config_file = request.root.source.config_file 334 suite_file = request.root.source.source_file 335 336 if not suite_file: 337 raise ParamError( 338 "test source '%s' not exists" % 339 request.root.source.source_string, error_no="00110") 340 LOG.debug("Test case file path: %s" % suite_file) 341 self._dry_run_oh_jsunit(config_file, request) 342 343 def _dry_run_oh_jsunit(self, config_file, request): 344 try: 345 if not os.path.exists(config_file): 346 LOG.error("Error: Test cases don't exist %s." % config_file) 347 raise ParamError( 348 "Error: Test cases don't exist %s." % config_file, 349 error_no="00102") 350 json_config = JsonParser(config_file) 351 self.kits = get_kit_instances(json_config, 352 self.config.resource_path, 353 self.config.testcases_path) 354 355 self._get_driver_config(json_config) 356 self.config.device.connector_command("target mount") 357 do_module_kit_setup(request, self.kits) 358 self.runner = OHJSUnitTestRunner(self.config) 359 self.runner.suites_name = request.get_module_name() 360 # execute test case 361 self._get_runner_config(json_config) 362 oh_jsunit_para_parse(self.runner, self.config.testargs) 363 364 test_to_run = self._collect_test_to_run() 365 LOG.info("Collected suite count is: {}, test count is: {}". 366 format(len(self.runner.expect_tests_dict.keys()), 367 len(test_to_run) if test_to_run else 0)) 368 finally: 369 do_module_kit_teardown(request) 370 371 def _run_oh_jsunit(self, config_file, request): 372 try: 373 if not os.path.exists(config_file): 374 LOG.error("Error: Test cases don't exist %s." % config_file) 375 raise ParamError( 376 "Error: Test cases don't exist %s." % config_file, 377 error_no="00102") 378 json_config = JsonParser(config_file) 379 self.kits = get_kit_instances(json_config, 380 self.config.resource_path, 381 self.config.testcases_path) 382 383 self._get_driver_config(json_config) 384 self.config.device.connector_command("target mount") 385 self._start_smart_perf() 386 do_module_kit_setup(request, self.kits) 387 self.runner = OHJSUnitTestRunner(self.config) 388 self.runner.suites_name = request.get_module_name() 389 self._get_runner_config(json_config) 390 if hasattr(self.config, "history_report_path") and \ 391 self.config.testargs.get("test"): 392 self._do_test_retry(request.listeners, self.config.testargs) 393 else: 394 if self.rerun: 395 self.runner.retry_times = self.runner.MAX_RETRY_TIMES 396 # execute test case 397 self._do_tf_suite() 398 self._make_exclude_list_file(request) 399 oh_jsunit_para_parse(self.runner, self.config.testargs) 400 self._do_test_run(listener=request.listeners) 401 402 finally: 403 do_module_kit_teardown(request) 404 405 def _get_driver_config(self, json_config): 406 package = get_config_value('package-name', 407 json_config.get_driver(), False) 408 module = get_config_value('module-name', 409 json_config.get_driver(), False) 410 bundle = get_config_value('bundle-name', 411 json_config. get_driver(), False) 412 is_rerun = get_config_value('rerun', json_config.get_driver(), False) 413 414 self.config.package_name = package 415 self.config.module_name = module 416 self.config.bundle_name = bundle 417 self.rerun = True if is_rerun == 'true' else False 418 419 if not package and not module: 420 raise ParamError("Neither package nor module is found" 421 " in config file.", error_no="03201") 422 timeout_config = get_config_value("shell-timeout", 423 json_config.get_driver(), False) 424 if timeout_config: 425 self.config.timeout = int(timeout_config) 426 else: 427 self.config.timeout = TIME_OUT 428 429 def _get_runner_config(self, json_config): 430 test_timeout = get_config_value('test-timeout', 431 json_config.get_driver(), False) 432 if test_timeout: 433 self.runner.add_arg("wait_time", int(test_timeout)) 434 435 testcase_timeout = get_config_value('testcase-timeout', 436 json_config.get_driver(), False) 437 if testcase_timeout: 438 self.runner.add_arg("timeout", int(testcase_timeout)) 439 self.runner.compile_mode = get_config_value( 440 'compile-mode', json_config.get_driver(), False) 441 442 def _do_test_run(self, listener): 443 test_to_run = self._collect_test_to_run() 444 LOG.info("Collected suite count is: {}, test count is: {}". 445 format(len(self.runner.expect_tests_dict.keys()), 446 len(test_to_run) if test_to_run else 0)) 447 if not test_to_run or not self.rerun: 448 self.runner.run(listener) 449 self.runner.notify_finished() 450 else: 451 self._run_with_rerun(listener, test_to_run) 452 453 def _collect_test_to_run(self): 454 run_results = self.runner.dry_run() 455 return run_results 456 457 def _run_tests(self, listener): 458 test_tracker = CollectingPassListener() 459 listener_copy = listener.copy() 460 listener_copy.append(test_tracker) 461 self.runner.run(listener_copy) 462 test_run = test_tracker.get_current_run_results() 463 return test_run 464 465 def _run_with_rerun(self, listener, expected_tests): 466 LOG.debug("Ready to run with rerun, expect run: %s" 467 % len(expected_tests)) 468 test_run = self._run_tests(listener) 469 self.runner.retry_times -= 1 470 LOG.debug("Run with rerun, has run: %s" % len(test_run) 471 if test_run else 0) 472 if len(test_run) < len(expected_tests): 473 expected_tests = TestDescription.remove_test(expected_tests, 474 test_run) 475 if not expected_tests: 476 LOG.debug("No tests to re-run twice,please check") 477 self.runner.notify_finished() 478 else: 479 self._rerun_twice(expected_tests, listener) 480 else: 481 LOG.debug("Rerun once success") 482 self.runner.notify_finished() 483 484 def _rerun_twice(self, expected_tests, listener): 485 tests = [] 486 for test in expected_tests: 487 tests.append("%s#%s" % (test.class_name, test.test_name)) 488 self.runner.add_arg("class", ",".join(tests)) 489 LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests)) 490 test_run = self._run_tests(listener) 491 self.runner.retry_times -= 1 492 LOG.debug("Rerun twice, has run: %s" % len(test_run)) 493 if len(test_run) < len(expected_tests): 494 expected_tests = TestDescription.remove_test(expected_tests, 495 test_run) 496 if not expected_tests: 497 LOG.debug("No tests to re-run third,please check") 498 self.runner.notify_finished() 499 else: 500 self._rerun_third(expected_tests, listener) 501 else: 502 LOG.debug("Rerun twice success") 503 self.runner.notify_finished() 504 505 def _rerun_third(self, expected_tests, listener): 506 tests = [] 507 for test in expected_tests: 508 tests.append("%s#%s" % (test.class_name, test.test_name)) 509 self.runner.add_arg("class", ",".join(tests)) 510 LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests)) 511 self._run_tests(listener) 512 LOG.debug("Rerun third success") 513 self.runner.notify_finished() 514 515 def _make_exclude_list_file(self, request): 516 if "all-test-file-exclude-filter" in self.config.testargs: 517 json_file_list = self.config.testargs.get( 518 "all-test-file-exclude-filter") 519 self.config.testargs.pop("all-test-file-exclude-filter") 520 if not json_file_list: 521 LOG.warning("all-test-file-exclude-filter value is empty!") 522 else: 523 if not os.path.isfile(json_file_list[0]): 524 LOG.warning( 525 "[{}] is not a valid file".format(json_file_list[0])) 526 return 527 file_open = os.open(json_file_list[0], os.O_RDONLY, 528 stat.S_IWUSR | stat.S_IRUSR) 529 with os.fdopen(file_open, "r") as file_handler: 530 json_data = json.load(file_handler) 531 exclude_list = json_data.get( 532 DeviceTestType.oh_jsunit_test, []) 533 filter_list = [] 534 for exclude in exclude_list: 535 if request.get_module_name() not in exclude: 536 continue 537 filter_list.extend(exclude.get(request.get_module_name())) 538 if not isinstance(self.config.testargs, dict): 539 return 540 if 'notClass' in self.config.testargs.keys(): 541 filter_list.extend(self.config.testargs.get('notClass', [])) 542 self.config.testargs.update({"notClass": filter_list}) 543 544 def _do_test_retry(self, listener, testargs): 545 tests_dict = dict() 546 case_list = list() 547 for test in testargs.get("test"): 548 test_item = test.split("#") 549 if len(test_item) != 2: 550 continue 551 case_list.append(test) 552 if test_item[0] not in tests_dict: 553 tests_dict.update({test_item[0] : []}) 554 tests_dict.get(test_item[0]).append( 555 TestDescription(test_item[0], test_item[1])) 556 self.runner.add_arg("class", ",".join(case_list)) 557 self.runner.expect_tests_dict = tests_dict 558 self.config.testargs.pop("test") 559 self.runner.run(listener) 560 self.runner.notify_finished() 561 562 def _do_tf_suite(self): 563 if hasattr(self.config, "tf_suite") and \ 564 self.config.tf_suite.get("cases", []): 565 case_list = self.config["tf_suite"]["cases"] 566 self.config.testargs.update({"class": case_list}) 567 568 def _start_smart_perf(self): 569 if not hasattr(self.config, ConfigConst.kits_in_module): 570 return 571 if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module): 572 return 573 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 574 sp_kits.target_name = self.config.bundle_name 575 param_config = self.config.get(ConfigConst.kits_params).get( 576 CKit.smartperf, "") 577 sp_kits.__check_config__(param_config) 578 self.kits.insert(0, sp_kits) 579 580 def _handle_logs(self, request): 581 serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns()) 582 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 583 if hasattr(self.config, ConfigConst.device_log) and \ 584 self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 585 and hasattr(self.config.device, "start_get_crash_log"): 586 self.config.device.device_log_collector.\ 587 start_get_crash_log(log_tar_file_name, module_name=request.get_module_name()) 588 self.config.device.device_log_collector.\ 589 remove_log_address(self.device_log, self.hilog) 590 self.config.device.device_log_collector.\ 591 stop_catch_device_log(self.log_proc) 592 self.config.device.device_log_collector.\ 593 stop_catch_device_log(self.hilog_proc) 594 595 def __result__(self): 596 return self.result if os.path.exists(self.result) else "" 597 598 599class OHJSUnitTestRunner: 600 MAX_RETRY_TIMES = 3 601 602 def __init__(self, config): 603 self.arg_list = {} 604 self.suites_name = None 605 self.config = config 606 self.rerun_attemp = 3 607 self.suite_recorder = {} 608 self.finished = False 609 self.expect_tests_dict = dict() 610 self.finished_observer = None 611 self.retry_times = 1 612 self.compile_mode = "" 613 614 def dry_run(self): 615 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 616 if parsers: 617 parsers = parsers[:1] 618 parser_instances = [] 619 for parser in parsers: 620 parser_instance = parser.__class__() 621 parser_instances.append(parser_instance) 622 handler = ShellHandler(parser_instances) 623 command = self._get_dry_run_command() 624 self.config.device.execute_shell_command( 625 command, timeout=self.config.timeout, receiver=handler, retry=0) 626 self.expect_tests_dict = parser_instances[0].tests_dict 627 return parser_instances[0].tests 628 629 def run(self, listener): 630 handler = self._get_shell_handler(listener) 631 command = self._get_run_command() 632 self.config.device.execute_shell_command( 633 command, timeout=self.config.timeout, receiver=handler, retry=0) 634 635 def notify_finished(self): 636 if self.finished_observer: 637 self.finished_observer.notify_task_finished() 638 self.retry_times -= 1 639 640 def _get_shell_handler(self, listener): 641 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 642 if parsers: 643 parsers = parsers[:1] 644 parser_instances = [] 645 for parser in parsers: 646 parser_instance = parser.__class__() 647 parser_instance.suites_name = self.suites_name 648 parser_instance.listeners = listener 649 parser_instance.runner = self 650 parser_instances.append(parser_instance) 651 self.finished_observer = parser_instance 652 handler = ShellHandler(parser_instances) 653 return handler 654 655 def add_arg(self, name, value): 656 if not name or not value: 657 return 658 self.arg_list[name] = value 659 660 def remove_arg(self, name): 661 if not name: 662 return 663 if name in self.arg_list: 664 del self.arg_list[name] 665 666 def get_args_command(self): 667 args_commands = "" 668 for key, value in self.arg_list.items(): 669 if "wait_time" == key: 670 args_commands = "%s -w %s " % (args_commands, value) 671 else: 672 args_commands = "%s -s %s %s " % (args_commands, key, value) 673 return args_commands 674 675 def _get_run_command(self): 676 command = "" 677 if self.config.package_name: 678 # aa test -p ${packageName} -b ${bundleName}-s 679 # unittest OpenHarmonyTestRunner 680 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 681 " {}".format(self.config.package_name, 682 self.config.bundle_name, 683 self.get_args_command()) 684 elif self.config.module_name: 685 # aa test -m ${moduleName} -b ${bundleName} 686 # -s unittest OpenHarmonyTestRunner 687 command = "aa test -m {} -b {} -s unittest {} {}".format( 688 self.config.module_name, self.config.bundle_name, 689 self.get_oh_test_runner_path(), self.get_args_command()) 690 return command 691 692 def _get_dry_run_command(self): 693 command = "" 694 if self.config.package_name: 695 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 696 " {} -s dryRun true".format(self.config.package_name, 697 self.config.bundle_name, 698 self.get_args_command()) 699 elif self.config.module_name: 700 command = "aa test -m {} -b {} -s unittest {}" \ 701 " {} -s dryRun true".format(self.config.module_name, 702 self.config.bundle_name, 703 self.get_oh_test_runner_path(), 704 self.get_args_command()) 705 706 return command 707 708 def get_oh_test_runner_path(self): 709 if self.compile_mode == "esmodule": 710 return "/ets/testrunner/OpenHarmonyTestRunner" 711 else: 712 return "OpenHarmonyTestRunner" 713 714 715@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 716class OHRustTestDriver(IDriver): 717 def __init__(self): 718 self.result = "" 719 self.error_message = "" 720 self.config = None 721 722 def __check_environment__(self, device_options): 723 pass 724 725 def __check_config__(self, config): 726 pass 727 728 def __execute__(self, request): 729 try: 730 LOG.debug("Start to execute open harmony rust test") 731 self.config = request.config 732 self.config.device = request.config.environment.devices[0] 733 self.config.target_test_path = "/system/bin" 734 735 suite_file = request.root.source.source_file 736 LOG.debug("Testsuite filepath:{}".format(suite_file)) 737 738 if not suite_file: 739 LOG.error("test source '{}' not exists".format( 740 request.root.source.source_string)) 741 return 742 743 self.result = "{}.xml".format( 744 os.path.join(request.config.report_path, 745 "result", request.get_module_name())) 746 self.config.device.set_device_report_path(request.config.report_path) 747 self.config.device.device_log_collector.start_hilog_task() 748 self._init_oh_rust() 749 self._run_oh_rust(suite_file, request) 750 except Exception as exception: 751 self.error_message = exception 752 if not getattr(exception, "error_no", ""): 753 setattr(exception, "error_no", "03409") 754 LOG.exception(self.error_message, exc_info=False, error_no="03409") 755 finally: 756 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 757 time.time_ns()) 758 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 759 self.config.device.device_log_collector.stop_hilog_task( 760 log_tar_file_name, module_name=request.get_module_name()) 761 self.result = check_result_report( 762 request.config.report_path, self.result, self.error_message) 763 764 def _init_oh_rust(self): 765 self.config.device.connector_command("target mount") 766 self.config.device.execute_shell_command( 767 "mount -o rw,remount,rw /") 768 769 def _run_oh_rust(self, suite_file, request=None): 770 # push testsuite file 771 self.config.device.push_file(suite_file, self.config.target_test_path) 772 # push resource file 773 resource_manager = ResourceManager() 774 resource_data_dict, resource_dir = \ 775 resource_manager.get_resource_data_dic(suite_file) 776 resource_manager.process_preparer_data(resource_data_dict, 777 resource_dir, 778 self.config.device) 779 for listener in request.listeners: 780 listener.device_sn = self.config.device.device_sn 781 782 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 783 if parsers: 784 parsers = parsers[:1] 785 parser_instances = [] 786 for parser in parsers: 787 parser_instance = parser.__class__() 788 parser_instance.suite_name = request.get_module_name() 789 parser_instance.listeners = request.listeners 790 parser_instances.append(parser_instance) 791 handler = ShellHandler(parser_instances) 792 793 command = "cd {}; chmod +x *; ./{}".format( 794 self.config.target_test_path, os.path.basename(suite_file)) 795 self.config.device.execute_shell_command( 796 command, timeout=TIME_OUT, receiver=handler, retry=0) 797 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 798 self.config.device) 799 800 def __result__(self): 801 return self.result if os.path.exists(self.result) else "" 802 803 804class OHYaraConfig(Enum): 805 HAP_FILE = "hap-file" 806 BUNDLE_NAME = "bundle-name" 807 CLEANUP_APPS = "cleanup-apps" 808 809 OS_FULLNAME_LIST = "osFullNameList" 810 VULNERABILITIES = "vulnerabilities" 811 VUL_ID = "vul_id" 812 OPENHARMONY_SA = "openharmony-sa" 813 AFFECTED_VERSION = "affected_versions" 814 MONTH = "month" 815 SEVERITY = "severity" 816 VUL_DESCRIPTION = "vul_description" 817 DISCLOSURE = "disclosure" 818 AFFECTED_FILES = "affected_files" 819 YARA_RULES = "yara_rules" 820 821 PASS = "pass" 822 FAIL = "fail" 823 BLOCK = "block" 824 825 ERROR_MSG_001 = "The patch label is longer than two months (60 days), which violates the OHCA agreement." 826 ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed." 827 ERROR_MSG_003 = "Modify the code according to the patch requirements: " 828 829 830class VulItem: 831 vul_id = "" 832 month = "" 833 severity = "" 834 vul_description = dict() 835 disclosure = dict() 836 affected_files = "" 837 affected_versions = "" 838 yara_rules = "" 839 trace = "" 840 final_risk = OHYaraConfig.PASS.value 841 complete = False 842 843 844@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test) 845class OHYaraTestDriver(IDriver): 846 def __init__(self): 847 self.result = "" 848 self.error_message = "" 849 self.config = None 850 self.tool_hap_info = dict() 851 self.security_patch = None 852 self.system_version = None 853 854 def __check_environment__(self, device_options): 855 pass 856 857 def __check_config__(self, config): 858 pass 859 860 def __execute__(self, request): 861 try: 862 LOG.debug("Start to execute open harmony yara test") 863 self.result = os.path.join( 864 request.config.report_path, "result", 865 '.'.join((request.get_module_name(), "xml"))) 866 self.config = request.config 867 self.config.device = request.config.environment.devices[0] 868 869 config_file = request.root.source.config_file 870 suite_file = request.root.source.source_file 871 872 if not suite_file: 873 raise ParamError( 874 "test source '%s' not exists" % 875 request.root.source.source_string, error_no="00110") 876 LOG.debug("Test case file path: %s" % suite_file) 877 self.config.device.set_device_report_path(request.config.report_path) 878 self._run_oh_yara(config_file, request) 879 880 except Exception as exception: 881 self.error_message = exception 882 if not getattr(exception, "error_no", ""): 883 setattr(exception, "error_no", "03409") 884 LOG.exception(self.error_message, exc_info=False, error_no="03409") 885 finally: 886 if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value): 887 cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)] 888 result = self.config.device.connector_command(cmd) 889 LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format( 890 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result)) 891 892 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 893 time.time_ns()) 894 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 895 self.config.device.device_log_collector.stop_hilog_task( 896 log_tar_file_name, module_name=request.get_module_name()) 897 898 self.result = check_result_report( 899 request.config.report_path, self.result, self.error_message) 900 901 def _get_driver_config(self, json_config): 902 yara_bin = get_config_value('yara-bin', 903 json_config.get_driver(), False) 904 version_mapping_file = get_config_value('version-mapping-file', 905 json_config.get_driver(), False) 906 vul_info_file = get_config_value('vul-info-file', 907 json_config.get_driver(), False) 908 # get absolute file path 909 self.config.yara_bin = get_file_absolute_path(yara_bin) 910 self.config.version_mapping_file = get_file_absolute_path(version_mapping_file) 911 self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path]) 912 913 # get tool hap info 914 # default value 915 self.tool_hap_info = { 916 OHYaraConfig.HAP_FILE.value: "sststool.hap", 917 OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool", 918 OHYaraConfig.CLEANUP_APPS.value: "true" 919 } 920 tool_hap_info = get_config_value('tools-hap-info', 921 json_config.get_driver(), False) 922 if tool_hap_info: 923 self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \ 924 tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap") 925 self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \ 926 tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool") 927 self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \ 928 tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true") 929 930 def _run_oh_yara(self, config_file, request=None): 931 message_list = list() 932 933 json_config = JsonParser(config_file) 934 self._get_driver_config(json_config) 935 936 # get device info 937 self.security_patch = self.config.device.execute_shell_command( 938 "param get const.ohos.version.security_patch").strip() 939 self.system_version = self.config.device.execute_shell_command( 940 "param get const.ohos.fullname").strip() 941 942 if "fail" in self.system_version: 943 self._get_full_name_by_tool_hap() 944 945 vul_items = self._get_vul_items() 946 # if security patch expire, case fail 947 current_date_str = datetime.now().strftime('%Y-%m') 948 if self._check_if_expire_or_risk(current_date_str): 949 LOG.info("Security patch has expired. Set all case fail.") 950 for _, item in enumerate(vul_items): 951 item.complete = True 952 item.final_risk = OHYaraConfig.FAIL.value 953 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value) 954 else: 955 LOG.info("Security patch is shorter than two months. Start yara test.") 956 # parse version mapping file 957 mapping_info = self._do_parse_json(self.config.version_mapping_file) 958 os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None) 959 # check if system version in version mapping list 960 vul_version = os_full_name_list.get(self.system_version, None) 961 # not in the maintenance scope, skip all case 962 if vul_version is None: 963 LOG.debug("The system version is not in the maintenance scope, skip it. " 964 "system versions is {}".format(self.system_version)) 965 else: 966 for _, item in enumerate(vul_items): 967 LOG.debug("Affected files: {}".format(item.affected_files)) 968 for index, affected_file in enumerate(item.affected_files): 969 has_inter = False 970 for i, _ in enumerate(item.affected_versions): 971 if self._check_if_intersection(vul_version, item.affected_versions[i]): 972 has_inter = True 973 break 974 if not has_inter: 975 LOG.debug("Yara rule [{}] affected versions has no intersection " 976 "in mapping version, skip it. Mapping version is {}, " 977 "affected versions is {}".format(item.vul_id, vul_version, 978 item.affected_versions)) 979 continue 980 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 981 request.get_module_name(), item.yara_rules[index].split('.')[0]) 982 if not os.path.exists(local_path): 983 os.makedirs(local_path) 984 yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path]) 985 self.config.device.pull_file(affected_file, local_path) 986 affected_file = os.path.join(local_path, os.path.basename(affected_file)) 987 if not os.path.exists(affected_file): 988 LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index])) 989 item.final_risk = OHYaraConfig.PASS.value 990 continue 991 cmd = [self.config.yara_bin, yara_file, affected_file] 992 result = exec_cmd(cmd) 993 LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index])) 994 if "testcase pass" in result: 995 item.final_risk = OHYaraConfig.PASS.value 996 break 997 else: 998 if self._check_if_expire_or_risk(item.month, check_risk=True): 999 item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value, 1000 item.disclosure.get("zh", "")) 1001 item.final_risk = OHYaraConfig.FAIL.value 1002 else: 1003 item.final_risk = OHYaraConfig.BLOCK.value 1004 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value) 1005 # if no risk delete files, if rule has risk keep it 1006 if item.final_risk != OHYaraConfig.FAIL.value: 1007 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 1008 request.get_module_name(), item.yara_rules[index].split('.')[0]) 1009 if os.path.exists(local_path): 1010 LOG.debug( 1011 "Yara rule [{}] has no risk, remove affected files.".format( 1012 item.yara_rules[index])) 1013 shutil.rmtree(local_path) 1014 item.complete = True 1015 self._generate_yara_report(request, vul_items, message_list) 1016 self._generate_xml_report(request, vul_items, message_list) 1017 1018 def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False): 1019 from dateutil.relativedelta import relativedelta 1020 self.security_patch = self.security_patch.replace(' ', '') 1021 self.security_patch = self.security_patch.replace('/', '-') 1022 # get current date 1023 source_date = datetime.strptime(date_str, '%Y-%m') 1024 security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m') 1025 # check if expire 2 months 1026 rd = relativedelta(source_date, security_patch_date) 1027 months = rd.months + (rd.years * 12) 1028 if check_risk: 1029 # vul time before security patch time no risk 1030 LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}" 1031 .format(self.security_patch[:-3], date_str, months)) 1032 if months > 0: 1033 return False 1034 else: 1035 return True 1036 else: 1037 # check if security patch time expire current time 2 months 1038 LOG.debug("Security patch time: {}, current time: {}, delta_months: {}" 1039 .format(self.security_patch[:-3], date_str, months)) 1040 if months > expire_time: 1041 return True 1042 else: 1043 return False 1044 1045 @staticmethod 1046 def _check_if_intersection(source_version, dst_version): 1047 # para dst_less_sor control if dst less than source 1048 def _do_check(soruce, dst, dst_less_sor=True): 1049 if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \ 1050 re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst): 1051 source_vers = soruce.split(".") 1052 dst_vers = dst.split(".") 1053 for index, _ in enumerate(source_vers): 1054 if dst_less_sor: 1055 # check if all source number less than dst number 1056 if int(source_vers[index]) < int(dst_vers[index]): 1057 return False 1058 else: 1059 # check if all source number larger than dst number 1060 if int(source_vers[index]) > int(dst_vers[index]): 1061 return False 1062 return True 1063 return False 1064 1065 source_groups = source_version.split("-") 1066 dst_groups = dst_version.split("-") 1067 if source_version == dst_version: 1068 return True 1069 elif len(source_groups) == 1 and len(dst_groups) == 1: 1070 return source_version == dst_version 1071 elif len(source_groups) == 1 and len(dst_groups) == 2: 1072 return _do_check(source_groups[0], dst_groups[0]) and \ 1073 _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) 1074 elif len(source_groups) == 2 and len(dst_groups) == 1: 1075 return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \ 1076 _do_check(source_groups[1], dst_groups[0]) 1077 elif len(source_groups) == 2 and len(dst_groups) == 2: 1078 return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \ 1079 _do_check(source_groups[1], dst_groups[0]) 1080 return False 1081 1082 def _get_vul_items(self): 1083 vul_items = list() 1084 vul_info = self._do_parse_json(self.config.vul_info_file) 1085 vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, []) 1086 for _, vul in enumerate(vulnerabilities): 1087 affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, []) 1088 item = VulItem() 1089 item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "") 1090 item.affected_versions = affected_versions 1091 item.month = vul.get(OHYaraConfig.MONTH.value, "") 1092 item.severity = vul.get(OHYaraConfig.SEVERITY.value, "") 1093 item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "") 1094 item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "") 1095 item.affected_files = \ 1096 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1097 OHYaraConfig.AFFECTED_FILES.value, []) 1098 item.yara_rules = \ 1099 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1100 OHYaraConfig.YARA_RULES.value, []) 1101 vul_items.append(item) 1102 LOG.debug("Vul size is {}".format(len(vul_items))) 1103 return vul_items 1104 1105 @staticmethod 1106 def _do_parse_json(file_path): 1107 json_content = None 1108 if not os.path.exists(file_path): 1109 raise ParamError("The json file {} does not exist".format( 1110 file_path), error_no="00110") 1111 flags = os.O_RDONLY 1112 modes = stat.S_IWUSR | stat.S_IRUSR 1113 with os.fdopen(os.open(file_path, flags, modes), 1114 "r", encoding="utf-8") as file_content: 1115 json_content = json.load(file_content) 1116 if json_content is None: 1117 raise ParamError("The json file {} parse error".format( 1118 file_path), error_no="00110") 1119 return json_content 1120 1121 def _get_full_name_by_tool_hap(self): 1122 # check if tool hap has installed 1123 result = self.config.device.execute_shell_command( 1124 "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1125 LOG.debug(result) 1126 if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result: 1127 hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value)) 1128 self.config.device.push_file(hap_path, "/data/local/tmp") 1129 result = self.config.device.execute_shell_command( 1130 "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path))) 1131 LOG.debug(result) 1132 self.config.device.execute_shell_command( 1133 "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format( 1134 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1135 self.config.device.execute_shell_command( 1136 "aa start -a {}.MainAbility -b {}".format( 1137 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), 1138 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1139 time.sleep(1) 1140 self.system_version = self.config.device.execute_shell_command( 1141 "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format( 1142 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '') 1143 LOG.debug(self.system_version) 1144 1145 def _generate_yara_report(self, request, vul_items, result_message): 1146 import csv 1147 result_message.clear() 1148 yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv" 1149 .format(request.config.device.device_sn)) 1150 if os.path.exists(yara_report): 1151 data = [] 1152 else: 1153 data = [ 1154 ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch], 1155 ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"] 1156 ] 1157 fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) 1158 for _, item in enumerate(vul_items): 1159 data.append([item.vul_id, item.severity, 1160 item.month, item.final_risk, 1161 item.disclosure.get("zh", ""), item.vul_description.get("zh", "")]) 1162 result = "{}|{}|{}|{}|{}|{}|{}\n".format( 1163 item.vul_id, item.severity, 1164 item.month, item.final_risk, 1165 item.disclosure.get("zh", ""), item.vul_description.get("zh", ""), 1166 item.trace) 1167 result_message.append(result) 1168 with os.fdopen(fd, "a", newline='') as file_handler: 1169 writer = csv.writer(file_handler) 1170 writer.writerows(data) 1171 1172 def _generate_xml_report(self, request, vul_items, message_list): 1173 result_message = "".join(message_list) 1174 listener_copy = request.listeners.copy() 1175 parsers = get_plugin( 1176 Plugin.PARSER, CommonParserType.oh_yara) 1177 if parsers: 1178 parsers = parsers[:1] 1179 for listener in listener_copy: 1180 listener.device_sn = self.config.device.device_sn 1181 parser_instances = [] 1182 for parser in parsers: 1183 parser_instance = parser.__class__() 1184 parser_instance.suites_name = request.get_module_name() 1185 parser_instance.vul_items = vul_items 1186 parser_instance.listeners = listener_copy 1187 parser_instances.append(parser_instance) 1188 handler = ShellHandler(parser_instances) 1189 process_command_ret(result_message, handler) 1190 1191 def __result__(self): 1192 return self.result if os.path.exists(self.result) else "" 1193 1194 @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test) 1195 class ValidatorTestDriver(IDriver): 1196 1197 def __init__(self): 1198 self.error_message = "" 1199 self.xml_path = "" 1200 self.result = "" 1201 self.config = None 1202 self.kits = [] 1203 1204 def __check_environment__(self, device_options): 1205 pass 1206 1207 def __check_config__(self, config): 1208 pass 1209 1210 def __execute__(self, request): 1211 try: 1212 self.result = os.path.join( 1213 request.config.report_path, "result", 1214 ".".join((request.get_module_name(), "xml"))) 1215 self.config = request.config 1216 self.config.device = request.config.environment.devices[0] 1217 config_file = request.root.source.config_file 1218 self._run_validate_test(config_file, request) 1219 except Exception as exception: 1220 self.error_message = exception 1221 if not getattr(exception, "error_no", ""): 1222 setattr(exception, "error_no", "03409") 1223 LOG.exception(self.error_message, exc_info=True, error_no="03409") 1224 raise exception 1225 finally: 1226 self.result = check_result_report(request.config.report_path, 1227 self.result, self.error_message) 1228 1229 def _run_validate_test(self, config_file, request): 1230 is_update = False 1231 try: 1232 if "update" in self.config.testargs.keys(): 1233 if dict(self.config.testargs).get("update")[0] == "true": 1234 is_update = True 1235 json_config = JsonParser(config_file) 1236 self.kits = get_kit_instances(json_config, self.config.resource_path, 1237 self.config.testcases_path) 1238 self._get_driver_config(json_config) 1239 if is_update: 1240 do_module_kit_setup(request, self.kits) 1241 while True: 1242 print("Is test finished?Y/N") 1243 usr_input = input(">>>") 1244 if usr_input == "Y" or usr_input == "y": 1245 LOG.debug("Finish current test") 1246 break 1247 else: 1248 print("continue") 1249 LOG.debug("Your input is:{}, continue".format(usr_input)) 1250 if self.xml_path: 1251 result_dir = os.path.join(request.config.report_path, "result") 1252 if not os.path.exists(result_dir): 1253 os.makedirs(result_dir) 1254 self.config.device.pull_file(self.xml_path, self.result) 1255 finally: 1256 if is_update: 1257 do_module_kit_teardown(request) 1258 1259 def _get_driver_config(self, json_config): 1260 self.xml_path = get_config_value("xml_path", json_config.get_driver(), False) 1261 def __result__(self): 1262 return self.result if os.path.exists(self.result) else "" 1263