1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21import json 22import stat 23import shutil 24import re 25from datetime import datetime 26from enum import Enum 27 28from xdevice import ConfigConst 29from xdevice import ParamError 30from xdevice import IDriver 31from xdevice import platform_logger 32from xdevice import Plugin 33from xdevice import get_plugin 34from xdevice import JsonParser 35from xdevice import ShellHandler 36from xdevice import TestDescription 37from xdevice import get_device_log_file 38from xdevice import check_result_report 39from xdevice import get_kit_instances 40from xdevice import get_config_value 41from xdevice import do_module_kit_setup 42from xdevice import do_module_kit_teardown 43from xdevice import DeviceTestType 44from xdevice import CommonParserType 45from xdevice import FilePermission 46from xdevice import ResourceManager 47from xdevice import get_file_absolute_path 48from xdevice import exec_cmd 49 50from ohos.executor.listener import CollectingPassListener 51from ohos.constants import CKit 52from ohos.environment.dmlib import process_command_ret 53 54__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver", 55 "OHYaraTestDriver", "oh_jsunit_para_parse"] 56 57TIME_OUT = 300 * 1000 58 59LOG = platform_logger("OpenHarmony") 60 61 62def oh_jsunit_para_parse(runner, junit_paras): 63 junit_paras = dict(junit_paras) 64 test_type_list = ["function", "performance", "reliability", "security"] 65 size_list = ["small", "medium", "large"] 66 level_list = ["0", "1", "2", "3"] 67 for para_name in junit_paras.keys(): 68 para_name = para_name.strip() 69 para_values = junit_paras.get(para_name, []) 70 if para_name == "class": 71 runner.add_arg(para_name, ",".join(para_values)) 72 elif para_name == "notClass": 73 runner.add_arg(para_name, ",".join(para_values)) 74 elif para_name == "testType": 75 if para_values[0] not in test_type_list: 76 continue 77 # function/performance/reliability/security 78 runner.add_arg(para_name, para_values[0]) 79 elif para_name == "size": 80 if para_values[0] not in size_list: 81 continue 82 # size small/medium/large 83 runner.add_arg(para_name, para_values[0]) 84 elif para_name == "level": 85 if para_values[0] not in level_list: 86 continue 87 # 0/1/2/3/4 88 runner.add_arg(para_name, para_values[0]) 89 elif para_name == "stress": 90 runner.add_arg(para_name, para_values[0]) 91 92 93@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test) 94class OHKernelTestDriver(IDriver): 95 """ 96 OpenHarmonyKernelTest 97 """ 98 def __init__(self): 99 self.timeout = 30 * 1000 100 self.result = "" 101 self.error_message = "" 102 self.kits = [] 103 self.config = None 104 self.runner = None 105 # log 106 self.device_log = None 107 self.hilog = None 108 self.log_proc = None 109 self.hilog_proc = None 110 111 def __check_environment__(self, device_options): 112 pass 113 114 def __check_config__(self, config): 115 pass 116 117 def __execute__(self, request): 118 try: 119 LOG.debug("Start to Execute OpenHarmony Kernel Test") 120 121 self.config = request.config 122 self.config.device = request.config.environment.devices[0] 123 124 config_file = request.root.source.config_file 125 126 self.result = "%s.xml" % \ 127 os.path.join(request.config.report_path, 128 "result", request.get_module_name()) 129 self.device_log = get_device_log_file( 130 request.config.report_path, 131 request.config.device.__get_serial__(), 132 "device_log") 133 134 self.hilog = get_device_log_file( 135 request.config.report_path, 136 request.config.device.__get_serial__(), 137 "device_hilog") 138 139 device_log_open = os.open(self.device_log, os.O_WRONLY | os.O_CREAT | 140 os.O_APPEND, FilePermission.mode_755) 141 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 142 FilePermission.mode_755) 143 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 144 with os.fdopen(device_log_open, "a") as log_file_pipe, \ 145 os.fdopen(hilog_open, "a") as hilog_file_pipe: 146 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 147 start_catch_device_log(log_file_pipe, hilog_file_pipe) 148 self._run_oh_kernel(config_file, request.listeners, request) 149 log_file_pipe.flush() 150 hilog_file_pipe.flush() 151 except Exception as exception: 152 self.error_message = exception 153 if not getattr(exception, "error_no", ""): 154 setattr(exception, "error_no", "03409") 155 LOG.exception(self.error_message, exc_info=False, error_no="03409") 156 raise exception 157 finally: 158 do_module_kit_teardown(request) 159 self.config.device.device_log_collector.remove_log_address(self.device_log, self.hilog) 160 self.config.device.device_log_collector.stop_catch_device_log(self.log_proc) 161 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 162 self.result = check_result_report( 163 request.config.report_path, self.result, self.error_message) 164 165 def _run_oh_kernel(self, config_file, listeners=None, request=None): 166 try: 167 json_config = JsonParser(config_file) 168 kits = get_kit_instances(json_config, self.config.resource_path, 169 self.config.testcases_path) 170 self._get_driver_config(json_config) 171 do_module_kit_setup(request, kits) 172 self.runner = OHKernelTestRunner(self.config) 173 self.runner.suite_name = request.get_module_name() 174 self.runner.run(listeners) 175 finally: 176 do_module_kit_teardown(request) 177 178 def _get_driver_config(self, json_config): 179 target_test_path = get_config_value('native-test-device-path', 180 json_config.get_driver(), False) 181 test_suite_name = get_config_value('test-suite-name', 182 json_config.get_driver(), False) 183 test_suites_list = get_config_value('test-suites-list', 184 json_config.get_driver(), False) 185 timeout_limit = get_config_value('timeout-limit', 186 json_config.get_driver(), False) 187 conf_file = get_config_value('conf-file', 188 json_config.get_driver(), False) 189 self.config.arg_list = {} 190 if target_test_path: 191 self.config.target_test_path = target_test_path 192 if test_suite_name: 193 self.config.arg_list["test-suite-name"] = test_suite_name 194 if test_suites_list: 195 self.config.arg_list["test-suites-list"] = test_suites_list 196 if timeout_limit: 197 self.config.arg_list["timeout-limit"] = timeout_limit 198 if conf_file: 199 self.config.arg_list["conf-file"] = conf_file 200 timeout_config = get_config_value('shell-timeout', 201 json_config.get_driver(), False) 202 if timeout_config: 203 self.config.timeout = int(timeout_config) 204 else: 205 self.config.timeout = TIME_OUT 206 207 def __result__(self): 208 return self.result if os.path.exists(self.result) else "" 209 210 211class OHKernelTestRunner: 212 def __init__(self, config): 213 self.suite_name = None 214 self.config = config 215 self.arg_list = config.arg_list 216 217 def run(self, listeners): 218 handler = self._get_shell_handler(listeners) 219 # hdc shell cd /data/local/tmp/OH_kernel_test; 220 # sh runtest test -t OpenHarmony_RK3568_config 221 # -n OpenHarmony_RK3568_skiptest -l 60 222 command = "cd %s; chmod +x *; sh runtest test %s" % ( 223 self.config.target_test_path, self.get_args_command()) 224 self.config.device.execute_shell_command( 225 command, timeout=self.config.timeout, receiver=handler, retry=0) 226 227 def _get_shell_handler(self, listeners): 228 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test) 229 if parsers: 230 parsers = parsers[:1] 231 parser_instances = [] 232 for parser in parsers: 233 parser_instance = parser.__class__() 234 parser_instance.suites_name = self.suite_name 235 parser_instance.listeners = listeners 236 parser_instances.append(parser_instance) 237 handler = ShellHandler(parser_instances) 238 return handler 239 240 def get_args_command(self): 241 args_commands = "" 242 for key, value in self.arg_list.items(): 243 if key == "test-suite-name" or key == "test-suites-list": 244 args_commands = "%s -t %s" % (args_commands, value) 245 elif key == "conf-file": 246 args_commands = "%s -n %s" % (args_commands, value) 247 elif key == "timeout-limit": 248 args_commands = "%s -l %s" % (args_commands, value) 249 return args_commands 250 251 252@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 253class OHJSUnitTestDriver(IDriver): 254 """ 255 OHJSUnitTestDriver is a Test that runs a native test package on 256 given device. 257 """ 258 259 def __init__(self): 260 self.timeout = 80 * 1000 261 self.start_time = None 262 self.result = "" 263 self.error_message = "" 264 self.kits = [] 265 self.config = None 266 self.runner = None 267 self.rerun = True 268 self.rerun_all = True 269 # log 270 self.device_log = None 271 self.hilog = None 272 self.log_proc = None 273 self.hilog_proc = None 274 275 def __check_environment__(self, device_options): 276 pass 277 278 def __check_config__(self, config): 279 pass 280 281 def __execute__(self, request): 282 try: 283 LOG.debug("Start execute OpenHarmony JSUnitTest") 284 self.result = os.path.join( 285 request.config.report_path, "result", 286 '.'.join((request.get_module_name(), "xml"))) 287 self.config = request.config 288 self.config.device = request.config.environment.devices[0] 289 290 config_file = request.root.source.config_file 291 suite_file = request.root.source.source_file 292 293 if not suite_file: 294 raise ParamError( 295 "test source '%s' not exists" % 296 request.root.source.source_string, error_no="00110") 297 LOG.debug("Test case file path: %s" % suite_file) 298 self.config.device.set_device_report_path(request.config.report_path) 299 self.hilog = get_device_log_file(request.config.report_path, 300 request.config.device.__get_serial__() + "_" + request. 301 get_module_name(), 302 "device_hilog") 303 304 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 305 0o755) 306 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 307 self.config.device.execute_shell_command(command="hilog -r") 308 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 309 if hasattr(self.config, ConfigConst.device_log) \ 310 and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 311 and hasattr(self.config.device, "clear_crash_log"): 312 self.config.device.device_log_collector.clear_crash_log() 313 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 314 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 315 self._run_oh_jsunit(config_file, request) 316 except Exception as exception: 317 self.error_message = exception 318 if not getattr(exception, "error_no", ""): 319 setattr(exception, "error_no", "03409") 320 LOG.exception(self.error_message, exc_info=True, error_no="03409") 321 raise exception 322 finally: 323 try: 324 self._handle_logs(request) 325 finally: 326 self.result = check_result_report( 327 request.config.report_path, self.result, self.error_message) 328 329 def __dry_run_execute__(self, request): 330 LOG.debug("Start dry run xdevice JSUnit Test") 331 self.config = request.config 332 self.config.device = request.config.environment.devices[0] 333 config_file = request.root.source.config_file 334 suite_file = request.root.source.source_file 335 336 if not suite_file: 337 raise ParamError( 338 "test source '%s' not exists" % 339 request.root.source.source_string, error_no="00110") 340 LOG.debug("Test case file path: %s" % suite_file) 341 self._dry_run_oh_jsunit(config_file, request) 342 343 def _dry_run_oh_jsunit(self, config_file, request): 344 try: 345 if not os.path.exists(config_file): 346 LOG.error("Error: Test cases don't exist %s." % config_file) 347 raise ParamError( 348 "Error: Test cases don't exist %s." % config_file, 349 error_no="00102") 350 json_config = JsonParser(config_file) 351 self.kits = get_kit_instances(json_config, 352 self.config.resource_path, 353 self.config.testcases_path) 354 355 self._get_driver_config(json_config) 356 self.config.device.connector_command("target mount") 357 do_module_kit_setup(request, self.kits) 358 self.runner = OHJSUnitTestRunner(self.config) 359 self.runner.suites_name = request.get_module_name() 360 # execute test case 361 self._get_runner_config(json_config) 362 oh_jsunit_para_parse(self.runner, self.config.testargs) 363 364 test_to_run = self._collect_test_to_run() 365 LOG.info("Collected suite count is: {}, test count is: {}". 366 format(len(self.runner.expect_tests_dict.keys()), 367 len(test_to_run) if test_to_run else 0)) 368 finally: 369 do_module_kit_teardown(request) 370 371 def _run_oh_jsunit(self, config_file, request): 372 try: 373 if not os.path.exists(config_file): 374 LOG.error("Error: Test cases don't exist %s." % config_file) 375 raise ParamError( 376 "Error: Test cases don't exist %s." % config_file, 377 error_no="00102") 378 json_config = JsonParser(config_file) 379 self.kits = get_kit_instances(json_config, 380 self.config.resource_path, 381 self.config.testcases_path) 382 383 self._get_driver_config(json_config) 384 self.config.device.connector_command("target mount") 385 self._start_smart_perf() 386 do_module_kit_setup(request, self.kits) 387 self.runner = OHJSUnitTestRunner(self.config) 388 self.runner.suites_name = request.get_module_name() 389 self._get_runner_config(json_config) 390 if hasattr(self.config, "history_report_path") and \ 391 self.config.testargs.get("test"): 392 self._do_test_retry(request.listeners, self.config.testargs) 393 else: 394 if self.rerun: 395 self.runner.retry_times = self.runner.MAX_RETRY_TIMES 396 # execute test case 397 self._do_tf_suite() 398 self._make_exclude_list_file(request) 399 oh_jsunit_para_parse(self.runner, self.config.testargs) 400 self._do_test_run(listener=request.listeners) 401 402 finally: 403 do_module_kit_teardown(request) 404 405 def _get_driver_config(self, json_config): 406 package = get_config_value('package-name', 407 json_config.get_driver(), False) 408 module = get_config_value('module-name', 409 json_config.get_driver(), False) 410 bundle = get_config_value('bundle-name', 411 json_config. get_driver(), False) 412 is_rerun = get_config_value('rerun', json_config.get_driver(), False) 413 414 self.config.package_name = package 415 self.config.module_name = module 416 self.config.bundle_name = bundle 417 self.rerun = True if is_rerun == 'true' else False 418 419 if not package and not module: 420 raise ParamError("Neither package nor module is found" 421 " in config file.", error_no="03201") 422 timeout_config = get_config_value("shell-timeout", 423 json_config.get_driver(), False) 424 if timeout_config: 425 self.config.timeout = int(timeout_config) 426 else: 427 self.config.timeout = TIME_OUT 428 429 def _get_runner_config(self, json_config): 430 test_timeout = get_config_value('test-timeout', 431 json_config.get_driver(), False) 432 if test_timeout: 433 self.runner.add_arg("wait_time", int(test_timeout)) 434 435 testcase_timeout = get_config_value('testcase-timeout', 436 json_config.get_driver(), False) 437 if testcase_timeout: 438 self.runner.add_arg("timeout", int(testcase_timeout)) 439 self.runner.compile_mode = get_config_value( 440 'compile-mode', json_config.get_driver(), False) 441 442 def _do_test_run(self, listener): 443 test_to_run = self._collect_test_to_run() 444 LOG.info("Collected suite count is: {}, test count is: {}". 445 format(len(self.runner.expect_tests_dict.keys()), 446 len(test_to_run) if test_to_run else 0)) 447 if not test_to_run or not self.rerun: 448 self.runner.run(listener) 449 self.runner.notify_finished() 450 else: 451 self._run_with_rerun(listener, test_to_run) 452 453 def _collect_test_to_run(self): 454 run_results = self.runner.dry_run() 455 return run_results 456 457 def _run_tests(self, listener): 458 test_tracker = CollectingPassListener() 459 listener_copy = listener.copy() 460 listener_copy.append(test_tracker) 461 self.runner.run(listener_copy) 462 test_run = test_tracker.get_current_run_results() 463 return test_run 464 465 def _run_with_rerun(self, listener, expected_tests): 466 LOG.debug("Ready to run with rerun, expect run: %s" 467 % len(expected_tests)) 468 test_run = self._run_tests(listener) 469 self.runner.retry_times -= 1 470 LOG.debug("Run with rerun, has run: %s" % len(test_run) 471 if test_run else 0) 472 if len(test_run) < len(expected_tests): 473 expected_tests = TestDescription.remove_test(expected_tests, 474 test_run) 475 if not expected_tests: 476 LOG.debug("No tests to re-run twice,please check") 477 self.runner.notify_finished() 478 else: 479 self._rerun_twice(expected_tests, listener) 480 else: 481 LOG.debug("Rerun once success") 482 self.runner.notify_finished() 483 484 def _rerun_twice(self, expected_tests, listener): 485 tests = [] 486 for test in expected_tests: 487 tests.append("%s#%s" % (test.class_name, test.test_name)) 488 self.runner.add_arg("class", ",".join(tests)) 489 LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests)) 490 test_run = self._run_tests(listener) 491 self.runner.retry_times -= 1 492 LOG.debug("Rerun twice, has run: %s" % len(test_run)) 493 if len(test_run) < len(expected_tests): 494 expected_tests = TestDescription.remove_test(expected_tests, 495 test_run) 496 if not expected_tests: 497 LOG.debug("No tests to re-run third,please check") 498 self.runner.notify_finished() 499 else: 500 self._rerun_third(expected_tests, listener) 501 else: 502 LOG.debug("Rerun twice success") 503 self.runner.notify_finished() 504 505 def _rerun_third(self, expected_tests, listener): 506 tests = [] 507 for test in expected_tests: 508 tests.append("%s#%s" % (test.class_name, test.test_name)) 509 self.runner.add_arg("class", ",".join(tests)) 510 LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests)) 511 self._run_tests(listener) 512 LOG.debug("Rerun third success") 513 self.runner.notify_finished() 514 515 def _make_exclude_list_file(self, request): 516 if "all-test-file-exclude-filter" in self.config.testargs: 517 json_file_list = self.config.testargs.get( 518 "all-test-file-exclude-filter") 519 self.config.testargs.pop("all-test-file-exclude-filter") 520 if not json_file_list: 521 LOG.warning("all-test-file-exclude-filter value is empty!") 522 else: 523 if not os.path.isfile(json_file_list[0]): 524 LOG.warning( 525 "[{}] is not a valid file".format(json_file_list[0])) 526 return 527 file_open = os.open(json_file_list[0], os.O_RDONLY, 528 stat.S_IWUSR | stat.S_IRUSR) 529 with os.fdopen(file_open, "r") as file_handler: 530 json_data = json.load(file_handler) 531 exclude_list = json_data.get( 532 DeviceTestType.oh_jsunit_test, []) 533 filter_list = [] 534 for exclude in exclude_list: 535 if request.get_module_name() not in exclude: 536 continue 537 filter_list.extend(exclude.get(request.get_module_name())) 538 if not isinstance(self.config.testargs, dict): 539 return 540 if 'notClass' in self.config.testargs.keys(): 541 filter_list.extend(self.config.testargs.get('notClass', [])) 542 self.config.testargs.update({"notClass": filter_list}) 543 544 def _do_test_retry(self, listener, testargs): 545 tests_dict = dict() 546 case_list = list() 547 for test in testargs.get("test"): 548 test_item = test.split("#") 549 if len(test_item) != 2: 550 continue 551 case_list.append(test) 552 if test_item[0] not in tests_dict: 553 tests_dict.update({test_item[0] : []}) 554 tests_dict.get(test_item[0]).append( 555 TestDescription(test_item[0], test_item[1])) 556 self.runner.add_arg("class", ",".join(case_list)) 557 self.runner.expect_tests_dict = tests_dict 558 self.config.testargs.pop("test") 559 self.runner.run(listener) 560 self.runner.notify_finished() 561 562 def _do_tf_suite(self): 563 if hasattr(self.config, "tf_suite") and \ 564 self.config.tf_suite.get("cases", []): 565 case_list = self.config["tf_suite"]["cases"] 566 self.config.testargs.update({"class": case_list}) 567 568 def _start_smart_perf(self): 569 if not hasattr(self.config, ConfigConst.kits_in_module): 570 return 571 if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module): 572 return 573 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 574 sp_kits.target_name = self.config.bundle_name 575 param_config = self.config.get(ConfigConst.kits_params).get( 576 CKit.smartperf, "") 577 sp_kits.__check_config__(param_config) 578 self.kits.insert(0, sp_kits) 579 580 def _handle_logs(self, request): 581 serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns()) 582 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 583 if hasattr(self.config, ConfigConst.device_log) and \ 584 self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 585 and hasattr(self.config.device, "start_get_crash_log"): 586 self.config.device.device_log_collector.\ 587 start_get_crash_log(log_tar_file_name, module_name=request.get_module_name()) 588 self.config.device.device_log_collector.\ 589 remove_log_address(self.device_log, self.hilog) 590 self.config.device.device_log_collector.\ 591 stop_catch_device_log(self.log_proc) 592 self.config.device.device_log_collector.\ 593 stop_catch_device_log(self.hilog_proc) 594 595 def __result__(self): 596 return self.result if os.path.exists(self.result) else "" 597 598 599class OHJSUnitTestRunner: 600 MAX_RETRY_TIMES = 3 601 602 def __init__(self, config): 603 self.arg_list = {} 604 self.suites_name = None 605 self.config = config 606 self.rerun_attemp = 3 607 self.suite_recorder = {} 608 self.finished = False 609 self.expect_tests_dict = dict() 610 self.finished_observer = None 611 self.retry_times = 1 612 self.compile_mode = "" 613 614 def dry_run(self): 615 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 616 if parsers: 617 parsers = parsers[:1] 618 parser_instances = [] 619 for parser in parsers: 620 parser_instance = parser.__class__() 621 parser_instances.append(parser_instance) 622 handler = ShellHandler(parser_instances) 623 command = self._get_dry_run_command() 624 self.config.device.execute_shell_command( 625 command, timeout=self.config.timeout, receiver=handler, retry=0) 626 self.expect_tests_dict = parser_instances[0].tests_dict 627 return parser_instances[0].tests 628 629 def run(self, listener): 630 handler = self._get_shell_handler(listener) 631 command = self._get_run_command() 632 self.config.device.execute_shell_command( 633 command, timeout=self.config.timeout, receiver=handler, retry=0) 634 635 def notify_finished(self): 636 if self.finished_observer: 637 self.finished_observer.notify_task_finished() 638 self.retry_times -= 1 639 640 def _get_shell_handler(self, listener): 641 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 642 if parsers: 643 parsers = parsers[:1] 644 parser_instances = [] 645 for parser in parsers: 646 parser_instance = parser.__class__() 647 parser_instance.suites_name = self.suites_name 648 parser_instance.listeners = listener 649 parser_instance.runner = self 650 parser_instances.append(parser_instance) 651 self.finished_observer = parser_instance 652 handler = ShellHandler(parser_instances) 653 return handler 654 655 def add_arg(self, name, value): 656 if not name or not value: 657 return 658 self.arg_list[name] = value 659 660 def remove_arg(self, name): 661 if not name: 662 return 663 if name in self.arg_list: 664 del self.arg_list[name] 665 666 def get_args_command(self): 667 args_commands = "" 668 for key, value in self.arg_list.items(): 669 if "wait_time" == key: 670 args_commands = "%s -w %s " % (args_commands, value) 671 else: 672 args_commands = "%s -s %s %s " % (args_commands, key, value) 673 return args_commands 674 675 def _get_run_command(self): 676 command = "" 677 if self.config.package_name: 678 # aa test -p ${packageName} -b ${bundleName}-s 679 # unittest OpenHarmonyTestRunner 680 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 681 " {}".format(self.config.package_name, 682 self.config.bundle_name, 683 self.get_args_command()) 684 elif self.config.module_name: 685 # aa test -m ${moduleName} -b ${bundleName} 686 # -s unittest OpenHarmonyTestRunner 687 command = "aa test -m {} -b {} -s unittest {} {}".format( 688 self.config.module_name, self.config.bundle_name, 689 self.get_oh_test_runner_path(), self.get_args_command()) 690 return command 691 692 def _get_dry_run_command(self): 693 command = "" 694 if self.config.package_name: 695 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 696 " {} -s dryRun true".format(self.config.package_name, 697 self.config.bundle_name, 698 self.get_args_command()) 699 elif self.config.module_name: 700 command = "aa test -m {} -b {} -s unittest {}" \ 701 " {} -s dryRun true".format(self.config.module_name, 702 self.config.bundle_name, 703 self.get_oh_test_runner_path(), 704 self.get_args_command()) 705 706 return command 707 708 def get_oh_test_runner_path(self): 709 if self.compile_mode == "esmodule": 710 return "/ets/testrunner/OpenHarmonyTestRunner" 711 else: 712 return "OpenHarmonyTestRunner" 713 714 715@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 716class OHRustTestDriver(IDriver): 717 def __init__(self): 718 self.result = "" 719 self.error_message = "" 720 self.config = None 721 722 def __check_environment__(self, device_options): 723 pass 724 725 def __check_config__(self, config): 726 pass 727 728 def __execute__(self, request): 729 try: 730 LOG.debug("Start to execute open harmony rust test") 731 self.config = request.config 732 self.config.device = request.config.environment.devices[0] 733 self.config.target_test_path = "/system/bin" 734 735 suite_file = request.root.source.source_file 736 LOG.debug("Testsuite filepath:{}".format(suite_file)) 737 738 if not suite_file: 739 LOG.error("test source '{}' not exists".format( 740 request.root.source.source_string)) 741 return 742 743 self.result = "{}.xml".format( 744 os.path.join(request.config.report_path, 745 "result", request.get_module_name())) 746 self.config.device.set_device_report_path(request.config.report_path) 747 self.config.device.device_log_collector.start_hilog_task() 748 self._init_oh_rust() 749 self._run_oh_rust(suite_file, request) 750 except Exception as exception: 751 self.error_message = exception 752 if not getattr(exception, "error_no", ""): 753 setattr(exception, "error_no", "03409") 754 LOG.exception(self.error_message, exc_info=False, error_no="03409") 755 finally: 756 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 757 time.time_ns()) 758 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 759 self.config.device.device_log_collector.stop_hilog_task( 760 log_tar_file_name, module_name=request.get_module_name()) 761 self.result = check_result_report( 762 request.config.report_path, self.result, self.error_message) 763 764 def _init_oh_rust(self): 765 self.config.device.connector_command("target mount") 766 self.config.device.execute_shell_command( 767 "mount -o rw,remount,rw /") 768 769 def _run_oh_rust(self, suite_file, request=None): 770 # push testsuite file 771 self.config.device.push_file(suite_file, self.config.target_test_path) 772 # push resource file 773 resource_manager = ResourceManager() 774 resource_data_dict, resource_dir = \ 775 resource_manager.get_resource_data_dic(suite_file) 776 resource_manager.process_preparer_data(resource_data_dict, 777 resource_dir, 778 self.config.device) 779 for listener in request.listeners: 780 listener.device_sn = self.config.device.device_sn 781 782 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 783 if parsers: 784 parsers = parsers[:1] 785 parser_instances = [] 786 for parser in parsers: 787 parser_instance = parser.__class__() 788 parser_instance.suite_name = request.get_module_name() 789 parser_instance.listeners = request.listeners 790 parser_instances.append(parser_instance) 791 handler = ShellHandler(parser_instances) 792 793 command = "cd {}; chmod +x *; ./{}".format( 794 self.config.target_test_path, os.path.basename(suite_file)) 795 self.config.device.execute_shell_command( 796 command, timeout=TIME_OUT, receiver=handler, retry=0) 797 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 798 self.config.device) 799 800 def __result__(self): 801 return self.result if os.path.exists(self.result) else "" 802 803 804class OHYaraConfig(Enum): 805 HAP_FILE = "hap-file" 806 BUNDLE_NAME = "bundle-name" 807 CLEANUP_APPS = "cleanup-apps" 808 809 OS_FULLNAME_LIST = "osFullNameList" 810 VULNERABILITIES = "vulnerabilities" 811 VUL_ID = "vul_id" 812 OPENHARMONY_SA = "openharmony-sa" 813 AFFECTED_VERSION = "affected_versions" 814 MONTH = "month" 815 SEVERITY = "severity" 816 VUL_DESCRIPTION = "vul_description" 817 DISCLOSURE = "disclosure" 818 AFFECTED_FILES = "affected_files" 819 YARA_RULES = "yara_rules" 820 821 PASS = "pass" 822 FAIL = "fail" 823 BLOCK = "block" 824 825 ERROR_MSG_001 = "The patch label is longer than two months (60 days), " \ 826 "which violates the OHCA agreement [https://compatibility.openharmony.cn/]." 827 ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed." 828 ERROR_MSG_003 = "Modify the code according to the patch requirements: " 829 830 831class VulItem: 832 vul_id = "" 833 month = "" 834 severity = "" 835 vul_description = dict() 836 disclosure = dict() 837 affected_files = "" 838 affected_versions = "" 839 yara_rules = "" 840 trace = "" 841 final_risk = OHYaraConfig.PASS.value 842 complete = False 843 844 845@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test) 846class OHYaraTestDriver(IDriver): 847 def __init__(self): 848 self.result = "" 849 self.error_message = "" 850 self.config = None 851 self.tool_hap_info = dict() 852 self.security_patch = None 853 self.system_version = None 854 855 def __check_environment__(self, device_options): 856 pass 857 858 def __check_config__(self, config): 859 pass 860 861 def __execute__(self, request): 862 try: 863 LOG.debug("Start to execute open harmony yara test") 864 self.result = os.path.join( 865 request.config.report_path, "result", 866 '.'.join((request.get_module_name(), "xml"))) 867 self.config = request.config 868 self.config.device = request.config.environment.devices[0] 869 870 config_file = request.root.source.config_file 871 suite_file = request.root.source.source_file 872 873 if not suite_file: 874 raise ParamError( 875 "test source '%s' not exists" % 876 request.root.source.source_string, error_no="00110") 877 LOG.debug("Test case file path: %s" % suite_file) 878 self.config.device.set_device_report_path(request.config.report_path) 879 self._run_oh_yara(config_file, request) 880 881 except Exception as exception: 882 self.error_message = exception 883 if not getattr(exception, "error_no", ""): 884 setattr(exception, "error_no", "03409") 885 LOG.exception(self.error_message, exc_info=False, error_no="03409") 886 finally: 887 if self.tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value): 888 cmd = ["uninstall", self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)] 889 result = self.config.device.connector_command(cmd) 890 LOG.debug("Try uninstall tools hap, bundle name is {}, result is {}".format( 891 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), result)) 892 893 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 894 time.time_ns()) 895 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 896 self.config.device.device_log_collector.stop_hilog_task( 897 log_tar_file_name, module_name=request.get_module_name()) 898 899 self.result = check_result_report( 900 request.config.report_path, self.result, self.error_message) 901 902 def _get_driver_config(self, json_config): 903 yara_bin = get_config_value('yara-bin', 904 json_config.get_driver(), False) 905 version_mapping_file = get_config_value('version-mapping-file', 906 json_config.get_driver(), False) 907 vul_info_file = get_config_value('vul-info-file', 908 json_config.get_driver(), False) 909 # get absolute file path 910 self.config.yara_bin = get_file_absolute_path(yara_bin) 911 self.config.version_mapping_file = get_file_absolute_path(version_mapping_file) 912 self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path]) 913 914 # get tool hap info 915 # default value 916 self.tool_hap_info = { 917 OHYaraConfig.HAP_FILE.value: "sststool.hap", 918 OHYaraConfig.BUNDLE_NAME.value: "com.example.sststool", 919 OHYaraConfig.CLEANUP_APPS.value: "true" 920 } 921 tool_hap_info = get_config_value('tools-hap-info', 922 json_config.get_driver(), False) 923 if tool_hap_info: 924 self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \ 925 tool_hap_info.get(OHYaraConfig.HAP_FILE.value, "sststool.hap") 926 self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \ 927 tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, "com.example.sststool") 928 self.tool_hap_info[OHYaraConfig.CLEANUP_APPS.value] = \ 929 tool_hap_info.get(OHYaraConfig.CLEANUP_APPS.value, "true") 930 931 def _run_oh_yara(self, config_file, request=None): 932 message_list = list() 933 934 json_config = JsonParser(config_file) 935 self._get_driver_config(json_config) 936 937 # get device info 938 self.security_patch = self.config.device.execute_shell_command( 939 "param get const.ohos.version.security_patch").strip() 940 self.system_version = self.config.device.execute_shell_command( 941 "param get const.ohos.fullname").strip() 942 943 if "fail" in self.system_version: 944 self._get_full_name_by_tool_hap() 945 946 vul_items = self._get_vul_items() 947 # if security patch expire, case fail 948 current_date_str = datetime.now().strftime('%Y-%m') 949 if self._check_if_expire_or_risk(current_date_str): 950 LOG.info("Security patch has expired. Set all case fail.") 951 for _, item in enumerate(vul_items): 952 item.complete = True 953 item.final_risk = OHYaraConfig.FAIL.value 954 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value) 955 else: 956 LOG.info("Security patch is shorter than two months. Start yara test.") 957 # parse version mapping file 958 mapping_info = self._do_parse_json(self.config.version_mapping_file) 959 os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None) 960 # check if system version in version mapping list 961 vul_version = os_full_name_list.get(self.system_version, None) 962 # not in the maintenance scope, skip all case 963 if vul_version is None: 964 LOG.debug("The system version is not in the maintenance scope, skip it. " 965 "system versions is {}".format(self.system_version)) 966 else: 967 for _, item in enumerate(vul_items): 968 LOG.debug("Affected files: {}".format(item.affected_files)) 969 for index, affected_file in enumerate(item.affected_files): 970 has_inter = False 971 for i, _ in enumerate(item.affected_versions): 972 if self._check_if_intersection(vul_version, item.affected_versions[i]): 973 has_inter = True 974 break 975 if not has_inter: 976 LOG.debug("Yara rule [{}] affected versions has no intersection " 977 "in mapping version, skip it. Mapping version is {}, " 978 "affected versions is {}".format(item.vul_id, vul_version, 979 item.affected_versions)) 980 continue 981 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 982 request.get_module_name(), item.yara_rules[index].split('.')[0]) 983 if not os.path.exists(local_path): 984 os.makedirs(local_path) 985 yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path]) 986 self.config.device.pull_file(affected_file, local_path) 987 affected_file = os.path.join(local_path, os.path.basename(affected_file)) 988 if not os.path.exists(affected_file): 989 LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index])) 990 item.final_risk = OHYaraConfig.PASS.value 991 continue 992 cmd = [self.config.yara_bin, yara_file, affected_file] 993 result = exec_cmd(cmd) 994 LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index])) 995 if "testcase pass" in result: 996 item.final_risk = OHYaraConfig.PASS.value 997 break 998 else: 999 if self._check_if_expire_or_risk(item.month, check_risk=True): 1000 item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value, 1001 item.disclosure.get("zh", "")) 1002 item.final_risk = OHYaraConfig.FAIL.value 1003 else: 1004 item.final_risk = OHYaraConfig.BLOCK.value 1005 item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value) 1006 # if no risk delete files, if rule has risk keep it 1007 if item.final_risk != OHYaraConfig.FAIL.value: 1008 local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value, 1009 request.get_module_name(), item.yara_rules[index].split('.')[0]) 1010 if os.path.exists(local_path): 1011 LOG.debug( 1012 "Yara rule [{}] has no risk, remove affected files.".format( 1013 item.yara_rules[index])) 1014 shutil.rmtree(local_path) 1015 item.complete = True 1016 self._generate_yara_report(request, vul_items, message_list) 1017 self._generate_xml_report(request, vul_items, message_list) 1018 1019 def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False): 1020 from dateutil.relativedelta import relativedelta 1021 self.security_patch = self.security_patch.replace(' ', '') 1022 self.security_patch = self.security_patch.replace('/', '-') 1023 # get current date 1024 source_date = datetime.strptime(date_str, '%Y-%m') 1025 security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m') 1026 # check if expire 2 months 1027 rd = relativedelta(source_date, security_patch_date) 1028 months = rd.months + (rd.years * 12) 1029 if check_risk: 1030 # vul time before security patch time no risk 1031 LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}" 1032 .format(self.security_patch[:-3], date_str, months)) 1033 if months > 0: 1034 return False 1035 else: 1036 return True 1037 else: 1038 # check if security patch time expire current time 2 months 1039 LOG.debug("Security patch time: {}, current time: {}, delta_months: {}" 1040 .format(self.security_patch[:-3], date_str, months)) 1041 if months > expire_time: 1042 return True 1043 else: 1044 return False 1045 1046 @staticmethod 1047 def _check_if_intersection(source_version, dst_version): 1048 # para dst_less_sor control if dst less than source 1049 def _do_check(soruce, dst, dst_less_sor=True): 1050 if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \ 1051 re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst): 1052 source_vers = soruce.split(".") 1053 dst_vers = dst.split(".") 1054 for index, _ in enumerate(source_vers): 1055 if dst_less_sor: 1056 # check if all source number less than dst number 1057 if int(source_vers[index]) < int(dst_vers[index]): 1058 return False 1059 else: 1060 # check if all source number larger than dst number 1061 if int(source_vers[index]) > int(dst_vers[index]): 1062 return False 1063 return True 1064 return False 1065 1066 source_groups = source_version.split("-") 1067 dst_groups = dst_version.split("-") 1068 if source_version == dst_version: 1069 return True 1070 elif len(source_groups) == 1 and len(dst_groups) == 1: 1071 return source_version == dst_version 1072 elif len(source_groups) == 1 and len(dst_groups) == 2: 1073 return _do_check(source_groups[0], dst_groups[0]) and \ 1074 _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) 1075 elif len(source_groups) == 2 and len(dst_groups) == 1: 1076 return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \ 1077 _do_check(source_groups[1], dst_groups[0]) 1078 elif len(source_groups) == 2 and len(dst_groups) == 2: 1079 return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \ 1080 _do_check(source_groups[1], dst_groups[0]) 1081 return False 1082 1083 def _get_vul_items(self): 1084 vul_items = list() 1085 vul_info = self._do_parse_json(self.config.vul_info_file) 1086 vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, []) 1087 for _, vul in enumerate(vulnerabilities): 1088 affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, []) 1089 item = VulItem() 1090 item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "") 1091 item.affected_versions = affected_versions 1092 item.month = vul.get(OHYaraConfig.MONTH.value, "") 1093 item.severity = vul.get(OHYaraConfig.SEVERITY.value, "") 1094 item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "") 1095 item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "") 1096 item.affected_files = \ 1097 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1098 OHYaraConfig.AFFECTED_FILES.value, []) 1099 item.yara_rules = \ 1100 vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get( 1101 OHYaraConfig.YARA_RULES.value, []) 1102 vul_items.append(item) 1103 LOG.debug("Vul size is {}".format(len(vul_items))) 1104 return vul_items 1105 1106 @staticmethod 1107 def _do_parse_json(file_path): 1108 json_content = None 1109 if not os.path.exists(file_path): 1110 raise ParamError("The json file {} does not exist".format( 1111 file_path), error_no="00110") 1112 flags = os.O_RDONLY 1113 modes = stat.S_IWUSR | stat.S_IRUSR 1114 with os.fdopen(os.open(file_path, flags, modes), 1115 "r", encoding="utf-8") as file_content: 1116 json_content = json.load(file_content) 1117 if json_content is None: 1118 raise ParamError("The json file {} parse error".format( 1119 file_path), error_no="00110") 1120 return json_content 1121 1122 def _get_full_name_by_tool_hap(self): 1123 # check if tool hap has installed 1124 result = self.config.device.execute_shell_command( 1125 "bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1126 LOG.debug(result) 1127 if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result: 1128 hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value)) 1129 self.config.device.push_file(hap_path, "/data/local/tmp") 1130 result = self.config.device.execute_shell_command( 1131 "bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path))) 1132 LOG.debug(result) 1133 self.config.device.execute_shell_command( 1134 "mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format( 1135 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1136 self.config.device.execute_shell_command( 1137 "aa start -a {}.MainAbility -b {}".format( 1138 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value), 1139 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))) 1140 time.sleep(1) 1141 self.system_version = self.config.device.execute_shell_command( 1142 "cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format( 1143 self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '') 1144 LOG.debug(self.system_version) 1145 1146 def _generate_yara_report(self, request, vul_items, result_message): 1147 import csv 1148 result_message.clear() 1149 yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv" 1150 .format(request.config.device.device_sn)) 1151 if os.path.exists(yara_report): 1152 data = [] 1153 else: 1154 data = [ 1155 ["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch], 1156 ["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"] 1157 ] 1158 fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) 1159 for _, item in enumerate(vul_items): 1160 data.append([item.vul_id, item.severity, 1161 item.month, item.final_risk, 1162 item.disclosure.get("zh", ""), item.vul_description.get("zh", "")]) 1163 result = "{}|{}|{}|{}|{}|{}|{}\n".format( 1164 item.vul_id, item.severity, 1165 item.month, item.final_risk, 1166 item.disclosure.get("zh", ""), item.vul_description.get("zh", ""), 1167 item.trace) 1168 result_message.append(result) 1169 with os.fdopen(fd, "a", newline='') as file_handler: 1170 writer = csv.writer(file_handler) 1171 writer.writerows(data) 1172 1173 def _generate_xml_report(self, request, vul_items, message_list): 1174 result_message = "".join(message_list) 1175 listener_copy = request.listeners.copy() 1176 parsers = get_plugin( 1177 Plugin.PARSER, CommonParserType.oh_yara) 1178 if parsers: 1179 parsers = parsers[:1] 1180 for listener in listener_copy: 1181 listener.device_sn = self.config.device.device_sn 1182 parser_instances = [] 1183 for parser in parsers: 1184 parser_instance = parser.__class__() 1185 parser_instance.suites_name = request.get_module_name() 1186 parser_instance.vul_items = vul_items 1187 parser_instance.listeners = listener_copy 1188 parser_instances.append(parser_instance) 1189 handler = ShellHandler(parser_instances) 1190 process_command_ret(result_message, handler) 1191 1192 def __result__(self): 1193 return self.result if os.path.exists(self.result) else "" 1194 1195 @Plugin(type=Plugin.DRIVER, id=DeviceTestType.validator_test) 1196 class ValidatorTestDriver(IDriver): 1197 1198 def __init__(self): 1199 self.error_message = "" 1200 self.xml_path = "" 1201 self.result = "" 1202 self.config = None 1203 self.kits = [] 1204 1205 def __check_environment__(self, device_options): 1206 pass 1207 1208 def __check_config__(self, config): 1209 pass 1210 1211 def __execute__(self, request): 1212 try: 1213 self.result = os.path.join( 1214 request.config.report_path, "result", 1215 ".".join((request.get_module_name(), "xml"))) 1216 self.config = request.config 1217 self.config.device = request.config.environment.devices[0] 1218 config_file = request.root.source.config_file 1219 self._run_validate_test(config_file, request) 1220 except Exception as exception: 1221 self.error_message = exception 1222 if not getattr(exception, "error_no", ""): 1223 setattr(exception, "error_no", "03409") 1224 LOG.exception(self.error_message, exc_info=True, error_no="03409") 1225 raise exception 1226 finally: 1227 self.result = check_result_report(request.config.report_path, 1228 self.result, self.error_message) 1229 1230 def _run_validate_test(self, config_file, request): 1231 is_update = False 1232 try: 1233 if "update" in self.config.testargs.keys(): 1234 if dict(self.config.testargs).get("update")[0] == "true": 1235 is_update = True 1236 json_config = JsonParser(config_file) 1237 self.kits = get_kit_instances(json_config, self.config.resource_path, 1238 self.config.testcases_path) 1239 self._get_driver_config(json_config) 1240 if is_update: 1241 do_module_kit_setup(request, self.kits) 1242 while True: 1243 print("Is test finished?Y/N") 1244 usr_input = input(">>>") 1245 if usr_input == "Y" or usr_input == "y": 1246 LOG.debug("Finish current test") 1247 break 1248 else: 1249 print("continue") 1250 LOG.debug("Your input is:{}, continue".format(usr_input)) 1251 if self.xml_path: 1252 result_dir = os.path.join(request.config.report_path, "result") 1253 if not os.path.exists(result_dir): 1254 os.makedirs(result_dir) 1255 self.config.device.pull_file(self.xml_path, self.result) 1256 finally: 1257 if is_update: 1258 do_module_kit_teardown(request) 1259 1260 def _get_driver_config(self, json_config): 1261 self.xml_path = get_config_value("xml_path", json_config.get_driver(), False) 1262 def __result__(self): 1263 return self.result if os.path.exists(self.result) else "" 1264