• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import copy
16import re
17import os
18from common_utils import load_config
19from part_prebuilts_config import get_parts_tag_config
20
21
22class ConfigParser:
23    def __init__(self, config_file: str, global_args):
24        self.data = load_config(config_file)
25        self.current_cpu = global_args.host_cpu
26        self.current_os = global_args.host_platform
27        self.input_tag = "all"
28        self.input_type = global_args.type
29        self.global_config = {
30            "code_dir": global_args.code_dir,
31            "download_root": self.data["download_root"]
32        }
33        VarParser.parse_vars(self.global_config, [])
34        download_root = self.global_config["download_root"]
35        self.global_config["download_root"] = os.path.abspath(os.path.expanduser(download_root))
36
37    def get_operate(self, part_names=None) -> tuple:
38        download_op = []
39        other_op = []
40        tool_list = self.data["tool_list"]
41        parts_configured_tags = get_parts_tag_config(part_names) if part_names else None
42        if parts_configured_tags:
43            self.input_tag = parts_configured_tags
44        for tool in tool_list:
45            _download, _other = self._get_tool_operate(tool)
46            download_op.extend(_download)
47            other_op.extend(_other)
48        return download_op, other_op
49
50    def _get_tool_operate(self, tool) -> tuple:
51        tool_matched, unified_tool_basic_config = self._is_tool_matched(tool)
52        if not tool_matched:
53            return [], []
54
55        matched_platform_configs = Filter.filter_platform(self.current_os, self.current_cpu, tool.get("config"))
56        for config in matched_platform_configs:
57            VarParser.parse_vars(config, [unified_tool_basic_config, self.global_config])
58        unified_platform_configs = []
59        for conf in matched_platform_configs:
60            unified_platform_configs.append(self._unify_config(self.global_config, unified_tool_basic_config, conf))
61        unified_platform_configs = Filter(unified_platform_configs).apply_filters(self.input_tag, self.input_type)
62
63        handle = tool.get("handle", [])
64
65        if unified_platform_configs:
66            # 有平台配置则只使用平台配置
67            download_operate, other_operate = self._generate_tool_operate(unified_platform_configs, handle)
68        else:
69            # 没有平台配置则使用工具配置
70            download_operate, other_operate = self._generate_tool_operate([unified_tool_basic_config], handle)
71
72        # 删除存在未知变量的配置
73        return VarParser.remove_undefined(download_operate), VarParser.remove_undefined(other_operate)
74
75    def _is_tool_matched(self, tool):
76        tool_basic_config = {key: tool[key] for key in tool if key not in {"config", "handle"}}
77        VarParser.parse_vars(tool_basic_config, [self.global_config])
78        unified_tool_basic_config = self._unify_config(self.global_config, tool_basic_config)
79        if not Filter([unified_tool_basic_config]).apply_filters(self.input_tag, self.input_type):
80            return False, []
81        else:
82            return True, unified_tool_basic_config
83
84    def _generate_tool_operate(self, outer_configs: list, handles: list) -> tuple:
85        if not outer_configs:
86            return [], []
87
88        download_operate = []
89        other_operate = []
90
91        # 根据配置,自动生成下载操作
92        for config in outer_configs:
93            if config.get("remote_url"):
94                download_config = self._generate_download_config(config)
95                download_operate.append(download_config)
96
97        # 如果没有其他操作,则返回
98        if not handles:
99            return download_operate, []
100
101        operates = self._generate_handles(outer_configs, handles)
102        # 区分下载操作和其他操作
103        other_operate = []
104        for operate in operates:
105            if operate["type"] == "download":
106                download_operate.append(operate)
107            else:
108                other_operate.append(operate)
109
110        return download_operate, other_operate
111
112    def _generate_handles(self, outer_configs: list, handles: list):
113        """
114        为每个配置生成对应的操作列表
115        :param configs: 配置列表
116        :param handles: 操作列表
117        """
118        operate_list = []
119        for config in outer_configs:
120            special_handle = config.get("handle_index")
121            count = 0
122            for index, handle in enumerate(handles):
123                if special_handle and index not in special_handle:
124                    continue
125                step_id = "_".join([config.get("name"), os.path.basename(config.get("remote_url", "")), str(count)])
126                count += 1
127                # 不能改变原来的handle
128                new_handle = copy.deepcopy(handle)
129                # 解析handle中的变量
130                VarParser.parse_vars(new_handle, [config])
131                # 生成操作id
132                new_handle["tool_name"] = config.get("name")
133                new_handle["step_id"] = step_id
134                operate_list.append(new_handle)
135        return operate_list
136
137    def _generate_download_config(self, config):
138        try:
139            return {
140                "remote_url": config["remote_url"],
141                "unzip_dir": config["unzip_dir"],
142                "unzip_filename": config["unzip_filename"],
143                "download_dir": config.get("download_dir", config["download_root"]),
144                "operate_type": "download",
145                "name": config.get("name"),
146            }
147        except KeyError as e:
148            print(f"error config: {config}")
149            raise e
150
151    def _unify_config(self, *additional_configs) -> dict:
152        unified_config = dict()
153        for config in additional_configs:
154            unified_config.update(config)
155        return unified_config
156
157
158class Filter:
159    def __init__(self, configs):
160        if configs is None:
161            self.input_configs = []
162            return
163        self.input_configs = copy.deepcopy(configs)
164
165    @classmethod
166    def filter_platform(cls, current_os: str, current_cpu: str, config: dict) -> list:
167        """获取匹配当前操作系统的配置"""
168        if not config:
169            return []
170
171        filtered = []
172
173        for os_key, os_config in config.items():
174            # 逗号分割操作系统名
175            configured_os_list = [o.strip() for o in os_key.split(",")]
176            if current_os in configured_os_list:
177                # 不配cpu场景
178                if isinstance(os_config, list):
179                    filtered.extend(os_config)
180                    continue
181                # 不配cpu, 仅有一个配置项场景
182                if isinstance(os_config, dict) and "remote_url" in os_config:
183                    filtered.extend(os_config)
184                    continue
185                # 配cpu场景
186                filtered.extend(cls.filter_cpu(current_cpu, os_config))
187        return filtered
188
189    @classmethod
190    def filter_cpu(cls, current_cpu: str, os_config: dict) -> list:
191        filtered = []
192        for cpu_str in os_config:
193            configured_cpu_list = [c.strip() for c in cpu_str.split(",")]
194            if current_cpu in configured_cpu_list:
195                cpu_config = os_config[cpu_str]
196                # cpu配置内部可以是一个配置,也可以是一个配置列表
197                if not isinstance(cpu_config, list):
198                    cpu_config = [cpu_config]
199                filtered.extend(cpu_config)
200        return filtered
201
202    def apply_filters(self, input_tag: str, input_type: str):
203        return self.filter_tag(input_tag).filter_type(input_type).result()
204
205    def filter_tag(self, input_tag: str) -> 'Filter':
206        """过滤tag字段"""
207        filtered = []
208        for config in self.input_configs:
209            tool_tag = config["tag"]
210            if input_tag == "all" or tool_tag in input_tag:
211                filtered.append(config)
212        self.input_configs = filtered
213        return self
214
215    def filter_type(self, input_type: str) -> 'Filter':
216        """过滤type字段"""
217        filtered = []
218        for config in self.input_configs:
219            _type = config.get("type")
220            if not _type:
221                filtered.append(config)
222                continue
223            # 配置的type,转set
224            if isinstance(_type, str):
225                configured_types = set([t.strip() for t in _type.split(",")])
226            else:
227                configured_types = set(_type)
228            # 输入的type,转set
229            input_types = set([t.strip() for t in input_type.split(",")])
230
231            # 检查二者是否有交集,有则添加
232            if not input_types.isdisjoint(configured_types):
233                filtered.append(config)
234        self.input_configs = filtered
235        return self
236
237    def result(self):
238        return self.input_configs
239
240
241class VarParser:
242    var_pattern: re.Pattern = re.compile(r'\$\{.*?\}')  # 正则表达式
243
244    @classmethod
245    def remove_undefined(cls, configs: list) -> list:
246        useful_config = []
247        for config in configs:
248            if not cls.has_undefined_var(config):
249                useful_config.append(config)
250        return useful_config
251
252    @classmethod
253    def has_undefined_var(cls, data):
254        try:
255            if isinstance(data, str):
256                return bool(cls.var_pattern.findall(data))
257            elif isinstance(data, list):
258                return any(cls.has_undefined_var(item) for item in data)
259            elif isinstance(data, dict):
260                return any(cls.has_undefined_var(value) for value in data.values())
261            else:
262                return False
263        except AttributeError:
264            print("var_pattern不是有效的正则表达式对象")
265            return False
266
267    @classmethod
268    def parse_vars(cls, data: dict, dictionarys: list):
269        """
270        解析config中的变量, 先自解析, 再按顺序查字典
271        :param config: 需要进行变量解析的配置
272        :param dictionarys: 字典列表
273        """
274        cls.replace_vars_in_data(data, data)
275        for dic in dictionarys:
276            cls.replace_vars_in_data(data, dic)
277
278    @classmethod
279    def replace_vars_in_data(cls, data: any, dictionary: dict) -> any:
280        """用dictionary字典中的值替换data中的变量,data可以为列表、字典、字符串等类型, 变量使用${var_name}形式"""
281        if isinstance(data, str):
282            return cls.replace_vars_in_string(data, dictionary)
283        elif isinstance(data, dict):
284            for k in list(data.keys()):
285                original_value = data[k]
286                new_value = cls.replace_vars_in_data(original_value, dictionary)
287                if new_value is not original_value:  # 仅当original_value为字符串时成立
288                    data[k] = new_value
289        elif isinstance(data, list):
290            for i in range(len(data)):
291                original_value = data[i]
292                new_value = cls.replace_vars_in_data(original_value, dictionary)
293                if new_value is not original_value:
294                    data[i] = new_value
295        else:
296            return data
297        return data
298
299    @classmethod
300    def replace_vars_in_string(cls, s: str, dictionary: dict) -> str:
301
302        """用dictionary字典中的值替换字符串s中的变量, 变量使用${var_name}形式"""
303
304        replaced_var_names = set()  # 避免循环依赖
305
306        while True:
307            try:
308                replaced = cls.var_pattern.sub(
309                    lambda matched_var: cls._replace_var_with_dict_value(matched_var, dictionary, replaced_var_names),
310                    s)
311                if replaced == s:
312                    break
313                s = replaced
314            except ValueError as e:
315                print(f"replace var in string {s} failed")
316                raise e
317        return s
318
319    @classmethod
320    def _replace_var_with_dict_value(cls, matched_var, dictionary, replaced_var_names):
321        var_name = matched_var.group()[2:-1]
322        if var_name in replaced_var_names:
323            raise ValueError(f"Variable \"{var_name}\" is being replaced again.")
324        if dictionary.get(var_name):
325            replaced_var_names.add(var_name)
326        return dictionary.get(var_name, matched_var.group())  # 找得到就替换,找不到就保留原始值