1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import platform 19import random 20import subprocess 21from pydoc import classname 22import time 23import os 24import sys 25import datetime 26import xml.etree.ElementTree as ElementTree 27 28from core.constants import SchedulerType 29from xdevice import Plugin 30from xdevice import get_plugin 31from xdevice import platform_logger 32from xdevice import Scheduler 33from xdevice import DeviceTestType 34from core.utils import get_build_output_path 35from core.utils import scan_support_product 36from core.utils import is_lite_product 37from core.common import is_open_source_product 38from core.command.parameter import Parameter 39from core.command.distribute_execute import DbinderTest 40from core.testcase.testcase_manager import TestCaseManager 41from core.config.config_manager import UserConfigManager 42from core.config.parse_parts_config import ParsePartsConfig 43from core.config.resource_manager import ResourceManager 44 45LOG = platform_logger("Run") 46 47 48class Run(object): 49 50 history_cmd_list = [] 51 52 @classmethod 53 def get_history(self): 54 return self.history_cmd_list 55 56 def process_command_run(self, command, options): 57 current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" ")))) 58 if options.coverage and platform.system() != "Windows": 59 if not options.pullgcda: 60 push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py") 61 if os.path.exists(push_cov_path): 62 if str(options.testpart) == "[]" and str(options.subsystem) == "[]": 63 LOG.info("No subsystem or part input. Not push coverage so.") 64 elif str(options.testpart) != "[]" and str(options.subsystem) != "[]": 65 LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.") 66 else: 67 if str(options.testpart) != "[]": 68 param = str(options.testpart) 69 subprocess.run("python3 {} {} {}".format( 70 push_cov_path, "testpart", param), shell=True) 71 else: 72 param = str(options.subsystem) 73 subprocess.run("python3 {} {} {}".format( 74 push_cov_path, "subsystem", param), shell=True) 75 else: 76 print(f"{push_cov_path} not exists.") 77 78 init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py") 79 if os.path.exists(init_gcov_path): 80 subprocess.run("python3 %s command_str=%s" % ( 81 init_gcov_path, current_raw_cmd), shell=True) 82 else: 83 print(f"{init_gcov_path} not exists.") 84 85 para = Parameter() 86 test_type_list = para.get_testtype_list(options.testtype) 87 if len(test_type_list) == 0: 88 LOG.error("The testtype parameter is incorrect.") 89 return 90 options.testtype = test_type_list 91 92 parser = ParsePartsConfig(options.productform) 93 partname_list = parser.get_part_list( 94 options.subsystem, 95 options.testpart) 96 options.partname_list = partname_list 97 options.coverage_outpath = self.get_coverage_outpath(options) 98 99 LOG.info("") 100 LOG.info("------------------------------------") 101 LOG.info("Input parameter:") 102 LOG.info("productform = %s" % options.productform) 103 LOG.info("testtype = %s" % str(options.testtype)) 104 LOG.info("subsystem = %s" % str(options.subsystem)) 105 LOG.info("testpart = %s" % str(options.testpart)) 106 LOG.info("testmodule = %s" % options.testmodule) 107 LOG.info("testsuit = %s" % options.testsuit) 108 LOG.info("testcase = %s" % options.testcase) 109 LOG.info("testlevel = %s" % options.testlevel) 110 LOG.info("testargs = %s" % options.testargs) 111 LOG.info("repeat = %s" % options.repeat) 112 LOG.info("retry = %s" % options.retry) 113 LOG.info("historylist = %s" % options.historylist) 114 LOG.info("runhistory = %s" % options.runhistory) 115 LOG.info("partname_list = %s" % str(options.partname_list)) 116 LOG.info("partdeps = %s" % options.partdeps) 117 LOG.info("------------------------------------") 118 LOG.info("") 119 120 if not para.check_run_parameter(options): 121 LOG.error("Input parameter is incorrect.") 122 return 123 124 current_time = datetime.datetime.now() 125 #记录命令运行历史 126 need_record_history = False 127 cmd_record = { 128 "time" : str(current_time), 129 "raw_cmd" : options.current_raw_cmd, 130 "result" : "unknown", 131 "command": command, 132 "options": options 133 } 134 if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ 135 or "--retry" in options.current_raw_cmd): 136 need_record_history = True 137 138 #打印历史记录 139 if options.historylist: 140 print("The latest command history is: %d" % len(self.history_cmd_list)) 141 for index in range(0, len(self.history_cmd_list)): 142 cmd_record = self.history_cmd_list[index] 143 print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], 144 cmd_record["raw_cmd"], cmd_record["result"])) 145 return 146 #重新运行历史里的一条命令 147 if options.runhistory > 0: 148 #如果记录大于10则认为非法 149 if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): 150 print("input history command[%d] out of range:", options.runhistory) 151 return 152 cmd_record = self.history_cmd_list[options.runhistory - 1] 153 print("run history command:", cmd_record["raw_cmd"]) 154 need_record_history = False 155 command = cmd_record["command"] 156 options = cmd_record["options"] 157 158 if options.retry: 159 if len(self.history_cmd_list) <= 0: 160 LOG.info("No history command exsit") 161 return 162 history_cmd = self.history_cmd_list[-1] 163 command = history_cmd["command"] 164 options = history_cmd["options"] 165 from xdevice import Variables 166 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 167 tree = ElementTree.parse(latest_report_path) 168 root = tree.getroot() 169 has_failed_case = 0 170 test_targets = {} 171 fail_list = [] 172 for child in root: 173 print(child.tag, ":", child.attrib) 174 for grand in child: 175 print(grand.tag, ":", grand.attrib) 176 for sub_child in grand: 177 if sub_child.tag == 'failure': 178 fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] 179 fail_list.append(fail_case) 180 has_failed_case += 1 181 break 182 test_targets["class"] = fail_list 183 setattr(options, "testargs", test_targets) 184 print("retry option:", options) 185 if has_failed_case > 0: 186 if not self._build_test_cases(options): 187 LOG.error("Build test cases failed.") 188 return 189 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 190 plugin_id=SchedulerType.SCHEDULER)[0] 191 scheduler.exec_command(command, options) 192 else: 193 LOG.info("No testcase to retry") 194 return 195 196 if not self._build_test_cases(options): 197 LOG.error("Build test cases failed.") 198 return 199 200 if "partdeps" == options.partdeps: 201 self.get_part_deps_list(options.productform, options.testpart) 202 options.testcases_path = self.get_external_deps_out_path(options.productform) 203 LOG.info("partdeps = %s" % options.partdeps) 204 205 if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype: 206 test_dict = self.get_xts_test_dict(options) 207 options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype) 208 options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype) 209 else: 210 test_dict = self.get_test_dict(options) 211 212 if not self._check_test_dictionary(test_dict): 213 LOG.error("The test file list is empty.") 214 return 215 if options.coverage and platform.system() != "Windows": 216 coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage") 217 if os.path.exists(coverage_path): 218 coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True) 219 coverage_process.communicate() 220 221 if ("distributedtest" in options.testtype and 222 len(options.testtype) == 1): 223 from core.command.distribute_utils import get_test_case 224 from core.command.distribute_utils \ 225 import check_ditributetest_environment 226 from core.command.distribute_utils import make_device_info_file 227 from core.command.distribute_utils import make_reports 228 229 local_time = time.localtime() 230 create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) 231 start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) 232 233 if not check_ditributetest_environment(): 234 return 235 236 output_test = get_test_case(test_dict["CXX"]) 237 if not output_test: 238 return 239 240 result_rootpath = os.path.join(sys.framework_root_dir, 241 "reports", 242 create_time) 243 244 log_path = os.path.join(result_rootpath, "log") 245 tmp_path = os.path.join(result_rootpath, "temp") 246 os.makedirs(log_path, exist_ok=True) 247 os.makedirs(tmp_path, exist_ok=True) 248 249 Scheduler.start_task_log(log_path) 250 make_device_info_file(tmp_path) 251 252 for case in output_test: 253 agent_target_name = case["agent_target_name"] 254 major_target_name = case["major_target_name"] 255 manager = DbinderTest(result_rootpath, case["suits_dir"]) 256 manager.setUp() 257 manager.test_distribute(major_target_name, agent_target_name, options) 258 manager.tearDown() 259 260 make_reports(result_rootpath, start_time) 261 Scheduler.stop_task_logcat() 262 else: 263 options.testdict = test_dict 264 options.target_outpath = self.get_target_out_path( 265 options.productform) 266 267 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 268 plugin_id=SchedulerType.SCHEDULER)[0] 269 if scheduler is None: 270 LOG.error("Can not find the scheduler plugin.") 271 else: 272 options.testcases_path = self.get_tests_out_path(options.productform) 273 options.resource_path = os.path.abspath(os.path.join( 274 sys.framework_root_dir, "..", "resource")) 275 if is_lite_product(options.productform, 276 sys.source_code_root_path): 277 if options.productform.find("wifiiot") != -1: 278 scheduler.update_test_type_in_source(".bin", 279 DeviceTestType.ctest_lite) 280 scheduler.update_ext_type_in_source("BIN", 281 DeviceTestType.ctest_lite) 282 else: 283 print("productform is not wifiiot") 284 scheduler.exec_command(command, options) 285 if need_record_history: 286 #读文件获取运行结果 287 from xdevice import Variables 288 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 289 with open(latest_report_path) as report_file: 290 for report_line in report_file: 291 if "testsuites name=\"summary_report\"" in report_line: 292 result = report_line.replace("\n", "") 293 result = result.replace("<testsuites name=\"summary_report\" ", "") 294 result = result.replace(">", "") 295 cmd_record["result"] = result 296 break 297 if len(self.history_cmd_list) >= 10: 298 del self.history_cmd_list[0] 299 self.history_cmd_list.append(cmd_record) 300 301 if options.coverage and platform.system() != "Windows": 302 pull_service_gcov_path = os.path.join( 303 sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py") 304 if os.path.exists(pull_service_gcov_path): 305 subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True) 306 else: 307 print(f"{pull_service_gcov_path} not exists.") 308 309 if not options.pullgcda: 310 cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py") 311 testpart = ",".join(list(map(str, options.partname_list))) 312 if os.path.exists(cov_main_file_path): 313 subprocess.run("python3 %s testpart=%s" % ( 314 cov_main_file_path, testpart), shell=True) 315 else: 316 print(f"{cov_main_file_path} not exists.") 317 return 318 319 ############################################################## 320 ############################################################## 321 322 @classmethod 323 def get_target_out_path(cls, product_form): 324 target_out_path = UserConfigManager().get_test_cases_dir() 325 if target_out_path == "": 326 target_out_path = os.path.join( 327 get_build_output_path(product_form), 328 "packages", 329 product_form) 330 target_out_path = os.path.abspath(target_out_path) 331 return target_out_path 332 333 @classmethod 334 def _build_test_cases(cls, options): 335 if options.coverage: 336 LOG.info("Coverage testing, no need to compile testcases") 337 return True 338 339 is_build_testcase = UserConfigManager().get_user_config_flag( 340 "build", "testcase") 341 project_root_path = sys.source_code_root_path 342 if is_build_testcase and project_root_path != "": 343 from core.build.build_manager import BuildManager 344 build_manager = BuildManager() 345 return build_manager.build_testcases(project_root_path, options) 346 else: 347 return True 348 349 @classmethod 350 def _check_test_dictionary(cls, test_dictionary): 351 is_valid_status = False 352 key_list = sorted(test_dictionary.keys()) 353 for key in key_list: 354 file_list = test_dictionary[key] 355 if len(file_list) > 0: 356 is_valid_status = True 357 break 358 return is_valid_status 359 360 @classmethod 361 def get_tests_out_path(cls, product_form): 362 testcase_path = UserConfigManager().get_test_cases_dir() 363 if testcase_path == "": 364 all_product_list = scan_support_product() 365 if product_form in all_product_list: 366 if is_open_source_product(product_form): 367 testcase_path = os.path.abspath(os.path.join( 368 get_build_output_path(product_form), 369 "tests")) 370 else: 371 testcase_path = os.path.abspath(os.path.join( 372 get_build_output_path(product_form), 373 "tests")) 374 else: 375 testcase_path = os.path.join( 376 get_build_output_path(product_form), "tests") 377 LOG.info("testcase_path=%s" % testcase_path) 378 return testcase_path 379 380 @classmethod 381 def get_xts_tests_out_path(cls, product_form, testtype): 382 xts_testcase_path = UserConfigManager().get_test_cases_dir() 383 if xts_testcase_path == "": 384 xts_testcase_path = os.path.abspath(os.path.join( 385 get_build_output_path(product_form), 386 "suites", 387 testtype[0], 388 "testcases")) 389 LOG.info("xts_testcase_path=%s" % xts_testcase_path) 390 return xts_testcase_path 391 392 @classmethod 393 def get_external_deps_out_path(cls, product_form): 394 external_deps_path = os.path.abspath(os.path.join( 395 get_build_output_path(product_form), 396 "part_deps_info", 397 "part_deps_info.json")) 398 LOG.info("external_deps_path=%s" % external_deps_path) 399 return external_deps_path 400 401 @classmethod 402 def get_coverage_outpath(cls, options): 403 coverage_out_path = "" 404 if options.coverage: 405 coverage_out_path = get_build_output_path(options.productform) 406 if coverage_out_path == "": 407 coverage_out_path = UserConfigManager().get_user_config( 408 "coverage").get("outpath", "") 409 if coverage_out_path == "": 410 LOG.error("Coverage test: coverage_outpath is empty.") 411 return coverage_out_path 412 413 @classmethod 414 def get_part_deps_list(cls, productform, testpart): 415 #获取预处理部件间依赖的编译结果路径 416 external_deps_path = cls.get_external_deps_out_path(productform) 417 external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart) 418 return external_deps_path_list 419 420 def get_xts_test_dict(self, options): 421 # 获取XTS测试用例编译结果路径 422 xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype) 423 if not os.path.exists(xts_test_case_path): 424 LOG.error("%s is not exist." % xts_test_case_path) 425 return {} 426 xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options) 427 return xts_test_dict 428 429 def get_test_dict(self, options): 430 # 获取测试用例编译结果路径 431 test_case_path = self.get_tests_out_path(options.productform) 432 if not os.path.exists(test_case_path): 433 LOG.error("%s is not exist." % test_case_path) 434 return {} 435 436 test_dict = TestCaseManager().get_test_files(test_case_path, options) 437 return test_dict 438 439 440