1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30from xdevice import DeviceTestType 31from xdevice import Binder 32 33LOG = platform_logger("TestcaseManager") 34 35TESTFILE_TYPE_DATA_DIC = { 36 "DEX": [], 37 "HAP": [], 38 "PYT": [], 39 "CXX": [], 40 "BIN": [], 41 "OHJST": [], 42 "JST": [], 43 "LTPPosix": [], 44 "OHRust": [] 45} 46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 47 48 49class TestCaseManager(object): 50 @classmethod 51 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 52 partlist = options.partname_list 53 testmodule = options.testmodule 54 testsuit = options.testsuit 55 56 if not suite_file.startswith(test_case_out_path): 57 return False 58 59 if testsuit != "": 60 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 61 testsuit_list = testsuit.split(',') 62 for test in testsuit_list: 63 if short_name == test: 64 return True 65 return False 66 67 is_valid_status = False 68 suitfile_subpath = suite_file.replace(test_case_out_path, "") 69 suitfile_subpath = suitfile_subpath.strip(os.sep) 70 if len(partlist) == 0: 71 if testmodule != "": 72 is_valid_status = False 73 else: 74 is_valid_status = True 75 else: 76 for partname in partlist: 77 if testmodule != "": 78 module_list = testmodule.split(",") 79 for module in module_list: 80 if suitfile_subpath.startswith( 81 partname + os.sep + module + os.sep): 82 is_valid_status = True 83 break 84 else: 85 if suitfile_subpath.startswith(partname + os.sep): 86 is_valid_status = True 87 break 88 return is_valid_status 89 90 @classmethod 91 def check_python_test_file(cls, suite_file): 92 if suite_file.endswith(".py"): 93 filename = os.path.basename(suite_file) 94 if filename.startswith("test_"): 95 return True 96 return False 97 98 @classmethod 99 def check_hap_test_file(cls, hap_file_path): 100 try: 101 if hap_file_path.endswith(".hap"): 102 json_file_path = hap_file_path.replace(".hap", ".json") 103 if os.path.exists(json_file_path): 104 with open(json_file_path, 'r') as json_file: 105 data_dic = json.load(json_file) 106 if not data_dic: 107 return False 108 else: 109 if "kits" in data_dic.keys(): 110 kits_list = data_dic.get("kits") 111 if len(kits_list) > 0: 112 for kits_dict in kits_list: 113 if "test-file-name" not in kits_dict.keys(): 114 continue 115 else: 116 return True 117 else: 118 return False 119 return False 120 except JSONDecodeError: 121 return False 122 finally: 123 print(" check hap test file finally") 124 125 @classmethod 126 def get_hap_test_driver(cls, hap_file_path): 127 data_dic = cls.get_hap_json(hap_file_path) 128 if not data_dic: 129 return "" 130 else: 131 if "driver" in data_dic.keys(): 132 driver_dict = data_dic.get("driver") 133 if bool(driver_dict): 134 driver_type = driver_dict.get("type") 135 return driver_type 136 else: 137 LOG.error("%s has not set driver." % hap_file_path) 138 return "" 139 else: 140 return "" 141 142 @classmethod 143 def get_hap_json(cls, hap_file_path): 144 if hap_file_path.endswith(".hap"): 145 json_file_path = hap_file_path.replace(".hap", ".json") 146 if os.path.exists(json_file_path): 147 with open(json_file_path, 'r') as json_file: 148 data_dic = json.load(json_file) 149 return data_dic 150 else: 151 return {} 152 else: 153 return {} 154 155 @classmethod 156 def get_hap_part_json(cls, hap_file_path): 157 if hap_file_path.endswith(".hap"): 158 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 159 if os.path.exists(json_file_path): 160 with open(json_file_path, 'r') as json_file: 161 data_dic = json.load(json_file) 162 return data_dic 163 else: 164 return {} 165 else: 166 return {} 167 168 @classmethod 169 def get_part_name_test_file(cls, hap_file_path): 170 data_dic = cls.get_hap_part_json(hap_file_path) 171 if not data_dic: 172 return "" 173 else: 174 if "part" in data_dic.keys(): 175 part_name = data_dic["part"] 176 return part_name 177 else: 178 return "" 179 180 def get_test_files(self, test_case_path, options): 181 LOG.info("test case path: " + test_case_path) 182 LOG.info("test type list: " + str(options.testtype)) 183 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 184 if os.path.exists(test_case_path): 185 if len(options.testtype) != 0: 186 test_type_list = options.testtype 187 suit_file_dic = self.get_test_file_data( 188 test_case_path, 189 test_type_list, 190 options) 191 else: 192 LOG.error("%s is not exist." % test_case_path) 193 return suit_file_dic 194 195 def get_test_file_data(self, test_case_path, test_type_list, options): 196 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 197 for test_type in test_type_list: 198 temp_dic = self.get_test_file_data_by_test_type( 199 test_case_path, 200 test_type, 201 options) 202 for key, value in suit_file_dic.items(): 203 suit_file_dic[key] = value + temp_dic.get(key) 204 return suit_file_dic 205 206 def get_test_file_data_by_test_type(self, test_case_path, 207 test_type, options): 208 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 209 test_case_out_path = os.path.join(test_case_path, test_type) 210 if os.path.exists(test_case_out_path): 211 LOG.info("The test case directory: %s" % test_case_out_path) 212 return self.get_all_test_file(test_case_out_path, options) 213 else: 214 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 215 return suit_file_dictionary 216 217 def get_all_test_file(self, test_case_out_path, options): 218 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 219 filter_part_list = FilterConfigManager().get_filtering_list( 220 "subsystem_name", options.productform) 221 filter_list_test_file = FilterConfigManager().get_filtering_list( 222 "testfile_name", options.productform) 223 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 224 command_list = options.current_raw_cmd.split(" ") 225 for part_name in os.listdir(test_case_out_path): 226 if "-ss" in command_list or "-tp" in command_list: 227 if part_name not in options.partname_list: 228 continue 229 part_case_dir = os.path.join(test_case_out_path, part_name) 230 if not os.path.isdir(part_case_dir): 231 continue 232 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 233 if part_name in filter_part_list: 234 continue 235 236 # 获取子系统目录下面的所有文件路径列表 237 suite_file_list = get_file_list_by_postfix(part_case_dir) 238 for suite_file in suite_file_list: 239 # 如果文件在resource目录下面,需要过滤 240 if -1 != suite_file.replace(test_case_out_path, "").find( 241 os.sep + "resource" + os.sep): 242 continue 243 244 file_name = os.path.basename(suite_file) 245 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 246 if file_name in filter_list_test_file: 247 continue 248 249 prefix_name, suffix_name = os.path.splitext(file_name) 250 if suffix_name in FILTER_SUFFIX_NAME_LIST: 251 continue 252 253 if not self.get_valid_suite_file(test_case_out_path, 254 suite_file, 255 options): 256 continue 257 258 if suffix_name == ".dex": 259 suite_file_dictionary.get("DEX").append(suite_file) 260 elif suffix_name == ".hap": 261 if self.get_hap_test_driver(suite_file) == "OHJSUnitTest": 262 # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 263 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file( 264 suite_file): 265 continue 266 # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 267 if options.testsuit != "": 268 testsuit_list = options.testsuit.split(";") 269 is_match = False 270 for suite_item in testsuit_list: 271 if suite_item == prefix_name: 272 is_match = True 273 break 274 if not is_match: 275 continue 276 if not self.check_hap_test_file(suite_file): 277 continue 278 suite_file_dictionary.get("OHJST").append(suite_file) 279 if self.get_hap_test_driver(suite_file) == "JSUnitTest": 280 suite_file_dictionary.get("JST").append(suite_file) 281 elif suffix_name == ".py": 282 if not self.check_python_test_file(suite_file): 283 continue 284 suite_file_dictionary.get("PYT").append(suite_file) 285 elif suffix_name == "": 286 if file_name.startswith("rust_"): 287 Binder.get_tdd_config().update_test_type_in_source( 288 "OHRust", DeviceTestType.oh_rust_test) 289 suite_file_dictionary.get("OHRust").append(suite_file) 290 else: 291 suite_file_dictionary.get("CXX").append(suite_file) 292 elif suffix_name == ".bin": 293 suite_file_dictionary.get("BIN").append(suite_file) 294 295 return suite_file_dictionary 296 297 def get_part_deps_files(self, external_deps_path, testpart): 298 LOG.info("external_deps_path:" + external_deps_path) 299 if os.path.exists(external_deps_path): 300 with open(external_deps_path, 'r') as json_file: 301 data_dic = json.load(json_file) 302 if not data_dic: 303 LOG.error("data_dic is empty.") 304 return [] 305 test_part = testpart[0] 306 if test_part in data_dic.keys(): 307 external_deps_part_list = data_dic.get(test_part) 308 LOG.info("external_deps_part_list = %s" % external_deps_part_list) 309 return external_deps_part_list 310 else: 311 LOG.error("%s is not in part deps info json." % test_part) 312 else: 313 LOG.error("Part deps info %s is not exist." % external_deps_path) 314 return [] 315 316 def check_xts_config_match(self, options, prefix_name, xts_suite_file): 317 # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列 318 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file): 319 return False 320 # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列 321 if options.testsuit != "": 322 testsuit_list = options.testsuit.split(";") 323 for suite_item in testsuit_list: 324 if suite_item == prefix_name: 325 return True 326 return False 327 return True 328 329 def get_xts_test_files(self, xts_test_case_path, options): 330 LOG.info("xts test case path: " + xts_test_case_path) 331 xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 332 if not os.path.exists(xts_test_case_path): 333 LOG.error("xts %s is not exist." % xts_test_case_path) 334 return xts_suit_file_dic 335 # 获取XTS测试用例输出目录下面的所有文件路径列表 336 xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path) 337 for xts_suite_file in xts_suite_file_list: 338 file_name = os.path.basename(xts_suite_file) 339 prefix_name, suffix_name = os.path.splitext(file_name) 340 if not self.check_xts_config_match(options, prefix_name, xts_suite_file): 341 continue 342 if suffix_name == "": 343 if file_name == "HatsOpenPosixTest": 344 xts_suit_file_dic.get("LTPPosix").append(xts_suite_file) 345 else: 346 xts_suit_file_dic.get("CXX").append(xts_suite_file) 347 elif suffix_name == ".hap": 348 if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest": 349 xts_suit_file_dic.get("OHJST").append(xts_suite_file) 350 if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": 351 xts_suit_file_dic.get("JST").append(xts_suite_file) 352 return xts_suit_file_dic 353