1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30from xdevice import Scheduler 31from xdevice import DeviceTestType 32 33LOG = platform_logger("TestcaseManager") 34 35TESTFILE_TYPE_DATA_DIC = { 36 "DEX": [], 37 "HAP": [], 38 "PYT": [], 39 "CXX": [], 40 "BIN": [], 41 "OHJST": [], 42 "JST": [], 43 "LTPPosix": [], 44 "OHRust": [] 45} 46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 47 48 49class TestCaseManager(object): 50 def get_test_files(self, test_case_path, options): 51 LOG.info("test case path: " + test_case_path) 52 LOG.info("test type list: " + str(options.testtype)) 53 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 54 if os.path.exists(test_case_path): 55 if len(options.testtype) != 0: 56 test_type_list = options.testtype 57 suit_file_dic = self.get_test_file_data( 58 test_case_path, 59 test_type_list, 60 options) 61 else: 62 LOG.error("%s is not exist." % test_case_path) 63 return suit_file_dic 64 65 def get_test_file_data(self, test_case_path, test_type_list, options): 66 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 67 for test_type in test_type_list: 68 temp_dic = self.get_test_file_data_by_test_type( 69 test_case_path, 70 test_type, 71 options) 72 for key, value in suit_file_dic.items(): 73 suit_file_dic[key] = value + temp_dic.get(key) 74 return suit_file_dic 75 76 def get_test_file_data_by_test_type(self, test_case_path, 77 test_type, options): 78 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 79 test_case_out_path = os.path.join(test_case_path, test_type) 80 if os.path.exists(test_case_out_path): 81 LOG.info("The test case directory: %s" % test_case_out_path) 82 return self.get_all_test_file(test_case_out_path, options) 83 else: 84 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 85 return suit_file_dictionary 86 87 def get_all_test_file(self, test_case_out_path, options): 88 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 89 filter_part_list = FilterConfigManager().get_filtering_list( 90 "subsystem_name", options.productform) 91 filter_list_test_file = FilterConfigManager().get_filtering_list( 92 "testfile_name", options.productform) 93 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 94 for part_name in os.listdir(test_case_out_path): 95 part_case_dir = os.path.join(test_case_out_path, part_name) 96 if not os.path.isdir(part_case_dir): 97 continue 98 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 99 if part_name in filter_part_list: 100 continue 101 102 # 获取子系统目录下面的所有文件路径列表 103 suite_file_list = get_file_list_by_postfix(part_case_dir) 104 for suite_file in suite_file_list: 105 # 如果文件在resource目录下面,需要过滤 106 if -1 != suite_file.replace(test_case_out_path, "").find( 107 os.sep + "resource" + os.sep): 108 continue 109 110 file_name = os.path.basename(suite_file) 111 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 112 if file_name in filter_list_test_file: 113 continue 114 115 prefix_name, suffix_name = os.path.splitext(file_name) 116 if suffix_name in FILTER_SUFFIX_NAME_LIST: 117 continue 118 119 if not self.get_valid_suite_file(test_case_out_path, 120 suite_file, 121 options): 122 continue 123 124 if suffix_name == ".dex": 125 suite_file_dictionary.get("DEX").append(suite_file) 126 elif suffix_name == ".hap": 127 if self.get_hap_test_driver(suite_file) == "OHJSUnitTest": 128 # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 129 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file( 130 suite_file): 131 continue 132 # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 133 if options.testsuit != "": 134 testsuit_list = options.testsuit.split(";") 135 is_match = False 136 for suite_item in testsuit_list: 137 if suite_item == prefix_name: 138 is_match = True 139 break 140 if not is_match: 141 continue 142 if not self.check_hap_test_file(suite_file): 143 continue 144 suite_file_dictionary.get("OHJST").append(suite_file) 145 if self.get_hap_test_driver(suite_file) == "JSUnitTest": 146 suite_file_dictionary.get("JST").append(suite_file) 147 elif suffix_name == ".py": 148 if not self.check_python_test_file(suite_file): 149 continue 150 suite_file_dictionary.get("PYT").append(suite_file) 151 elif suffix_name == "": 152 if file_name.startswith("rust_"): 153 Scheduler.update_test_type_in_source("OHRust", DeviceTestType.oh_rust_test) 154 suite_file_dictionary.get("OHRust").append(suite_file) 155 else: 156 suite_file_dictionary.get("CXX").append(suite_file) 157 elif suffix_name == ".bin": 158 suite_file_dictionary.get("BIN").append(suite_file) 159 160 return suite_file_dictionary 161 162 def get_part_deps_files(self, external_deps_path, testpart): 163 LOG.info("external_deps_path:" + external_deps_path) 164 if os.path.exists(external_deps_path): 165 with open(external_deps_path, 'r') as json_file: 166 data_dic = json.load(json_file) 167 if not data_dic: 168 LOG.error("data_dic is empty.") 169 return [] 170 test_part = testpart[0] 171 if test_part in data_dic.keys(): 172 external_deps_part_list = data_dic.get(test_part) 173 LOG.info("external_deps_part_list = %s" % external_deps_part_list) 174 return external_deps_part_list 175 else: 176 LOG.error("%s is not in part deps info json." % test_part) 177 else: 178 LOG.error("Part deps info %s is not exist." % external_deps_path) 179 return [] 180 181 def check_xts_config_match(self, options, prefix_name, xts_suite_file): 182 # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列 183 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file): 184 return False 185 # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列 186 if options.testsuit != "": 187 testsuit_list = options.testsuit.split(";") 188 for suite_item in testsuit_list: 189 if suite_item == prefix_name: 190 return True 191 return False 192 return True 193 194 def get_xts_test_files(self, xts_test_case_path, options): 195 LOG.info("xts test case path: " + xts_test_case_path) 196 xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 197 if not os.path.exists(xts_test_case_path): 198 LOG.error("xts %s is not exist." % xts_test_case_path) 199 return xts_suit_file_dic 200 # 获取XTS测试用例输出目录下面的所有文件路径列表 201 xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path) 202 for xts_suite_file in xts_suite_file_list: 203 file_name = os.path.basename(xts_suite_file) 204 prefix_name, suffix_name = os.path.splitext(file_name) 205 if not self.check_xts_config_match(options, prefix_name, xts_suite_file): 206 continue 207 if suffix_name == "": 208 if file_name == "HatsOpenPosixTest": 209 xts_suit_file_dic.get("LTPPosix").append(xts_suite_file) 210 else: 211 xts_suit_file_dic.get("CXX").append(xts_suite_file) 212 elif suffix_name == ".hap": 213 if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest": 214 xts_suit_file_dic.get("OHJST").append(xts_suite_file) 215 if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": 216 xts_suit_file_dic.get("JST").append(xts_suite_file) 217 return xts_suit_file_dic 218 219 @classmethod 220 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 221 partlist = options.partname_list 222 testmodule = options.testmodule 223 testsuit = options.testsuit 224 225 if not suite_file.startswith(test_case_out_path): 226 return False 227 228 if testsuit != "": 229 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 230 testsuit_list = testsuit.split(',') 231 for test in testsuit_list: 232 if short_name.startswith(test) or \ 233 testsuit.startswith(short_name): 234 return True 235 return False 236 237 is_valid_status = False 238 suitfile_subpath = suite_file.replace(test_case_out_path, "") 239 suitfile_subpath = suitfile_subpath.strip(os.sep) 240 if len(partlist) == 0: 241 if testmodule != "": 242 temp_list = suitfile_subpath.split(os.sep) 243 if len(temp_list) > 2 and testmodule == temp_list[1]: 244 is_valid_status = True 245 else: 246 is_valid_status = True 247 else: 248 for partname in partlist: 249 if testmodule != "": 250 if suitfile_subpath.startswith( 251 partname + os.sep + testmodule + os.sep): 252 is_valid_status = True 253 break 254 else: 255 if suitfile_subpath.startswith(partname + os.sep): 256 is_valid_status = True 257 break 258 return is_valid_status 259 260 @classmethod 261 def check_python_test_file(cls, suite_file): 262 if suite_file.endswith(".py"): 263 filename = os.path.basename(suite_file) 264 if filename.startswith("test_"): 265 return True 266 return False 267 268 @classmethod 269 def check_hap_test_file(cls, hap_file_path): 270 try: 271 if hap_file_path.endswith(".hap"): 272 json_file_path = hap_file_path.replace(".hap", ".json") 273 if os.path.exists(json_file_path): 274 with open(json_file_path, 'r') as json_file: 275 data_dic = json.load(json_file) 276 if not data_dic: 277 return False 278 else: 279 if "kits" in data_dic.keys(): 280 kits_list = data_dic.get("kits") 281 if len(kits_list) > 0: 282 for kits_dict in kits_list: 283 if "test-file-name" not in kits_dict.keys(): 284 continue 285 else: 286 return True 287 else: 288 return False 289 return False 290 except JSONDecodeError: 291 return False 292 finally: 293 print(" check hap test file finally") 294 295 @classmethod 296 def get_hap_test_driver(cls, hap_file_path): 297 data_dic = cls.get_hap_json(hap_file_path) 298 if not data_dic: 299 return "" 300 else: 301 if "driver" in data_dic.keys(): 302 driver_dict = data_dic.get("driver") 303 if bool(driver_dict): 304 driver_type = driver_dict.get("type") 305 return driver_type 306 else: 307 LOG.error("%s has not set driver." % hap_file_path) 308 return "" 309 else: 310 return "" 311 312 @classmethod 313 def get_hap_json(cls, hap_file_path): 314 if hap_file_path.endswith(".hap"): 315 json_file_path = hap_file_path.replace(".hap", ".json") 316 if os.path.exists(json_file_path): 317 with open(json_file_path, 'r') as json_file: 318 data_dic = json.load(json_file) 319 return data_dic 320 else: 321 return {} 322 else: 323 return {} 324 325 @classmethod 326 def get_hap_part_json(cls, hap_file_path): 327 if hap_file_path.endswith(".hap"): 328 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 329 if os.path.exists(json_file_path): 330 with open(json_file_path, 'r') as json_file: 331 data_dic = json.load(json_file) 332 return data_dic 333 else: 334 return {} 335 else: 336 return {} 337 338 @classmethod 339 def get_part_name_test_file(cls, hap_file_path): 340 data_dic = cls.get_hap_part_json(hap_file_path) 341 if not data_dic: 342 return "" 343 else: 344 if "part" in data_dic.keys(): 345 part_name = data_dic["part"] 346 return part_name 347 else: 348 return "" 349