1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import time 22from dataclasses import dataclass 23 24from xdevice import DeviceTestType 25from xdevice import IDriver 26from xdevice import Plugin 27from xdevice import platform_logger 28from xdevice import DeviceLabelType 29from xdevice import ComType 30from xdevice import ParserType 31from xdevice import ShellHandler 32from xdevice import ExecuteTerminate 33from xdevice import LiteDeviceExecuteCommandError 34from xdevice import get_plugin 35from xdevice import JsonParser 36from xdevice import get_config_value 37from xdevice import get_kit_instances 38from xdevice import check_result_report 39from xdevice import get_device_log_file 40from xdevice import get_test_component_version 41from xdevice import ParamError 42from core.utils import get_filename_extension 43from core.testkit.kit_lite import DeployKit 44from core.config.resource_manager import ResourceManager 45from core.config.config_manager import UserConfigManager 46 47__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"] 48LOG = platform_logger("LiteUnitTest") 49 50 51def get_level_para_string(level_string): 52 level_list = list(set(level_string.split(","))) 53 level_para_string = "" 54 for item in level_list: 55 if not item.isdigit(): 56 continue 57 item = item.strip(" ") 58 level_para_string += ("Level%s," % item) 59 level_para_string = level_para_string.strip(",") 60 return level_para_string 61 62 63@dataclass 64class GTestConst(object): 65 exec_para_filter = "--gtest_filter" 66 exec_para_level = "--gtest_testsize" 67 68 69@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test) 70class LiteUnitTest(IDriver): 71 """ 72 lite gtest test driver for L1 73 """ 74 config = None 75 log = platform_logger("LiteUnitTest") 76 nfs_dir = "" 77 mnt_cmd = "" 78 lite_device = None 79 result = None 80 81 def __check_failed__(self, msg): 82 self.log.error("check failed {}".format(msg)) 83 return None 84 85 def __check_environment__(self, device_options): 86 pass 87 88 def __check_config__(self, config): 89 """ 90 1. check serial protocol 91 2. login device 92 3. NFS is available 93 :param config: serial device 94 :return: 95 """ 96 self.log.error("Lite driver check config:{}".format(config)) 97 98 def __execute__(self, request): 99 """ 100 101 1. select test case by subsystem, module, suite 102 2. open test dir 103 3、execute single test case, eg. ./test_demo 104 :param request: contains test condition, sub_system 105 module_name,test_suit, 106 test_case,test_level,test_case_dir 107 :return: 108 """ 109 self.log.debug("Test suite FilePath: %s" % 110 request.root.source.source_file) 111 self.lite_device = request.config.environment.devices[0] 112 self.lite_device.connect() 113 if not self._before_execute_test(): 114 self.log.error("open test dir failed") 115 return 116 self.log.debug("open test dir success") 117 if self._execute_test(request) == "": 118 self.log.error("execute test command failed") 119 return 120 self.log.info("execute test command success") 121 if not self._after_execute_test(request): 122 self.log.error("after execute test failed") 123 return 124 self.log.info("lite device execute request success") 125 126 def _mount_nfs_server(self): 127 #before execute each suits bin, mount nfs 128 self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config( 129 "NFS").get("mnt_cmd")) 130 if self.mnt_cmd == "mount ": 131 self.log.error("no configure for mount command") 132 return 133 134 filter_result, status, _ = \ 135 self.lite_device.execute_command_with_timeout( 136 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3) 137 if "already mounted" in filter_result: 138 self.log.info("nfs has been mounted") 139 return 140 141 for i in range(0, 2): 142 if status: 143 self.log.info("execute mount command success") 144 return 145 self.log.info("try mount %d" % (i + 2)) 146 _, status, _ = self.lite_device.execute_command_with_timeout( 147 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, 148 timeout=3) 149 150 self.log.error("execute mount command failed") 151 152 def _before_execute_test(self): 153 """ 154 need copy test case to nfs dir 155 :param request: nfs dir, test case path 156 :return: 157 """ 158 self.nfs_dir = \ 159 UserConfigManager().get_user_config("NFS").get("host_dir") 160 if self.nfs_dir == "": 161 self.log.error("no configure for nfs directory") 162 return False 163 self._mount_nfs_server() 164 _, status, _ = \ 165 self.lite_device.execute_command_with_timeout("cd /{}".format( 166 UserConfigManager().get_user_config("NFS").get("board_dir")), 167 case_type=DeviceTestType.lite_cpp_test) 168 if not status: 169 self.log.error("pre execute command failed") 170 return False 171 self.log.info("pre execute command success") 172 return True 173 174 def _execute_test(self, request): 175 test_case = request.root.source.source_file 176 self.config = request.config 177 test_para = self._get_test_para(self.config.testcase, 178 self.config.testlevel) 179 case_name = os.path.basename(test_case) 180 if os.path.exists(os.path.join(self.nfs_dir, case_name)): 181 os.remove(os.path.join(self.nfs_dir, case_name)) 182 result_name = case_name + ".xml" 183 result_file = os.path.join(self.nfs_dir, result_name) 184 if os.path.exists(result_file): 185 os.remove(result_file) 186 shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name)) 187 # push resource files 188 resource_manager = ResourceManager() 189 resource_data_dic, resource_dir = \ 190 resource_manager.get_resource_data_dic(test_case) 191 resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir) 192 self.lite_device.execute_command_with_timeout( 193 "chmod 777 {}".format(case_name), 194 case_type=DeviceTestType.lite_cpp_test) 195 test_command = "./%s %s" % (case_name, test_para) 196 case_result, status, _ = \ 197 self.lite_device.execute_command_with_timeout( 198 test_command, case_type=DeviceTestType.lite_cpp_test) 199 if status: 200 self.log.info("test case result:\n %s" % case_result) 201 return 202 self.log.error("failed case: %s" % test_case) 203 204 def _get_test_para(self, testcase, testlevel): 205 if "" != testcase and "" == testlevel: 206 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 207 elif "" == testcase and "" != testlevel: 208 level_para = get_level_para_string(testlevel) 209 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 210 else: 211 test_para = "" 212 return test_para 213 214 def _after_execute_test(self, request): 215 """ 216 copy test result to result dir 217 :param request: 218 :return: 219 """ 220 if request.config is None: 221 self.log.error("test config is null") 222 return False 223 report_path = request.config.report_path 224 test_result = os.path.join(report_path, "result") 225 test_case = request.root.source.source_file 226 case_name = os.path.basename(test_case) 227 if not os.path.exists(test_result): 228 os.mkdir(test_result) 229 sub_system_module = test_case.split( 230 "unittest" + os.sep)[1].split(os.sep + "bin")[0] 231 if os.sep in sub_system_module: 232 sub_system = sub_system_module.split(os.sep)[0] 233 module_name = sub_system_module.split(os.sep)[1] 234 subsystem_dir = os.path.join(test_result, sub_system) 235 if not os.path.exists(subsystem_dir): 236 os.mkdir(subsystem_dir) 237 module_dir = os.path.join(subsystem_dir, module_name) 238 if not os.path.exists(module_dir): 239 os.mkdir(module_dir) 240 test_result = module_dir 241 else: 242 if sub_system_module != "": 243 test_result = os.path.join(test_result, sub_system_module) 244 if not os.path.exists(test_result): 245 os.mkdir(test_result) 246 result_name = case_name + ".xml" 247 result_file = os.path.join(self.nfs_dir, result_name) 248 if not self._check_xml_exist(result_name): 249 self.log.error("result xml file %s not exist." % result_name) 250 if not os.path.exists(result_file): 251 self.log.error("file %s not exist." % result_file) 252 self._clear_nfs_space() 253 return False 254 file_name = os.path.basename(result_file) 255 final_result = os.path.join(test_result, file_name) 256 shutil.copyfile(result_file, 257 final_result) 258 self.log.info("after execute test") 259 self._clear_nfs_space() 260 self.lite_device.close() 261 return True 262 263 def _clear_nfs_space(self): 264 _, status, _ = \ 265 self.lite_device.execute_command_with_timeout( 266 "cd ..", 267 case_type=DeviceTestType.lite_cpp_test) 268 _, status, _ = \ 269 self.lite_device.execute_command_with_timeout( 270 "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"), 271 case_type=DeviceTestType.lite_cpp_test) 272 shutil.rmtree(self.nfs_dir) 273 os.mkdir(self.nfs_dir) 274 275 def _check_xml_exist(self, xml_file, timeout=10): 276 ls_command = \ 277 "ls /%s" % \ 278 UserConfigManager().get_user_config("NFS").get("board_dir") 279 start_time = time.time() 280 while time.time()-start_time < timeout: 281 result, _, _ = self.lite_device.execute_command_with_timeout( 282 command=ls_command, case_type=DeviceTestType.cpp_test_lite, 283 timeout=5, receiver=None) 284 if xml_file in result: 285 return True 286 time.sleep(5) 287 return False 288 289 def show_help_info(self): 290 """ 291 show help info. 292 """ 293 self.log.info("this is test driver for cpp test") 294 return None 295 296 def show_driver_info(self): 297 """ 298 show driver info. 299 """ 300 self.log.info("this is test driver for cpp test") 301 return None 302 303 def __result__(self): 304 pass 305 306 307@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) 308class CTestDriver(IDriver): 309 """ 310 CTest is a test that runs a native test package on given lite device. 311 """ 312 config = None 313 result = "" 314 error_message = "" 315 version_cmd = "AT+CSV" 316 317 def __init__(self): 318 self.file_name = "" 319 320 def __check_environment__(self, device_options): 321 if len(device_options) != 1 or \ 322 device_options[0].label != DeviceLabelType.wifiiot: 323 self.error_message = "check environment failed" 324 return False 325 return True 326 327 def __check_config__(self, config=None): 328 del config 329 self.config = None 330 331 def __execute__(self, request): 332 from xdevice import Variables 333 try: 334 self.config = request.config 335 self.config.device = request.config.environment.devices[0] 336 337 if request.config.resource_path: 338 current_dir = request.config.resource_path 339 else: 340 current_dir = Variables.exec_dir 341 342 config_file = request.root.source.config_file.strip() 343 if config_file: 344 source = os.path.join(current_dir, config_file) 345 self.file_name = os.path.basename(config_file).split(".")[0] 346 else: 347 source = request.root.source.source_string.strip() 348 349 self._run_ctest(source=source, request=request) 350 351 except (LiteDeviceExecuteCommandError, Exception) as exception: 352 LOG.error(exception, error_no=getattr(exception, "error_no", 353 "00000")) 354 self.error_message = exception 355 finally: 356 if request.root.source.test_name.startswith("{"): 357 report_name = "report" 358 else: 359 report_name = get_filename_extension( 360 request.root.source.test_name)[0] 361 362 self.result = check_result_report(request.config.report_path, 363 self.result, 364 self.error_message, 365 report_name) 366 367 def _run_ctest(self, source=None, request=None): 368 if not source: 369 LOG.error("Error: %s don't exist." % source, error_no="00101") 370 return 371 372 try: 373 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 374 version = get_test_component_version(self.config) 375 parser_instances = [] 376 for parser in parsers: 377 parser_instance = parser.__class__() 378 parser_instance.suites_name = self.file_name 379 parser_instance.product_info.setdefault("Version", version) 380 parser_instance.listeners = request.listeners 381 parser_instances.append(parser_instance) 382 handler = ShellHandler(parser_instances) 383 384 reset_cmd = self._reset_device(request, source) 385 self.result = "%s.xml" % os.path.join(request.config.report_path, 386 "result", self.file_name) 387 388 self.config.device.device.com_dict.get( 389 ComType.deploy_com).connect() 390 391 result, _, error = self.config.device.device. \ 392 execute_command_with_timeout( 393 command=reset_cmd, 394 case_type=DeviceTestType.ctest_lite, 395 key=ComType.deploy_com, 396 timeout=90, 397 receiver=handler) 398 399 device_log_file = get_device_log_file(request.config.report_path, 400 request.config.device. 401 __get_serial__()) 402 403 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 404 os.O_CREAT | os.O_APPEND, 0o755) 405 with os.fdopen(device_log_file_open, "a") as file_name: 406 file_name.write("{}{}".format( 407 "\n".join(result.split("\n")[0:-1]), "\n")) 408 file_name.flush() 409 finally: 410 self.config.device.device.com_dict.get(ComType.deploy_com).close() 411 412 def _reset_device(self, request, source): 413 json_config = JsonParser(source) 414 reset_cmd = [] 415 kit_instances = get_kit_instances(json_config, 416 request.config.resource_path, 417 request.config.testcases_path) 418 from xdevice import Scheduler 419 420 for (kit_instance, kit_info) in zip(kit_instances, 421 json_config.get_kits()): 422 if not isinstance(kit_instance, DeployKit): 423 continue 424 if not self.file_name: 425 self.file_name = get_config_value( 426 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 427 reset_cmd = kit_instance.burn_command 428 if not Scheduler.is_execute: 429 raise ExecuteTerminate("ExecuteTerminate", error_no="00300") 430 431 kit_instance.__setup__(self.config.device, 432 source_file=request.root.source.source_file.strip()) 433 434 reset_cmd = [int(item, 16) for item in reset_cmd] 435 return reset_cmd 436 437 def __result__(self): 438 return self.result if os.path.exists(self.result) else "" 439 440 441 442@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) 443class JSUnitTestLiteDriver(IDriver): 444 """ 445 JSUnitTestDriver is a Test that runs a native test package on given device. 446 """ 447 448 def __init__(self): 449 self.result = "" 450 self.error_message = "" 451 self.kits = [] 452 self.config = None 453 454 def __check_environment__(self, device_options): 455 pass 456 457 def __check_config__(self, config): 458 pass 459 460 def _get_driver_config(self, json_config): 461 bundle_name = get_config_value('bundle-name', 462 json_config.get_driver(), False) 463 if not bundle_name: 464 raise ParamError("Can't find bundle-name in config file.", 465 error_no="00108") 466 else: 467 self.config.bundle_name = bundle_name 468 469 ability = get_config_value('ability', 470 json_config.get_driver(), False) 471 if not ability: 472 self.config.ability = "default" 473 else: 474 self.config.ability = ability 475 476 def __execute__(self, request): 477 try: 478 LOG.debug("Start execute xdevice extension JSUnit Test") 479 480 self.config = request.config 481 self.config.device = request.config.environment.devices[0] 482 483 config_file = request.root.source.config_file 484 suite_file = request.root.source.source_file 485 486 if not suite_file: 487 raise ParamError( 488 "test source '%s' not exists" % 489 request.root.source.source_string, error_no="00101") 490 491 if not os.path.exists(config_file): 492 LOG.error("Error: Test cases don't exist %s." % config_file, 493 error_no="00101") 494 raise ParamError( 495 "Error: Test cases don't exist %s." % config_file, 496 error_no="00101") 497 498 self.file_name = os.path.basename( 499 request.root.source.source_file.strip()).split(".")[0] 500 501 self.result = "%s.xml" % os.path.join( 502 request.config.report_path, "result", self.file_name) 503 504 json_config = JsonParser(config_file) 505 self.kits = get_kit_instances(json_config, 506 self.config.resource_path, 507 self.config.testcases_path) 508 509 self._get_driver_config(json_config) 510 511 from xdevice import Scheduler 512 for kit in self.kits: 513 if not Scheduler.is_execute: 514 raise ExecuteTerminate("ExecuteTerminate", 515 error_no="00300") 516 if kit.__class__.__name__ == CKit.liteinstall: 517 kit.bundle_name = self.config.bundle_name 518 kit.__setup__(self.config.device, request=request) 519 520 self._run_jsunit(request) 521 522 except Exception as exception: 523 self.error_message = exception 524 finally: 525 report_name = "report" if request.root.source. \ 526 test_name.startswith("{") else get_filename_extension( 527 request.root.source.test_name)[0] 528 529 self.result = check_result_report( 530 request.config.report_path, self.result, self.error_message, 531 report_name) 532 533 for kit in self.kits: 534 kit.__teardown__(self.config.device) 535 536 self.config.device.close() 537 538 def _run_jsunit(self, request): 539 parser_instances = [] 540 parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) 541 for parser in parsers: 542 parser_instance = parser.__class__() 543 parser_instance.suites_name = self.file_name 544 parser_instance.listeners = request.listeners 545 parser_instances.append(parser_instance) 546 handler = ShellHandler(parser_instances) 547 548 command = "./bin/aa start -p %s -n %s" % \ 549 (self.config.bundle_name, self.config.ability) 550 result, _, error = self.config.device.execute_command_with_timeout( 551 command=command, 552 timeout=300, 553 receiver=handler) 554 555 device_log_file = get_device_log_file(request.config.report_path, 556 request.config.device. 557 __get_serial__()) 558 559 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 560 os.O_CREAT | os.O_APPEND, 0o755) 561 with os.fdopen(device_log_file_open, "a") as file_name: 562 file_name.write("{}{}".format( 563 "\n".join(result.split("\n")[0:-1]), "\n")) 564 file_name.flush() 565 566 def __result__(self): 567 return self.result if os.path.exists(self.result) else ""