1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2024 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import shutil 22import stat 23import time 24 25from xdevice import ParamError 26from xdevice import get_device_log_file 27from xdevice import check_result_report 28from xdevice import get_kit_instances 29from xdevice import do_module_kit_setup 30from xdevice import do_module_kit_teardown 31from xdevice import get_config_value 32from xdevice import Plugin 33from xdevice import DeviceTestType 34from xdevice import IDriver 35from xdevice import get_plugin 36from xdevice import CommonParserType 37from xdevice import ShellHandler 38from xdevice import ConfigConst 39from xdevice import JsonParser 40from xdevice import TestDescription 41from xdevice import platform_logger 42 43from ohos.constants import CKit 44from ohos.executor.listener import CollectingPassListener 45from core.driver.drivers import update_xml 46from core.driver.drivers import get_result_savepath 47 48__all__ = ["OHJSUnitTestDriver", "oh_jsunit_para_parse"] 49 50LOG = platform_logger("Drivers") 51TIME_OUT = 300 * 1000 52 53 54def oh_jsunit_para_parse(runner, junit_paras): 55 junit_paras = dict(junit_paras) 56 test_type_list = ["function", "performance", "reliability", "security"] 57 size_list = ["small", "medium", "large"] 58 level_list = ["0", "1", "2", "3"] 59 for para_name in junit_paras.keys(): 60 para_name = para_name.strip() 61 para_values = junit_paras.get(para_name, []) 62 if para_name == "class": 63 runner.add_arg(para_name, ",".join(para_values)) 64 elif para_name == "notClass": 65 runner.add_arg(para_name, ",".join(para_values)) 66 elif para_name == "testType": 67 if para_values[0] not in test_type_list: 68 continue 69 # function/performance/reliability/security 70 runner.add_arg(para_name, para_values[0]) 71 elif para_name == "size": 72 if para_values[0] not in size_list: 73 continue 74 # size small/medium/large 75 runner.add_arg(para_name, para_values[0]) 76 elif para_name == "level": 77 if para_values[0] not in level_list: 78 continue 79 # 0/1/2/3/4 80 runner.add_arg(para_name, para_values[0]) 81 elif para_name == "stress": 82 runner.add_arg(para_name, para_values[0]) 83 84 85class OHJSUnitTestRunner: 86 MAX_RETRY_TIMES = 3 87 88 def __init__(self, config): 89 self.arg_list = {} 90 self.suites_name = None 91 self.config = config 92 self.rerun_attemp = 3 93 self.suite_recorder = {} 94 self.finished = False 95 self.expect_tests_dict = dict() 96 self.finished_observer = None 97 self.retry_times = 1 98 self.compile_mode = "" 99 100 def dry_run(self): 101 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 102 if parsers: 103 parsers = parsers[:1] 104 parser_instances = [] 105 for parser in parsers: 106 parser_instance = parser.__class__() 107 parser_instances.append(parser_instance) 108 handler = ShellHandler(parser_instances) 109 command = self._get_dry_run_command() 110 self.config.device.execute_shell_command( 111 command, timeout=self.config.timeout, receiver=handler, retry=0) 112 self.expect_tests_dict = parser_instances[0].tests_dict 113 return parser_instances[0].tests 114 115 def run(self, listener): 116 handler = self._get_shell_handler(listener) 117 command = self._get_run_command() 118 self.config.device.execute_shell_command( 119 command, timeout=self.config.timeout, receiver=handler, retry=0) 120 121 def notify_finished(self): 122 if self.finished_observer: 123 self.finished_observer.notify_task_finished() 124 self.retry_times -= 1 125 126 def _get_shell_handler(self, listener): 127 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 128 if parsers: 129 parsers = parsers[:1] 130 parser_instances = [] 131 for parser in parsers: 132 parser_instance = parser.__class__() 133 parser_instance.suites_name = self.suites_name 134 parser_instance.listeners = listener 135 parser_instance.runner = self 136 parser_instances.append(parser_instance) 137 self.finished_observer = parser_instance 138 handler = ShellHandler(parser_instances) 139 return handler 140 141 def add_arg(self, name, value): 142 if not name or not value: 143 return 144 self.arg_list[name] = value 145 146 def remove_arg(self, name): 147 if not name: 148 return 149 if name in self.arg_list: 150 del self.arg_list[name] 151 152 def get_args_command(self): 153 args_commands = "" 154 for key, value in self.arg_list.items(): 155 if "wait_time" == key: 156 args_commands = "%s -w %s " % (args_commands, value) 157 else: 158 args_commands = "%s -s %s %s " % (args_commands, key, value) 159 return args_commands 160 161 def _get_run_command(self): 162 command = "" 163 if self.config.package_name: 164 # aa test -p ${packageName} -b ${bundleName}-s 165 # unittest OpenHarmonyTestRunner 166 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 167 " {}".format(self.config.package_name, 168 self.config.bundle_name, 169 self.get_args_command()) 170 elif self.config.module_name: 171 # aa test -m ${moduleName} -b ${bundleName} 172 # -s unittest OpenHarmonyTestRunner 173 command = "aa test -m {} -b {} -s unittest {} {}".format( 174 self.config.module_name, self.config.bundle_name, 175 self.get_oh_test_runner_path(), self.get_args_command()) 176 return command 177 178 def _get_dry_run_command(self): 179 command = "" 180 if self.config.package_name: 181 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 182 " {} -s dryRun true".format(self.config.package_name, 183 self.config.bundle_name, 184 self.get_args_command()) 185 elif self.config.module_name: 186 command = "aa test -m {} -b {} -s unittest {}" \ 187 " {} -s dryRun true".format(self.config.module_name, 188 self.config.bundle_name, 189 self.get_oh_test_runner_path(), 190 self.get_args_command()) 191 192 return command 193 194 def get_oh_test_runner_path(self): 195 if self.compile_mode == "esmodule": 196 return "/ets/testrunner/OpenHarmonyTestRunner" 197 else: 198 return "OpenHarmonyTestRunner" 199 200 201@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 202class OHJSUnitTestDriver(IDriver): 203 """ 204 OHJSUnitTestDriver is a Test that runs a native test package on 205 given device. 206 """ 207 208 def __init__(self): 209 self.timeout = 80 * 1000 210 self.start_time = None 211 self.result = "" 212 self.error_message = "" 213 self.kits = [] 214 self.config = None 215 self.runner = None 216 self.rerun = True 217 self.rerun_all = True 218 # log 219 self.device_log = None 220 self.hilog = None 221 self.log_proc = None 222 self.hilog_proc = None 223 224 def __check_environment__(self, device_options): 225 pass 226 227 def __check_config__(self, config): 228 pass 229 230 def __execute__(self, request): 231 try: 232 LOG.debug("Developer_test Start execute OpenHarmony JSUnitTest") 233 self.config = request.config 234 self.config.device = request.config.environment.devices[0] 235 236 config_file = request.root.source.config_file 237 suite_file = request.root.source.source_file 238 result_save_path = get_result_savepath(suite_file, self.config.report_path) 239 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 240 if not suite_file: 241 raise ParamError( 242 "test source '%s' not exists" % 243 request.root.source.source_string, error_no="00110") 244 LOG.debug("Test case file path: %s" % suite_file) 245 self.config.device.set_device_report_path(request.config.report_path) 246 self.hilog = get_device_log_file(request.config.report_path, 247 request.config.device.__get_serial__() + "_" + request. 248 get_module_name(), 249 "device_hilog") 250 251 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 252 0o755) 253 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 254 self.config.device.execute_shell_command(command="hilog -r") 255 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 256 if hasattr(self.config, ConfigConst.device_log) \ 257 and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 258 and hasattr(self.config.device, "clear_crash_log"): 259 self.config.device.device_log_collector.clear_crash_log() 260 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 261 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 262 self._run_oh_jsunit(config_file, request) 263 except Exception as exception: 264 self.error_message = exception 265 if not getattr(exception, "error_no", ""): 266 setattr(exception, "error_no", "03409") 267 LOG.exception(self.error_message, exc_info=True, error_no="03409") 268 raise exception 269 finally: 270 try: 271 self._handle_logs(request) 272 finally: 273 xml_path = os.path.join( 274 request.config.report_path, "result", 275 '.'.join((request.get_module_name(), "xml"))) 276 shutil.move(xml_path, self.result) 277 self.result = check_result_report( 278 request.config.report_path, self.result, self.error_message) 279 update_xml(request.root.source.source_file, self.result) 280 281 def _run_oh_jsunit(self, config_file, request): 282 try: 283 if not os.path.exists(config_file): 284 LOG.error("Error: Test cases don't exist %s." % config_file) 285 raise ParamError( 286 "Error: Test cases don't exist %s." % config_file, 287 error_no="00102") 288 json_config = JsonParser(config_file) 289 self.kits = get_kit_instances(json_config, 290 self.config.resource_path, 291 self.config.testcases_path) 292 293 self._get_driver_config(json_config) 294 self.config.device.connector_command("target mount") 295 self._start_smart_perf() 296 do_module_kit_setup(request, self.kits) 297 self.runner = OHJSUnitTestRunner(self.config) 298 self.runner.suites_name = request.get_module_name() 299 self._get_runner_config(json_config) 300 if hasattr(self.config, "history_report_path") and \ 301 self.config.testargs.get("test"): 302 self._do_test_retry(request.listeners, self.config.testargs) 303 else: 304 if self.rerun: 305 self.runner.retry_times = self.runner.MAX_RETRY_TIMES 306 # execute test case 307 self._do_tf_suite() 308 self._make_exclude_list_file(request) 309 oh_jsunit_para_parse(self.runner, self.config.testargs) 310 self._do_test_run(listener=request.listeners) 311 312 finally: 313 do_module_kit_teardown(request) 314 315 def _get_driver_config(self, json_config): 316 package = get_config_value('package-name', 317 json_config.get_driver(), False) 318 module = get_config_value('module-name', 319 json_config.get_driver(), False) 320 bundle = get_config_value('bundle-name', 321 json_config. get_driver(), False) 322 is_rerun = get_config_value('rerun', json_config.get_driver(), False) 323 324 self.config.package_name = package 325 self.config.module_name = module 326 self.config.bundle_name = bundle 327 self.rerun = True if is_rerun == 'true' else False 328 329 if not package and not module: 330 raise ParamError("Neither package nor module is found" 331 " in config file.", error_no="03201") 332 timeout_config = get_config_value("shell-timeout", 333 json_config.get_driver(), False) 334 if timeout_config: 335 self.config.timeout = int(timeout_config) 336 else: 337 self.config.timeout = TIME_OUT 338 339 def _get_runner_config(self, json_config): 340 test_timeout = get_config_value('test-timeout', 341 json_config.get_driver(), False) 342 if test_timeout: 343 self.runner.add_arg("wait_time", int(test_timeout)) 344 345 testcase_timeout = get_config_value('testcase-timeout', 346 json_config.get_driver(), False) 347 if testcase_timeout: 348 self.runner.add_arg("timeout", int(testcase_timeout)) 349 self.runner.compile_mode = get_config_value( 350 'compile-mode', json_config.get_driver(), False) 351 352 def _do_test_run(self, listener): 353 test_to_run = self._collect_test_to_run() 354 LOG.info("Collected suite count is: {}, test count is: {}". 355 format(len(self.runner.expect_tests_dict.keys()), 356 len(test_to_run) if test_to_run else 0)) 357 if not test_to_run or not self.rerun: 358 self.runner.run(listener) 359 self.runner.notify_finished() 360 else: 361 self._run_with_rerun(listener, test_to_run) 362 363 def _collect_test_to_run(self): 364 run_results = self.runner.dry_run() 365 return run_results 366 367 def _run_tests(self, listener): 368 test_tracker = CollectingPassListener() 369 listener_copy = listener.copy() 370 listener_copy.append(test_tracker) 371 self.runner.run(listener_copy) 372 test_run = test_tracker.get_current_run_results() 373 return test_run 374 375 def _run_with_rerun(self, listener, expected_tests): 376 LOG.debug("Developer_test Ready to run with rerun, expect run: %s" 377 % len(expected_tests)) 378 test_run = self._run_tests(listener) 379 self.runner.retry_times -= 1 380 LOG.debug("Run with rerun, has run: %s" % len(test_run) 381 if test_run else 0) 382 if len(test_run) < len(expected_tests): 383 expected_tests = TestDescription.remove_test(expected_tests, 384 test_run) 385 if not expected_tests: 386 LOG.debug("No tests to re-run twice,please check") 387 self.runner.notify_finished() 388 else: 389 self._rerun_twice(expected_tests, listener) 390 else: 391 LOG.debug("Rerun once success") 392 self.runner.notify_finished() 393 394 def _rerun_twice(self, expected_tests, listener): 395 tests = [] 396 for test in expected_tests: 397 tests.append("%s#%s" % (test.class_name, test.test_name)) 398 self.runner.add_arg("class", ",".join(tests)) 399 LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests)) 400 test_run = self._run_tests(listener) 401 self.runner.retry_times -= 1 402 LOG.debug("Rerun twice, has run: %s" % len(test_run)) 403 if len(test_run) < len(expected_tests): 404 expected_tests = TestDescription.remove_test(expected_tests, 405 test_run) 406 if not expected_tests: 407 LOG.debug("No tests to re-run third,please check") 408 self.runner.notify_finished() 409 else: 410 self._rerun_third(expected_tests, listener) 411 else: 412 LOG.debug("Rerun twice success") 413 self.runner.notify_finished() 414 415 def _rerun_third(self, expected_tests, listener): 416 tests = [] 417 for test in expected_tests: 418 tests.append("%s#%s" % (test.class_name, test.test_name)) 419 self.runner.add_arg("class", ",".join(tests)) 420 LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests)) 421 self._run_tests(listener) 422 LOG.debug("Rerun third success") 423 self.runner.notify_finished() 424 425 def _make_exclude_list_file(self, request): 426 if "all-test-file-exclude-filter" in self.config.testargs: 427 json_file_list = self.config.testargs.get( 428 "all-test-file-exclude-filter") 429 self.config.testargs.pop("all-test-file-exclude-filter") 430 if not json_file_list: 431 LOG.warning("all-test-file-exclude-filter value is empty!") 432 else: 433 if not os.path.isfile(json_file_list[0]): 434 LOG.warning( 435 "[{}] is not a valid file".format(json_file_list[0])) 436 return 437 file_open = os.open(json_file_list[0], os.O_RDONLY, 438 stat.S_IWUSR | stat.S_IRUSR) 439 with os.fdopen(file_open, "r") as file_handler: 440 json_data = json.load(file_handler) 441 exclude_list = json_data.get( 442 DeviceTestType.oh_jsunit_test, []) 443 filter_list = [] 444 for exclude in exclude_list: 445 if request.get_module_name() not in exclude: 446 continue 447 filter_list.extend(exclude.get(request.get_module_name())) 448 if not isinstance(self.config.testargs, dict): 449 return 450 if 'notClass' in self.config.testargs.keys(): 451 filter_list.extend(self.config.testargs.get('notClass', [])) 452 self.config.testargs.update({"notClass": filter_list}) 453 454 def _do_test_retry(self, listener, testargs): 455 tests_dict = dict() 456 case_list = list() 457 for test in testargs.get("test"): 458 test_item = test.split("#") 459 if len(test_item) != 2: 460 continue 461 case_list.append(test) 462 if test_item[0] not in tests_dict: 463 tests_dict.update({test_item[0] : []}) 464 tests_dict.get(test_item[0]).append( 465 TestDescription(test_item[0], test_item[1])) 466 self.runner.add_arg("class", ",".join(case_list)) 467 self.runner.expect_tests_dict = tests_dict 468 self.config.testargs.pop("test") 469 self.runner.run(listener) 470 self.runner.notify_finished() 471 472 def _do_tf_suite(self): 473 if hasattr(self.config, "tf_suite") and \ 474 self.config.tf_suite.get("cases", []): 475 case_list = self.config["tf_suite"]["cases"] 476 self.config.testargs.update({"class": case_list}) 477 478 def _start_smart_perf(self): 479 if not hasattr(self.config, ConfigConst.kits_in_module): 480 return 481 if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module): 482 return 483 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 484 sp_kits.target_name = self.config.bundle_name 485 param_config = self.config.get(ConfigConst.kits_params).get( 486 CKit.smartperf, "") 487 sp_kits.__check_config__(param_config) 488 self.kits.insert(0, sp_kits) 489 490 def _handle_logs(self, request): 491 serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns()) 492 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 493 if hasattr(self.config, ConfigConst.device_log) and \ 494 self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 495 and hasattr(self.config.device, "start_get_crash_log"): 496 self.config.device.device_log_collector.\ 497 start_get_crash_log(log_tar_file_name, module_name=request.get_module_name()) 498 self.config.device.device_log_collector.\ 499 remove_log_address(self.device_log, self.hilog) 500 self.config.device.device_log_collector.\ 501 stop_catch_device_log(self.log_proc) 502 self.config.device.device_log_collector.\ 503 stop_catch_device_log(self.hilog_proc) 504 505 def __result__(self): 506 return self.result if os.path.exists(self.result) else "" 507 508