1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import os 19import re 20import json 21from enum import Enum 22 23HOME = os.path.dirname(os.path.dirname(os.path.dirname( 24 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) 25 26 27class ChangeFileEntity: 28 def __init__(self, name, path): 29 self.name = name 30 self.path = path 31 self.add = [] 32 self.modified = [] 33 self.delete = [] 34 self._already_match_utils = False 35 36 def addAddPaths(self, add_list): 37 self.add += list(map(lambda x: os.path.join(self.path, x), add_list)) 38 self.add.sort() 39 40 def addModifiedPaths(self, modified_list): 41 self.modified += list(map(lambda x: os.path.join(self.path, x), modified_list)) 42 self.modified.sort() 43 44 def addRenamePathsto(self, rename_list): 45 for list in rename_list: 46 self.add += [os.path.join(self.path, list[1])] 47 self.delete += [os.path.join(self.path, list[0])] 48 self.add.sort() 49 self.delete.sort() 50 51 def addDeletePaths(self, delete_list): 52 self.delete += list(map(lambda x: os.path.join(self.path, x), delete_list)) 53 self.delete.sort() 54 55 def isEmpty(self): 56 if self.add: 57 return False 58 if self.modified: 59 return False 60 if self.delete: 61 return False 62 return True 63 64 def get_already_match_utils(self): 65 return self._already_match_utils 66 67 def set_already_match_utils(self, already_match_utils): 68 self._already_match_utils = already_match_utils 69 70 def __str__(self): 71 add_str = '\n '.join(self.add) if self.add else 'None' 72 modified_str = '\n '.join(self.modified) if self.modified else 'None' 73 delete_str = '\n '.join(self.delete) if self.delete else 'None' 74 75 return (f"ChangeFileEntity(\n" 76 f" name: {self.name},\n" 77 f" path: {self.path},\n" 78 f" add: [\n {add_str}\n ],\n" 79 f" modified: [\n {modified_str}\n ],\n" 80 f" delete: [\n {delete_str}\n ]\n" 81 f")") 82 83 84class MatchConfig: 85 config_path = os.path.join(HOME, "test/xts/tools/config") 86 MACTH_CONFIG_PATH = os.path.join(config_path, "ci_match_config.json") 87 exception_path = {} 88 all_com_path = {} 89 skip_judge_build_path = {} 90 temple_list = [] 91 acts_All_template_ex_list = [] 92 xts_path_list = [] 93 interface_path_list = [] 94 95 INTERFACE_BUNDLE_NAME_PATH = os.path.join(config_path, "ci_api_part_name.json") 96 interface_js_data = {} 97 interface_c_data = {} 98 driver_interface = {} 99 100 WHITE_LIST_PATH = os.path.join(config_path, "ci_target_white_list.json") 101 white_list_repo = {} 102 103 # 不参与编译测试套配置 104 uncompile_suite = {} 105 106 @classmethod 107 def initialization(cls): 108 if cls.exception_path == {}: 109 print("MatchConfig initialization begin...") 110 if not os.path.exists(cls.MACTH_CONFIG_PATH): 111 print("warning: Reading the configuration file is abnormal because {} not exist".format( 112 cls.MACTH_CONFIG_PATH)) 113 with open(cls.MACTH_CONFIG_PATH, 'r') as file: 114 rules_data = json.load(file) 115 cls.exception_path = rules_data['exception_path'] 116 cls.all_com_path = rules_data['all_com_path'] 117 cls.skip_judge_build_path = rules_data['skip_judge_build_path'] 118 cls.temple_list = rules_data['temple_list'] 119 cls.acts_All_template_ex_list = rules_data['acts_All_template_ex'] 120 cls.xts_path_list = rules_data['xts_path_list'] 121 cls.interface_path_list = rules_data['interface_path_list'] 122 print("MatchConfig initialization end.") 123 124 @classmethod 125 def interface_initialization(cls): 126 127 if cls.interface_js_data == {}: 128 print("INTERFACE_BUNDLE_NAME initialization begin...") 129 if not os.path.exists(cls.INTERFACE_BUNDLE_NAME_PATH): 130 print("warning: Reading the configuration file is abnormal because {} not exist".format( 131 cls.INTERFACE_BUNDLE_NAME_PATH)) 132 with open(cls.INTERFACE_BUNDLE_NAME_PATH, 'r') as file: 133 interface_data = json.load(file) 134 cls.interface_js_data = interface_data['sdk-js'] 135 cls.interface_c_data = interface_data['sdk_c'] 136 cls.driver_interface = interface_data['driver_interface'] 137 138 print("INTERFACE_BUNDLE_NAME initialization end.") 139 140 @classmethod 141 def get_interface_json_js_data(cls): 142 if cls.interface_js_data == {}: 143 cls.interface_initialization() 144 return cls.interface_js_data 145 146 @classmethod 147 def get_interface_json_c_data(cls): 148 if cls.interface_c_data == {}: 149 cls.interface_initialization() 150 return cls.interface_c_data 151 152 @classmethod 153 def get_interface_json_driver_interface_data(cls): 154 if cls.driver_interface == {}: 155 cls.interface_initialization() 156 return cls.driver_interface 157 158 @classmethod 159 def get_interface_path_list(cls): 160 if cls.interface_path_list == []: 161 cls.initialization() 162 return cls.interface_path_list 163 164 @classmethod 165 def get_exception_path(cls): 166 if cls.exception_path == {}: 167 cls.initialization() 168 return cls.exception_path 169 170 @classmethod 171 def get_all_com_path(cls): 172 if cls.all_com_path == {}: 173 cls.initialization() 174 return cls.all_com_path 175 176 @classmethod 177 def get_skip_judge_build_path(cls): 178 if cls.skip_judge_build_path == {}: 179 cls.initialization() 180 return cls.skip_judge_build_path 181 182 @classmethod 183 def get_temple_list(cls): 184 if cls.temple_list == []: 185 cls.initialization() 186 return cls.temple_list 187 188 @classmethod 189 def get_acts_All_template_ex_list(cls): 190 if cls.acts_All_template_ex_list == []: 191 cls.initialization() 192 return cls.acts_All_template_ex_list 193 194 @classmethod 195 def get_xts_path_list(cls): 196 if cls.xts_path_list == []: 197 cls.initialization() 198 return cls.xts_path_list 199 200 @classmethod 201 def initialization_white_list(cls): 202 if cls.white_list_repo == {}: 203 print("WhiteList initialization begin...") 204 if not os.path.exists(cls.WHITE_LIST_PATH): 205 print("warning: Reading the configuration file is abnormal because {} not exist".format( 206 cls.WHITE_LIST_PATH)) 207 with open(cls.WHITE_LIST_PATH, 'r') as file: 208 white_file = json.load(file) 209 white_repos = white_file["repo_list"] 210 for white_repo in white_repos: 211 cls.white_list_repo[white_repo["path"]] = white_repo 212 print("WhiteList initialization end.") 213 214 @classmethod 215 def get_white_list_repo(cls): 216 if cls.white_list_repo == {}: 217 cls.initialization_white_list() 218 return cls.white_list_repo 219 220 @classmethod 221 def get_uncompile_suite_list(cls, xts_root_dir, device_type): 222 xts_suite = PathUtils.get_root_target(xts_root_dir) 223 if xts_suite not in cls.uncompile_suite: 224 xts_name = os.path.basename(xts_root_dir) 225 UNCOMPILE_PATH = os.path.join(HOME, "test", "xts", xts_name, "ci_uncompile_suite.json") 226 if not os.path.exists(UNCOMPILE_PATH): 227 print("Get uncompile testsuite failed because {} not exist".format(UNCOMPILE_PATH)) 228 return [] 229 with open(UNCOMPILE_PATH, 'r') as file: 230 cls.uncompile_suite[xts_suite] = json.load(file) 231 if device_type in cls.uncompile_suite[xts_suite]: 232 return cls.uncompile_suite[xts_suite][device_type] 233 elif isinstance(cls.uncompile_suite[xts_suite], dict): 234 return [] 235 else: 236 return cls.uncompile_suite[xts_suite] 237 238class XTSTargetUtils: 239 240 @staticmethod 241 def get_current_Build(xts_root_dir, current_dir): 242 while PathUtils.is_parent_path(xts_root_dir, current_dir): 243 # 当前目录是否包含需跳过的keywords 244 if PathUtils.isMatchRules(current_dir, MatchConfig.get_skip_judge_build_path()): 245 current_dir = os.path.dirname(current_dir) 246 continue 247 # 检查当前目录下是否存在BUILD.gn文件 248 build_gn_path = os.path.join(current_dir, 'BUILD.gn') 249 if os.path.exists(build_gn_path): 250 return build_gn_path 251 # 如果没有找到,向上一层目录移动 252 current_dir = os.path.dirname(current_dir) 253 # xts仓最外层均有BUILD.gn文件 254 return current_dir 255 256 # 路径获取target 257 @staticmethod 258 def getTargetfromPath(xts_root_dir, path) -> list: 259 if path == xts_root_dir: 260 root_target = PathUtils.get_all_build_target(xts_root_dir) 261 return root_target 262 build_file = XTSTargetUtils.get_current_Build(xts_root_dir, path) 263 targets = XTSTargetUtils.getTargetFromBuild(build_file) 264 if targets == None: 265 return XTSTargetUtils.getTargetfromPath(xts_root_dir, os.path.dirname(os.path.dirname(build_file))) 266 return targets 267 268 @staticmethod 269 def getTargetFromBuild(build_File) -> list: 270 pattern = re.compile(r'(\b(?:' + '|'.join( 271 re.escape(word) for word in MatchConfig.get_temple_list()) + r')\b)\s*\(\s*"([^"]*)"\)') 272 with open(build_File, 'r', encoding='utf-8') as file: 273 content = file.read() 274 matches = pattern.findall(content) 275 targets = [match[1] for match in matches] 276 relative_path = os.path.relpath(os.path.dirname(build_File), HOME) 277 if len(targets) > 1: 278 deps = XTSTargetUtils.getDepsinBuild(content) 279 # 编译本gn中未被依赖的目标 280 targets = [item for item in targets if item not in deps] 281 return [f"{relative_path}:{item}" for item in targets] 282 283 @staticmethod 284 def getDepsinBuild(build): 285 # 定义正则表达式模式来匹配deps数组 286 pattern = re.compile(r'deps\s*=\s*\[\s*(?P<deps>.*?)\s*\]', re.DOTALL) 287 # pattern = r'\s*deps\s*=\s*<deps>' 288 # 搜索文本中的匹配项 289 matches = pattern.findall(build) 290 all_deps = [] 291 292 for match in matches: 293 # 分割字符串并去除双引号和空格 294 deps_list = [dep.strip('\n').strip().strip('"').lstrip(':') for dep in match.split(',')] 295 all_deps.extend(deps_list) 296 297 return all_deps 298 299 @staticmethod 300 def getPathsByBundle(bundle, test_home) -> list: 301 matching_files = [] 302 # 遍历根目录及其子目录 303 for root, dirs, files in os.walk(test_home): 304 if PathUtils.isMatchRules(root, MatchConfig.get_exception_path()): 305 continue 306 for file in files: 307 if file == 'BUILD.gn': 308 file_path = os.path.join(root, file) 309 # 读取文件内容 310 with open(file_path, 'r', encoding='utf-8') as f: 311 content = f.read() 312 # 检查是否包含bundle 313 for bundle_ in bundle: 314 part_name = f'part_name = "{bundle_}"' 315 if part_name in content: 316 matching_files.append(root) 317 continue 318 return matching_files 319 320 @staticmethod 321 def del_uncompile_target(xts_root_dir, device_type, targets) -> list: 322 ci_target = set() 323 uncompile_suite_list = MatchConfig.get_uncompile_suite_list(xts_root_dir, device_type) 324 print("Config uncompile testsuite: {}".format(uncompile_suite_list)) 325 for path_target in targets: 326 if path_target not in uncompile_suite_list: 327 ci_target.add(path_target) 328 print("Accurte compile target: {}".format(ci_target)) 329 return list(ci_target) 330 331 332class PathUtils: 333 334 # 路径列表简化 335 @staticmethod 336 def removeSubandDumpPath(path_list: list) -> list: 337 # 排序,确保父目录在子目录之前,减少运算 338 path_list.sort() 339 # 存储最小集 340 minimal_paths_set = set() 341 # 记录已存在的父目录的全部未添加编译的子目录 342 parent_dirs = {} 343 344 for path in path_list: 345 # 检查当前路径或其父路径是否已经在最小集中 346 isinclude = False 347 for m_path in minimal_paths_set: 348 if PathUtils.is_parent_path(m_path, path): 349 isinclude = True 350 break 351 # 添加逻辑 352 if not isinclude: 353 PathUtils.addPathClean(path, minimal_paths_set, parent_dirs) 354 355 return list(minimal_paths_set) 356 357 @staticmethod 358 def addPathClean(path, minimal_paths_set, parent_dirs): 359 # 检查当前路径的首层父目录是否在最小集中 360 parent_path = os.path.dirname(path) 361 if parent_path in parent_dirs: 362 # 在-原list修改 363 subdirs = parent_dirs[parent_path] 364 else: 365 # 不在-记录父目录及本目录 366 subdirs = [os.path.join(parent_path, d) for d in os.listdir(parent_path) if 367 os.path.isdir(os.path.join(parent_path, d))] 368 parent_dirs[parent_path] = subdirs 369 subdirs.remove(path) 370 minimal_paths_set.add(path) 371 # 检查是否替换为添加其直接父目录 372 if len(subdirs) == 0: 373 del parent_dirs[parent_path] 374 # minimal_paths_sets删除parent_path子目录 375 for d in os.listdir(parent_path): 376 p = os.path.join(parent_path, d) 377 if os.path.isdir(p) and p in minimal_paths_set: 378 minimal_paths_set.remove(os.path.join(parent_path, d)) 379 PathUtils.addPathClean(parent_path, minimal_paths_set, parent_dirs) 380 381 @staticmethod 382 def get_current_exist(root_path, path) -> str: 383 current_dir = path 384 while PathUtils.is_parent_path(root_path, current_dir): 385 if os.path.exists(current_dir): 386 return current_dir 387 current_dir = os.path.dirname(current_dir) 388 # 根目录必然存在 389 return root_path 390 391 @staticmethod 392 def is_parent_path(parent_path, child_path): 393 # 获取公共路径 394 common_path = os.path.commonpath([parent_path, child_path]) 395 return common_path == parent_path 396 397 @staticmethod 398 def get_all_build_target(xts_root_dir, full_flag = 0): 399 if xts_root_dir.endswith("acts") and full_flag == 0: 400 return MatchConfig.get_acts_All_template_ex_list() 401 return [PathUtils.get_root_target(xts_root_dir)] 402 403 @staticmethod 404 def get_root_target(xts_root_dir): 405 xts_suite = os.path.basename(xts_root_dir) 406 # relative_path = os.path.relpath(xts_root_dir, HOME) 407 target = f"xts_{xts_suite}" 408 return target 409 410 @staticmethod 411 def isMatchRules(file, rules): 412 string_rules = rules["string_rules"] 413 re_rules = rules["re_rules"] 414 for rule in string_rules: 415 if rule in file: 416 return True 417 for rule in re_rules: 418 if re.compile(rule).search(file): 419 return True 420 return False 421 422 @staticmethod 423 def isTargetContains(targetFiles, file) -> bool: 424 for f in targetFiles: 425 if PathUtils.is_parent_path(f, file): 426 return True 427 return False 428