1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30from xdevice import DeviceTestType 31from xdevice import Binder 32 33LOG = platform_logger("TestcaseManager") 34 35TESTFILE_TYPE_DATA_DIC = { 36 "DEX": [], 37 "HAP": [], 38 "PYT": [], 39 "CXX": [], 40 "BIN": [], 41 "OHJST": [], 42 "JST": [], 43 "LTPPosix": [], 44 "OHRust": [] 45} 46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 47 48 49class TestCaseManager(object): 50 @classmethod 51 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 52 partlist = options.partname_list 53 testmodule = options.testmodule 54 testsuit = options.testsuit 55 56 if not suite_file.startswith(test_case_out_path): 57 return False 58 59 if testsuit != "": 60 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 61 testsuit_list = testsuit.split(',') 62 for test in testsuit_list: 63 if short_name.startswith(test) or \ 64 testsuit.startswith(short_name): 65 return True 66 return False 67 68 is_valid_status = False 69 suitfile_subpath = suite_file.replace(test_case_out_path, "") 70 suitfile_subpath = suitfile_subpath.strip(os.sep) 71 if len(partlist) == 0: 72 if testmodule != "": 73 temp_list = suitfile_subpath.split(os.sep) 74 if len(temp_list) > 2 and testmodule == temp_list[1]: 75 is_valid_status = True 76 else: 77 is_valid_status = True 78 else: 79 for partname in partlist: 80 if testmodule != "": 81 module_list = testmodule.split(",") 82 for module in module_list: 83 if suitfile_subpath.startswith( 84 partname + os.sep + module + os.sep): 85 is_valid_status = True 86 break 87 else: 88 if suitfile_subpath.startswith(partname + os.sep): 89 is_valid_status = True 90 break 91 return is_valid_status 92 93 @classmethod 94 def check_python_test_file(cls, suite_file): 95 if suite_file.endswith(".py"): 96 filename = os.path.basename(suite_file) 97 if filename.startswith("test_"): 98 return True 99 return False 100 101 @classmethod 102 def check_hap_test_file(cls, hap_file_path): 103 try: 104 if hap_file_path.endswith(".hap"): 105 json_file_path = hap_file_path.replace(".hap", ".json") 106 if os.path.exists(json_file_path): 107 with open(json_file_path, 'r') as json_file: 108 data_dic = json.load(json_file) 109 if not data_dic: 110 return False 111 else: 112 if "kits" in data_dic.keys(): 113 kits_list = data_dic.get("kits") 114 if len(kits_list) > 0: 115 for kits_dict in kits_list: 116 if "test-file-name" not in kits_dict.keys(): 117 continue 118 else: 119 return True 120 else: 121 return False 122 return False 123 except JSONDecodeError: 124 return False 125 finally: 126 print(" check hap test file finally") 127 128 @classmethod 129 def get_hap_test_driver(cls, hap_file_path): 130 data_dic = cls.get_hap_json(hap_file_path) 131 if not data_dic: 132 return "" 133 else: 134 if "driver" in data_dic.keys(): 135 driver_dict = data_dic.get("driver") 136 if bool(driver_dict): 137 driver_type = driver_dict.get("type") 138 return driver_type 139 else: 140 LOG.error("%s has not set driver." % hap_file_path) 141 return "" 142 else: 143 return "" 144 145 @classmethod 146 def get_hap_json(cls, hap_file_path): 147 if hap_file_path.endswith(".hap"): 148 json_file_path = hap_file_path.replace(".hap", ".json") 149 if os.path.exists(json_file_path): 150 with open(json_file_path, 'r') as json_file: 151 data_dic = json.load(json_file) 152 return data_dic 153 else: 154 return {} 155 else: 156 return {} 157 158 @classmethod 159 def get_hap_part_json(cls, hap_file_path): 160 if hap_file_path.endswith(".hap"): 161 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 162 if os.path.exists(json_file_path): 163 with open(json_file_path, 'r') as json_file: 164 data_dic = json.load(json_file) 165 return data_dic 166 else: 167 return {} 168 else: 169 return {} 170 171 @classmethod 172 def get_part_name_test_file(cls, hap_file_path): 173 data_dic = cls.get_hap_part_json(hap_file_path) 174 if not data_dic: 175 return "" 176 else: 177 if "part" in data_dic.keys(): 178 part_name = data_dic["part"] 179 return part_name 180 else: 181 return "" 182 183 def get_test_files(self, test_case_path, options): 184 LOG.info("test case path: " + test_case_path) 185 LOG.info("test type list: " + str(options.testtype)) 186 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 187 if os.path.exists(test_case_path): 188 if len(options.testtype) != 0: 189 test_type_list = options.testtype 190 suit_file_dic = self.get_test_file_data( 191 test_case_path, 192 test_type_list, 193 options) 194 else: 195 LOG.error("%s is not exist." % test_case_path) 196 return suit_file_dic 197 198 def get_test_file_data(self, test_case_path, test_type_list, options): 199 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 200 for test_type in test_type_list: 201 temp_dic = self.get_test_file_data_by_test_type( 202 test_case_path, 203 test_type, 204 options) 205 for key, value in suit_file_dic.items(): 206 suit_file_dic[key] = value + temp_dic.get(key) 207 return suit_file_dic 208 209 def get_test_file_data_by_test_type(self, test_case_path, 210 test_type, options): 211 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 212 test_case_out_path = os.path.join(test_case_path, test_type) 213 if os.path.exists(test_case_out_path): 214 LOG.info("The test case directory: %s" % test_case_out_path) 215 return self.get_all_test_file(test_case_out_path, options) 216 else: 217 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 218 return suit_file_dictionary 219 220 def get_all_test_file(self, test_case_out_path, options): 221 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 222 filter_part_list = FilterConfigManager().get_filtering_list( 223 "subsystem_name", options.productform) 224 filter_list_test_file = FilterConfigManager().get_filtering_list( 225 "testfile_name", options.productform) 226 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 227 command_list = options.current_raw_cmd.split(" ") 228 for part_name in os.listdir(test_case_out_path): 229 if "-ss" in command_list or "-tp" in command_list: 230 if part_name not in options.partname_list: 231 continue 232 part_case_dir = os.path.join(test_case_out_path, part_name) 233 if not os.path.isdir(part_case_dir): 234 continue 235 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 236 if part_name in filter_part_list: 237 continue 238 239 # 获取子系统目录下面的所有文件路径列表 240 suite_file_list = get_file_list_by_postfix(part_case_dir) 241 for suite_file in suite_file_list: 242 # 如果文件在resource目录下面,需要过滤 243 if -1 != suite_file.replace(test_case_out_path, "").find( 244 os.sep + "resource" + os.sep): 245 continue 246 247 file_name = os.path.basename(suite_file) 248 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 249 if file_name in filter_list_test_file: 250 continue 251 252 prefix_name, suffix_name = os.path.splitext(file_name) 253 if suffix_name in FILTER_SUFFIX_NAME_LIST: 254 continue 255 256 if not self.get_valid_suite_file(test_case_out_path, 257 suite_file, 258 options): 259 continue 260 261 if suffix_name == ".dex": 262 suite_file_dictionary.get("DEX").append(suite_file) 263 elif suffix_name == ".hap": 264 if self.get_hap_test_driver(suite_file) == "OHJSUnitTest": 265 # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 266 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file( 267 suite_file): 268 continue 269 # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 270 if options.testsuit != "": 271 testsuit_list = options.testsuit.split(";") 272 is_match = False 273 for suite_item in testsuit_list: 274 if suite_item == prefix_name: 275 is_match = True 276 break 277 if not is_match: 278 continue 279 if not self.check_hap_test_file(suite_file): 280 continue 281 suite_file_dictionary.get("OHJST").append(suite_file) 282 if self.get_hap_test_driver(suite_file) == "JSUnitTest": 283 suite_file_dictionary.get("JST").append(suite_file) 284 elif suffix_name == ".py": 285 if not self.check_python_test_file(suite_file): 286 continue 287 suite_file_dictionary.get("PYT").append(suite_file) 288 elif suffix_name == "": 289 if file_name.startswith("rust_"): 290 Binder.get_tdd_config().update_test_type_in_source( 291 "OHRust", DeviceTestType.oh_rust_test) 292 suite_file_dictionary.get("OHRust").append(suite_file) 293 else: 294 suite_file_dictionary.get("CXX").append(suite_file) 295 elif suffix_name == ".bin": 296 suite_file_dictionary.get("BIN").append(suite_file) 297 298 return suite_file_dictionary 299 300 def get_part_deps_files(self, external_deps_path, testpart): 301 LOG.info("external_deps_path:" + external_deps_path) 302 if os.path.exists(external_deps_path): 303 with open(external_deps_path, 'r') as json_file: 304 data_dic = json.load(json_file) 305 if not data_dic: 306 LOG.error("data_dic is empty.") 307 return [] 308 test_part = testpart[0] 309 if test_part in data_dic.keys(): 310 external_deps_part_list = data_dic.get(test_part) 311 LOG.info("external_deps_part_list = %s" % external_deps_part_list) 312 return external_deps_part_list 313 else: 314 LOG.error("%s is not in part deps info json." % test_part) 315 else: 316 LOG.error("Part deps info %s is not exist." % external_deps_path) 317 return [] 318 319 def check_xts_config_match(self, options, prefix_name, xts_suite_file): 320 # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列 321 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file): 322 return False 323 # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列 324 if options.testsuit != "": 325 testsuit_list = options.testsuit.split(";") 326 for suite_item in testsuit_list: 327 if suite_item == prefix_name: 328 return True 329 return False 330 return True 331 332 def get_xts_test_files(self, xts_test_case_path, options): 333 LOG.info("xts test case path: " + xts_test_case_path) 334 xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 335 if not os.path.exists(xts_test_case_path): 336 LOG.error("xts %s is not exist." % xts_test_case_path) 337 return xts_suit_file_dic 338 # 获取XTS测试用例输出目录下面的所有文件路径列表 339 xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path) 340 for xts_suite_file in xts_suite_file_list: 341 file_name = os.path.basename(xts_suite_file) 342 prefix_name, suffix_name = os.path.splitext(file_name) 343 if not self.check_xts_config_match(options, prefix_name, xts_suite_file): 344 continue 345 if suffix_name == "": 346 if file_name == "HatsOpenPosixTest": 347 xts_suit_file_dic.get("LTPPosix").append(xts_suite_file) 348 else: 349 xts_suit_file_dic.get("CXX").append(xts_suite_file) 350 elif suffix_name == ".hap": 351 if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest": 352 xts_suit_file_dic.get("OHJST").append(xts_suite_file) 353 if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": 354 xts_suit_file_dic.get("JST").append(xts_suite_file) 355 return xts_suit_file_dic 356