1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import os 19import re 20from collections import defaultdict 21 22from util.io_util import IoUtil 23from exceptions.ohos_exception import OHOSException 24from resources.config import Config 25from containers.status import throw_exception 26 27from helper.noInstance import NoInstance 28 29 30class ProductUtil(metaclass=NoInstance): 31 32 @staticmethod 33 def get_products(): 34 config = Config() 35 # ext products configuration 36 _ext_scan_path = os.path.join(config.root_path, 37 'out/products_ext/vendor') 38 if os.path.exists(_ext_scan_path): 39 for company in os.listdir(_ext_scan_path): 40 company_path = os.path.join(_ext_scan_path, company) 41 if not os.path.isdir(company_path): 42 continue 43 44 for product in os.listdir(company_path): 45 p_config_path = os.path.join(company_path, product) 46 config_path = os.path.join(p_config_path, 'config.json') 47 48 if os.path.isfile(config_path): 49 info = IoUtil.read_json_file(config_path) 50 product_name = info.get('product_name') 51 if info.get('product_path'): 52 product_path = os.path.join( 53 config.root_path, info.get('product_path')) 54 else: 55 product_path = p_config_path 56 if product_name is not None: 57 subsystem_config_overlay_path = os.path.join(product_path, 58 'subsystem_config_overlay.json') 59 if os.path.isfile(subsystem_config_overlay_path): 60 yield { 61 'company': company, 62 "name": product_name, 63 'product_config_path': p_config_path, 64 'product_path': product_path, 65 'version': info.get('version', '3.0'), 66 'os_level': info.get('type', "mini"), 67 'build_out_path': info.get('build_out_path'), 68 'subsystem_config_json': 69 info.get('subsystem_config_json'), 70 'subsystem_config_overlay_json': 71 subsystem_config_overlay_path, 72 'config': config_path, 73 'component_type': info.get('component_type', '') 74 } 75 else: 76 yield { 77 'company': company, 78 "name": product_name, 79 'product_config_path': p_config_path, 80 'product_path': product_path, 81 'version': info.get('version', '3.0'), 82 'os_level': info.get('type', "mini"), 83 'build_out_path': info.get('build_out_path'), 84 'subsystem_config_json': 85 info.get('subsystem_config_json'), 86 'config': config_path, 87 'component_type': info.get('component_type', '') 88 } 89 if config.vendor_path != '': 90 for company in os.listdir(config.vendor_path): 91 company_path = os.path.join(config.vendor_path, company) 92 if not os.path.isdir(company_path): 93 continue 94 95 for product in os.listdir(company_path): 96 product_path = os.path.join(company_path, product) 97 config_path = os.path.join(product_path, 'config.json') 98 99 if os.path.isfile(config_path): 100 info = IoUtil.read_json_file(config_path) 101 product_name = info.get('product_name') 102 if product_name is not None: 103 yield { 104 'company': company, 105 "name": product_name, 106 'product_config_path': product_path, 107 'product_path': product_path, 108 'version': info.get('version', '3.0'), 109 'os_level': info.get('type', "mini"), 110 'config': config_path, 111 'component_type': info.get('component_type', '') 112 } 113 bip_path = config.built_in_product_path 114 for item in os.listdir(bip_path): 115 if item[0] in ".": 116 continue 117 else: 118 product_name = item[0:-len('.json') 119 ] if item.endswith('.json') else item 120 config_path = os.path.join(bip_path, item) 121 info = IoUtil.read_json_file(config_path) 122 yield { 123 'company': 'built-in', 124 "name": product_name, 125 'product_config_path': bip_path, 126 'product_path': bip_path, 127 'version': info.get('version', '2.0'), 128 'os_level': info.get('type', 'standard'), 129 'config': config_path, 130 'component_type': info.get('component_type', '') 131 } 132 133 bipl_path = config.built_in_product_path_for_llvm 134 if os.path.isdir(bipl_path): 135 for item in os.listdir(bipl_path): 136 if item[0] in ".": 137 continue 138 else: 139 product_name = item[0:-len('.json') 140 ] if item.endswith('.json') else item 141 config_path = os.path.join(bipl_path, item) 142 info = IoUtil.read_json_file(config_path) 143 yield { 144 'company': 'built-in', 145 "name": product_name, 146 'product_config_path': bipl_path, 147 'product_path': bipl_path, 148 'version': info.get('version', '2.0'), 149 'os_level': info.get('type', 'standard'), 150 'config': config_path, 151 'component_type': info.get('component_type', '') 152 } 153 154 @staticmethod 155 @throw_exception 156 def get_device_info(product_json: str): 157 info = IoUtil.read_json_file(product_json) 158 config = Config() 159 version = info.get('version', '3.0') 160 161 if version == '3.0': 162 device_company = info.get('device_company') 163 board = info.get('board') 164 _board_path = info.get('board_path') 165 if _board_path and os.path.exists( 166 os.path.join(config.root_path, _board_path)): 167 board_path = os.path.join(config.root_path, _board_path) 168 else: 169 board_path = os.path.join(config.root_path, 'device', 170 device_company, board) 171 # board and soc decoupling feature will add boards 172 # directory path here. 173 if not os.path.exists(board_path): 174 board_path = os.path.join(config.root_path, 'device', 175 'board', device_company, board) 176 board_config_path = None 177 if info.get('board_config_path'): 178 board_config_path = os.path.join(config.root_path, 179 info.get('board_config_path')) 180 181 return { 182 'board': info.get('board'), 183 'kernel': info.get('kernel_type'), 184 'kernel_version': info.get('kernel_version'), 185 'company': info.get('device_company'), 186 'board_path': board_path, 187 'board_config_path': board_config_path, 188 'target_cpu': info.get('target_cpu'), 189 'target_os': info.get('target_os'), 190 'support_cpu': info.get('support_cpu'), 191 } 192 else: 193 raise OHOSException(f'wrong version number in {product_json}') 194 195 @staticmethod 196 @throw_exception 197 def get_all_components(product_json: str): 198 if not os.path.isfile(product_json): 199 raise OHOSException(f'features {product_json} not found') 200 201 config = Config() 202 # Get all inherit files 203 files = [os.path.join(config.root_path, file) for file in IoUtil.read_json_file( 204 product_json).get('inherit', [])] 205 # Add the product config file to last with highest priority 206 files.append(product_json) 207 208 # Read all parts in order 209 all_parts = {} 210 for _file in files: 211 if not os.path.isfile(_file): 212 continue 213 _info = IoUtil.read_json_file(_file) 214 parts = _info.get('parts') 215 if parts: 216 all_parts.update(parts) 217 else: 218 # v3 config files 219 all_parts.update(ProductUtil.get_vendor_parts_list(_info)) 220 221 return all_parts 222 223 @staticmethod 224 @throw_exception 225 def get_features(product_json: str): 226 if not os.path.isfile(product_json): 227 raise OHOSException(f'features {product_json} not found') 228 229 config = Config() 230 # Get all inherit files 231 files = [os.path.join(config.root_path, file) for file in IoUtil.read_json_file( 232 product_json).get('inherit', [])] 233 # Add the product config file to last with highest priority 234 files.append(product_json) 235 236 # Read all parts in order 237 all_parts = {} 238 for _file in files: 239 if not os.path.isfile(_file): 240 continue 241 _info = IoUtil.read_json_file(_file) 242 parts = _info.get('parts') 243 if parts: 244 all_parts.update(parts) 245 else: 246 # v3 config files 247 all_parts.update(ProductUtil.get_vendor_parts_list(_info)) 248 249 # Get all features 250 features_list = [] 251 for part, value in all_parts.items(): 252 if "features" not in value: 253 continue 254 for key, val in value["features"].items(): 255 _item = '' 256 if isinstance(val, bool): 257 _item = f'{key}={str(val).lower()}' 258 elif isinstance(val, int): 259 _item = f'{key}={val}' 260 elif isinstance(val, str): 261 _item = f'{key}="{val}"' 262 else: 263 raise Exception( 264 "part feature '{key}:{val}' type not support.") 265 features_list.append(_item) 266 return features_list 267 268 @staticmethod 269 @throw_exception 270 def get_features_dict(product_json: str): 271 all_parts = ProductUtil.get_all_components(product_json) 272 features_dict = {} 273 for part, value in all_parts.items(): 274 if "features" not in value: 275 continue 276 for key, val in value["features"].items(): 277 if type(val) in [bool, int, str]: 278 features_dict[key] = val 279 else: 280 raise Exception( 281 "part feature '{key}:{val}' type not support.") 282 return features_dict 283 284 @staticmethod 285 @throw_exception 286 def get_components(product_json: str, subsystems: str): 287 if not os.path.isfile(product_json): 288 raise OHOSException(f'{product_json} not found') 289 290 components_dict = defaultdict(list) 291 product_data = IoUtil.read_json_file(product_json) 292 for subsystem in product_data.get('subsystems', []): 293 sname = subsystem.get('subsystem', '') 294 if not len(subsystems) or sname in subsystems: 295 components_dict[sname] += [ 296 comp['component'] 297 for comp in subsystem.get('components', []) 298 ] 299 300 return components_dict, product_data.get('board', ''),\ 301 product_data.get('kernel_type', '') 302 303 @staticmethod 304 @throw_exception 305 def get_product_info(product_name: str, company=None): 306 for product_info in ProductUtil.get_products(): 307 cur_company = product_info['company'] 308 cur_product = product_info['name'] 309 if company: 310 if cur_company == company and cur_product == product_name: 311 return product_info 312 else: 313 if cur_product == product_name: 314 return product_info 315 316 raise OHOSException(f'product {product_name}@{company} not found') 317 318 @staticmethod 319 @throw_exception 320 def get_compiler(config_path: str): 321 config = os.path.join(config_path, 'config.gni') 322 if not os.path.isfile(config): 323 return '' 324 compiler_pattern = r'board_toolchain_type ?= ?"(\w+)"' 325 with open(config, 'rt', encoding='utf-8') as config_file: 326 data = config_file.read() 327 compiler_list = re.findall(compiler_pattern, data) 328 if not len(compiler_list): 329 raise OHOSException(f'board_toolchain_type is None in {config}') 330 331 return compiler_list[0] 332 333 @staticmethod 334 def get_vendor_parts_list(config: dict): 335 return _transform(config).get('parts') 336 337 @staticmethod 338 def has_component(product_name: str) -> bool: 339 pass 340 341 342def _transform(config: dict): 343 subsystems = config.get('subsystems') 344 if subsystems: 345 config.pop('subsystems') 346 parts = _from_ss_to_parts(subsystems) 347 config['parts'] = parts 348 return config 349 350 351def _from_ss_to_parts(subsystems: dict): 352 parts = dict() 353 for subsystem in subsystems: 354 ss_name = subsystem.get('subsystem') 355 components = subsystem.get('components') 356 if components: 357 for com in components: 358 com_name = com.get('component') 359 features = com.get('features') 360 syscap = com.get('syscap') 361 exclusions = com.get('exclusions') 362 if features: 363 pairs = get_features(features) 364 parts['{}:{}'.format(ss_name, com_name)] = pairs 365 else: 366 parts['{}:{}'.format(ss_name, com_name)] = dict() 367 if syscap: 368 pairs = get_syscap(syscap) 369 parts.get('{}:{}'.format(ss_name, com_name)).update(pairs) 370 if exclusions: 371 pairs = get_exclusion_modules(exclusions) 372 parts.get('{}:{}'.format(ss_name, com_name)).update(pairs) 373 # Copy other key-values 374 for key, val in com.items(): 375 if key in ['component', 'features', 'syscap', 'exclusions']: 376 continue 377 parts.get('{}:{}'.format(ss_name, com_name)).update(key=val) 378 return parts 379 380 381def get_features(features: dict): 382 feats = {} 383 for feat in features: 384 if not feat: 385 continue 386 match = feat.index("=") 387 if match <= 0: 388 print("Warning: invalid feature [{}]".format(feat)) 389 continue 390 key = feat[:match].strip() 391 val = feat[match + 1:].strip().strip('"') 392 if val == 'true': 393 feats[key] = True 394 elif val == 'false': 395 feats[key] = False 396 elif re.match(r'[0-9]+', val): 397 feats[key] = int(val) 398 else: 399 feats[key] = val.replace('\"', '"') 400 401 pairs = dict() 402 pairs['features'] = feats 403 return pairs 404 405 406def get_syscap(syscap: dict): 407 feats = {} 408 for feat in syscap: 409 if not feat: 410 continue 411 if '=' not in feat: 412 raise Exception("Error: invalid syscap [{}]".format(feat)) 413 match = feat.index("=") 414 key = feat[:match].strip() 415 val = feat[match + 1:].strip().strip('"') 416 if val == 'true': 417 feats[key] = True 418 elif val == 'false': 419 feats[key] = False 420 elif re.match(r'[0-9]+', val): 421 feats[key] = int(val) 422 else: 423 feats[key] = val.replace('\"', '"') 424 425 pairs = dict() 426 pairs['syscap'] = feats 427 return pairs 428 429 430def get_exclusion_modules(exclusions: str): 431 pairs = dict() 432 pairs['exclusions'] = exclusions 433 return pairs 434