1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30from xdevice import Scheduler 31from xdevice import DeviceTestType 32 33LOG = platform_logger("TestcaseManager") 34 35TESTFILE_TYPE_DATA_DIC = { 36 "DEX": [], 37 "HAP": [], 38 "PYT": [], 39 "CXX": [], 40 "BIN": [], 41 "OHJST": [], 42 "JST": [], 43 "LTPPosix": [], 44 "OHRust": [] 45} 46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 47 48 49class TestCaseManager(object): 50 @classmethod 51 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 52 partlist = options.partname_list 53 testmodule = options.testmodule 54 testsuit = options.testsuit 55 56 if not suite_file.startswith(test_case_out_path): 57 return False 58 59 if testsuit != "": 60 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 61 testsuit_list = testsuit.split(',') 62 for test in testsuit_list: 63 if short_name.startswith(test) or \ 64 testsuit.startswith(short_name): 65 return True 66 return False 67 68 is_valid_status = False 69 suitfile_subpath = suite_file.replace(test_case_out_path, "") 70 suitfile_subpath = suitfile_subpath.strip(os.sep) 71 if len(partlist) == 0: 72 if testmodule != "": 73 temp_list = suitfile_subpath.split(os.sep) 74 if len(temp_list) > 2 and testmodule == temp_list[1]: 75 is_valid_status = True 76 else: 77 is_valid_status = True 78 else: 79 for partname in partlist: 80 if testmodule != "": 81 if suitfile_subpath.startswith( 82 partname + os.sep + testmodule + os.sep): 83 is_valid_status = True 84 break 85 else: 86 if suitfile_subpath.startswith(partname + os.sep): 87 is_valid_status = True 88 break 89 return is_valid_status 90 91 @classmethod 92 def check_python_test_file(cls, suite_file): 93 if suite_file.endswith(".py"): 94 filename = os.path.basename(suite_file) 95 if filename.startswith("test_"): 96 return True 97 return False 98 99 @classmethod 100 def check_hap_test_file(cls, hap_file_path): 101 try: 102 if hap_file_path.endswith(".hap"): 103 json_file_path = hap_file_path.replace(".hap", ".json") 104 if os.path.exists(json_file_path): 105 with open(json_file_path, 'r') as json_file: 106 data_dic = json.load(json_file) 107 if not data_dic: 108 return False 109 else: 110 if "kits" in data_dic.keys(): 111 kits_list = data_dic.get("kits") 112 if len(kits_list) > 0: 113 for kits_dict in kits_list: 114 if "test-file-name" not in kits_dict.keys(): 115 continue 116 else: 117 return True 118 else: 119 return False 120 return False 121 except JSONDecodeError: 122 return False 123 finally: 124 print(" check hap test file finally") 125 126 @classmethod 127 def get_hap_test_driver(cls, hap_file_path): 128 data_dic = cls.get_hap_json(hap_file_path) 129 if not data_dic: 130 return "" 131 else: 132 if "driver" in data_dic.keys(): 133 driver_dict = data_dic.get("driver") 134 if bool(driver_dict): 135 driver_type = driver_dict.get("type") 136 return driver_type 137 else: 138 LOG.error("%s has not set driver." % hap_file_path) 139 return "" 140 else: 141 return "" 142 143 @classmethod 144 def get_hap_json(cls, hap_file_path): 145 if hap_file_path.endswith(".hap"): 146 json_file_path = hap_file_path.replace(".hap", ".json") 147 if os.path.exists(json_file_path): 148 with open(json_file_path, 'r') as json_file: 149 data_dic = json.load(json_file) 150 return data_dic 151 else: 152 return {} 153 else: 154 return {} 155 156 @classmethod 157 def get_hap_part_json(cls, hap_file_path): 158 if hap_file_path.endswith(".hap"): 159 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 160 if os.path.exists(json_file_path): 161 with open(json_file_path, 'r') as json_file: 162 data_dic = json.load(json_file) 163 return data_dic 164 else: 165 return {} 166 else: 167 return {} 168 169 @classmethod 170 def get_part_name_test_file(cls, hap_file_path): 171 data_dic = cls.get_hap_part_json(hap_file_path) 172 if not data_dic: 173 return "" 174 else: 175 if "part" in data_dic.keys(): 176 part_name = data_dic["part"] 177 return part_name 178 else: 179 return "" 180 181 def get_test_files(self, test_case_path, options): 182 LOG.info("test case path: " + test_case_path) 183 LOG.info("test type list: " + str(options.testtype)) 184 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 185 if os.path.exists(test_case_path): 186 if len(options.testtype) != 0: 187 test_type_list = options.testtype 188 suit_file_dic = self.get_test_file_data( 189 test_case_path, 190 test_type_list, 191 options) 192 else: 193 LOG.error("%s is not exist." % test_case_path) 194 return suit_file_dic 195 196 def get_test_file_data(self, test_case_path, test_type_list, options): 197 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 198 for test_type in test_type_list: 199 temp_dic = self.get_test_file_data_by_test_type( 200 test_case_path, 201 test_type, 202 options) 203 for key, value in suit_file_dic.items(): 204 suit_file_dic[key] = value + temp_dic.get(key) 205 return suit_file_dic 206 207 def get_test_file_data_by_test_type(self, test_case_path, 208 test_type, options): 209 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 210 test_case_out_path = os.path.join(test_case_path, test_type) 211 if os.path.exists(test_case_out_path): 212 LOG.info("The test case directory: %s" % test_case_out_path) 213 return self.get_all_test_file(test_case_out_path, options) 214 else: 215 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 216 return suit_file_dictionary 217 218 def get_all_test_file(self, test_case_out_path, options): 219 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 220 filter_part_list = FilterConfigManager().get_filtering_list( 221 "subsystem_name", options.productform) 222 filter_list_test_file = FilterConfigManager().get_filtering_list( 223 "testfile_name", options.productform) 224 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 225 command_list = options.current_raw_cmd.split(" ") 226 for part_name in os.listdir(test_case_out_path): 227 if "-ss" in command_list or "-tp" in command_list: 228 if part_name not in options.partname_list: 229 continue 230 part_case_dir = os.path.join(test_case_out_path, part_name) 231 if not os.path.isdir(part_case_dir): 232 continue 233 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 234 if part_name in filter_part_list: 235 continue 236 237 # 获取子系统目录下面的所有文件路径列表 238 suite_file_list = get_file_list_by_postfix(part_case_dir) 239 for suite_file in suite_file_list: 240 # 如果文件在resource目录下面,需要过滤 241 if -1 != suite_file.replace(test_case_out_path, "").find( 242 os.sep + "resource" + os.sep): 243 continue 244 245 file_name = os.path.basename(suite_file) 246 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 247 if file_name in filter_list_test_file: 248 continue 249 250 prefix_name, suffix_name = os.path.splitext(file_name) 251 if suffix_name in FILTER_SUFFIX_NAME_LIST: 252 continue 253 254 if not self.get_valid_suite_file(test_case_out_path, 255 suite_file, 256 options): 257 continue 258 259 if suffix_name == ".dex": 260 suite_file_dictionary.get("DEX").append(suite_file) 261 elif suffix_name == ".hap": 262 if self.get_hap_test_driver(suite_file) == "OHJSUnitTest": 263 # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 264 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file( 265 suite_file): 266 continue 267 # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 268 if options.testsuit != "": 269 testsuit_list = options.testsuit.split(";") 270 is_match = False 271 for suite_item in testsuit_list: 272 if suite_item == prefix_name: 273 is_match = True 274 break 275 if not is_match: 276 continue 277 if not self.check_hap_test_file(suite_file): 278 continue 279 suite_file_dictionary.get("OHJST").append(suite_file) 280 if self.get_hap_test_driver(suite_file) == "JSUnitTest": 281 suite_file_dictionary.get("JST").append(suite_file) 282 elif suffix_name == ".py": 283 if not self.check_python_test_file(suite_file): 284 continue 285 suite_file_dictionary.get("PYT").append(suite_file) 286 elif suffix_name == "": 287 if file_name.startswith("rust_"): 288 Scheduler.update_test_type_in_source("OHRust", DeviceTestType.oh_rust_test) 289 suite_file_dictionary.get("OHRust").append(suite_file) 290 else: 291 suite_file_dictionary.get("CXX").append(suite_file) 292 elif suffix_name == ".bin": 293 suite_file_dictionary.get("BIN").append(suite_file) 294 295 return suite_file_dictionary 296 297 def get_part_deps_files(self, external_deps_path, testpart): 298 LOG.info("external_deps_path:" + external_deps_path) 299 if os.path.exists(external_deps_path): 300 with open(external_deps_path, 'r') as json_file: 301 data_dic = json.load(json_file) 302 if not data_dic: 303 LOG.error("data_dic is empty.") 304 return [] 305 test_part = testpart[0] 306 if test_part in data_dic.keys(): 307 external_deps_part_list = data_dic.get(test_part) 308 LOG.info("external_deps_part_list = %s" % external_deps_part_list) 309 return external_deps_part_list 310 else: 311 LOG.error("%s is not in part deps info json." % test_part) 312 else: 313 LOG.error("Part deps info %s is not exist." % external_deps_path) 314 return [] 315 316 def check_xts_config_match(self, options, prefix_name, xts_suite_file): 317 # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列 318 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file): 319 return False 320 # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列 321 if options.testsuit != "": 322 testsuit_list = options.testsuit.split(";") 323 for suite_item in testsuit_list: 324 if suite_item == prefix_name: 325 return True 326 return False 327 return True 328 329 def get_xts_test_files(self, xts_test_case_path, options): 330 LOG.info("xts test case path: " + xts_test_case_path) 331 xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 332 if not os.path.exists(xts_test_case_path): 333 LOG.error("xts %s is not exist." % xts_test_case_path) 334 return xts_suit_file_dic 335 # 获取XTS测试用例输出目录下面的所有文件路径列表 336 xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path) 337 for xts_suite_file in xts_suite_file_list: 338 file_name = os.path.basename(xts_suite_file) 339 prefix_name, suffix_name = os.path.splitext(file_name) 340 if not self.check_xts_config_match(options, prefix_name, xts_suite_file): 341 continue 342 if suffix_name == "": 343 if file_name == "HatsOpenPosixTest": 344 xts_suit_file_dic.get("LTPPosix").append(xts_suite_file) 345 else: 346 xts_suit_file_dic.get("CXX").append(xts_suite_file) 347 elif suffix_name == ".hap": 348 if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest": 349 xts_suit_file_dic.get("OHJST").append(xts_suite_file) 350 if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": 351 xts_suit_file_dic.get("JST").append(xts_suite_file) 352 return xts_suit_file_dic 353