1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import platform 19import random 20import shutil 21import subprocess 22from pydoc import classname 23import time 24import os 25import sys 26import datetime 27import xml.etree.ElementTree as ElementTree 28 29from core.constants import SchedulerType 30from xdevice import Plugin 31from xdevice import get_plugin 32from xdevice import platform_logger 33from xdevice import Scheduler 34from xdevice import DeviceTestType 35from core.utils import get_build_output_path 36from core.utils import scan_support_product 37from core.utils import is_lite_product 38from core.common import is_open_source_product 39from core.command.parameter import Parameter 40from core.command.distribute_execute import DbinderTest 41from core.testcase.testcase_manager import TestCaseManager 42from core.config.config_manager import UserConfigManager 43from core.config.parse_parts_config import ParsePartsConfig 44from core.config.resource_manager import ResourceManager 45 46LOG = platform_logger("Run") 47 48 49class Run(object): 50 51 history_cmd_list = [] 52 53 @classmethod 54 def get_history(self): 55 return self.history_cmd_list 56 57 @classmethod 58 def get_target_out_path(cls, product_form): 59 target_out_path = UserConfigManager().get_test_cases_dir() 60 if target_out_path == "": 61 target_out_path = os.path.join( 62 get_build_output_path(product_form), 63 "packages", 64 product_form) 65 target_out_path = os.path.abspath(target_out_path) 66 return target_out_path 67 68 @classmethod 69 def _build_test_cases(cls, options): 70 if options.coverage: 71 LOG.info("Coverage testing, no need to compile testcases") 72 return True 73 74 is_build_testcase = UserConfigManager().get_user_config_flag( 75 "build", "testcase") 76 project_root_path = sys.source_code_root_path 77 if is_build_testcase and project_root_path != "": 78 from core.build.build_manager import BuildManager 79 build_manager = BuildManager() 80 return build_manager.build_testcases(project_root_path, options) 81 else: 82 return True 83 84 @classmethod 85 def _check_test_dictionary(cls, test_dictionary): 86 is_valid_status = False 87 key_list = sorted(test_dictionary.keys()) 88 for key in key_list: 89 file_list = test_dictionary[key] 90 if len(file_list) > 0: 91 is_valid_status = True 92 break 93 return is_valid_status 94 95 @classmethod 96 def get_tests_out_path(cls, product_form): 97 testcase_path = UserConfigManager().get_test_cases_dir() 98 if testcase_path == "": 99 all_product_list = scan_support_product() 100 if product_form in all_product_list: 101 if is_open_source_product(product_form): 102 testcase_path = os.path.abspath(os.path.join( 103 get_build_output_path(product_form), 104 "tests")) 105 else: 106 testcase_path = os.path.abspath(os.path.join( 107 get_build_output_path(product_form), 108 "tests")) 109 else: 110 testcase_path = os.path.join( 111 get_build_output_path(product_form), "tests") 112 LOG.info("testcase_path=%s" % testcase_path) 113 return testcase_path 114 115 @classmethod 116 def get_xts_tests_out_path(cls, product_form, testtype): 117 xts_testcase_path = UserConfigManager().get_test_cases_dir() 118 if xts_testcase_path == "": 119 xts_testcase_path = os.path.abspath(os.path.join( 120 get_build_output_path(product_form), 121 "suites", 122 testtype[0], 123 "testcases")) 124 LOG.info("xts_testcase_path=%s" % xts_testcase_path) 125 return xts_testcase_path 126 127 @classmethod 128 def get_external_deps_out_path(cls, product_form): 129 external_deps_path = os.path.abspath(os.path.join( 130 get_build_output_path(product_form), 131 "part_deps_info", 132 "part_deps_info.json")) 133 LOG.info("external_deps_path=%s" % external_deps_path) 134 return external_deps_path 135 136 @classmethod 137 def get_coverage_outpath(cls, options): 138 coverage_out_path = "" 139 if options.coverage: 140 coverage_out_path = get_build_output_path(options.productform) 141 if coverage_out_path == "": 142 coverage_out_path = UserConfigManager().get_user_config( 143 "coverage").get("outpath", "") 144 if coverage_out_path == "": 145 LOG.error("Coverage test: coverage_outpath is empty.") 146 return coverage_out_path 147 148 @classmethod 149 def get_part_deps_list(cls, productform, testpart): 150 #获取预处理部件间依赖的编译结果路径 151 external_deps_path = cls.get_external_deps_out_path(productform) 152 external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart) 153 return external_deps_path_list 154 155 def process_command_run(self, command, options): 156 current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" ")))) 157 if options.coverage and platform.system() != "Windows": 158 if not options.pullgcda: 159 push_cov_path = os.path.join(sys.framework_root_dir, "local_coverage/push_coverage_so/push_coverage.py") 160 if os.path.exists(push_cov_path): 161 if str(options.testpart) == "[]" and str(options.subsystem) == "[]": 162 LOG.info("No subsystem or part input. Not push coverage so.") 163 elif str(options.testpart) != "[]" and str(options.subsystem) != "[]": 164 LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.") 165 else: 166 if str(options.testpart) != "[]": 167 param = str(options.testpart) 168 subprocess.run("python3 {} {} {}".format( 169 push_cov_path, "testpart", param), shell=True) 170 else: 171 param = str(options.subsystem) 172 subprocess.run("python3 {} {} {}".format( 173 push_cov_path, "subsystem", param), shell=True) 174 else: 175 print(f"{push_cov_path} not exists.") 176 177 init_gcov_path = os.path.join(sys.framework_root_dir, "local_coverage/resident_service/init_gcov.py") 178 if os.path.exists(init_gcov_path): 179 subprocess.run("python3 %s command_str=%s" % ( 180 init_gcov_path, current_raw_cmd), shell=True) 181 else: 182 print(f"{init_gcov_path} not exists.") 183 184 para = Parameter() 185 test_type_list = para.get_testtype_list(options.testtype) 186 if len(test_type_list) == 0: 187 LOG.error("The testtype parameter is incorrect.") 188 return 189 options.testtype = test_type_list 190 191 parser = ParsePartsConfig(options.productform) 192 partname_list = parser.get_part_list( 193 options.subsystem, 194 options.testpart) 195 options.partname_list = partname_list 196 options.coverage_outpath = self.get_coverage_outpath(options) 197 198 LOG.info("") 199 LOG.info("------------------------------------") 200 LOG.info("Input parameter:") 201 LOG.info("productform = %s" % options.productform) 202 LOG.info("testtype = %s" % str(options.testtype)) 203 LOG.info("subsystem = %s" % str(options.subsystem)) 204 LOG.info("testpart = %s" % str(options.testpart)) 205 LOG.info("testmodule = %s" % options.testmodule) 206 LOG.info("testsuit = %s" % options.testsuit) 207 LOG.info("testcase = %s" % options.testcase) 208 LOG.info("testlevel = %s" % options.testlevel) 209 LOG.info("testargs = %s" % options.testargs) 210 LOG.info("repeat = %s" % options.repeat) 211 LOG.info("retry = %s" % options.retry) 212 LOG.info("historylist = %s" % options.historylist) 213 LOG.info("runhistory = %s" % options.runhistory) 214 LOG.info("partname_list = %s" % str(options.partname_list)) 215 LOG.info("partdeps = %s" % options.partdeps) 216 LOG.info("------------------------------------") 217 LOG.info("") 218 219 if not para.check_run_parameter(options): 220 LOG.error("Input parameter is incorrect.") 221 return 222 223 current_time = datetime.datetime.now() 224 #记录命令运行历史 225 need_record_history = False 226 cmd_record = { 227 "time" : str(current_time), 228 "raw_cmd" : options.current_raw_cmd, 229 "result" : "unknown", 230 "command": command, 231 "options": options 232 } 233 if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ 234 or "--retry" in options.current_raw_cmd): 235 need_record_history = True 236 237 #打印历史记录 238 if options.historylist: 239 print("The latest command history is: %d" % len(self.history_cmd_list)) 240 for index, cmd_record in enumerate(self.history_cmd_list): 241 print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], 242 cmd_record["raw_cmd"], cmd_record["result"])) 243 return 244 #重新运行历史里的一条命令 245 if options.runhistory > 0: 246 #如果记录大于10则认为非法 247 if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): 248 print("input history command[%d] out of range:", options.runhistory) 249 return 250 cmd_record = self.history_cmd_list[options.runhistory - 1] 251 print("run history command:", cmd_record["raw_cmd"]) 252 need_record_history = False 253 command = cmd_record["command"] 254 options = cmd_record["options"] 255 256 if options.retry: 257 if len(self.history_cmd_list) <= 0: 258 LOG.info("No history command exsit") 259 return 260 history_cmd = self.history_cmd_list[-1] 261 command = history_cmd["command"] 262 options = history_cmd["options"] 263 from xdevice import Variables 264 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 265 tree = ElementTree.parse(latest_report_path) 266 root = tree.getroot() 267 has_failed_case = 0 268 test_targets = {} 269 fail_list = [] 270 for child in root: 271 print(child.tag, ":", child.attrib) 272 for grand in child: 273 print(grand.tag, ":", grand.attrib) 274 for sub_child in grand: 275 if sub_child.tag == 'failure': 276 fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] 277 fail_list.append(fail_case) 278 has_failed_case += 1 279 break 280 test_targets["class"] = fail_list 281 setattr(options, "testargs", test_targets) 282 print("retry option:", options) 283 if has_failed_case > 0: 284 if not self._build_test_cases(options): 285 LOG.error("Build test cases failed.") 286 return 287 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 288 plugin_id=SchedulerType.SCHEDULER)[0] 289 scheduler.exec_command(command, options) 290 else: 291 LOG.info("No testcase to retry") 292 return 293 294 if not self._build_test_cases(options): 295 LOG.error("Build test cases failed.") 296 return 297 298 if "partdeps" == options.partdeps: 299 self.get_part_deps_list(options.productform, options.testpart) 300 options.testcases_path = self.get_external_deps_out_path(options.productform) 301 LOG.info("partdeps = %s" % options.partdeps) 302 303 if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype: 304 test_dict = self.get_xts_test_dict(options) 305 options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype) 306 options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype) 307 else: 308 test_dict = self.get_test_dict(options) 309 310 if not self._check_test_dictionary(test_dict): 311 LOG.error("The test file list is empty.") 312 return 313 if options.coverage and platform.system() != "Windows": 314 coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage") 315 if os.path.exists(coverage_path): 316 coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True) 317 coverage_process.communicate() 318 319 if ("distributedtest" in options.testtype and 320 len(options.testtype) == 1): 321 from core.command.distribute_utils import get_test_case 322 from core.command.distribute_utils \ 323 import check_ditributetest_environment 324 from core.command.distribute_utils import make_device_info_file 325 from core.command.distribute_utils import make_reports 326 327 local_time = time.localtime() 328 create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) 329 start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) 330 331 if not check_ditributetest_environment(): 332 return 333 334 output_test = get_test_case(test_dict.get("CXX", None)) 335 if not output_test: 336 return 337 338 result_rootpath = os.path.join(sys.framework_root_dir, 339 "reports", 340 create_time) 341 342 log_path = os.path.join(result_rootpath, "log") 343 tmp_path = os.path.join(result_rootpath, "temp") 344 os.makedirs(log_path, exist_ok=True) 345 os.makedirs(tmp_path, exist_ok=True) 346 347 Scheduler.start_task_log(log_path) 348 make_device_info_file(tmp_path) 349 350 for case in output_test: 351 agent_target_name = case["agent_target_name"] 352 major_target_name = case["major_target_name"] 353 manager = DbinderTest(result_rootpath, case["suits_dir"]) 354 manager.setUp() 355 manager.test_distribute(major_target_name, agent_target_name, options) 356 manager.tearDown() 357 358 make_reports(result_rootpath, start_time) 359 Scheduler.stop_task_logcat() 360 else: 361 options.testdict = test_dict 362 options.target_outpath = self.get_target_out_path( 363 options.productform) 364 365 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 366 plugin_id=SchedulerType.SCHEDULER)[0] 367 if scheduler is None: 368 LOG.error("Can not find the scheduler plugin.") 369 else: 370 options.testcases_path = self.get_tests_out_path(options.productform) 371 options.resource_path = os.path.abspath(os.path.join( 372 sys.framework_root_dir, "..", "resource")) 373 if is_lite_product(options.productform, 374 sys.source_code_root_path): 375 if options.productform.find("wifiiot") != -1: 376 scheduler.update_test_type_in_source(".bin", 377 DeviceTestType.ctest_lite) 378 scheduler.update_ext_type_in_source("BIN", 379 DeviceTestType.ctest_lite) 380 else: 381 print("productform is not wifiiot") 382 scheduler.exec_command(command, options) 383 if need_record_history: 384 #读文件获取运行结果 385 from xdevice import Variables 386 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 387 with open(latest_report_path) as report_file: 388 for report_line in report_file: 389 if "testsuites name=\"summary_report\"" in report_line: 390 result = report_line.replace("\n", "") 391 result = result.replace("<testsuites name=\"summary_report\" ", "") 392 result = result.replace(">", "") 393 cmd_record["result"] = result 394 break 395 if len(self.history_cmd_list) >= 10: 396 del self.history_cmd_list[0] 397 self.history_cmd_list.append(cmd_record) 398 399 if "fuzztest" == options.testtype[0] and options.coverage is False: 400 report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0] 401 latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus") 402 if os.path.exists(latest_corpus_path): 403 shutil.rmtree(latest_corpus_path) 404 shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path) 405 406 if options.coverage and platform.system() != "Windows": 407 pull_service_gcov_path = os.path.join( 408 sys.framework_root_dir, "local_coverage/resident_service/pull_service_gcda.py") 409 if os.path.exists(pull_service_gcov_path): 410 subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True) 411 else: 412 print(f"{pull_service_gcov_path} not exists.") 413 414 if not options.pullgcda: 415 cov_main_file_path = os.path.join(sys.framework_root_dir, "local_coverage/coverage_tools.py") 416 testpart = ",".join(list(map(str, options.partname_list))) 417 if os.path.exists(cov_main_file_path): 418 subprocess.run("python3 %s testpart=%s" % ( 419 cov_main_file_path, testpart), shell=True) 420 else: 421 print(f"{cov_main_file_path} not exists.") 422 return 423 424 def get_xts_test_dict(self, options): 425 # 获取XTS测试用例编译结果路径 426 xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype) 427 if not os.path.exists(xts_test_case_path): 428 LOG.error("%s is not exist." % xts_test_case_path) 429 return {} 430 xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options) 431 return xts_test_dict 432 433 def get_test_dict(self, options): 434 # 获取测试用例编译结果路径 435 test_case_path = self.get_tests_out_path(options.productform) 436 if not os.path.exists(test_case_path): 437 LOG.error("%s is not exist." % test_case_path) 438 return {} 439 440 test_dict = TestCaseManager().get_test_files(test_case_path, options) 441 return test_dict 442