1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import time 22from dataclasses import dataclass 23 24from xdevice import DeviceTestType 25from xdevice import IDriver 26from xdevice import Plugin 27from xdevice import platform_logger 28from xdevice import DeviceLabelType 29from xdevice import ShellHandler 30from xdevice import ExecuteTerminate 31from xdevice import get_plugin 32from xdevice import JsonParser 33from xdevice import get_config_value 34from xdevice import get_kit_instances 35from xdevice import check_result_report 36from xdevice import get_device_log_file 37from xdevice import get_test_component_version 38from xdevice import ParamError 39from ohos.constants import ParserType 40from ohos.constants import ComType 41from ohos.constants import CKit 42from ohos.exception import LiteDeviceExecuteCommandError 43from core.utils import get_filename_extension 44from core.testkit.kit_lite import DeployKit 45from core.config.resource_manager import ResourceManager 46from core.config.config_manager import UserConfigManager 47 48__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"] 49LOG = platform_logger("LiteUnitTest") 50 51 52def get_level_para_string(level_string): 53 level_list = list(set(level_string.split(","))) 54 level_para_string = "" 55 for item in level_list: 56 if not item.isdigit(): 57 continue 58 item = item.strip(" ") 59 level_para_string = f"{level_para_string}Level{item}," 60 level_para_string = level_para_string.strip(",") 61 return level_para_string 62 63 64@dataclass 65class GTestConst(object): 66 exec_para_filter = "--gtest_filter" 67 exec_para_level = "--gtest_testsize" 68 69 70@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test) 71class LiteUnitTest(IDriver): 72 """ 73 lite gtest test driver for L1 74 """ 75 config = None 76 log = platform_logger("LiteUnitTest") 77 nfs_dir = "" 78 mnt_cmd = "" 79 lite_device = None 80 result = None 81 82 def __check_failed__(self, msg): 83 self.log.error("check failed {}".format(msg)) 84 return 85 86 def __check_environment__(self, device_options): 87 pass 88 89 def __check_config__(self, config): 90 """ 91 1. check serial protocol 92 2. login device 93 3. NFS is available 94 :param config: serial device 95 :return: 96 """ 97 self.log.error("Lite driver check config:{}".format(config)) 98 99 def __execute__(self, request): 100 """ 101 102 1. select test case by subsystem, module, suite 103 2. open test dir 104 3、execute single test case, eg. ./test_demo 105 :param request: contains test condition, sub_system 106 module_name,test_suit, 107 test_case,test_level,test_case_dir 108 :return: 109 """ 110 self.log.debug("Test suite FilePath: %s" % 111 request.root.source.source_file) 112 self.lite_device = request.config.environment.devices[0] 113 self.lite_device.connect() 114 if not self._before_execute_test(): 115 self.log.error("open test dir failed") 116 return 117 self.log.debug("open test dir success") 118 if self._execute_test(request) == "": 119 self.log.error("execute test command failed") 120 return 121 self.log.info("execute test command success") 122 if not self._after_execute_test(request): 123 self.log.error("after execute test failed") 124 return 125 self.log.info("lite device execute request success") 126 127 def _mount_nfs_server(self): 128 #before execute each suits bin, mount nfs 129 self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config( 130 "NFS").get("mnt_cmd")) 131 if self.mnt_cmd == "mount ": 132 self.log.error("no configure for mount command") 133 return 134 135 filter_result, status, _ = \ 136 self.lite_device.execute_command_with_timeout( 137 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3) 138 if "already mounted" in filter_result: 139 self.log.info("nfs has been mounted") 140 return 141 142 for i in range(0, 2): 143 if status: 144 self.log.info("execute mount command success") 145 return 146 self.log.info("try mount %d" % (i + 2)) 147 _, status, _ = self.lite_device.execute_command_with_timeout( 148 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, 149 timeout=3) 150 151 self.log.error("execute mount command failed") 152 153 def _before_execute_test(self): 154 """ 155 need copy test case to nfs dir 156 :param request: nfs dir, test case path 157 :return: 158 """ 159 self.nfs_dir = \ 160 UserConfigManager().get_user_config("NFS").get("host_dir") 161 if self.nfs_dir == "": 162 self.log.error("no configure for nfs directory") 163 return False 164 self._mount_nfs_server() 165 _, status, _ = \ 166 self.lite_device.execute_command_with_timeout("cd /{}".format( 167 UserConfigManager().get_user_config("NFS").get("board_dir")), 168 case_type=DeviceTestType.lite_cpp_test) 169 if not status: 170 self.log.error("pre execute command failed") 171 return False 172 self.log.info("pre execute command success") 173 return True 174 175 def _execute_test(self, request): 176 test_case = request.root.source.source_file 177 self.config = request.config 178 test_para = self._get_test_para(self.config.testcase, 179 self.config.testlevel) 180 case_name = os.path.basename(test_case) 181 if os.path.exists(os.path.join(self.nfs_dir, case_name)): 182 os.remove(os.path.join(self.nfs_dir, case_name)) 183 result_name = case_name + ".xml" 184 result_file = os.path.join(self.nfs_dir, result_name) 185 if os.path.exists(result_file): 186 os.remove(result_file) 187 shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name)) 188 # push resource files 189 resource_manager = ResourceManager() 190 resource_data_dic, resource_dir = \ 191 resource_manager.get_resource_data_dic(test_case) 192 resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir) 193 self.lite_device.execute_command_with_timeout( 194 "chmod 777 {}".format(case_name), 195 case_type=DeviceTestType.lite_cpp_test) 196 test_command = "./%s %s" % (case_name, test_para) 197 case_result, status, _ = \ 198 self.lite_device.execute_command_with_timeout( 199 test_command, case_type=DeviceTestType.lite_cpp_test) 200 if status: 201 self.log.info("test case result:\n %s" % case_result) 202 return 203 self.log.error("failed case: %s" % test_case) 204 205 def _get_test_para(self, testcase, testlevel): 206 if "" != testcase and "" == testlevel: 207 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 208 elif "" == testcase and "" != testlevel: 209 level_para = get_level_para_string(testlevel) 210 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 211 else: 212 test_para = "" 213 return test_para 214 215 def _after_execute_test(self, request): 216 """ 217 copy test result to result dir 218 :param request: 219 :return: 220 """ 221 if request.config is None: 222 self.log.error("test config is null") 223 return False 224 report_path = request.config.report_path 225 test_result = os.path.join(report_path, "result") 226 test_case = request.root.source.source_file 227 case_name = os.path.basename(test_case) 228 if not os.path.exists(test_result): 229 os.mkdir(test_result) 230 sub_system_module = test_case.split( 231 "unittest" + os.sep)[1].split(os.sep + "bin")[0] 232 if os.sep in sub_system_module: 233 sub_system = sub_system_module.split(os.sep)[0] 234 module_name = sub_system_module.split(os.sep)[1] 235 subsystem_dir = os.path.join(test_result, sub_system) 236 if not os.path.exists(subsystem_dir): 237 os.mkdir(subsystem_dir) 238 module_dir = os.path.join(subsystem_dir, module_name) 239 if not os.path.exists(module_dir): 240 os.mkdir(module_dir) 241 test_result = module_dir 242 else: 243 if sub_system_module != "": 244 test_result = os.path.join(test_result, sub_system_module) 245 if not os.path.exists(test_result): 246 os.mkdir(test_result) 247 result_name = case_name + ".xml" 248 result_file = os.path.join(self.nfs_dir, result_name) 249 if not self._check_xml_exist(result_name): 250 self.log.error("result xml file %s not exist." % result_name) 251 if not os.path.exists(result_file): 252 self.log.error("file %s not exist." % result_file) 253 self._clear_nfs_space() 254 return False 255 file_name = os.path.basename(result_file) 256 final_result = os.path.join(test_result, file_name) 257 shutil.copyfile(result_file, 258 final_result) 259 self.log.info("after execute test") 260 self._clear_nfs_space() 261 self.lite_device.close() 262 return True 263 264 def _clear_nfs_space(self): 265 _, status, _ = \ 266 self.lite_device.execute_command_with_timeout( 267 "cd ..", 268 case_type=DeviceTestType.lite_cpp_test) 269 _, status, _ = \ 270 self.lite_device.execute_command_with_timeout( 271 "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"), 272 case_type=DeviceTestType.lite_cpp_test) 273 shutil.rmtree(self.nfs_dir) 274 os.mkdir(self.nfs_dir) 275 276 def _check_xml_exist(self, xml_file, timeout=10): 277 ls_command = \ 278 "ls /%s" % \ 279 UserConfigManager().get_user_config("NFS").get("board_dir") 280 start_time = time.time() 281 while time.time() - start_time < timeout: 282 result, _, _ = self.lite_device.execute_command_with_timeout( 283 command=ls_command, case_type=DeviceTestType.cpp_test_lite, 284 timeout=5, receiver=None) 285 if xml_file in result: 286 return True 287 time.sleep(5) 288 return False 289 290 def show_help_info(self): 291 """ 292 show help info. 293 """ 294 self.log.info("this is test driver for cpp test") 295 return 296 297 def show_driver_info(self): 298 """ 299 show driver info. 300 """ 301 self.log.info("this is test driver for cpp test") 302 return 303 304 def __result__(self): 305 pass 306 307 308@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) 309class CTestDriver(IDriver): 310 """ 311 CTest is a test that runs a native test package on given lite device. 312 """ 313 config = None 314 result = "" 315 error_message = "" 316 version_cmd = "AT+CSV" 317 318 def __init__(self): 319 self.file_name = "" 320 321 def __check_environment__(self, device_options): 322 if len(device_options) != 1 or \ 323 device_options[0].label != DeviceLabelType.wifiiot: 324 self.error_message = "check environment failed" 325 return False 326 return True 327 328 def __check_config__(self, config=None): 329 del config 330 self.config = None 331 332 def __execute__(self, request): 333 from xdevice import Variables 334 try: 335 self.config = request.config 336 self.config.device = request.config.environment.devices[0] 337 338 if request.config.resource_path: 339 current_dir = request.config.resource_path 340 else: 341 current_dir = Variables.exec_dir 342 343 config_file = request.root.source.config_file.strip() 344 if config_file: 345 source = os.path.join(current_dir, config_file) 346 self.file_name = os.path.basename(config_file).split(".")[0] 347 else: 348 source = request.root.source.source_string.strip() 349 350 self._run_ctest(source=source, request=request) 351 352 except (LiteDeviceExecuteCommandError, Exception) as exception: 353 LOG.error(exception, error_no=getattr(exception, "error_no", 354 "00000")) 355 self.error_message = exception 356 finally: 357 if request.root.source.test_name.startswith("{"): 358 report_name = "report" 359 else: 360 report_name = get_filename_extension( 361 request.root.source.test_name)[0] 362 363 self.result = check_result_report(request.config.report_path, 364 self.result, 365 self.error_message, 366 report_name) 367 368 def _run_ctest(self, source=None, request=None): 369 if not source: 370 LOG.error("Error: %s don't exist." % source, error_no="00101") 371 return 372 373 try: 374 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 375 version = get_test_component_version(self.config) 376 parser_instances = [] 377 for parser in parsers: 378 parser_instance = parser.__class__() 379 parser_instance.suites_name = self.file_name 380 parser_instance.product_info.setdefault("Version", version) 381 parser_instance.listeners = request.listeners 382 parser_instances.append(parser_instance) 383 handler = ShellHandler(parser_instances) 384 385 reset_cmd = self._reset_device(request, source) 386 self.result = "%s.xml" % os.path.join(request.config.report_path, 387 "result", self.file_name) 388 389 self.config.device.device.com_dict.get( 390 ComType.deploy_com).connect() 391 392 result, _, error = self.config.device.device. \ 393 execute_command_with_timeout( 394 command=reset_cmd, 395 case_type=DeviceTestType.ctest_lite, 396 key=ComType.deploy_com, 397 timeout=90, 398 receiver=handler) 399 400 device_log_file = get_device_log_file(request.config.report_path, 401 request.config.device. 402 __get_serial__()) 403 404 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 405 os.O_CREAT | os.O_APPEND, 0o755) 406 with os.fdopen(device_log_file_open, "a") as file_name: 407 file_name.write("{}{}".format( 408 "\n".join(result.split("\n")[0:-1]), "\n")) 409 file_name.flush() 410 finally: 411 self.config.device.device.com_dict.get(ComType.deploy_com).close() 412 413 def _reset_device(self, request, source): 414 json_config = JsonParser(source) 415 reset_cmd = [] 416 kit_instances = get_kit_instances(json_config, 417 request.config.resource_path, 418 request.config.testcases_path) 419 from xdevice import Scheduler 420 421 for (kit_instance, kit_info) in zip(kit_instances, 422 json_config.get_kits()): 423 if not isinstance(kit_instance, DeployKit): 424 continue 425 if not self.file_name: 426 self.file_name = get_config_value( 427 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 428 reset_cmd = kit_instance.burn_command 429 if not Scheduler.is_execute: 430 raise ExecuteTerminate("ExecuteTerminate", error_no="00300") 431 432 kit_instance.__setup__(self.config.device, 433 source_file=request.root.source.source_file.strip()) 434 435 reset_cmd = [int(item, 16) for item in reset_cmd] 436 return reset_cmd 437 438 def __result__(self): 439 return self.result if os.path.exists(self.result) else "" 440 441 442 443@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) 444class JSUnitTestLiteDriver(IDriver): 445 """ 446 JSUnitTestDriver is a Test that runs a native test package on given device. 447 """ 448 449 def __init__(self): 450 self.result = "" 451 self.error_message = "" 452 self.kits = [] 453 self.config = None 454 455 def __check_environment__(self, device_options): 456 pass 457 458 def __check_config__(self, config): 459 pass 460 461 def _get_driver_config(self, json_config): 462 bundle_name = get_config_value('bundle-name', 463 json_config.get_driver(), False) 464 if not bundle_name: 465 raise ParamError("Can't find bundle-name in config file.", 466 error_no="00108") 467 else: 468 self.config.bundle_name = bundle_name 469 470 ability = get_config_value('ability', 471 json_config.get_driver(), False) 472 if not ability: 473 self.config.ability = "default" 474 else: 475 self.config.ability = ability 476 477 def __execute__(self, request): 478 try: 479 LOG.debug("Start execute xdevice extension JSUnit Test") 480 481 self.config = request.config 482 self.config.device = request.config.environment.devices[0] 483 484 config_file = request.root.source.config_file 485 suite_file = request.root.source.source_file 486 487 if not suite_file: 488 raise ParamError( 489 "test source '%s' not exists" % 490 request.root.source.source_string, error_no="00101") 491 492 if not os.path.exists(config_file): 493 LOG.error("Error: Test cases don't exist %s." % config_file, 494 error_no="00101") 495 raise ParamError( 496 "Error: Test cases don't exist %s." % config_file, 497 error_no="00101") 498 499 self.file_name = os.path.basename( 500 request.root.source.source_file.strip()).split(".")[0] 501 502 self.result = "%s.xml" % os.path.join( 503 request.config.report_path, "result", self.file_name) 504 505 json_config = JsonParser(config_file) 506 self.kits = get_kit_instances(json_config, 507 self.config.resource_path, 508 self.config.testcases_path) 509 510 self._get_driver_config(json_config) 511 512 from xdevice import Scheduler 513 for kit in self.kits: 514 if not Scheduler.is_execute: 515 raise ExecuteTerminate("ExecuteTerminate", 516 error_no="00300") 517 if kit.__class__.__name__ == CKit.liteinstall: 518 kit.bundle_name = self.config.bundle_name 519 kit.__setup__(self.config.device, request=request) 520 521 self._run_jsunit(request) 522 523 except Exception as exception: 524 self.error_message = exception 525 finally: 526 report_name = "report" if request.root.source. \ 527 test_name.startswith("{") else get_filename_extension( 528 request.root.source.test_name)[0] 529 530 self.result = check_result_report( 531 request.config.report_path, self.result, self.error_message, 532 report_name) 533 534 for kit in self.kits: 535 kit.__teardown__(self.config.device) 536 537 self.config.device.close() 538 539 def _run_jsunit(self, request): 540 parser_instances = [] 541 parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) 542 for parser in parsers: 543 parser_instance = parser.__class__() 544 parser_instance.suites_name = self.file_name 545 parser_instance.listeners = request.listeners 546 parser_instances.append(parser_instance) 547 handler = ShellHandler(parser_instances) 548 549 command = "./bin/aa start -p %s -n %s" % \ 550 (self.config.bundle_name, self.config.ability) 551 result, _, error = self.config.device.execute_command_with_timeout( 552 command=command, 553 timeout=300, 554 receiver=handler) 555 556 device_log_file = get_device_log_file(request.config.report_path, 557 request.config.device. 558 __get_serial__()) 559 560 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 561 os.O_CREAT | os.O_APPEND, 0o755) 562 with os.fdopen(device_log_file_open, "a") as file_name: 563 file_name.write("{}{}".format( 564 "\n".join(result.split("\n")[0:-1]), "\n")) 565 file_name.flush() 566 567 def __result__(self): 568 return self.result if os.path.exists(self.result) else ""