1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21import json 22import shutil 23import zipfile 24import tempfile 25import stat 26import re 27from dataclasses import dataclass 28 29from xdevice import ParamError 30from xdevice import ExecuteTerminate 31from xdevice import IDriver 32from xdevice import platform_logger 33from xdevice import Plugin 34from xdevice import get_plugin 35from xdevice import JsonParser 36from xdevice import ShellHandler 37from xdevice import TestDescription 38from xdevice import ResourceManager 39from xdevice import get_device_log_file 40from xdevice import check_result_report 41from xdevice import get_kit_instances 42from xdevice import get_config_value 43from xdevice import do_module_kit_setup 44from xdevice import do_module_kit_teardown 45 46from xdevice_extension._core.constants import DeviceTestType 47from xdevice_extension._core.constants import DeviceConnectorType 48from xdevice_extension._core.constants import CommonParserType 49from xdevice_extension._core.constants import FilePermission 50from xdevice_extension._core.environment.dmlib import DisplayOutputReceiver 51from xdevice_extension._core.exception import ShellCommandUnresponsiveException 52from xdevice_extension._core.exception import HapNotSupportTest 53from xdevice_extension._core.exception import HdcCommandRejectedException 54from xdevice_extension._core.exception import HdcError 55from xdevice_extension._core.testkit.kit import junit_para_parse 56from xdevice_extension._core.testkit.kit import reset_junit_para 57from xdevice_extension._core.utils import get_filename_extension 58from xdevice_extension._core.utils import start_standing_subprocess 59from xdevice_extension._core.executor.listener import CollectingTestListener 60from xdevice_extension._core.testkit.kit import gtest_para_parse 61from xdevice_extension._core.environment.dmlib import process_command_ret 62 63 64__all__ = ["CppTestDriver", "HapTestDriver", "OHKernelTestDriver", 65 "JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner", 66 "RemoteDexRunner", "disable_keyguard"] 67LOG = platform_logger("Drivers") 68DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 69ON_DEVICE_TEST_DIR_LOCATION = "/%s/%s/%s/" % ("data", "local", "tmp") 70 71FAILED_RUN_TEST_ATTEMPTS = 3 72TIME_OUT = 900 * 1000 73 74 75@dataclass 76class ZunitConst(object): 77 z_unit_app = "ohos.unittest.App" 78 output_dir = "OUTPUT_DIR=" 79 output_file = "OUTPUT_FILE=" 80 test_class = "TEST_CLASS=" 81 exec_class = "EXEC_CLASS=" 82 exec_method = "EXEC_METHOD=" 83 exec_level = "EXEC_LEVEL=" 84 jacoco_exec_file = "JACOCO_EXEC_FILE=" 85 jtest_status_filename = "jtest_status.txt" 86 remote_command_dir = "commandtmp" 87 88 89def get_level_para_string(level_string): 90 level_list = list(set(level_string.split(","))) 91 level_para_string = "" 92 for item in level_list: 93 if not item.isdigit(): 94 continue 95 item = item.strip(" ") 96 level_para_string = "%sLevel%s," % (level_para_string, item) 97 level_para_string = level_para_string.strip(",") 98 return level_para_string 99 100 101def get_execute_java_test_files(suite_file): 102 java_test_file = "" 103 test_info_file = "%s.info" % suite_file[:suite_file.rfind(".")] 104 if not os.path.exists(test_info_file): 105 return java_test_file 106 try: 107 test_info_file_open = os.open(test_info_file, os.O_RDWR, 108 stat.S_IWUSR | stat.S_IRUSR) 109 with os.fdopen(test_info_file_open, "r") as file_desc: 110 lines = file_desc.readlines() 111 for line in lines: 112 class_name, _ = line.split(',', 1) 113 class_name = class_name.strip() 114 if not class_name.endswith("Test"): 115 continue 116 java_test_file = "%s%s," % (java_test_file, class_name) 117 except(IOError, ValueError) as err_msg: 118 LOG.exception("Error to read info file: ", err_msg, exc_info=False) 119 if java_test_file != "": 120 java_test_file = java_test_file[:-1] 121 return java_test_file 122 123 124def get_java_test_para(testcase, testlevel): 125 exec_class = "*" 126 exec_method = "*" 127 exec_level = "" 128 129 if "" != testcase and "" == testlevel: 130 pos = testcase.rfind(".") 131 if pos != -1: 132 exec_class = testcase[0:pos] 133 exec_method = testcase[pos + 1:] 134 exec_level = "" 135 else: 136 exec_class = "*" 137 exec_method = testcase 138 exec_level = "" 139 elif "" == testcase and "" != testlevel: 140 exec_class = "*" 141 exec_method = "*" 142 exec_level = get_level_para_string(testlevel) 143 144 return exec_class, exec_method, exec_level 145 146 147def get_xml_output(config, json_config): 148 xml_output = config.testargs.get("xml-output") 149 if not xml_output: 150 if get_config_value('xml-output', json_config.get_driver(), False): 151 xml_output = get_config_value('xml-output', 152 json_config.get_driver(), False) 153 else: 154 xml_output = "false" 155 else: 156 xml_output = xml_output[0] 157 xml_output = str(xml_output).lower() 158 return xml_output 159 160 161def get_result_savepath(testsuit_path, result_rootpath): 162 findkey = "%stests%s" % (os.sep, os.sep) 163 filedir, _ = os.path.split(testsuit_path) 164 pos = filedir.find(findkey) 165 if -1 != pos: 166 subpath = filedir[pos + len(findkey):] 167 pos1 = subpath.find(os.sep) 168 if -1 != pos1: 169 subpath = subpath[pos1 + len(os.sep):] 170 result_path = os.path.join(result_rootpath, "result", subpath) 171 else: 172 result_path = os.path.join(result_rootpath, "result") 173 else: 174 result_path = os.path.join(result_rootpath, "result") 175 176 if not os.path.exists(result_path): 177 os.makedirs(result_path) 178 179 LOG.info("result_savepath = %s" % result_path) 180 return result_path 181 182 183# all testsuit common Unavailable test result xml 184def _create_empty_result_file(filepath, filename, error_message): 185 error_message = str(error_message) 186 error_message = error_message.replace("\"", """) 187 error_message = error_message.replace("<", "<") 188 error_message = error_message.replace(">", ">") 189 error_message = error_message.replace("&", "&") 190 if filename.endswith(".hap"): 191 filename = filename.split(".")[0] 192 if not os.path.exists(filepath): 193 file_open = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 194 0o755) 195 with os.fdopen(file_open, "w") as file_desc: 196 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 197 time.localtime()) 198 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 199 file_desc.write('<testsuites tests="0" failures="0" ' 200 'disabled="0" errors="0" timestamp="%s" ' 201 'time="0" name="AllTests">\n' % time_stamp) 202 file_desc.write( 203 ' <testsuite name="%s" tests="0" failures="0" ' 204 'disabled="0" errors="0" time="0.0" ' 205 'unavailable="1" message="%s">\n' % 206 (filename, error_message)) 207 file_desc.write(' </testsuite>\n') 208 file_desc.write('</testsuites>\n') 209 file_desc.flush() 210 return 211 212 213class ResultManager(object): 214 def __init__(self, testsuit_path, result_rootpath, device, 215 device_testpath): 216 self.testsuite_path = testsuit_path 217 self.result_rootpath = result_rootpath 218 self.device = device 219 self.device_testpath = device_testpath 220 self.testsuite_name = os.path.basename(self.testsuite_path) 221 self.is_coverage = False 222 223 def set_is_coverage(self, is_coverage): 224 self.is_coverage = is_coverage 225 226 def get_test_results(self, error_message=""): 227 # Get test result files 228 filepath = self.obtain_test_result_file() 229 if not os.path.exists(filepath): 230 _create_empty_result_file(filepath, self.testsuite_name, 231 error_message) 232 233 # Get coverage data files 234 if self.is_coverage: 235 self.obtain_coverage_data() 236 237 return filepath 238 239 def obtain_test_result_file(self): 240 result_savepath = get_result_savepath(self.testsuite_path, 241 self.result_rootpath) 242 if self.testsuite_path.endswith('.hap'): 243 filepath = os.path.join(result_savepath, "%s.xml" % str( 244 self.testsuite_name).split(".")[0]) 245 246 remote_result_name = "" 247 if self.device.is_file_exist(os.path.join(self.device_testpath, 248 "testcase_result.xml")): 249 remote_result_name = "testcase_result.xml" 250 elif self.device.is_file_exist(os.path.join(self.device_testpath, 251 "report.xml")): 252 remote_result_name = "report.xml" 253 254 if remote_result_name: 255 self.device.pull_file( 256 os.path.join(self.device_testpath, remote_result_name), 257 filepath) 258 else: 259 LOG.error("%s no report file", self.device_testpath) 260 261 else: 262 filepath = os.path.join(result_savepath, "%s.xml" % 263 self.testsuite_name) 264 remote_result_file = os.path.join(self.device_testpath, 265 "%s.xml" % self.testsuite_name) 266 267 if self.device.is_file_exist(remote_result_file): 268 self.device.pull_file(remote_result_file, result_savepath) 269 else: 270 LOG.error("%s not exists", remote_result_file) 271 return filepath 272 273 def is_exist_target_in_device(self, path, target): 274 command = "ls -l %s | grep %s" % (path, target) 275 276 check_result = False 277 stdout_info = self.device.execute_shell_command(command) 278 if stdout_info != "" and stdout_info.find(target) != -1: 279 check_result = True 280 return check_result 281 282 def obtain_coverage_data(self): 283 java_cov_path = os.path.abspath( 284 os.path.join(self.result_rootpath, "..", "coverage/data/exec")) 285 dst_target_name = "%s.exec" % self.testsuite_name 286 src_target_name = "jacoco.exec" 287 if self.is_exist_target_in_device(self.device_testpath, 288 src_target_name): 289 if not os.path.exists(java_cov_path): 290 os.makedirs(java_cov_path) 291 self.device.pull_file( 292 os.path.join(self.device_testpath, src_target_name), 293 os.path.join(java_cov_path, dst_target_name)) 294 295 cxx_cov_path = os.path.abspath( 296 os.path.join(self.result_rootpath, "..", "coverage/data/cxx", 297 self.testsuite_name)) 298 target_name = "obj" 299 if self.is_exist_target_in_device(self.device_testpath, target_name): 300 if not os.path.exists(cxx_cov_path): 301 os.makedirs(cxx_cov_path) 302 src_file = os.path.join(self.device_testpath, target_name) 303 self.device.pull_file(src_file, cxx_cov_path) 304 305 306@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 307class CppTestDriver(IDriver): 308 """ 309 CppTestDriver is a Test that runs a native test package on given device. 310 """ 311 312 def __init__(self): 313 self.result = "" 314 self.error_message = "" 315 self.config = None 316 self.rerun = True 317 self.rerun_all = True 318 self.runner = None 319 320 def __check_environment__(self, device_options): 321 pass 322 323 def __check_config__(self, config): 324 pass 325 326 def __execute__(self, request): 327 try: 328 LOG.debug("Start execute xdevice extension CppTest") 329 330 self.config = request.config 331 self.config.device = request.config.environment.devices[0] 332 333 config_file = request.root.source.config_file 334 self.result = "%s.xml" % \ 335 os.path.join(request.config.report_path, 336 "result", request.root.source.test_name) 337 338 hilog = get_device_log_file( 339 request.config.report_path, 340 request.config.device.__get_serial__(), 341 "device_hilog") 342 343 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 344 0o755) 345 self.config.device.hdc_command("shell hilog -r") 346 self._run_cpp_test(config_file, listeners=request.listeners, 347 request=request) 348 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 349 self.config.device.start_catch_device_log(hilog_file_pipe) 350 time.sleep(30) 351 hilog_file_pipe.flush() 352 353 except Exception as exception: 354 self.error_message = exception 355 if not getattr(exception, "error_no", ""): 356 setattr(exception, "error_no", "03404") 357 LOG.exception(self.error_message, exc_info=False, error_no="03404") 358 raise exception 359 360 finally: 361 self.config.device.stop_catch_device_log() 362 self.result = check_result_report( 363 request.config.report_path, self.result, self.error_message) 364 365 def _run_cpp_test(self, config_file, listeners=None, request=None): 366 try: 367 if not os.path.exists(config_file): 368 LOG.error("Error: Test cases don't exit %s." % config_file, 369 error_no="00102") 370 raise ParamError( 371 "Error: Test cases don't exit %s." % config_file, 372 error_no="00102") 373 374 json_config = JsonParser(config_file) 375 kits = get_kit_instances(json_config, self.config.resource_path, 376 self.config.testcases_path) 377 378 for listener in listeners: 379 listener.device_sn = self.config.device.device_sn 380 381 self._get_driver_config(json_config) 382 do_module_kit_setup(request, kits) 383 self.runner = RemoteCppTestRunner(self.config) 384 self.runner.suite_name = request.root.source.test_name 385 386 if hasattr(self.config, "history_report_path") and \ 387 self.config.testargs.get("test"): 388 self._do_test_retry(listeners, self.config.testargs) 389 else: 390 gtest_para_parse(self.config.testargs, self.runner) 391 self._do_test_run(listeners) 392 393 finally: 394 do_module_kit_teardown(request) 395 396 def _do_test_retry(self, listener, testargs): 397 for test in testargs.get("test"): 398 test_item = test.split("#") 399 if len(test_item) != 2: 400 continue 401 self.runner.add_instrumentation_arg( 402 "gtest_filter", "%s.%s" % (test_item[0], test_item[1])) 403 self.runner.run(listener) 404 405 def _do_test_run(self, listener): 406 test_to_run = self._collect_test_to_run() 407 LOG.debug("collected test count is: %s" % len(test_to_run)) 408 if not test_to_run: 409 self.runner.run(listener) 410 else: 411 self._run_with_rerun(listener, test_to_run) 412 413 def _collect_test_to_run(self): 414 if self.rerun: 415 self.runner.add_instrumentation_arg("gtest_list_tests", True) 416 run_results = self.runner.dry_run() 417 self.runner.remove_instrumentation_arg("gtest_list_tests") 418 return run_results 419 return None 420 421 def _run_tests(self, listener): 422 test_tracker = CollectingTestListener() 423 listener_copy = listener.copy() 424 listener_copy.append(test_tracker) 425 self.runner.run(listener_copy) 426 test_run = test_tracker.get_current_run_results() 427 return test_run 428 429 def _run_with_rerun(self, listener, expected_tests): 430 LOG.debug("ready to run with rerun, expect run: %s" 431 % len(expected_tests)) 432 test_run = self._run_tests(listener) 433 LOG.debug("run with rerun, has run: %s" % len(expected_tests)) 434 if len(test_run) < len(expected_tests): 435 expected_tests = TestDescription.remove_test(expected_tests, 436 test_run) 437 if not expected_tests: 438 LOG.debug("No tests to re-run, all tests executed at least " 439 "once.") 440 if self.rerun_all: 441 self._rerun_all(expected_tests, listener) 442 else: 443 self._rerun_serially(expected_tests, listener) 444 445 def _rerun_all(self, expected_tests, listener): 446 tests = [] 447 for test in expected_tests: 448 tests.append("%s.%s" % (test.class_name, test.test_name)) 449 self.runner.add_instrumentation_arg("gtest_filter", ":".join(tests)) 450 LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests)) 451 test_run = self._run_tests(listener) 452 LOG.debug("rerun file, has run: %s" % len(test_run)) 453 if len(test_run) < len(expected_tests): 454 expected_tests = TestDescription.remove_test(expected_tests, 455 test_run) 456 if not expected_tests: 457 LOG.debug("Rerun textFile success") 458 self._rerun_serially(expected_tests, listener) 459 460 def _rerun_serially(self, expected_tests, listener): 461 LOG.debug("rerun serially, expected run: %s" % len(expected_tests)) 462 for test in expected_tests: 463 self.runner.add_instrumentation_arg( 464 "gtest_filter", "%s.%s" % (test.class_name, test.test_name)) 465 self.runner.rerun(listener, test) 466 self.runner.remove_instrumentation_arg("gtest_filter") 467 468 def _get_driver_config(self, json_config): 469 target_test_path = get_config_value('native-test-device-path', 470 json_config.get_driver(), False) 471 if target_test_path: 472 self.config.target_test_path = target_test_path 473 else: 474 self.config.target_test_path = DEFAULT_TEST_PATH 475 476 self.config.module_name = get_config_value( 477 'module-name', json_config.get_driver(), False) 478 479 timeout_config = get_config_value('native-test-timeout', 480 json_config.get_driver(), False) 481 if timeout_config: 482 self.config.timeout = int(timeout_config) 483 else: 484 self.config.timeout = TIME_OUT 485 486 rerun = get_config_value('rerun', json_config.get_driver(), False) 487 if isinstance(rerun, bool): 488 self.rerun = rerun 489 elif str(rerun).lower() == "false": 490 self.rerun = False 491 else: 492 self.rerun = True 493 494 def __result__(self): 495 return self.result if os.path.exists(self.result) else "" 496 497 498class RemoteCppTestRunner: 499 def __init__(self, config): 500 self.arg_list = {} 501 self.suite_name = None 502 self.config = config 503 self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS 504 505 def dry_run(self): 506 parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest_list) 507 if parsers: 508 parsers = parsers[:1] 509 parser_instances = [] 510 for parser in parsers: 511 parser_instance = parser.__class__() 512 parser_instances.append(parser_instance) 513 handler = ShellHandler(parser_instances) 514 # apply execute right 515 chmod_cmd = "shell chmod 777 {}/{}".format( 516 self.config.target_test_path, self.config.module_name) 517 LOG.info("The apply execute command is {}".format(chmod_cmd)) 518 self.config.device.hdc_command(chmod_cmd, timeout=30 * 1000) 519 # dry run command 520 dry_command = "{}/{} {}".format(self.config.target_test_path, 521 self.config.module_name, 522 self.get_args_command()) 523 pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn 524 command = "%s %s" % (pre_cmd, dry_command) 525 LOG.info("The dry_command execute command is {}".format(command)) 526 output = start_standing_subprocess(command.split(), 527 return_result=True) 528 LOG.debug("dryrun output ---") 529 output = output.replace("\r", "") 530 handler.__read__(output) 531 handler.__done__() 532 return parser_instances[0].tests 533 534 def run(self, listener): 535 handler = self._get_shell_handler(listener) 536 pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn 537 command = "{}/{} {}".format(self.config.target_test_path, 538 self.config.module_name, 539 self.get_args_command()) 540 command = "%s %s" % (pre_cmd, command) 541 LOG.debug("run command is: %s" % command) 542 output = start_standing_subprocess(command.split(), return_result=True) 543 LOG.debug("run output ---") 544 output = output.replace("\r", "") 545 handler.__read__(output) 546 handler.__done__() 547 548 def rerun(self, listener, test): 549 if self.rerun_attempt: 550 test_tracker = CollectingTestListener() 551 listener_copy = listener.copy() 552 listener_copy.append(test_tracker) 553 handler = self._get_shell_handler(listener_copy) 554 try: 555 pre_cmd = "hdc_std -t %s shell " % self.config.device.device_sn 556 command = "{}/{} {}".format(self.config.target_test_path, 557 self.config.module_name, 558 self.get_args_command()) 559 command = "%s %s" % (pre_cmd, command) 560 LOG.debug("rerun command is: %s" % command) 561 output = start_standing_subprocess(command.split(), 562 return_result=True) 563 LOG.debug("run output ---") 564 output = output.replace("\r", "") 565 handler.__read__(output) 566 handler.__done__() 567 568 except ShellCommandUnresponsiveException: 569 LOG.debug("Exception: ShellCommandUnresponsiveException") 570 finally: 571 if not len(test_tracker.get_current_run_results()): 572 LOG.debug("No test case is obtained finally") 573 self.rerun_attempt -= 1 574 handler.parsers[0].mark_test_as_blocked(test) 575 else: 576 LOG.debug("not execute and mark as blocked finally") 577 handler = self._get_shell_handler(listener) 578 handler.parsers[0].mark_test_as_blocked(test) 579 580 def add_instrumentation_arg(self, name, value): 581 if not name or not value: 582 return 583 self.arg_list[name] = value 584 585 def remove_instrumentation_arg(self, name): 586 if not name: 587 return 588 if name in self.arg_list: 589 del self.arg_list[name] 590 591 def get_args_command(self): 592 args_commands = "" 593 for key, value in self.arg_list.items(): 594 if key == "gtest_list_tests": 595 args_commands = "%s --%s" % (args_commands, key) 596 else: 597 args_commands = "%s --%s=%s" % (args_commands, key, value) 598 return args_commands 599 600 def _get_shell_handler(self, listener): 601 parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest) 602 if parsers: 603 parsers = parsers[:1] 604 parser_instances = [] 605 for parser in parsers: 606 parser_instance = parser.__class__() 607 parser_instance.suite_name = self.suite_name 608 parser_instance.listeners = listener 609 parser_instances.append(parser_instance) 610 handler = ShellHandler(parser_instances) 611 return handler 612 613 614@Plugin(type=Plugin.DRIVER, id=DeviceTestType.junit_test) 615class JUnitTestDriver(IDriver): 616 """ 617 JUnitTestDriver is a Test that runs a native test package on given device. 618 """ 619 620 def __init__(self): 621 self.result = "" 622 self.error_message = "" 623 self.kits = [] 624 self.config = None 625 self.rerun = True 626 self.runner = None 627 self.rerun_using_test_file = True 628 self.temp_file_list = [] 629 self.is_no_test = False 630 631 def __check_environment__(self, device_options): 632 pass 633 634 def __check_config__(self, config): 635 if hasattr(config, "devices") and len(config.devices) > 1: 636 for device in config.devices: 637 device_name = device.get("name") 638 if not device_name: 639 self.error_message = "JUnitTest Load Error(03100)" 640 raise ParamError("device name not set in config file", 641 error_no="03100") 642 643 def __execute__(self, request): 644 try: 645 LOG.debug("Start execute xdevice extension JUnit Test") 646 647 self.config = request.config 648 self.config.device = request.get_devices()[0] 649 self.config.devices = request.get_devices() 650 651 config_file = request.get_config_file() 652 LOG.info("config file: %s", config_file) 653 self.result = os.path.join(request.get("report_path"), "result", 654 ".".join((request.get_test_name(), 655 "xml"))) 656 self.__check_config__(self.config) 657 658 device_log_pipes = [] 659 try: 660 for device in self.config.devices: 661 device_name = device.get("name", "") 662 hilog = get_device_log_file( 663 request.config.report_path, device.__get_serial__(), 664 "device_hilog", device_name) 665 hilog_open = os.open(hilog, os.O_WRONLY | 666 os.O_CREAT | os.O_APPEND, 0o755) 667 hilog_pipe = os.fdopen(hilog_open, "a") 668 device.start_catch_device_log(hilog_pipe) 669 device_log_pipes.extend([hilog_pipe]) 670 671 self._run_junit(config_file, listeners=request.listeners, 672 request=request) 673 finally: 674 for device_log_pipe in device_log_pipes: 675 device_log_pipe.flush() 676 device_log_pipe.close() 677 for device in self.config.devices: 678 device.stop_catch_device_log() 679 680 except Exception as exception: 681 if not getattr(exception, "error_no", ""): 682 setattr(exception, "error_no", "03405") 683 LOG.exception(self.error_message, exc_info=False, error_no="03405") 684 raise exception 685 686 finally: 687 if not self._is_ignore_report(): 688 self.result = check_result_report( 689 request.config.report_path, self.result, 690 self.error_message) 691 else: 692 LOG.debug("hide result and not generate report") 693 694 def _run_junit(self, config_file, listeners, request): 695 try: 696 if not os.path.exists(config_file): 697 error_msg = "Error: Test cases %s don't exist." % config_file 698 LOG.error(error_msg, error_no="00102") 699 raise ParamError(error_msg, error_no="00102") 700 701 for device in self.config.devices: 702 cmd = "target mount" \ 703 if device.usb_type == DeviceConnectorType.hdc \ 704 else "remount" 705 device.hdc_command(cmd) 706 json_config = JsonParser(config_file) 707 self.kits = get_kit_instances(json_config, 708 self.config.resource_path, 709 self.config.testcases_path) 710 711 for listener in listeners: 712 listener.device_sn = self.config.device.device_sn 713 714 self._get_driver_config(json_config) 715 do_module_kit_setup(request, self.kits) 716 self.runner = RemoteTestRunner(self.config) 717 self.runner.suite_name = request.get_test_name() 718 self.runner.suite_file = "%s.hap" % \ 719 get_filename_extension(config_file)[0] 720 721 self._get_runner_config(json_config) 722 if hasattr(self.config, "history_report_path") and \ 723 self.config.testargs.get("test"): 724 self._do_test_retry(listeners, self.config.testargs) 725 else: 726 self._do_include_tests() 727 self.runner.junit_para = junit_para_parse( 728 self.config.device, self.config.testargs, "-s") 729 self._do_test_run(listeners) 730 731 finally: 732 do_module_kit_teardown(request) 733 if self.runner and self.runner.junit_para and ( 734 self.runner.junit_para.find("testFile") != -1 735 or self.runner.junit_para.find("notTestFile") != -1): 736 self._junit_clear() 737 738 def _junit_clear(self): 739 self.config.device.execute_shell_command( 740 "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")) 741 for temp_file in self.temp_file_list: 742 if os.path.exists(temp_file): 743 os.remove(temp_file) 744 self.temp_file_list.clear() 745 746 def _get_driver_config(self, json_config): 747 package = get_config_value('package', json_config.get_driver(), False) 748 runner = "ohos.testkit.runner.Runner" 749 include_tests = get_config_value("include-tests", 750 json_config.get_driver(), True, []) 751 if not package: 752 raise ParamError("Can't find package in config file.") 753 self.config.package = package 754 self.config.runner = runner 755 self.config.include_tests = include_tests 756 757 self.config.xml_output = get_xml_output(self.config, json_config) 758 759 timeout_config = get_config_value('shell-timeout', 760 json_config.get_driver(), False) 761 if timeout_config: 762 self.config.timeout = int(timeout_config) 763 else: 764 self.config.timeout = TIME_OUT 765 766 nohup = get_config_value('nohup', json_config.get_driver(), False) 767 if nohup and (nohup == "true" or nohup == "True"): 768 self.config.nohup = True 769 else: 770 self.config.nohup = False 771 772 def _get_runner_config(self, json_config): 773 test_timeout = get_config_value('test-timeout', 774 json_config.get_driver(), False) 775 if test_timeout: 776 self.runner.add_instrumentation_arg("timeout_sec", 777 int(test_timeout)) 778 779 def _do_test_retry(self, listener, testargs): 780 for test in testargs.get("test"): 781 test_item = test.split("#") 782 if len(test_item) != 2: 783 continue 784 self.runner.class_name = test_item[0] 785 self.runner.test_name = test_item[1] 786 self.runner.run(listener) 787 788 def _do_test_run(self, listener): 789 if not self._check_package(): 790 LOG.error("%s is not supported test" % self.config.package) 791 raise HapNotSupportTest("%s is not supported test" % 792 self.config.package) 793 test_to_run = self._collect_test_to_run() 794 LOG.debug("collected test count is: %s" % len(test_to_run)) 795 if not test_to_run: 796 self.is_no_test = True 797 self.runner.run(listener) 798 else: 799 self._run_with_rerun(listener, test_to_run) 800 801 def _check_package(self): 802 command = '''systemdumper -s 401 -a "-bundle %s"''' % \ 803 self.config.package 804 output = self.config.device.execute_shell_command(command) 805 LOG.debug("systemdumper output: %s" % output) 806 if output and "ohos.testkit.runner.EntryAbility" in output: 807 return True 808 else: 809 LOG.info("try hidumper command to check package") 810 command = '''hidumper -s 401 -a "-bundle %s"''' % \ 811 self.config.package 812 output = self.config.device.execute_shell_command(command) 813 LOG.debug("hidumper output: %s" % output) 814 if output and "ohos.testkit.runner.EntryAbility" in output: 815 return True 816 return False 817 818 def _run_tests(self, listener): 819 test_tracker = CollectingTestListener() 820 listener_copy = listener.copy() 821 listener_copy.append(test_tracker) 822 self.runner.run(listener_copy) 823 test_run = test_tracker.get_current_run_results() 824 return test_run 825 826 def _run_with_rerun(self, listener, expected_tests): 827 LOG.debug("ready to run with rerun, expect run: %s" 828 % len(expected_tests)) 829 test_run = self._run_tests(listener) 830 LOG.debug("run with rerun, has run: %s" % len(expected_tests)) 831 if len(test_run) < len(expected_tests): 832 expected_tests = TestDescription.remove_test( 833 expected_tests, test_run) 834 if not expected_tests: 835 LOG.debug("No tests to re-run, all tests executed at least " 836 "once.") 837 if self.rerun_using_test_file: 838 self._rerun_file(expected_tests, listener) 839 else: 840 self._rerun_serially(expected_tests, listener) 841 842 def _make_test_file(self, expected_tests, test_file_path): 843 file_name = 'xdevice_testFile_%s.txt' % self.runner.suite_name 844 file_path = os.path.join(test_file_path, file_name) 845 try: 846 file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT | 847 os.O_APPEND, 0o755) 848 with os.fdopen(file_path_open, "a") as file_desc: 849 for test in expected_tests: 850 file_desc.write("%s#%s" % (test.class_name, 851 test.test_name)) 852 file_desc.write("\n") 853 file_desc.flush() 854 except(IOError, ValueError) as err_msg: 855 LOG.exception("Error for make long command file: ", err_msg, 856 exc_info=False, error_no="03200") 857 return file_name, file_path 858 859 def _rerun_file(self, expected_tests, listener): 860 test_file_path = tempfile.mkdtemp(prefix="test_file_", 861 dir=self.config.report_path) 862 file_name, file_path = self._make_test_file( 863 expected_tests, test_file_path) 864 self.config.device.push_file(file_path, ON_DEVICE_TEST_DIR_LOCATION) 865 file_path_on_device = ''.join((ON_DEVICE_TEST_DIR_LOCATION, file_name)) 866 self.runner.add_instrumentation_arg("testFile", file_path_on_device) 867 self.runner.junit_para = reset_junit_para(self.runner.junit_para, "-s") 868 LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests)) 869 test_run = self._run_tests(listener) 870 LOG.debug("rerun file, has run: %s" % len(test_run)) 871 self.config.device.execute_shell_command("rm %s" % file_path_on_device) 872 if len(test_run) < len(expected_tests): 873 expected_tests = TestDescription.remove_test(expected_tests, 874 test_run) 875 if not expected_tests: 876 LOG.debug("Rerun textFile success") 877 self._rerun_serially(expected_tests, listener) 878 shutil.rmtree(test_file_path) 879 880 def _rerun_serially(self, expected_tests, listener): 881 LOG.debug("rerun serially, expected run: %s" % len(expected_tests)) 882 self.runner.remove_instrumentation_arg("testFile") 883 for test in expected_tests: 884 self.runner.class_name = test.class_name 885 self.runner.test_name = test.test_name 886 self.runner.rerun(listener, test) 887 888 def _collect_test_to_run(self): 889 if self.rerun and self.config.xml_output == "false" and \ 890 not self.config.nohup: 891 self.runner.set_test_collection(True) 892 tests = self._collect_test_and_retry() 893 self.runner.set_test_collection(False) 894 return tests 895 return None 896 897 def _collect_test_and_retry(self): 898 collector = CollectingTestListener() 899 listener = [collector] 900 self.runner.run(listener) 901 run_results = collector.get_current_run_results() 902 return run_results 903 904 def _do_include_tests(self): 905 """ 906 handler the include-tests parameters in json file of current module. 907 the main approach is to inject new dict into "testargs". 908 """ 909 if not self.config.include_tests: 910 return 911 keys_list = [key.strip() for key in self.config.testargs.keys()] 912 if "test-file-include-filter" in keys_list: 913 return 914 test_filter = self._slice_include_tests() 915 if not test_filter: 916 LOG.error("invalid include-tests! please check json file.") 917 return 918 if "test" in keys_list or "class" in keys_list: 919 test_list = list() 920 if "test" in keys_list: 921 for element in self.config.testargs.get("test", []): 922 if self._filter_valid_test(element, test_filter): 923 test_list.append(element.strip()) 924 self.config.testargs.pop("test") 925 if "class" in keys_list: 926 for element in self.config.testargs.get("class", []): 927 if self._filter_valid_test(element, test_filter): 928 test_list.append(element.strip()) 929 self.config.testargs.pop("class") 930 else: 931 test_list = [ele.strip() for ele in self.config.include_tests] 932 if test_list: 933 import datetime 934 prefix = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f') 935 936 save_file = \ 937 os.path.join(self.config.report_path, "temp_%s.txt" % prefix) 938 save_file_open = os.open(save_file, os.O_WRONLY 939 | os.O_CREAT, 0o755) 940 with os.fdopen(save_file_open, "w") as save_handler: 941 for test in test_list: 942 save_handler.write("{}\n".format(test.strip())) 943 save_handler.flush() 944 self.temp_file_list.append(save_file) 945 include_tests_key = "test-file-include-filter" 946 self.config.testargs.update( 947 {include_tests_key: self.temp_file_list}) 948 LOG.debug("handle include-tests, write to %s, data length is %s" % 949 (self.temp_file_list[0], len(test_list))) 950 else: 951 msg = "there is any valid test after filter by 'include-tests'" 952 LOG.error(msg) 953 raise ParamError(msg) 954 955 def _slice_include_tests(self): 956 test_filter = dict() 957 for include_test in self.config.include_tests: 958 include_test = include_test.strip() 959 if include_test: 960 # element like 'class#method' 961 if "#" in include_test: 962 test_list = test_filter.get("test_in", []) 963 test_list.append(include_test) 964 test_filter.update({"test_in": test_list}) 965 # element like 'class' 966 else: 967 class_list = test_filter.get("class_in", []) 968 class_list.append(include_test) 969 test_filter.update({"class_in": class_list}) 970 else: 971 LOG.warning("there is empty element in include-tests") 972 if len([ele for test in test_filter.values() for ele in test]) > 0: 973 return test_filter 974 975 @classmethod 976 def _filter_valid_test(cls, element, test_filter): 977 element = element.strip() 978 # if element in the list which element like 'class#method' 979 if element in test_filter.get("test_in", []): 980 return element 981 # if element is the list which element like 'class' 982 element_items = element.split("#") 983 if element_items[0].strip() in test_filter.get("class_in", []): 984 return element 985 raise ParamError("{} not match 'include-tests'!".format(element)) 986 987 def _is_ignore_report(self): 988 if self.config.task and not self.config.testlist and \ 989 not self.config.testfile: 990 if self.is_no_test and self.config.testargs.get("level", None): 991 return True 992 993 def __result__(self): 994 return self.result if os.path.exists(self.result) else "" 995 996 997class RemoteTestRunner: 998 def __init__(self, config): 999 self.arg_list = {} 1000 self.suite_name = None 1001 self.suite_file = None 1002 self.class_name = None 1003 self.test_name = None 1004 self.junit_para = None 1005 self.config = config 1006 self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS 1007 1008 def __check_environment__(self, device_options): 1009 pass 1010 1011 def run(self, listener): 1012 handler = self._get_shell_handler(listener) 1013 # execute test case 1014 command = "aa start -p %s " \ 1015 "-n ohos.testkit.runner.EntryAbility" \ 1016 " -s unittest %s -s rawLog true %s %s" \ 1017 % (self.config.package, self.config.runner, self.junit_para, 1018 self.get_args_command()) 1019 1020 try: 1021 if self.config.nohup: 1022 nohup_command = "nohup %s &" % command 1023 result_value = self.config.device.execute_shell_cmd_background( 1024 nohup_command, timeout=self.config.timeout) 1025 elif self.config.xml_output == "true": 1026 result_value = self.config.device.execute_shell_command( 1027 command, timeout=self.config.timeout, 1028 retry=0) 1029 else: 1030 self.config.device.execute_shell_command( 1031 command, timeout=self.config.timeout, receiver=handler, 1032 retry=0) 1033 return 1034 except ShellCommandUnresponsiveException: 1035 LOG.debug("Exception: ShellCommandUnresponsiveException") 1036 else: 1037 self.config.target_test_path = "/%s/%s/%s/%s/%s/" % \ 1038 ("data", "user", "0", 1039 self.config.package, "cache") 1040 result = ResultManager(self.suite_file, self.config.report_path, 1041 self.config.device, 1042 self.config.target_test_path) 1043 result.get_test_results(result_value) 1044 1045 def rerun(self, listener, test): 1046 if self.rerun_attempt: 1047 listener_copy = listener.copy() 1048 test_tracker = CollectingTestListener() 1049 listener_copy.append(test_tracker) 1050 handler = self._get_shell_handler(listener_copy) 1051 try: 1052 command = "aa start -p %s " \ 1053 "-n ohos.testkit.runner.EntryAbility" \ 1054 " -s unittest %s -s rawLog true %s" \ 1055 % (self.config.package, self.config.runner, 1056 self.get_args_command()) 1057 1058 self.config.device.execute_shell_command( 1059 command, timeout=self.config.timeout, receiver=handler, 1060 retry=0) 1061 1062 except Exception as error: 1063 LOG.error("rerun error %s, %s" % (error, error.__class__)) 1064 finally: 1065 if not len(test_tracker.get_current_run_results()): 1066 LOG.debug("No test case is obtained finally") 1067 self.rerun_attempt -= 1 1068 handler.parsers[0].mark_test_as_blocked(test) 1069 else: 1070 LOG.debug("not execute and mark as blocked finally") 1071 handler = self._get_shell_handler(listener) 1072 handler.parsers[0].mark_test_as_blocked(test) 1073 1074 def set_test_collection(self, collect): 1075 if collect: 1076 self.add_instrumentation_arg("log", "true") 1077 else: 1078 self.remove_instrumentation_arg("log") 1079 1080 def add_instrumentation_arg(self, name, value): 1081 if not name or not value: 1082 return 1083 self.arg_list[name] = value 1084 1085 def remove_instrumentation_arg(self, name): 1086 if not name: 1087 return 1088 if name in self.arg_list: 1089 del self.arg_list[name] 1090 1091 def get_args_command(self): 1092 args_commands = "" 1093 for key, value in self.arg_list.items(): 1094 args_commands = "%s -s %s %s" % (args_commands, key, value) 1095 if self.class_name and self.test_name: 1096 args_commands = "%s -s class %s#%s" % (args_commands, 1097 self.class_name, 1098 self.test_name) 1099 elif self.class_name: 1100 args_commands = "%s -s class %s" % (args_commands, self.class_name) 1101 return args_commands 1102 1103 def _get_shell_handler(self, listener): 1104 parsers = get_plugin(Plugin.PARSER, CommonParserType.junit) 1105 if parsers: 1106 parsers = parsers[:1] 1107 parser_instances = [] 1108 for parser in parsers: 1109 parser_instance = parser.__class__() 1110 parser_instance.suite_name = self.suite_name 1111 parser_instance.listeners = listener 1112 parser_instances.append(parser_instance) 1113 handler = ShellHandler(parser_instances) 1114 return handler 1115 1116 1117class RemoteDexRunner: 1118 def __init__(self, config): 1119 self.arg_list = {} 1120 self.suite_name = None 1121 self.junit_para = "" 1122 self.config = config 1123 self.class_name = None 1124 self.test_name = None 1125 self.rerun_attempt = FAILED_RUN_TEST_ATTEMPTS 1126 1127 def run(self, listener): 1128 handler = self._get_shell_handler(listener) 1129 command = "export BOOTCLASSPATH=$BOOTCLASSPATH:" \ 1130 "{remote_path}/{module_name};cd {remote_path}; " \ 1131 "app_process -cp {remote_path}/{module_name} / " \ 1132 "ohos.testkit.runner.JUnitRunner {junit_para}{arg_list}" \ 1133 " --rawLog true --coverage false " \ 1134 "--classpathToScan {remote_path}/{module_name}".format( 1135 remote_path=self.config.remote_path, 1136 module_name=self.config.module_name, 1137 junit_para=self.junit_para, 1138 arg_list=self.get_args_command()) 1139 1140 try: 1141 self.config.device.execute_shell_command( 1142 command, timeout=self.config.timeout, 1143 receiver=handler, retry=0) 1144 except ConnectionResetError: 1145 if len(listener) == 1 and isinstance(listener[0], 1146 CollectingTestListener): 1147 LOG.info("try subprocess ") 1148 listener[0].tests.clear() 1149 command = ["shell", command] 1150 result = self.config.device.hdc_command( 1151 command, timeout=self.config.timeout, retry=0, 1152 join_result=True) 1153 handler.__read__(result) 1154 handler.__done__() 1155 LOG.info("get current testcase: %s " % 1156 len(listener[0].get_current_run_results())) 1157 1158 def rerun(self, listener, test): 1159 if self.rerun_attempt: 1160 listener_copy = listener.copy() 1161 test_tracker = CollectingTestListener() 1162 listener_copy.append(test_tracker) 1163 handler = self._get_shell_handler(listener_copy) 1164 try: 1165 command = "export BOOTCLASSPATH=$BOOTCLASSPATH:" \ 1166 "{remote_path}/{module_name};cd {remote_path}; " \ 1167 "app_process -cp {remote_path}/{module_name} / " \ 1168 "ohos.testkit.runner.JUnitRunner {arg_list} " \ 1169 "--rawLog true --coverage false " \ 1170 "--classpathToScan " \ 1171 "{remote_path}/{module_name}".format( 1172 remote_path=self.config.remote_path, 1173 module_name=self.config.module_name, 1174 arg_list=self.get_args_command()) 1175 self.config.device.execute_shell_command( 1176 command, timeout=self.config.timeout, 1177 receiver=handler, retry=0) 1178 1179 except ShellCommandUnresponsiveException as _: 1180 LOG.debug("Exception: ShellCommandUnresponsiveException") 1181 finally: 1182 if not len(test_tracker.get_current_run_results()): 1183 LOG.debug("No test case is obtained finally") 1184 self.rerun_attempt -= 1 1185 handler.parsers[0].mark_test_as_blocked(test) 1186 else: 1187 LOG.debug("not execute and mark as blocked finally") 1188 handler = self._get_shell_handler(listener) 1189 handler.parsers[0].mark_test_as_blocked(test) 1190 1191 def set_test_collection(self, collect): 1192 if collect: 1193 self.add_instrumentation_arg("log", "true") 1194 else: 1195 self.remove_instrumentation_arg("log") 1196 1197 def add_instrumentation_arg(self, name, value): 1198 if not name or not value: 1199 return 1200 self.arg_list[name] = value 1201 1202 def remove_instrumentation_arg(self, name): 1203 if not name: 1204 return 1205 if name in self.arg_list: 1206 del self.arg_list[name] 1207 1208 def get_args_command(self): 1209 args_commands = "" 1210 for key, value in self.arg_list.items(): 1211 args_commands = "%s --%s %s" % (args_commands, key, value) 1212 if self.class_name and self.test_name: 1213 args_commands = "%s --class %s#%s" % (args_commands, 1214 self.class_name, 1215 self.test_name) 1216 elif self.class_name: 1217 args_commands = "%s --class %s" % (args_commands, self.class_name) 1218 return args_commands 1219 1220 def _get_shell_handler(self, listener): 1221 parsers = get_plugin(Plugin.PARSER, CommonParserType.junit) 1222 if parsers: 1223 parsers = parsers[:1] 1224 parser_instances = [] 1225 for parser in parsers: 1226 parser_instance = parser.__class__() 1227 parser_instance.suite_name = self.suite_name 1228 parser_instance.listeners = listener 1229 parser_instances.append(parser_instance) 1230 handler = ShellHandler(parser_instances) 1231 return handler 1232 1233 1234@Plugin(type=Plugin.DRIVER, id=DeviceTestType.hap_test) 1235class HapTestDriver(IDriver): 1236 """ 1237 HapTestDriver is a Test that runs a native test package on given device. 1238 """ 1239 # test driver config 1240 config = None 1241 instrument_hap_file_suffix = '_ad.hap' 1242 result = "" 1243 1244 def __init__(self): 1245 self.ability_name = "" 1246 self.package_name = "" 1247 self.activity_name = "" 1248 1249 def __check_environment__(self, device_options): 1250 pass 1251 1252 def __check_config__(self, config): 1253 pass 1254 1255 def __execute__(self, request): 1256 try: 1257 LOG.debug("Start execute xdevice extension HapTest") 1258 1259 self.config = request.config 1260 self.config.target_test_path = DEFAULT_TEST_PATH 1261 self.config.device = request.config.environment.devices[0] 1262 1263 suite_file = request.root.source.source_file 1264 if not suite_file: 1265 LOG.error("test source '%s' not exists" % 1266 request.root.source.source_string, error_no="00110") 1267 return 1268 1269 LOG.debug("Testsuite FilePath: %s" % suite_file) 1270 package_name, ability_name = self._get_package_and_ability_name( 1271 suite_file) 1272 self.package_name = package_name 1273 self.ability_name = ability_name 1274 self.activity_name = "%s.MainAbilityShellActivity" % \ 1275 self.package_name 1276 self.config.test_hap_out_path = \ 1277 "/data/data/%s/files/test/result/" % self.package_name 1278 self.config.test_suite_timeout = 300 * 1000 1279 1280 serial = request.config.device.__get_serial__() 1281 device_log_file = get_device_log_file(request.config.report_path, 1282 serial) 1283 device_log_file_open = os.open( 1284 device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755) 1285 with os.fdopen(device_log_file_open, "a")as file_pipe: 1286 self.config.device.start_catch_device_log(file_pipe) 1287 self._init_junit_test() 1288 self._run_junit_test(suite_file) 1289 file_pipe.flush() 1290 finally: 1291 self.config.device.stop_catch_device_log() 1292 1293 def _init_junit_test(self): 1294 cmd = "target mount" \ 1295 if self.config.device.usb_type == DeviceConnectorType.hdc \ 1296 else "remount" 1297 self.config.device.hdc_command(cmd) 1298 self.config.device.execute_shell_command( 1299 "rm -rf %s" % self.config.target_test_path) 1300 self.config.device.execute_shell_command( 1301 "mkdir -p %s" % self.config.target_test_path) 1302 self.config.device.execute_shell_command( 1303 "mount -o rw,remount,rw /%s" % "system") 1304 1305 def _run_junit_test(self, suite_file): 1306 filename = os.path.basename(suite_file) 1307 suitefile_target_test_path = self.config.test_hap_out_path 1308 junit_test_para = self._get_junit_test_para(filename, suite_file) 1309 is_coverage_test = True if self.config.coverage == "coverage" else \ 1310 False 1311 1312 # push testsuite file 1313 self.config.device.push_file(suite_file, self.config.target_test_path) 1314 1315 resource_manager = ResourceManager() 1316 resource_data_dic, resource_dir = \ 1317 resource_manager.get_resource_data_dic(suite_file) 1318 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 1319 self.config.device) 1320 1321 # execute testcase 1322 install_result = self._install_hap(filename) 1323 result = ResultManager(suite_file, self.config.report_path, 1324 self.config.device, 1325 self.config.test_hap_out_path) 1326 result.set_is_coverage(is_coverage_test) 1327 if install_result: 1328 return_message = self._execute_suitefile_junittest( 1329 filename, junit_test_para, suitefile_target_test_path) 1330 1331 self.result = result.get_test_results(return_message) 1332 self._uninstall_hap(self.package_name) 1333 else: 1334 self.result = result.get_test_results("Error: install hap failed.") 1335 LOG.error("Error: install hap failed.", error_no="03204") 1336 1337 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, 1338 self.config.device) 1339 1340 def _get_junit_test_para(self, filename, suite_file): 1341 if not filename.endswith(self.instrument_hap_file_suffix): 1342 exec_class, exec_method, exec_level = get_java_test_para( 1343 self.config.testcase, self.config.testlevel) 1344 java_test_file = get_execute_java_test_files(suite_file) 1345 junit_test_para = self._get_hap_test_para(java_test_file, 1346 exec_class, exec_method, 1347 exec_level) 1348 else: 1349 junit_test_para = get_execute_java_test_files(suite_file) 1350 return junit_test_para 1351 1352 @staticmethod 1353 def _get_hap_test_para(java_test_file, exec_class, exec_method, 1354 exec_level): 1355 hap_test_para = "%s%s#%s%s#%s%s#%s%s" % ( 1356 ZunitConst.test_class, java_test_file, 1357 ZunitConst.exec_class, exec_class, 1358 ZunitConst.exec_method, exec_method, 1359 ZunitConst.exec_level, exec_level) 1360 return hap_test_para 1361 1362 def _execute_suitefile_junittest(self, filename, testpara, 1363 target_test_path): 1364 return_message = self._execute_hapfile_junittest(filename, testpara, 1365 target_test_path) 1366 return return_message 1367 1368 def _execute_hapfile_junittest(self, filename, testpara, target_test_path): 1369 _unlock_screen(self.config.device) 1370 _unlock_device(self.config.device) 1371 1372 try: 1373 if not filename.endswith(self.instrument_hap_file_suffix): 1374 return_message = self.start_hap_activity(testpara, filename) 1375 LOG.info("HAP Testcase is executing, please wait a moment...") 1376 if "Error" not in return_message: 1377 self._check_hap_finished(target_test_path) 1378 else: 1379 return_message = self.start_instrument_hap_activity(testpara) 1380 except (ExecuteTerminate, HdcCommandRejectedException, 1381 ShellCommandUnresponsiveException, HdcError) as exception: 1382 return_message = str(exception.args) 1383 if not getattr(exception, "error_no", ""): 1384 setattr(exception, "error_no", "03203") 1385 1386 _lock_screen(self.config.device) 1387 return return_message 1388 1389 def _init_hap_device(self): 1390 self.config.device.execute_shell_command( 1391 "rm -rf %s" % self.config.test_hap_out_path) 1392 self.config.device.execute_shell_command( 1393 "mkdir -p %s" % self.config.test_hap_out_path) 1394 1395 def _install_hap(self, filename): 1396 message = self.config.device.execute_shell_command( 1397 "bm install -p %s" % os.path.join(self.config.target_test_path, 1398 filename)) 1399 message = str(message).rstrip() 1400 if message == "" or "Success" in message: 1401 return_code = True 1402 if message != "": 1403 LOG.info(message) 1404 else: 1405 return_code = False 1406 if message != "": 1407 LOG.warning(message) 1408 1409 _sleep_according_to_result(return_code) 1410 return return_code 1411 1412 def start_hap_activity(self, testpara, filename): 1413 execute_para = testpara 1414 if self.config.coverage == "coverage": 1415 execute_para = ''.join((execute_para, ' ', 1416 ZunitConst.jacoco_exec_file, filename, 1417 ".exec")) 1418 try: 1419 display_receiver = DisplayOutputReceiver() 1420 self.config.device.execute_shell_command( 1421 "am start -S -n %s/%s --es param '%s'" % 1422 (self.package_name, self.activity_name, 1423 execute_para), receiver=display_receiver, 1424 timeout=self.config.test_suite_timeout) 1425 _sleep_according_to_result(display_receiver.output) 1426 return_message = display_receiver.output 1427 1428 except (ExecuteTerminate, HdcCommandRejectedException, 1429 ShellCommandUnresponsiveException, HdcError) as exception: 1430 return_message = exception.args 1431 if not getattr(exception, "error_no", ""): 1432 setattr(exception, "error_no", "03203") 1433 return return_message 1434 1435 def start_instrument_hap_activity(self, testpara): 1436 from xdevice import Variables 1437 try: 1438 display_receiver = DisplayOutputReceiver() 1439 if self.config.coverage != "coverage": 1440 self.config.device.execute_shell_command( 1441 "aa start -p %s -n %s -s AbilityTestCase %s -w %s" % 1442 (self.package_name, self.ability_name, testpara, 1443 str(self.config.test_suite_timeout)), 1444 receiver=display_receiver, 1445 timeout=self.config.test_suite_timeout) 1446 else: 1447 build_variant_outpath = os.path.join( 1448 Variables.source_code_rootpath, "out", 1449 self.config.build_variant) 1450 strip_num = len(build_variant_outpath.split(os.sep)) - 1 1451 self.config.device.execute_shell_command( 1452 "cd %s; export GCOV_PREFIX=%s; " 1453 "export GCOV_PREFIX_STRIP=%d; " 1454 "aa start -p %s -n %s -s AbilityTestCase %s -w %s" % 1455 (self.config.target_test_path, 1456 self.config.target_test_path, 1457 strip_num, self.package_name, self.ability_name, testpara, 1458 str(self.config.test_suite_timeout)), 1459 receiver=display_receiver, 1460 timeout=self.config.test_suite_timeout) 1461 _sleep_according_to_result(display_receiver.output) 1462 return_message = display_receiver.output 1463 except (ExecuteTerminate, HdcCommandRejectedException, 1464 ShellCommandUnresponsiveException, HdcError) as exception: 1465 return_message = exception.args 1466 if not getattr(exception, "error_no", ""): 1467 setattr(exception, "error_no", "03203") 1468 return return_message 1469 1470 def _check_hap_finished(self, target_test_path): 1471 run_timeout = True 1472 sleep_duration = 3 1473 target_file = os.path.join(target_test_path, 1474 ZunitConst.jtest_status_filename) 1475 for _ in range( 1476 int(self.config.test_suite_timeout / (1000 * sleep_duration))): 1477 check_value = self.config.device.is_file_exist(target_file) 1478 LOG.info("%s state: %s", self.config.device.device_sn, 1479 self.config.device.test_device_state.value) 1480 if not check_value: 1481 time.sleep(sleep_duration) 1482 continue 1483 run_timeout = False 1484 break 1485 if run_timeout: 1486 return_code = False 1487 LOG.error("HAP Testcase executed timeout or exception, please " 1488 "check detail information from system log", 1489 error_no="03205") 1490 else: 1491 return_code = True 1492 LOG.info("HAP Testcase executed finished") 1493 return return_code 1494 1495 def _uninstall_hap(self, package_name): 1496 return_message = self.config.device.execute_shell_command( 1497 "pm uninstall %s" % package_name) 1498 _sleep_according_to_result(return_message) 1499 return return_message 1500 1501 @staticmethod 1502 def _get_package_and_ability_name(hap_filepath): 1503 package_name = "" 1504 ability_name = "" 1505 1506 if os.path.exists(hap_filepath): 1507 filename = os.path.basename(hap_filepath) 1508 1509 # unzip the hap file 1510 hap_bak_path = os.path.abspath( 1511 os.path.join(os.path.dirname(hap_filepath), 1512 "%s.bak" % filename)) 1513 try: 1514 with zipfile.ZipFile(hap_filepath) as zf_desc: 1515 zf_desc.extractall(path=hap_bak_path) 1516 except RuntimeError as error: 1517 LOG.error(error, error_no="03206") 1518 1519 # verify config.json file 1520 app_profile_path = os.path.join(hap_bak_path, 1521 "config.json") 1522 if not os.path.exists(app_profile_path): 1523 LOG.info("file %s not exists" % app_profile_path) 1524 return package_name, ability_name 1525 1526 if os.path.isdir(app_profile_path): 1527 LOG.info("%s is a folder, and not a file" % app_profile_path) 1528 return package_name, ability_name 1529 1530 # get package_name and ability_name value. 1531 app_profile_path_open = os.open(app_profile_path, os.O_RDONLY, 1532 stat.S_IWUSR | stat.S_IRUSR) 1533 with os.fdopen(app_profile_path_open, 'r') as load_f: 1534 load_dict = json.load(load_f) 1535 profile_list = load_dict.values() 1536 for profile in profile_list: 1537 package_name = profile.get("package") 1538 if not package_name: 1539 continue 1540 abilities = profile.get("abilities") 1541 for abilitie in abilities: 1542 abilities_name = abilitie.get("name") 1543 if abilities_name.startswith("."): 1544 ability_name = ''.join( 1545 (package_name, 1546 abilities_name[abilities_name.find("."):])) 1547 else: 1548 ability_name = abilities_name 1549 break 1550 break 1551 1552 # delete hap_bak_path 1553 if os.path.exists(hap_bak_path): 1554 shutil.rmtree(hap_bak_path) 1555 else: 1556 LOG.info("file %s not exists" % hap_filepath) 1557 1558 return package_name, ability_name 1559 1560 def __result__(self): 1561 return self.result if os.path.exists(self.result) else "" 1562 1563 1564@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 1565class JSUnitTestDriver(IDriver): 1566 """ 1567 JSUnitTestDriver is a Test that runs a native test package on given device. 1568 """ 1569 1570 def __init__(self): 1571 self.xml_output = "false" 1572 self.timeout = 80 * 1000 1573 self.start_time = None 1574 self.result = "" 1575 self.error_message = "" 1576 self.kits = [] 1577 self.config = None 1578 1579 def __check_environment__(self, device_options): 1580 pass 1581 1582 def __check_config__(self, config): 1583 pass 1584 1585 def __execute__(self, request): 1586 try: 1587 LOG.debug("Start execute xdevice extension JSUnit Test") 1588 self.result = os.path.join( 1589 request.config.report_path, "result", 1590 '.'.join((request.get_module_name(), "xml"))) 1591 self.config = request.config 1592 self.config.device = request.config.environment.devices[0] 1593 1594 config_file = request.root.source.config_file 1595 suite_file = request.root.source.source_file 1596 1597 if not suite_file: 1598 raise ParamError( 1599 "test source '%s' not exists" % 1600 request.root.source.source_string, error_no="00110") 1601 1602 LOG.debug("Test case file path: %s" % suite_file) 1603 # avoid hilog service stuck issue 1604 self.config.device.hdc_command("shell stop_service hilogd", 1605 timeout=30 * 1000) 1606 self.config.device.hdc_command("shell start_service hilogd", 1607 timeout=30 * 1000) 1608 time.sleep(10) 1609 1610 self.config.device.hdc_command("shell hilog -r", timeout=30 * 1000) 1611 self._run_jsunit(config_file, request) 1612 except Exception as exception: 1613 self.error_message = exception 1614 if not getattr(exception, "error_no", ""): 1615 setattr(exception, "error_no", "03409") 1616 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1617 raise exception 1618 finally: 1619 self.config.device.stop_catch_device_log() 1620 self.result = check_result_report( 1621 request.config.report_path, self.result, self.error_message) 1622 1623 def generate_console_output(self, device_log_file, request, timeout): 1624 LOG.info("prepare to read device log, may wait some time") 1625 message_list = list() 1626 label_list, suite_info, is_suites_end = self.read_device_log_timeout( 1627 device_log_file, message_list, timeout) 1628 if not is_suites_end: 1629 message_list.append("app Log: [end] run suites end\n") 1630 LOG.warning("there is no suites end") 1631 if len(label_list[0]) > 0 and sum(label_list[0]) != 0: 1632 # the problem happened! when the sum of label list is not zero 1633 self._insert_suite_end(label_list, message_list) 1634 1635 result_message = "".join(message_list) 1636 message_list.clear() 1637 expect_tests_dict = self._parse_suite_info(suite_info) 1638 self._analyse_tests(request, result_message, expect_tests_dict) 1639 1640 @classmethod 1641 def _insert_suite_end(cls, label_list, message_list): 1642 for i in range(len(label_list[0])): 1643 if label_list[0][i] != 1: # skipp 1644 continue 1645 # check the start label, then peek next position 1646 if i + 1 == len(label_list[0]): # next position at the tail 1647 message_list.insert(-1, "app Log: [suite end]\n") 1648 LOG.warning("there is no suite end") 1649 continue 1650 if label_list[0][i + 1] != 1: # 0 present the end label 1651 continue 1652 message_list.insert(label_list[1][i + 1], 1653 "app Log: [suite end]\n") 1654 LOG.warning("there is no suite end") 1655 for j in range(i + 1, len(label_list[1])): 1656 label_list[1][j] += 1 # move the index to next 1657 1658 def _analyse_tests(self, request, result_message, expect_tests_dict): 1659 listener_copy = request.listeners.copy() 1660 parsers = get_plugin( 1661 Plugin.PARSER, CommonParserType.jsunit) 1662 if parsers: 1663 parsers = parsers[:1] 1664 for listener in listener_copy: 1665 listener.device_sn = self.config.device.device_sn 1666 parser_instances = [] 1667 for parser in parsers: 1668 parser_instance = parser.__class__() 1669 parser_instance.suites_name = request.get_module_name() 1670 parser_instance.listeners = listener_copy 1671 parser_instances.append(parser_instance) 1672 handler = ShellHandler(parser_instances) 1673 handler.parsers[0].expect_tests_dict = expect_tests_dict 1674 process_command_ret(result_message, handler) 1675 1676 @classmethod 1677 def _parse_suite_info(cls, suite_info): 1678 tests_dict = dict() 1679 test_count = 0 1680 if suite_info: 1681 LOG.debug("Suites info: %s" % suite_info) 1682 json_str = "".join(suite_info) 1683 try: 1684 suite_dict_list = json.loads(json_str).get("suites", []) 1685 for suite_dict in suite_dict_list: 1686 for class_name, test_name_dict_list in suite_dict.items(): 1687 tests_dict.update({class_name: []}) 1688 for test_name_dict in test_name_dict_list: 1689 for test_name in test_name_dict.values(): 1690 test = TestDescription(class_name, test_name) 1691 tests_dict.get(class_name).append(test) 1692 test_count += 1 1693 except json.decoder.JSONDecodeError as json_error: 1694 LOG.warning("Suites info is invalid: %s" % json_error) 1695 LOG.debug("Collect suite count is %s, test count is %s" % 1696 (len(tests_dict), test_count)) 1697 return tests_dict 1698 1699 def read_device_log_timeout(self, device_log_file, 1700 message_list, timeout): 1701 LOG.info("The timeout is {} seconds".format(timeout)) 1702 pattern = "^\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s+(\\d+)" 1703 while time.time() - self.start_time <= timeout: 1704 with open(device_log_file, "r", encoding='utf-8', 1705 errors='ignore') as file_read_pipe: 1706 pid = "" 1707 message_list.clear() 1708 label_list = [[], []] # [-1, 1 ..] [line1, line2 ..] 1709 suite_info = [] 1710 while True: 1711 try: 1712 line = file_read_pipe.readline() 1713 except UnicodeError as error: 1714 LOG.warning("While read log file: %s" % error) 1715 if not line: 1716 time.sleep(5) # wait for log write to file 1717 break 1718 if line.lower().find("jsapp:") != -1: 1719 if "[suites info]" in line: 1720 _, pos = re.match(".+\\[suites info]", line).span() 1721 suite_info.append(line[pos:].strip()) 1722 1723 if "[start] start run suites" in line: # 发现了任务开始标签 1724 pid, is_update = \ 1725 self._init_suites_start(line, pattern, pid) 1726 if is_update: 1727 message_list.clear() 1728 label_list[0].clear() 1729 label_list[1].clear() 1730 1731 if not pid or pid not in line: 1732 continue 1733 message_list.append(line) 1734 if "[suite end]" in line: 1735 label_list[0].append(-1) 1736 label_list[1].append(len(message_list) - 1) 1737 if "[suite start]" in line: 1738 label_list[0].append(1) 1739 label_list[1].append(len(message_list) - 1) 1740 if "[end] run suites end" in line: 1741 LOG.info("Find the end mark then analysis result") 1742 LOG.debug("current JSApp pid= %s" % pid) 1743 return label_list, suite_info, True 1744 else: 1745 LOG.error("Hjsunit run timeout {}s reached".format(timeout)) 1746 LOG.debug("current JSApp pid= %s" % pid) 1747 return label_list, suite_info, False 1748 1749 @classmethod 1750 def _init_suites_start(cls, line, pattern, pid): 1751 matcher = re.match(pattern, line.strip()) 1752 if matcher and matcher.group(1): 1753 pid = matcher.group(1) 1754 return pid, True 1755 return pid, False 1756 1757 def _run_jsunit(self, config_file, request): 1758 try: 1759 if not os.path.exists(config_file): 1760 LOG.error("Error: Test cases don't exist %s." % config_file) 1761 raise ParamError( 1762 "Error: Test cases don't exist %s." % config_file, 1763 error_no="00102") 1764 1765 json_config = JsonParser(config_file) 1766 self.kits = get_kit_instances(json_config, 1767 self.config.resource_path, 1768 self.config.testcases_path) 1769 1770 package, ability_name = self._get_driver_config(json_config) 1771 self.config.device.hdc_command("target mount") 1772 do_module_kit_setup(request, self.kits) 1773 1774 hilog = get_device_log_file( 1775 request.config.report_path, 1776 request.config.device.__get_serial__() + "_" + request. 1777 get_module_name(), 1778 "device_hilog") 1779 1780 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 1781 0o755) 1782 1783 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 1784 self.config.device.start_catch_device_log(hilog_file_pipe) 1785 1786 # execute test case 1787 command = "shell aa start -d 123 -a %s -b %s" \ 1788 % (ability_name, package) 1789 1790 result_value = self.config.device.hdc_command(command) 1791 if result_value and "start ability successfully" in\ 1792 str(result_value).lower(): 1793 setattr(self, "start_success", True) 1794 LOG.info("execute %s's testcase success. result value=%s" 1795 % (package, result_value)) 1796 else: 1797 LOG.info("execute %s's testcase failed. result value=%s" 1798 % (package, result_value)) 1799 raise RuntimeError("hjsunit test run error happened!") 1800 1801 self.start_time = time.time() 1802 timeout_config = get_config_value('test-timeout', 1803 json_config.get_driver(), 1804 False, 60000) 1805 timeout = int(timeout_config) / 1000 1806 self.generate_console_output(hilog, request, timeout) 1807 finally: 1808 do_module_kit_teardown(request) 1809 1810 def _jsunit_clear(self): 1811 self.config.device.execute_shell_command( 1812 "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")) 1813 1814 def _get_driver_config(self, json_config): 1815 package = get_config_value('package', json_config.get_driver(), False) 1816 default_ability = "{}.MainAbility".format(package) 1817 ability_name = get_config_value('abilityName', json_config. 1818 get_driver(), False, default_ability) 1819 self.xml_output = get_xml_output(self.config, json_config) 1820 timeout_config = get_config_value('native-test-timeout', 1821 json_config.get_driver(), False) 1822 if timeout_config: 1823 self.timeout = int(timeout_config) 1824 1825 if not package: 1826 raise ParamError("Can't find package in config file.", 1827 error_no="03201") 1828 return package, ability_name 1829 1830 def __result__(self): 1831 return self.result if os.path.exists(self.result) else "" 1832 1833 1834@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ltp_posix_test) 1835class LTPPosixTestDriver(IDriver): 1836 def __init__(self): 1837 self.timeout = 80 * 1000 1838 self.start_time = None 1839 self.result = "" 1840 self.error_message = "" 1841 self.kits = [] 1842 self.config = None 1843 self.handler = None 1844 1845 def __check_environment__(self, device_options): 1846 pass 1847 1848 def __check_config__(self, config): 1849 pass 1850 1851 def __execute__(self, request): 1852 try: 1853 LOG.debug("Start execute xdevice extension LTP Posix Test") 1854 self.result = os.path.join( 1855 request.config.report_path, "result", 1856 '.'.join((request.get_module_name(), "xml"))) 1857 self.config = request.config 1858 self.config.device = request.config.environment.devices[0] 1859 1860 config_file = request.root.source.config_file 1861 suite_file = request.root.source.source_file 1862 1863 if not suite_file: 1864 raise ParamError( 1865 "test source '%s' not exists" % 1866 request.root.source.source_string, error_no="00110") 1867 1868 LOG.debug("Test case file path: %s" % suite_file) 1869 # avoid hilog service stuck issue 1870 self.config.device.hdc_command("shell stop_service hilogd", 1871 timeout=30 * 1000) 1872 self.config.device.hdc_command("shell start_service hilogd", 1873 timeout=30 * 1000) 1874 time.sleep(10) 1875 1876 self.config.device.hdc_command("shell hilog -r", timeout=30 * 1000) 1877 self._run_posix(config_file, request) 1878 except Exception as exception: 1879 self.error_message = exception 1880 if not getattr(exception, "error_no", ""): 1881 setattr(exception, "error_no", "03409") 1882 LOG.exception(self.error_message, exc_info=True, error_no="03409") 1883 raise exception 1884 finally: 1885 self.config.device.stop_catch_device_log() 1886 self.result = check_result_report( 1887 request.config.report_path, self.result, self.error_message) 1888 1889 def _run_posix(self, config_file, request): 1890 try: 1891 if not os.path.exists(config_file): 1892 LOG.error("Error: Test cases don't exist %s." % config_file) 1893 raise ParamError( 1894 "Error: Test cases don't exist %s." % config_file, 1895 error_no="00102") 1896 1897 json_config = JsonParser(config_file) 1898 self.kits = get_kit_instances(json_config, 1899 self.config.resource_path, 1900 self.config.testcases_path) 1901 self.config.device.hdc_command("target mount") 1902 test_list = None 1903 dst = None 1904 for kit in self.kits: 1905 test_list, dst = kit.__setup__(request.config.device, 1906 request=request) 1907 # apply execute right 1908 self.config.device.hdc_command("shell chmod -R 777 {}".format(dst)) 1909 1910 hilog = get_device_log_file( 1911 request.config.report_path, 1912 request.config.device.__get_serial__() + "_" + request. 1913 get_module_name(), 1914 "device_hilog") 1915 1916 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 1917 0o755) 1918 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 1919 for test_bin in test_list: 1920 if not test_bin.endswith(".run-test"): 1921 continue 1922 listeners = request.listeners 1923 for listener in listeners: 1924 listener.device_sn = self.config.device.device_sn 1925 parsers = get_plugin(Plugin.PARSER, 1926 "OpenSourceTest") 1927 parser_instances = [] 1928 for parser in parsers: 1929 parser_instance = parser.__class__() 1930 parser_instance.suite_name = request.root.source.\ 1931 test_name 1932 parser_instance.test_name = test_bin.replace("./", "") 1933 parser_instance.listeners = listeners 1934 parser_instances.append(parser_instance) 1935 self.handler = ShellHandler(parser_instances) 1936 result_message = self.config.device.hdc_command( 1937 "shell {}".format(test_bin)) 1938 LOG.info("get result from command {}". 1939 format(result_message)) 1940 process_command_ret(result_message, self.handler) 1941 finally: 1942 do_module_kit_teardown(request) 1943 1944 def __result__(self): 1945 return self.result if os.path.exists(self.result) else "" 1946 1947 1948@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test) 1949class OHKernelTestDriver(IDriver): 1950 """ 1951 OpenHarmonyKernelTest 1952 """ 1953 def __init__(self): 1954 self.timeout = 30 * 1000 1955 self.result = "" 1956 self.error_message = "" 1957 self.kits = [] 1958 self.config = None 1959 self.runner = None 1960 1961 def __check_environment__(self, device_options): 1962 pass 1963 1964 def __check_config__(self, config): 1965 pass 1966 1967 def __execute__(self, request): 1968 try: 1969 LOG.debug("Start to Execute OpenHarmony Kernel Test") 1970 1971 self.config = request.config 1972 self.config.device = request.config.environment.devices[0] 1973 1974 config_file = request.root.source.config_file 1975 1976 self.result = "%s.xml" % \ 1977 os.path.join(request.config.report_path, 1978 "result", request.get_module_name()) 1979 hilog = get_device_log_file( 1980 request.config.report_path, 1981 request.config.device.__get_serial__(), 1982 "device_hilog") 1983 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 1984 FilePermission.mode_755) 1985 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 1986 self.config.device.start_catch_device_log(hilog_file_pipe) 1987 self._run_oh_kernel(config_file, request.listeners, request) 1988 hilog_file_pipe.flush() 1989 except Exception as exception: 1990 self.error_message = exception 1991 if not getattr(exception, "error_no", ""): 1992 setattr(exception, "error_no", "03409") 1993 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1994 raise exception 1995 finally: 1996 do_module_kit_teardown(request) 1997 self.config.device.stop_catch_device_log() 1998 self.result = check_result_report( 1999 request.config.report_path, self.result, self.error_message) 2000 2001 def _run_oh_kernel(self, config_file, listeners=None, request=None): 2002 try: 2003 json_config = JsonParser(config_file) 2004 kits = get_kit_instances(json_config, self.config.resource_path, 2005 self.config.testcases_path) 2006 self._get_driver_config(json_config) 2007 do_module_kit_setup(request, kits) 2008 self.runner = OHKernelTestRunner(self.config) 2009 self.runner.suite_name = request.get_module_name() 2010 self.runner.run(listeners) 2011 finally: 2012 do_module_kit_teardown(request) 2013 2014 def _get_driver_config(self, json_config): 2015 target_test_path = get_config_value('native-test-device-path', 2016 json_config.get_driver(), False) 2017 test_suite_name = get_config_value('test-suite-name', 2018 json_config.get_driver(), False) 2019 test_suites_list = get_config_value('test-suites-list', 2020 json_config.get_driver(), False) 2021 timeout_limit = get_config_value('timeout-limit', 2022 json_config.get_driver(), False) 2023 conf_file = get_config_value('conf-file', 2024 json_config.get_driver(), False) 2025 self.config.arg_list = {} 2026 if target_test_path: 2027 self.config.target_test_path = target_test_path 2028 if test_suite_name: 2029 self.config.arg_list["test-suite-name"] = test_suite_name 2030 if test_suites_list: 2031 self.config.arg_list["test-suites-list"] = test_suites_list 2032 if timeout_limit: 2033 self.config.arg_list["timeout-limit"] = timeout_limit 2034 if conf_file: 2035 self.config.arg_list["conf-file"] = conf_file 2036 timeout_config = get_config_value('shell-timeout', 2037 json_config.get_driver(), False) 2038 if timeout_config: 2039 self.config.timeout = int(timeout_config) 2040 else: 2041 self.config.timeout = TIME_OUT 2042 2043 def __result__(self): 2044 return self.result if os.path.exists(self.result) else "" 2045 2046 2047class OHKernelTestRunner: 2048 def __init__(self, config): 2049 self.suite_name = None 2050 self.config = config 2051 self.arg_list = config.arg_list 2052 2053 def run(self, listeners): 2054 handler = self._get_shell_handler(listeners) 2055 # hdc shell cd /data/local/tmp/OH_kernel_test; 2056 # sh runtest test -t OpenHarmony_RK3568_config 2057 # -n OpenHarmony_RK3568_skiptest -l 60 2058 command = "cd %s; chmod +x *; sh runtest test %s" % ( 2059 self.config.target_test_path, self.get_args_command()) 2060 self.config.device.execute_shell_command( 2061 command, timeout=self.config.timeout, receiver=handler, retry=0) 2062 2063 def _get_shell_handler(self, listeners): 2064 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test) 2065 if parsers: 2066 parsers = parsers[:1] 2067 parser_instances = [] 2068 for parser in parsers: 2069 parser_instance = parser.__class__() 2070 parser_instance.suites_name = self.suite_name 2071 parser_instance.listeners = listeners 2072 parser_instances.append(parser_instance) 2073 handler = ShellHandler(parser_instances) 2074 return handler 2075 2076 def get_args_command(self): 2077 args_commands = "" 2078 for key, value in self.arg_list.items(): 2079 if key == "test-suite-name" or key == "test-suites-list": 2080 args_commands = "%s -t %s" % (args_commands, value) 2081 elif key == "conf-file": 2082 args_commands = "%s -n %s" % (args_commands, value) 2083 elif key == "timeout-limit": 2084 args_commands = "%s -l %s" % (args_commands, value) 2085 return args_commands 2086 2087 2088def disable_keyguard(device): 2089 _unlock_screen(device) 2090 _unlock_device(device) 2091 2092 2093def _unlock_screen(device): 2094 device.execute_shell_command("svc power stayon true") 2095 time.sleep(1) 2096 2097 2098def _unlock_device(device): 2099 device.execute_shell_command("input keyevent 82") 2100 time.sleep(1) 2101 device.execute_shell_command("wm dismiss-keyguard") 2102 time.sleep(1) 2103 2104 2105def _lock_screen(device): 2106 time.sleep(1) 2107 2108 2109def _sleep_according_to_result(result): 2110 if result: 2111 time.sleep(1) 2112