1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from pydoc import classname 20import time 21import os 22import sys 23import datetime 24import xml.etree.ElementTree as ElementTree 25 26from core.constants import SchedulerType 27from xdevice import Plugin 28from xdevice import get_plugin 29from xdevice import platform_logger 30from xdevice import Scheduler 31from xdevice import DeviceTestType 32from core.utils import get_build_output_path 33from core.utils import scan_support_product 34from core.utils import is_lite_product 35from core.common import is_open_source_product 36from core.command.parameter import Parameter 37from core.command.distribute_execute import DbinderTest 38from core.testcase.testcase_manager import TestCaseManager 39from core.config.config_manager import UserConfigManager 40from core.config.parse_parts_config import ParsePartsConfig 41from core.config.resource_manager import ResourceManager 42 43LOG = platform_logger("Run") 44 45 46class Run(object): 47 48 history_cmd_list = [] 49 50 @classmethod 51 def get_history(self): 52 return self.history_cmd_list 53 54 def process_command_run(self, command, options): 55 para = Parameter() 56 test_type_list = para.get_testtype_list(options.testtype) 57 if len(test_type_list) == 0: 58 LOG.error("The testtype parameter is incorrect.") 59 return 60 options.testtype = test_type_list 61 62 parser = ParsePartsConfig(options.productform) 63 partname_list = parser.get_part_list( 64 options.subsystem, 65 options.testpart) 66 options.partname_list = partname_list 67 options.coverage_outpath = self.get_coverage_outpath(options) 68 69 LOG.info("") 70 LOG.info("------------------------------------") 71 LOG.info("Input parameter:") 72 LOG.info("productform = %s" % options.productform) 73 LOG.info("testtype = %s" % str(options.testtype)) 74 LOG.info("subsystem = %s" % str(options.subsystem)) 75 LOG.info("testpart = %s" % str(options.testpart)) 76 LOG.info("testmodule = %s" % options.testmodule) 77 LOG.info("testsuit = %s" % options.testsuit) 78 LOG.info("testcase = %s" % options.testcase) 79 LOG.info("testlevel = %s" % options.testlevel) 80 LOG.info("testargs = %s" % options.testargs) 81 LOG.info("repeat = %s" % options.repeat) 82 LOG.info("retry = %s" % options.retry) 83 LOG.info("historylist = %s" % options.historylist) 84 LOG.info("runhistory = %s" % options.runhistory) 85 LOG.info("partname_list = %s" % str(options.partname_list)) 86 LOG.info("------------------------------------") 87 LOG.info("") 88 89 if not para.check_run_parameter(options): 90 LOG.error("Input parameter is incorrect.") 91 return 92 93 current_time = datetime.datetime.now() 94 #记录命令运行历史 95 need_record_history = False 96 cmd_record = { 97 "time" : str(current_time), 98 "raw_cmd" : options.current_raw_cmd, 99 "result" : "unknown", 100 "command": command, 101 "options": options 102 } 103 if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ 104 or "--retry" in options.current_raw_cmd): 105 need_record_history = True 106 107 #打印历史记录 108 if options.historylist: 109 print("The latest command history is: %d" % len(self.history_cmd_list)) 110 for index in range(0, len(self.history_cmd_list)): 111 cmd_record = self.history_cmd_list[index] 112 print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], 113 cmd_record["raw_cmd"], cmd_record["result"])) 114 return 115 #重新运行历史里的一条命令 116 if options.runhistory > 0: 117 #如果记录大于10则认为非法 118 if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): 119 print("input history command[%d] out of range:", options.runhistory) 120 return 121 cmd_record = self.history_cmd_list[options.runhistory - 1] 122 print("run history command:", cmd_record["raw_cmd"]) 123 need_record_history = False 124 command = cmd_record["command"] 125 options = cmd_record["options"] 126 127 if options.retry: 128 if len(self.history_cmd_list) <= 0: 129 LOG.info("No history command exsit") 130 return 131 history_cmd = self.history_cmd_list[-1] 132 command = history_cmd["command"] 133 options = history_cmd["options"] 134 latest_report_path = os.path.join( 135 sys.framework_root_dir, "reports/latest/summary_report.xml") 136 tree = ElementTree.parse(latest_report_path) 137 root = tree.getroot() 138 has_failed_case = 0 139 test_targets = {} 140 fail_list = [] 141 for child in root: 142 print(child.tag, ":", child.attrib) 143 for grand in child: 144 print(grand.tag, ":", grand.attrib) 145 if grand.attrib["result"] == 'false': 146 fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] 147 fail_list.append(fail_case) 148 has_failed_case += 1 149 test_targets["class"] = fail_list 150 setattr(options, "testargs", test_targets) 151 print("retry option:", options) 152 if has_failed_case > 0: 153 if not self._build_test_cases(options): 154 LOG.error("Build test cases failed.") 155 return 156 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 157 plugin_id=SchedulerType.SCHEDULER)[0] 158 scheduler.exec_command(command, options) 159 else: 160 LOG.info("No testcase to retry") 161 return 162 163 if not self._build_test_cases(options): 164 LOG.error("Build test cases failed.") 165 return 166 167 if "actstest" in options.testtype: 168 test_dict = self.get_acts_test_dict(options) 169 options.testcases_path = self.get_acts_tests_out_path(options.productform) 170 options.resource_path = self.get_acts_tests_out_path(options.productform) 171 else: 172 test_dict = self.get_test_dict(options) 173 174 if not self._check_test_dictionary(test_dict): 175 LOG.error("The test file list is empty.") 176 return 177 178 if ("distributedtest" in options.testtype and 179 len(options.testtype) == 1): 180 from core.command.distribute_utils import get_test_case 181 from core.command.distribute_utils \ 182 import check_ditributetest_environment 183 from core.command.distribute_utils import make_device_info_file 184 from core.command.distribute_utils import make_reports 185 186 local_time = time.localtime() 187 create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) 188 start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) 189 190 if not check_ditributetest_environment(): 191 return 192 193 output_test = get_test_case(test_dict["CXX"]) 194 if not output_test: 195 return 196 197 result_rootpath = os.path.join(sys.framework_root_dir, 198 "reports", 199 create_time) 200 201 log_path = os.path.join(result_rootpath, "log") 202 tmp_path = os.path.join(result_rootpath, "temp") 203 os.makedirs(log_path, exist_ok=True) 204 os.makedirs(tmp_path, exist_ok=True) 205 206 Scheduler.start_task_log(log_path) 207 make_device_info_file(tmp_path) 208 209 for case in output_test: 210 agent_target_name = case["agent_target_name"] 211 major_target_name = case["major_target_name"] 212 manager = DbinderTest(result_rootpath, case["suits_dir"]) 213 manager.setUp() 214 manager.test_distribute(major_target_name, agent_target_name, options) 215 manager.tearDown() 216 217 make_reports(result_rootpath, start_time) 218 Scheduler.stop_task_logcat() 219 else: 220 options.testdict = test_dict 221 options.target_outpath = self.get_target_out_path( 222 options.productform) 223 224 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 225 plugin_id=SchedulerType.SCHEDULER)[0] 226 if scheduler is None: 227 LOG.error("Can not find the scheduler plugin.") 228 else: 229 if is_lite_product(options.productform, 230 sys.source_code_root_path): 231 options.testcases_path = options.target_outpath 232 options.resource_path = os.path.abspath(os.path.join( 233 sys.framework_root_dir, "..", "resource")) 234 if options.productform.find("wifiiot") != -1: 235 scheduler.update_test_type_in_source(".bin", 236 DeviceTestType.ctest_lite) 237 scheduler.update_ext_type_in_source("BIN", 238 DeviceTestType.ctest_lite) 239 else: 240 print("productform is not wifiiot") 241 scheduler.exec_command(command, options) 242 if need_record_history: 243 #读文件获取运行结果 244 latest_report_path = os.path.join(sys.framework_root_dir, "reports/latest/summary_report.xml") 245 with open(latest_report_path) as report_file: 246 for report_line in report_file: 247 if "testsuites name=\"summary_report\"" in report_line: 248 result = report_line.replace("\n", "") 249 result = result.replace("<testsuites name=\"summary_report\" ", "") 250 result = result.replace(">", "") 251 cmd_record["result"] = result 252 break 253 if len(self.history_cmd_list) >= 10: 254 del self.history_cmd_list[0] 255 self.history_cmd_list.append(cmd_record) 256 print("-------------run end: ", self.history_cmd_list) 257 return 258 259 ############################################################## 260 ############################################################## 261 262 @classmethod 263 def get_target_out_path(cls, product_form): 264 target_out_path = UserConfigManager().get_test_cases_dir() 265 if target_out_path == "": 266 target_out_path = os.path.join( 267 get_build_output_path(product_form), 268 "packages", 269 product_form) 270 target_out_path = os.path.abspath(target_out_path) 271 return target_out_path 272 273 @classmethod 274 def _build_test_cases(cls, options): 275 if options.coverage: 276 LOG.info("Coverage testing, no need to compile testcases") 277 return True 278 279 is_build_testcase = UserConfigManager().get_user_config_flag( 280 "build", "testcase") 281 project_root_path = sys.source_code_root_path 282 if is_build_testcase and project_root_path != "": 283 from core.build.build_manager import BuildManager 284 build_manager = BuildManager() 285 return build_manager.build_testcases(project_root_path, options) 286 else: 287 return True 288 289 @classmethod 290 def _check_test_dictionary(cls, test_dictionary): 291 is_valid_status = False 292 key_list = sorted(test_dictionary.keys()) 293 for key in key_list: 294 file_list = test_dictionary[key] 295 if len(file_list) > 0: 296 is_valid_status = True 297 break 298 return is_valid_status 299 300 @classmethod 301 def get_tests_out_path(cls, product_form): 302 testcase_path = UserConfigManager().get_test_cases_dir() 303 if testcase_path == "": 304 all_product_list = scan_support_product() 305 if product_form in all_product_list: 306 if is_open_source_product(product_form): 307 testcase_path = os.path.abspath(os.path.join( 308 get_build_output_path(product_form), 309 "packages", 310 "phone", 311 "tests")) 312 else: 313 testcase_path = os.path.abspath(os.path.join( 314 get_build_output_path(product_form), 315 "packages", 316 product_form, 317 "tests")) 318 else: 319 testcase_path = os.path.join( 320 get_build_output_path(product_form), "tests") 321 LOG.info("testcase_path=%s" % testcase_path) 322 return testcase_path 323 324 @classmethod 325 def get_acts_tests_out_path(cls, product_form): 326 acts_testcase_path = os.path.abspath(os.path.join( 327 get_build_output_path(product_form), 328 "suites", 329 "acts", 330 "testcases")) 331 LOG.info("acts_testcase_path=%s" % acts_testcase_path) 332 return acts_testcase_path 333 334 @classmethod 335 def get_coverage_outpath(cls, options): 336 coverage_out_path = "" 337 if options.coverage: 338 coverage_out_path = get_build_output_path(options.productform) 339 if coverage_out_path == "": 340 coverage_out_path = UserConfigManager().get_user_config( 341 "coverage").get("outpath", "") 342 if coverage_out_path == "": 343 LOG.error("Coverage test: coverage_outpath is empty.") 344 return coverage_out_path 345 346 def get_acts_test_dict(self, options): 347 # 获取测试用例编译结果路径 348 acts_test_case_path = self.get_acts_tests_out_path(options.productform) 349 acts_test_dict = TestCaseManager().get_acts_test_files(acts_test_case_path, options) 350 return acts_test_dict 351 352 def get_test_dict(self, options): 353 # 获取测试用例编译结果路径 354 test_case_path = self.get_tests_out_path(options.productform) 355 if not os.path.exists(test_case_path): 356 LOG.error("%s is not exist." % test_case_path) 357 return 358 359 test_dict = TestCaseManager().get_test_files(test_case_path, options) 360 return test_dict 361 362 363