1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20 21from xdevice import ParamError 22from xdevice import IDriver 23from xdevice import platform_logger 24from xdevice import Plugin 25from xdevice import get_plugin 26from xdevice import JsonParser 27from xdevice import ShellHandler 28from xdevice import TestDescription 29from xdevice import get_device_log_file 30from xdevice import check_result_report 31from xdevice import get_kit_instances 32from xdevice import get_config_value 33from xdevice import do_module_kit_setup 34from xdevice import do_module_kit_teardown 35 36from xdevice_extension._core.constants import DeviceTestType 37from xdevice_extension._core.constants import CommonParserType 38from xdevice_extension._core.constants import FilePermission 39from xdevice_extension._core.executor.listener import CollectingPassListener 40from xdevice_extension._core.exception import ShellCommandUnresponsiveException 41from xdevice_extension._core.testkit.kit import oh_jsunit_para_parse 42 43__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver"] 44 45TIME_OUT = 300 * 1000 46 47LOG = platform_logger("OpenHarmony") 48 49 50@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test) 51class OHKernelTestDriver(IDriver): 52 """ 53 OpenHarmonyKernelTest 54 """ 55 def __init__(self): 56 self.timeout = 30 * 1000 57 self.result = "" 58 self.error_message = "" 59 self.kits = [] 60 self.config = None 61 self.runner = None 62 63 def __check_environment__(self, device_options): 64 pass 65 66 def __check_config__(self, config): 67 pass 68 69 def __execute__(self, request): 70 try: 71 LOG.debug("Start to Execute OpenHarmony Kernel Test") 72 73 self.config = request.config 74 self.config.device = request.config.environment.devices[0] 75 76 config_file = request.root.source.config_file 77 78 self.result = "%s.xml" % \ 79 os.path.join(request.config.report_path, 80 "result", request.get_module_name()) 81 hilog = get_device_log_file( 82 request.config.report_path, 83 request.config.device.__get_serial__(), 84 "device_hilog") 85 86 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 87 FilePermission.mode_755) 88 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 89 self.config.device.start_catch_device_log(hilog_file_pipe) 90 self._run_oh_kernel(config_file, request.listeners, request) 91 hilog_file_pipe.flush() 92 except Exception as exception: 93 self.error_message = exception 94 if not getattr(exception, "error_no", ""): 95 setattr(exception, "error_no", "03409") 96 LOG.exception(self.error_message, exc_info=False, error_no="03409") 97 raise exception 98 finally: 99 do_module_kit_teardown(request) 100 self.config.device.stop_catch_device_log() 101 self.result = check_result_report( 102 request.config.report_path, self.result, self.error_message) 103 104 def _run_oh_kernel(self, config_file, listeners=None, request=None): 105 try: 106 json_config = JsonParser(config_file) 107 kits = get_kit_instances(json_config, self.config.resource_path, 108 self.config.testcases_path) 109 self._get_driver_config(json_config) 110 do_module_kit_setup(request, kits) 111 self.runner = OHKernelTestRunner(self.config) 112 self.runner.suite_name = request.get_module_name() 113 self.runner.run(listeners) 114 finally: 115 do_module_kit_teardown(request) 116 117 def _get_driver_config(self, json_config): 118 target_test_path = get_config_value('native-test-device-path', 119 json_config.get_driver(), False) 120 test_suite_name = get_config_value('test-suite-name', 121 json_config.get_driver(), False) 122 test_suites_list = get_config_value('test-suites-list', 123 json_config.get_driver(), False) 124 timeout_limit = get_config_value('timeout-limit', 125 json_config.get_driver(), False) 126 conf_file = get_config_value('conf-file', 127 json_config.get_driver(), False) 128 self.config.arg_list = {} 129 if target_test_path: 130 self.config.target_test_path = target_test_path 131 if test_suite_name: 132 self.config.arg_list["test-suite-name"] = test_suite_name 133 if test_suites_list: 134 self.config.arg_list["test-suites-list"] = test_suites_list 135 if timeout_limit: 136 self.config.arg_list["timeout-limit"] = timeout_limit 137 if conf_file: 138 self.config.arg_list["conf-file"] = conf_file 139 timeout_config = get_config_value('shell-timeout', 140 json_config.get_driver(), False) 141 if timeout_config: 142 self.config.timeout = int(timeout_config) 143 else: 144 self.config.timeout = TIME_OUT 145 146 def __result__(self): 147 return self.result if os.path.exists(self.result) else "" 148 149 150class OHKernelTestRunner: 151 def __init__(self, config): 152 self.suite_name = None 153 self.config = config 154 self.arg_list = config.arg_list 155 156 def run(self, listeners): 157 handler = self._get_shell_handler(listeners) 158 # hdc shell cd /data/local/tmp/OH_kernel_test; 159 # sh runtest test -t OpenHarmony_RK3568_config 160 # -n OpenHarmony_RK3568_skiptest -l 60 161 command = "cd %s; chmod +x *; sh runtest test %s" % ( 162 self.config.target_test_path, self.get_args_command()) 163 self.config.device.execute_shell_command( 164 command, timeout=self.config.timeout, receiver=handler, retry=0) 165 166 def _get_shell_handler(self, listeners): 167 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test) 168 if parsers: 169 parsers = parsers[:1] 170 parser_instances = [] 171 for parser in parsers: 172 parser_instance = parser.__class__() 173 parser_instance.suites_name = self.suite_name 174 parser_instance.listeners = listeners 175 parser_instances.append(parser_instance) 176 handler = ShellHandler(parser_instances) 177 return handler 178 179 def get_args_command(self): 180 args_commands = "" 181 for key, value in self.arg_list.items(): 182 if key == "test-suite-name" or key == "test-suites-list": 183 args_commands = "%s -t %s" % (args_commands, value) 184 elif key == "conf-file": 185 args_commands = "%s -n %s" % (args_commands, value) 186 elif key == "timeout-limit": 187 args_commands = "%s -l %s" % (args_commands, value) 188 return args_commands 189 190 191@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 192class OHJSUnitTestDriver(IDriver): 193 """ 194 OHJSUnitTestDriver is a Test that runs a native test package on 195 given device. 196 """ 197 198 def __init__(self): 199 self.timeout = 80 * 1000 200 self.start_time = None 201 self.result = "" 202 self.error_message = "" 203 self.kits = [] 204 self.config = None 205 self.runner = None 206 self.rerun = True 207 self.rerun_all = True 208 209 def __check_environment__(self, device_options): 210 pass 211 212 def __check_config__(self, config): 213 pass 214 215 def __execute__(self, request): 216 try: 217 LOG.debug("Start execute xdevice extension JSUnit Test") 218 self.result = os.path.join( 219 request.config.report_path, "result", 220 '.'.join((request.get_module_name(), "xml"))) 221 self.config = request.config 222 self.config.device = request.config.environment.devices[0] 223 224 config_file = request.root.source.config_file 225 suite_file = request.root.source.source_file 226 227 if not suite_file: 228 raise ParamError( 229 "test source '%s' not exists" % 230 request.root.source.source_string, error_no="00110") 231 LOG.debug("Test case file path: %s" % suite_file) 232 hilog = get_device_log_file(request.config.report_path, 233 request.get_module_name(), 234 "device_hilog") 235 236 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 237 0o755) 238 self.config.device.execute_shell_command(command="hilog -r") 239 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 240 self.config.device.start_catch_device_log(hilog_file_pipe) 241 self._run_oh_jsunit(config_file, request) 242 except Exception as exception: 243 self.error_message = exception 244 if not getattr(exception, "error_no", ""): 245 setattr(exception, "error_no", "03409") 246 LOG.exception(self.error_message, exc_info=True, error_no="03409") 247 raise exception 248 finally: 249 self.config.device.stop_catch_device_log() 250 self.result = check_result_report( 251 request.config.report_path, self.result, self.error_message) 252 253 def _run_oh_jsunit(self, config_file, request): 254 try: 255 if not os.path.exists(config_file): 256 LOG.error("Error: Test cases don't exist %s." % config_file) 257 raise ParamError( 258 "Error: Test cases don't exist %s." % config_file, 259 error_no="00102") 260 json_config = JsonParser(config_file) 261 self.kits = get_kit_instances(json_config, 262 self.config.resource_path, 263 self.config.testcases_path) 264 265 self._get_driver_config(json_config) 266 self.config.device.hdc_command("target mount") 267 do_module_kit_setup(request, self.kits) 268 self.runner = OHJSUnitTestRunner(self.config) 269 self.runner.suite_name = request.get_module_name() 270 # execute test case 271 self._get_runner_config(json_config) 272 oh_jsunit_para_parse(self.runner, self.config.testargs) 273 self._do_test_run(listener=request.listeners) 274 275 finally: 276 do_module_kit_teardown(request) 277 278 def _get_driver_config(self, json_config): 279 package = get_config_value('package-name', 280 json_config.get_driver(), False) 281 module = get_config_value('module-name', 282 json_config.get_driver(), False) 283 bundle = get_config_value('bundle-name', 284 json_config. get_driver(), False) 285 286 self.config.package_name = package 287 self.config.module_name = module 288 self.config.bundle_name = bundle 289 290 if not package and not module: 291 raise ParamError("Neither package nor moodle is found" 292 " in config file.", error_no="03201") 293 timeout_config = get_config_value("shell-timeout", 294 json_config.get_driver(), False) 295 if timeout_config: 296 self.config.timeout = int(timeout_config) 297 else: 298 self.config.timeout = TIME_OUT 299 300 def _get_runner_config(self, json_config): 301 test_timeout = get_config_value('test-timeout', 302 json_config.get_driver(), False) 303 if test_timeout: 304 self.runner.add_arg("wait_time", int(test_timeout)) 305 306 testcase_timeout = get_config_value('testcase-timeout', 307 json_config.get_driver(), False) 308 if testcase_timeout: 309 self.runner.add_arg("timeout", int(testcase_timeout)) 310 311 def _do_test_run(self, listener): 312 test_to_run = self._collect_test_to_run() 313 LOG.info("Collected test count is: %s" % (len(test_to_run) 314 if test_to_run else 0)) 315 if not test_to_run: 316 self.runner.run(listener) 317 else: 318 self._run_with_rerun(listener, test_to_run) 319 320 def _collect_test_to_run(self): 321 if self.rerun: 322 run_results = self.runner.dry_run() 323 return run_results 324 325 def _run_tests(self, listener): 326 test_tracker = CollectingPassListener() 327 listener_copy = listener.copy() 328 listener_copy.append(test_tracker) 329 self.runner.run(listener_copy) 330 test_run = test_tracker.get_current_run_results() 331 return test_run 332 333 def _run_with_rerun(self, listener, expected_tests): 334 LOG.debug("Ready to run with rerun, expect run: %s" 335 % len(expected_tests)) 336 test_run = self._run_tests(listener) 337 LOG.debug("Run with rerun, has run: %s" % len(test_run) 338 if test_run else 0) 339 if len(test_run) < len(expected_tests): 340 expected_tests = TestDescription.remove_test(expected_tests, 341 test_run) 342 if not expected_tests: 343 LOG.debug("No tests to re-run, all tests executed at least " 344 "once.") 345 if self.rerun_all: 346 self._rerun_all(expected_tests, listener) 347 else: 348 self._rerun_serially(expected_tests, listener) 349 350 def _rerun_all(self, expected_tests, listener): 351 tests = [] 352 for test in expected_tests: 353 tests.append("%s#%s" % (test.class_name, test.test_name)) 354 self.runner.add_arg("class", ",".join(tests)) 355 LOG.debug("Ready to rerun all, expect run: %s" % len(expected_tests)) 356 test_run = self._run_tests(listener) 357 LOG.debug("Rerun all, has run: %s" % len(test_run)) 358 if len(test_run) < len(expected_tests): 359 expected_tests = TestDescription.remove_test(expected_tests, 360 test_run) 361 if not expected_tests: 362 LOG.debug("Rerun textFile success") 363 self._rerun_serially(expected_tests, listener) 364 365 def _rerun_serially(self, expected_tests, listener): 366 LOG.debug("Rerun serially, expected run: %s" % len(expected_tests)) 367 for test in expected_tests: 368 self.runner.add_arg( 369 "class", "%s#%s" % (test.class_name, test.test_name)) 370 self.runner.rerun(listener, test) 371 self.runner.remove_arg("class") 372 373 def __result__(self): 374 return self.result if os.path.exists(self.result) else "" 375 376 377class OHJSUnitTestRunner: 378 def __init__(self, config): 379 self.arg_list = {} 380 self.suite_name = None 381 self.config = config 382 self.rerun_attemp = 3 383 384 def dry_run(self): 385 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 386 if parsers: 387 parsers = parsers[:1] 388 parser_instances = [] 389 for parser in parsers: 390 parser_instance = parser.__class__() 391 parser_instances.append(parser_instance) 392 handler = ShellHandler(parser_instances) 393 command = self._get_dry_run_command() 394 self.config.device.execute_shell_command( 395 command, timeout=self.config.timeout, receiver=handler, retry=0) 396 397 return parser_instances[0].tests 398 399 def run(self, listener): 400 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 401 if parsers: 402 parsers = parsers[:1] 403 parser_instances = [] 404 for parser in parsers: 405 parser_instance = parser.__class__() 406 parser_instance.suite_name = self.suite_name 407 parser_instance.listeners = listener 408 parser_instances.append(parser_instance) 409 handler = ShellHandler(parser_instances) 410 command = self._get_run_command() 411 self.config.device.execute_shell_command( 412 command, timeout=self.config.timeout, receiver=handler, retry=0) 413 414 def rerun(self, listener, test): 415 handler = None 416 if self.rerun_attemp: 417 test_tracker = CollectingPassListener() 418 try: 419 420 listener_copy = listener.copy() 421 listener_copy.append(test_tracker) 422 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 423 if parsers: 424 parsers = parsers[:1] 425 parser_instances = [] 426 for parser in parsers: 427 parser_instance = parser.__class__() 428 parser_instance.suite_name = self.suite_name 429 parser_instance.listeners = listener_copy 430 parser_instances.append(parser_instance) 431 handler = ShellHandler(parser_instances) 432 command = self._get_run_command() 433 self.config.device.execute_shell_command( 434 command, timeout=self.config.timeout, receiver=handler, 435 retry=0) 436 except ShellCommandUnresponsiveException as _: 437 LOG.debug("Exception: ShellCommandUnresponsiveException") 438 finally: 439 if not len(test_tracker.get_current_run_results()): 440 LOG.debug("No test case is obtained finally") 441 self.rerun_attemp -= 1 442 handler.parsers[0].mark_test_as_blocked(test) 443 else: 444 LOG.debug("Not execute and mark as blocked finally") 445 parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest) 446 if parsers: 447 parsers = parsers[:1] 448 parser_instances = [] 449 for parser in parsers: 450 parser_instance = parser.__class__() 451 parser_instance.suite_name = self.suite_name 452 parser_instance.listeners = listener 453 parser_instances.append(parser_instance) 454 handler = ShellHandler(parser_instances) 455 handler.parsers[0].mark_test_as_blocked(test) 456 457 def add_arg(self, name, value): 458 if not name or not value: 459 return 460 self.arg_list[name] = value 461 462 def remove_arg(self, name): 463 if not name: 464 return 465 if name in self.arg_list: 466 del self.arg_list[name] 467 468 def get_args_command(self): 469 args_commands = "" 470 for key, value in self.arg_list.items(): 471 if "wait_time" == key: 472 args_commands = "%s -w %s " % (args_commands, value) 473 else: 474 args_commands = "%s -s %s %s " % (args_commands, key, value) 475 return args_commands 476 477 def _get_run_command(self): 478 command = "" 479 if self.config.package_name: 480 # aa test -p ${packageName} -b ${bundleName}-s 481 # unittest OpenHarmonyTestRunner 482 command = "aa test -p %s -b %s -s unittest OpenHarmonyTestRunner" \ 483 " %s" % (self.config.package_name, 484 self.config.bundle_name, 485 self.get_args_command()) 486 elif self.config.module_name: 487 # aa test -m ${moduleName} -b ${bundleName} 488 # -s unittest OpenHarmonyTestRunner 489 command = "aa test -m %s -b %s -s unittest OpenHarmonyTestRunner" \ 490 " %s" % (self.config.module_name, 491 self.config.bundle_name, 492 self.get_args_command()) 493 return command 494 495 def _get_dry_run_command(self): 496 command = "" 497 if self.config.package_name: 498 command = "aa test -p %s -b %s -s unittest OpenHarmonyTestRunner" \ 499 " %s -s dryRun true" % (self.config.package_name, 500 self.config.bundle_name, 501 self.get_args_command()) 502 elif self.config.module_name: 503 command = "aa test -m %s -b %s -s unittest OpenHarmonyTestRunner" \ 504 " %s -s dryRun true" % (self.config.module_name, 505 self.config.bundle_name, 506 self.get_args_command()) 507 508 return command 509 510