1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30 31LOG = platform_logger("TestcaseManager") 32 33TESTFILE_TYPE_DATA_DIC = { 34 "DEX": [], 35 "HAP": [], 36 "PYT": [], 37 "CXX": [], 38 "BIN": [], 39 "OHJST": [], 40 "JST": [], 41} 42FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 43 44 45class TestCaseManager(object): 46 def get_test_files(self, test_case_path, options): 47 LOG.info("test case path: " + test_case_path) 48 LOG.info("test type list: " + str(options.testtype)) 49 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 50 if os.path.exists(test_case_path): 51 if len(options.testtype) != 0: 52 test_type_list = options.testtype 53 suit_file_dic = self.get_test_file_data( 54 test_case_path, 55 test_type_list, 56 options) 57 else: 58 LOG.error("%s is not exist." % test_case_path) 59 return suit_file_dic 60 61 def get_test_file_data(self, test_case_path, test_type_list, options): 62 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 63 for test_type in test_type_list: 64 temp_dic = self.get_test_file_data_by_test_type( 65 test_case_path, 66 test_type, 67 options) 68 for key, value in suit_file_dic.items(): 69 suit_file_dic[key] = value + temp_dic.get(key) 70 return suit_file_dic 71 72 def get_test_file_data_by_test_type(self, test_case_path, 73 test_type, options): 74 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 75 test_case_out_path = os.path.join(test_case_path, test_type) 76 if os.path.exists(test_case_out_path): 77 LOG.info("The test case directory: %s" % test_case_out_path) 78 return self.get_all_test_file(test_case_out_path, options) 79 else: 80 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 81 return suit_file_dictionary 82 83 def get_all_test_file(self, test_case_out_path, options): 84 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 85 filter_part_list = FilterConfigManager().get_filtering_list( 86 "subsystem_name", options.productform) 87 filter_list_test_file = FilterConfigManager().get_filtering_list( 88 "testfile_name", options.productform) 89 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 90 for part_name in os.listdir(test_case_out_path): 91 part_case_dir = os.path.join(test_case_out_path, part_name) 92 if not os.path.isdir(part_case_dir): 93 continue 94 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 95 if part_name in filter_part_list: 96 continue 97 98 # 获取子系统目录下面的所有文件路径列表 99 suite_file_list = get_file_list_by_postfix(part_case_dir) 100 for suite_file in suite_file_list: 101 # 如果文件在resource目录下面,需要过滤 102 if -1 != suite_file.replace(test_case_out_path, "").find( 103 os.sep + "resource" + os.sep): 104 continue 105 106 file_name = os.path.basename(suite_file) 107 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 108 if file_name in filter_list_test_file: 109 continue 110 111 _, suffix_name = os.path.splitext(file_name) 112 if suffix_name in FILTER_SUFFIX_NAME_LIST: 113 continue 114 115 if not self.get_valid_suite_file(test_case_out_path, 116 suite_file, 117 options): 118 continue 119 120 if suffix_name == ".dex": 121 suite_file_dictionary.get("DEX").append(suite_file) 122 elif suffix_name == ".hap": 123 suite_file_dictionary.get("JST").append(suite_file) 124 elif suffix_name == ".py": 125 if not self.check_python_test_file(suite_file): 126 continue 127 suite_file_dictionary.get("PYT").append(suite_file) 128 elif suffix_name == "": 129 suite_file_dictionary.get("CXX").append(suite_file) 130 elif suffix_name == ".bin": 131 suite_file_dictionary.get("BIN").append(suite_file) 132 133 return suite_file_dictionary 134 135 def get_acts_test_files(self, acts_test_case_path, options): 136 LOG.info("acts test case path: " + acts_test_case_path) 137 acts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 138 if os.path.exists(acts_test_case_path): 139 # 获取acts测试用例输出目录下面的所有文件路径列表 140 acts_suite_file_list = get_file_list_by_postfix(acts_test_case_path) 141 for acts_suite_file in acts_suite_file_list: 142 file_name = os.path.basename(acts_suite_file) 143 prefix_name, suffix_name = os.path.splitext(file_name) 144 if suffix_name != ".hap": 145 continue 146 # 如果acts测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 147 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(acts_suite_file): 148 continue 149 # 如果acts测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 150 if options.testsuit != "": 151 testsuit_list = options.testsuit.split(";") 152 is_match = False 153 for suite_item in testsuit_list: 154 if suite_item == prefix_name: 155 is_match = True 156 break 157 if not is_match: 158 continue 159 if not self.check_hap_test_file(acts_suite_file): 160 continue 161 if self.get_hap_test_driver(acts_suite_file) == "OHJSUnitTest": 162 acts_suit_file_dic.get("OHJST").append(acts_suite_file) 163 if self.get_hap_test_driver(acts_suite_file) == "JSUnitTest": 164 acts_suit_file_dic.get("JST").append(acts_suite_file) 165 else: 166 LOG.error("acts %s is not exist." % acts_test_case_path) 167 return acts_suit_file_dic 168 169 @classmethod 170 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 171 partlist = options.partname_list 172 testmodule = options.testmodule 173 testsuit = options.testsuit 174 175 if not suite_file.startswith(test_case_out_path): 176 return False 177 178 if testsuit != "": 179 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 180 return short_name.startswith(testsuit) or \ 181 testsuit.startswith(short_name) 182 183 is_valid_status = False 184 suitfile_subpath = suite_file.replace(test_case_out_path, "") 185 suitfile_subpath = suitfile_subpath.strip(os.sep) 186 if len(partlist) == 0: 187 if testmodule != "": 188 temp_list = suitfile_subpath.split(os.sep) 189 if len(temp_list) > 2 and testmodule == temp_list[1]: 190 is_valid_status = True 191 else: 192 is_valid_status = True 193 else: 194 for partname in partlist: 195 if testmodule != "": 196 if suitfile_subpath.startswith( 197 partname + os.sep + testmodule + os.sep): 198 is_valid_status = True 199 break 200 else: 201 if suitfile_subpath.startswith(partname + os.sep): 202 is_valid_status = True 203 break 204 return is_valid_status 205 206 @classmethod 207 def check_python_test_file(cls, suite_file): 208 if suite_file.endswith(".py"): 209 filename = os.path.basename(suite_file) 210 if filename.startswith("test_"): 211 return True 212 return False 213 214 @classmethod 215 def check_hap_test_file(cls, hap_file_path): 216 try: 217 if hap_file_path.endswith(".hap"): 218 json_file_path = hap_file_path.replace(".hap", ".json") 219 if os.path.exists(json_file_path): 220 with open(json_file_path, 'r') as json_file: 221 data_dic = json.load(json_file) 222 if not data_dic: 223 return False 224 else: 225 if "kits" in data_dic.keys(): 226 kits_list = data_dic.get("kits") 227 if len(kits_list) > 0: 228 kits_dict = kits_list[0] 229 if "test-file-name" in kits_dict.keys(): 230 return True 231 else: 232 return False 233 else: 234 return False 235 return False 236 except JSONDecodeError: 237 return False 238 finally: 239 print(" check hap test file finally") 240 241 @classmethod 242 def get_hap_test_driver(cls, hap_file_path): 243 data_dic = cls.get_hap_json(hap_file_path) 244 if not data_dic: 245 return False 246 else: 247 if "driver" in data_dic.keys(): 248 driver_dict = data_dic.get("driver") 249 if bool(driver_dict): 250 driver_type = driver_dict.get("type") 251 return driver_type 252 else: 253 LOG.error("%s has not set driver." % hap_file_path) 254 255 @classmethod 256 def get_hap_json(cls, hap_file_path): 257 if hap_file_path.endswith(".hap"): 258 json_file_path = hap_file_path.replace(".hap", ".json") 259 if os.path.exists(json_file_path): 260 with open(json_file_path, 'r') as json_file: 261 data_dic = json.load(json_file) 262 return data_dic 263 264 @classmethod 265 def get_hap_part_json(cls, hap_file_path): 266 if hap_file_path.endswith(".hap"): 267 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 268 if os.path.exists(json_file_path): 269 with open(json_file_path, 'r') as json_file: 270 data_dic = json.load(json_file) 271 return data_dic 272 273 @classmethod 274 def get_part_name_test_file(cls, hap_file_path): 275 data_dic = cls.get_hap_part_json(hap_file_path) 276 if not data_dic: 277 return False 278 else: 279 if "part" in data_dic.keys(): 280 part_name = data_dic["part"] 281 return part_name