1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import copy 16import re 17import os 18from common_utils import load_config 19from part_prebuilts_config import get_parts_tag_config 20 21 22class ConfigParser: 23 def __init__(self, config_file: str, global_args): 24 self.data = load_config(config_file) 25 self.current_cpu = global_args.host_cpu 26 self.current_os = global_args.host_platform 27 self.input_tag = "all" 28 self.input_type = global_args.type 29 self.global_config = { 30 "code_dir": global_args.code_dir, 31 "download_root": self.data["download_root"] 32 } 33 VarParser.parse_vars(self.global_config, []) 34 download_root = self.global_config["download_root"] 35 self.global_config["download_root"] = os.path.abspath(os.path.expanduser(download_root)) 36 37 def get_operate(self, part_names=None) -> tuple: 38 download_op = [] 39 other_op = [] 40 tool_list = self.data["tool_list"] 41 parts_configured_tags = get_parts_tag_config(part_names) if part_names else None 42 if parts_configured_tags: 43 self.input_tag = parts_configured_tags 44 for tool in tool_list: 45 _download, _other = self._get_tool_operate(tool) 46 download_op.extend(_download) 47 other_op.extend(_other) 48 return download_op, other_op 49 50 def _get_tool_operate(self, tool) -> tuple: 51 tool_matched, unified_tool_basic_config = self._is_tool_matched(tool) 52 if not tool_matched: 53 return [], [] 54 55 matched_platform_configs = Filter.filter_platform(self.current_os, self.current_cpu, tool.get("config")) 56 for config in matched_platform_configs: 57 VarParser.parse_vars(config, [unified_tool_basic_config, self.global_config]) 58 unified_platform_configs = [] 59 for conf in matched_platform_configs: 60 unified_platform_configs.append(self._unify_config(self.global_config, unified_tool_basic_config, conf)) 61 unified_platform_configs = Filter(unified_platform_configs).apply_filters(self.input_tag, self.input_type) 62 63 handle = tool.get("handle", []) 64 65 if unified_platform_configs: 66 # 有平台配置则只使用平台配置 67 download_operate, other_operate = self._generate_tool_operate(unified_platform_configs, handle) 68 else: 69 # 没有平台配置则使用工具配置 70 download_operate, other_operate = self._generate_tool_operate([unified_tool_basic_config], handle) 71 72 # 删除存在未知变量的配置 73 return VarParser.remove_undefined(download_operate), VarParser.remove_undefined(other_operate) 74 75 def _is_tool_matched(self, tool): 76 tool_basic_config = {key: tool[key] for key in tool if key not in {"config", "handle"}} 77 VarParser.parse_vars(tool_basic_config, [self.global_config]) 78 unified_tool_basic_config = self._unify_config(self.global_config, tool_basic_config) 79 if not Filter([unified_tool_basic_config]).apply_filters(self.input_tag, self.input_type): 80 return False, [] 81 else: 82 return True, unified_tool_basic_config 83 84 def _generate_tool_operate(self, outer_configs: list, handles: list) -> tuple: 85 if not outer_configs: 86 return [], [] 87 88 download_operate = [] 89 other_operate = [] 90 91 # 根据配置,自动生成下载操作 92 for config in outer_configs: 93 if config.get("remote_url"): 94 download_config = self._generate_download_config(config) 95 download_operate.append(download_config) 96 97 # 如果没有其他操作,则返回 98 if not handles: 99 return download_operate, [] 100 101 operates = self._generate_handles(outer_configs, handles) 102 # 区分下载操作和其他操作 103 other_operate = [] 104 for operate in operates: 105 if operate["type"] == "download": 106 download_operate.append(operate) 107 else: 108 other_operate.append(operate) 109 110 return download_operate, other_operate 111 112 def _generate_handles(self, outer_configs: list, handles: list): 113 """ 114 为每个配置生成对应的操作列表 115 :param configs: 配置列表 116 :param handles: 操作列表 117 """ 118 operate_list = [] 119 for config in outer_configs: 120 special_handle = config.get("handle_index") 121 count = 0 122 for index, handle in enumerate(handles): 123 if special_handle and index not in special_handle: 124 continue 125 step_id = "_".join([config.get("name"), os.path.basename(config.get("remote_url", "")), str(count)]) 126 count += 1 127 # 不能改变原来的handle 128 new_handle = copy.deepcopy(handle) 129 # 解析handle中的变量 130 VarParser.parse_vars(new_handle, [config]) 131 # 生成操作id 132 new_handle["tool_name"] = config.get("name") 133 new_handle["step_id"] = step_id 134 operate_list.append(new_handle) 135 return operate_list 136 137 def _generate_download_config(self, config): 138 try: 139 return { 140 "remote_url": config["remote_url"], 141 "unzip_dir": config["unzip_dir"], 142 "unzip_filename": config["unzip_filename"], 143 "download_dir": config.get("download_dir", config["download_root"]), 144 "operate_type": "download", 145 "name": config.get("name"), 146 } 147 except KeyError as e: 148 print(f"error config: {config}") 149 raise e 150 151 def _unify_config(self, *additional_configs) -> dict: 152 unified_config = dict() 153 for config in additional_configs: 154 unified_config.update(config) 155 return unified_config 156 157 158class Filter: 159 def __init__(self, configs): 160 if configs is None: 161 self.input_configs = [] 162 return 163 self.input_configs = copy.deepcopy(configs) 164 165 @classmethod 166 def filter_platform(cls, current_os: str, current_cpu: str, config: dict) -> list: 167 """获取匹配当前操作系统的配置""" 168 if not config: 169 return [] 170 171 filtered = [] 172 173 for os_key, os_config in config.items(): 174 # 逗号分割操作系统名 175 configured_os_list = [o.strip() for o in os_key.split(",")] 176 if current_os in configured_os_list: 177 # 不配cpu场景 178 if isinstance(os_config, list): 179 filtered.extend(os_config) 180 continue 181 # 不配cpu, 仅有一个配置项场景 182 if isinstance(os_config, dict) and "remote_url" in os_config: 183 filtered.extend(os_config) 184 continue 185 # 配cpu场景 186 filtered.extend(cls.filter_cpu(current_cpu, os_config)) 187 return filtered 188 189 @classmethod 190 def filter_cpu(cls, current_cpu: str, os_config: dict) -> list: 191 filtered = [] 192 for cpu_str in os_config: 193 configured_cpu_list = [c.strip() for c in cpu_str.split(",")] 194 if current_cpu in configured_cpu_list: 195 cpu_config = os_config[cpu_str] 196 # cpu配置内部可以是一个配置,也可以是一个配置列表 197 if not isinstance(cpu_config, list): 198 cpu_config = [cpu_config] 199 filtered.extend(cpu_config) 200 return filtered 201 202 def apply_filters(self, input_tag: str, input_type: str): 203 return self.filter_tag(input_tag).filter_type(input_type).result() 204 205 def filter_tag(self, input_tag: str) -> 'Filter': 206 """过滤tag字段""" 207 filtered = [] 208 for config in self.input_configs: 209 tool_tag = config["tag"] 210 if input_tag == "all" or tool_tag in input_tag: 211 filtered.append(config) 212 self.input_configs = filtered 213 return self 214 215 def filter_type(self, input_type: str) -> 'Filter': 216 """过滤type字段""" 217 filtered = [] 218 for config in self.input_configs: 219 _type = config.get("type") 220 if not _type: 221 filtered.append(config) 222 continue 223 # 配置的type,转set 224 if isinstance(_type, str): 225 configured_types = set([t.strip() for t in _type.split(",")]) 226 else: 227 configured_types = set(_type) 228 # 输入的type,转set 229 input_types = set([t.strip() for t in input_type.split(",")]) 230 231 # 检查二者是否有交集,有则添加 232 if not input_types.isdisjoint(configured_types): 233 filtered.append(config) 234 self.input_configs = filtered 235 return self 236 237 def result(self): 238 return self.input_configs 239 240 241class VarParser: 242 var_pattern: re.Pattern = re.compile(r'\$\{.*?\}') # 正则表达式 243 244 @classmethod 245 def remove_undefined(cls, configs: list) -> list: 246 useful_config = [] 247 for config in configs: 248 if not cls.has_undefined_var(config): 249 useful_config.append(config) 250 return useful_config 251 252 @classmethod 253 def has_undefined_var(cls, data): 254 try: 255 if isinstance(data, str): 256 return bool(cls.var_pattern.findall(data)) 257 elif isinstance(data, list): 258 return any(cls.has_undefined_var(item) for item in data) 259 elif isinstance(data, dict): 260 return any(cls.has_undefined_var(value) for value in data.values()) 261 else: 262 return False 263 except AttributeError: 264 print("var_pattern不是有效的正则表达式对象") 265 return False 266 267 @classmethod 268 def parse_vars(cls, data: dict, dictionarys: list): 269 """ 270 解析config中的变量, 先自解析, 再按顺序查字典 271 :param config: 需要进行变量解析的配置 272 :param dictionarys: 字典列表 273 """ 274 cls.replace_vars_in_data(data, data) 275 for dic in dictionarys: 276 cls.replace_vars_in_data(data, dic) 277 278 @classmethod 279 def replace_vars_in_data(cls, data: any, dictionary: dict) -> any: 280 """用dictionary字典中的值替换data中的变量,data可以为列表、字典、字符串等类型, 变量使用${var_name}形式""" 281 if isinstance(data, str): 282 return cls.replace_vars_in_string(data, dictionary) 283 elif isinstance(data, dict): 284 for k in list(data.keys()): 285 original_value = data[k] 286 new_value = cls.replace_vars_in_data(original_value, dictionary) 287 if new_value is not original_value: # 仅当original_value为字符串时成立 288 data[k] = new_value 289 elif isinstance(data, list): 290 for i in range(len(data)): 291 original_value = data[i] 292 new_value = cls.replace_vars_in_data(original_value, dictionary) 293 if new_value is not original_value: 294 data[i] = new_value 295 else: 296 return data 297 return data 298 299 @classmethod 300 def replace_vars_in_string(cls, s: str, dictionary: dict) -> str: 301 302 """用dictionary字典中的值替换字符串s中的变量, 变量使用${var_name}形式""" 303 304 replaced_var_names = set() # 避免循环依赖 305 306 while True: 307 try: 308 replaced = cls.var_pattern.sub( 309 lambda matched_var: cls._replace_var_with_dict_value(matched_var, dictionary, replaced_var_names), 310 s) 311 if replaced == s: 312 break 313 s = replaced 314 except ValueError as e: 315 print(f"replace var in string {s} failed") 316 raise e 317 return s 318 319 @classmethod 320 def _replace_var_with_dict_value(cls, matched_var, dictionary, replaced_var_names): 321 var_name = matched_var.group()[2:-1] 322 if var_name in replaced_var_names: 323 raise ValueError(f"Variable \"{var_name}\" is being replaced again.") 324 if dictionary.get(var_name): 325 replaced_var_names.add(var_name) 326 return dictionary.get(var_name, matched_var.group()) # 找得到就替换,找不到就保留原始值