1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# This file is for rom analyzation of lite/small devices. 17 18import sys 19import argparse 20import json 21import logging 22import os 23from typing import * 24import copy 25import preprocess 26from time import time 27from concurrent.futures import ThreadPoolExecutor, Future 28from threading import RLock 29import collections 30 31from config import result_dict, collector_config, configs, \ 32 project_path, sub_com_dict, product_name, recollect_gn, baseline, unit_adapt, output_file 33from pkgs.basic_tool import BasicTool, unit_adaptive 34from pkgs.gn_common_tool import GnCommonTool 35from pkgs.simple_excel_writer import SimpleExcelWriter 36from pkgs.rom_ram_baseline_collector import RomRamBaselineCollector 37from misc import gn_lineno_collect 38 39 40""" 411. 先收集BUILD.gn中的target信息 422. 然后根据编译产物到1中进行搜索,匹配其所属的部件 43 44对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段 45对于gn原生的template,主要根据bundle.json中的字段来归属其部件 46 47对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤 48""" 49 50 51class RomAnalysisTool: 52 @classmethod 53 def collect_gn_info(cls): 54 logging.info("start scanning BUILD.gn") 55 with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool: 56 future_list: List[Future] = list() 57 for c in collector_config: 58 future_list.append(pool.submit(c)) 59 for f in future_list: 60 f.result() 61 gn_info_file = configs["gn_info_file"] 62 with open(gn_info_file, 'w', encoding='utf-8') as f: 63 json.dump(result_dict, f, indent=4) 64 65 @classmethod 66 def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None: 67 """ 68 :top_dir 顶层目录,不会变化 69 :rela_path 最顶层的值为空 70 :sub_path 一般是a/b/c这种形式 71 :dir_list 相对于原始top目录的子目录的全路径 72 example: 73 / 74 |-a 75 |-b 76 |-c 77 |-|-d 78 |-|-e 79 |-|-f 80 |-|-|-g 81 |-|-|-h 82 top_dir: / 83 rela_path: "" 84 sub_path: c/e 85 dir_list: [c] 86 => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list 87 """ 88 if (not sub_path) or (os.sep not in sub_path): 89 return 90 # 将其他目录添加到dir_list 91 t, sub_sub_path = sub_path.split(os.sep, 1) # 如果是c/e,分割成c,e 92 t = os.path.join(rela_path, t) 93 if t in dir_list: 94 dir_list.remove(t) 95 sub_sub_dir_list = os.listdir(os.path.join(top_dir, t)) 96 for ssdl in sub_sub_dir_list: 97 if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl): 98 dir_list.append(os.path.join(t, ssdl)) 99 if not sub_sub_dir_list: 100 return 101 cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list) 102 103 @classmethod 104 def _find_files(cls, product_name: str) -> Dict[str, List[str]]: 105 product_dir: Dict[str, Dict] = configs[product_name]["product_dir"] 106 if not product_name: 107 logging.error( 108 f"product_name '{product_name}' not found in the config.yaml") 109 exit(1) 110 product_path_dit: Dict[str, str] = dict() # 存储编译产物的类型及目录 111 root_dir = product_dir.get("root") 112 root_dir = os.path.join(project_path, root_dir) 113 relative_dir: Dict[str, str] = product_dir.get("relative") 114 if not relative_dir: 115 logging.warning( 116 f"'relative_dir' of {product_name} not found in the config.yaml") 117 exit(1) 118 # 除了so a hap bin外的全部归到etc里面 119 for k, v in relative_dir.items(): 120 product_path_dit[k] = os.path.join(root_dir, v) 121 # 查找编译产物信息 122 # product_dict格式: {"so": ["a.so", "b.so"]} 123 product_dict: Dict[str, List[str]] = dict() # 存储编译产物的名称 124 for k, v in product_path_dit.items(): 125 if not os.path.exists(v): 126 logging.warning(f"dir '{v}' not exist") 127 product_dict[k] = BasicTool.find_files_with_pattern(v) # v是全路径 128 if product_dir.get("rest"): 129 rest_dir_list: List[str] = os.listdir( 130 root_dir) # 除了配置在relative下之外的所有剩余目录,全部归到etc下 131 for v in relative_dir.values(): 132 if v in rest_dir_list: 133 rest_dir_list.remove(v) 134 for v in relative_dir.values(): 135 if os.sep in v: 136 cls._add_rest_dir(root_dir, str(), v, rest_dir_list) 137 if "etc" not in product_dict.keys(): 138 product_dict["etc"] = list() 139 for r in rest_dir_list: 140 product_dict["etc"].extend( 141 BasicTool.find_files_with_pattern(os.path.join(root_dir, r))) 142 return product_dict 143 144 @classmethod 145 def collect_product_info(cls, product_name: str): 146 logging.info("start scanning compile products") 147 product_dict: Dict[str, List[str]] = cls._find_files(product_name) 148 with open(configs[product_name]["product_infofile"], 'w', encoding='utf-8') as f: 149 json.dump(product_dict, f, indent=4) 150 return product_dict 151 152 @classmethod 153 def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict, com_size_baseline: str = str()): 154 size = unit.get("size") 155 if not rom_size_dict.get("size"): # 总大小 156 rom_size_dict["size"] = 0 157 if not rom_size_dict.get(sub): # 子系统大小 158 rom_size_dict[sub]: Dict[str, Dict] = dict() 159 rom_size_dict[sub]["size"] = 0 160 rom_size_dict[sub]["count"] = 0 161 162 if not rom_size_dict.get(sub).get(com): # 部件 163 rom_size_dict.get(sub)[com] = dict() 164 rom_size_dict[sub][com]["filelist"] = list() 165 rom_size_dict[sub][com]["size"] = 0 166 rom_size_dict[sub][com]["count"] = 0 167 168 if (sub != "NOTFOUND" and sub != "UNDEFINED" and com != "NOTFOUND" and com != "UNDEFINED") \ 169 and (not rom_size_dict.get(sub).get(com).get("baseline")) and baseline: 170 rom_size_dict[sub][com]["baseline"] = com_size_baseline 171 172 rom_size_dict[sub][com]["filelist"].append(unit) 173 rom_size_dict[sub][com]["size"] += size 174 rom_size_dict[sub][com]["count"] += 1 175 rom_size_dict[sub]["size"] += size 176 rom_size_dict[sub]["count"] += 1 177 rom_size_dict["size"] += size 178 179 @classmethod 180 def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]: 181 """ 182 TODO 应当先遍历gn_info进行匹配 183 直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name""" 184 logging.info(f"fuzzy match: {file_name}") 185 _, base_name = os.path.split(file_name) 186 if base_name.startswith("lib"): 187 base_name = base_name[3:] 188 if base_name.endswith(".a"): 189 base_name = base_name[:base_name.index(".a")] 190 elif base_name.endswith(".z.so"): 191 base_name = base_name[:base_name.index(".z.so")] 192 elif base_name.endswith(".so"): 193 base_name = base_name[:base_name.index(".so")] 194 exclude_dir = configs["black_list"] 195 tbl = [x for x in exclude_dir if os.sep in x] 196 197 def handler(content: Text) -> List[str]: 198 t = list(filter(lambda y: len(y) > 0, list( 199 map(lambda x: x.strip(), content.split("\n"))))) 200 for item in tbl: 201 p = os.path.join(project_path, item) 202 t = list(filter(lambda x: p not in x, t)) 203 return t 204 grep_result: List[str] = BasicTool.grep_ern( 205 base_name, 206 project_path, 207 include="BUILD.gn", 208 exclude=tuple(exclude_dir), 209 post_handler=handler) 210 if filter_path_keyword: 211 tmp = list() 212 for gr in grep_result: 213 for item in filter_path_keyword: 214 if item in gr: 215 continue 216 tmp.append(gr) 217 grep_result = tmp 218 if not grep_result: 219 logging.info(f"fuzzy match failed.") 220 return str(), str(), str() 221 gn_dict: Dict[str, int] = collections.defaultdict(int) 222 for g in grep_result: 223 gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep) 224 gn_dict[gn] += 1 225 gn_file, _ = collections.Counter(gn_dict).most_common(1)[0] 226 for k, v in sub_com_dict.items(): 227 if gn_file.startswith(k): 228 s = v.get("subsystem") 229 c = v.get("component") 230 logging.info( 231 f"fuzzy match success: subsystem_name={s}, component_name={c}") 232 return gn_file, s, c 233 logging.info(f"fuzzy match failed.") 234 return str(), str(), str() 235 236 @classmethod 237 def _save_as_xls(cls, result_dict: Dict, product_name: str, baseline: bool) -> None: 238 logging.info("saving as xls...") 239 header = ["subsystem_name", "component_name", 240 "output_file", "size(Byte)"] 241 if baseline: 242 header = ["subsystem_name", "component_name", "baseline", 243 "output_file", "size(Byte)"] 244 tmp_dict = copy.deepcopy(result_dict) 245 excel_writer = SimpleExcelWriter("rom") 246 excel_writer.set_sheet_header(headers=header) 247 subsystem_start_row = 1 248 subsystem_end_row = 0 249 subsystem_col = 0 250 component_start_row = 1 251 component_end_row = 0 252 component_col = 1 253 baseline_col = 2 254 if "size" in tmp_dict.keys(): 255 del tmp_dict["size"] 256 for subsystem_name in tmp_dict.keys(): 257 subsystem_dict = tmp_dict.get(subsystem_name) 258 subsystem_size = subsystem_dict.get("size") 259 subsystem_file_count = subsystem_dict.get("count") 260 del subsystem_dict["count"] 261 del subsystem_dict["size"] 262 subsystem_end_row += subsystem_file_count 263 264 for component_name in subsystem_dict.keys(): 265 component_dict: Dict[str, int] = subsystem_dict.get( 266 component_name) 267 component_size = component_dict.get("size") 268 component_file_count = component_dict.get("count") 269 component_baseline = component_dict.get("baseline") 270 if component_baseline: 271 del component_dict["baseline"] 272 del component_dict["count"] 273 del component_dict["size"] 274 component_end_row += component_file_count 275 276 for fileinfo in component_dict.get("filelist"): 277 file_name = fileinfo.get("file_name") 278 file_size = fileinfo.get("size") 279 line = [subsystem_name, component_name, 280 file_name, file_size] 281 if baseline: 282 line = [subsystem_name, component_name, 283 component_baseline, file_name, file_size] 284 excel_writer.append_line(line) 285 excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col, 286 component_name) 287 if baseline: 288 excel_writer.write_merge(component_start_row, baseline_col, component_end_row, baseline_col, 289 component_baseline) 290 component_start_row = component_end_row + 1 291 excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col, 292 subsystem_name) 293 subsystem_start_row = subsystem_end_row + 1 294 output_name: str = configs[product_name]["output_name"] 295 output_name = output_name.replace(".json", ".xls") 296 excel_writer.save(output_name) 297 logging.info("save as xls success.") 298 299 @classmethod 300 def _result_unit_adaptive(cls, result_dict: Dict[str, Dict]) -> None: 301 total_size = unit_adaptive(result_dict["size"]) 302 del result_dict["size"] 303 for subsystem_name, subsystem_info in result_dict.items(): 304 sub_size = unit_adaptive(subsystem_info["size"]) 305 count = subsystem_info["count"] 306 del subsystem_info["size"] 307 del subsystem_info["count"] 308 for component_name, component_info in subsystem_info.items(): 309 component_info["size"] = unit_adaptive(component_info["size"]) 310 subsystem_info["size"] = sub_size 311 subsystem_info["count"] = count 312 result_dict["size"] = total_size 313 314 @classmethod 315 def _match_manual_configured(cls, manual_config_info: Dict[str, Dict], compiled_files: Dict[str, List], compiled_root_path: str, result_dict: Dict[str, Dict]) -> None: 316 for file_path, file_info in manual_config_info.items(): 317 full_path = os.path.join( 318 project_path, compiled_root_path, file_path) 319 if not os.path.isfile(full_path): 320 logging.warning(f"config error: {file_path} is not a file.") 321 continue 322 file_info["size"] = os.path.getsize(full_path) 323 file_info["file_name"] = full_path 324 cls._put(file_info["subsystem"], 325 file_info["component"], file_info, result_dict) 326 for _, v in compiled_files.items(): 327 if full_path not in v: 328 continue 329 index = v.index(full_path) 330 del v[index] 331 break 332 333 @classmethod 334 def _iterate_all_template_type(cls, type_list: List[str], gn_info: Dict, gn_info_file: str, base_name: str, rom_ram_baseline: Dict, rom_size_dict: Dict, f: str, size: int): 335 find_flag = False 336 component_rom_baseline = None 337 for tn in type_list: # tn example: ohos_shared_library 338 if find_flag: # 如果已经在前面的template中找到了,后面的就不必再查找 339 break 340 output_dict: Dict[str, Dict] = gn_info.get( 341 tn) # 这个模板对应的所有可能编译产物 342 if not output_dict: 343 logging.warning( 344 f"'{tn}' not found in the {gn_info_file}") 345 continue 346 d = output_dict.get(base_name) 347 if not d: 348 continue 349 d["size"] = size 350 d["file_name"] = f.replace(project_path, "") 351 if rom_ram_baseline.get(d["subsystem_name"]) and rom_ram_baseline.get(d["subsystem_name"]).get(d["component_name"]): 352 component_rom_baseline = rom_ram_baseline.get( 353 d["subsystem_name"]).get(d["component_name"]).get("rom") 354 cls._put(d["subsystem_name"], 355 d["component_name"], d, rom_size_dict, component_rom_baseline) 356 find_flag = True 357 if not find_flag: # 如果指定序列中的template都没有查找到,则模糊匹配 358 # fuzzy match 359 psesudo_gn, sub, com = cls._fuzzy_match(f) 360 if sub and com: 361 if rom_ram_baseline.get(sub) and rom_ram_baseline.get(sub).get(com): 362 component_rom_baseline = rom_ram_baseline.get( 363 sub).get(com).get("baseline") 364 cls._put(sub, com, { 365 "subsystem_name": sub, 366 "component_name": com, 367 "psesudo_gn_path": psesudo_gn, 368 "description": "fuzzy match", 369 "file_name": f.replace(project_path, ""), 370 "size": size, 371 }, rom_size_dict, component_rom_baseline) 372 find_flag = True 373 if not find_flag: # 模糊匹配都没有匹配到的,归属到NOTFOUND 374 cls._put("NOTFOUND", "NOTFOUND", { 375 "file_name": f.replace(project_path, ""), 376 "size": size, 377 }, rom_size_dict) 378 379 @classmethod 380 def _subsystem_component_for_all_product_file(cls, product_dict: Dict[str, List[str]], query_order: Dict[str, List[str]], 381 gn_info: Dict, gn_info_file: str, rom_ram_baseline: Dict, rom_size_dict: Dict): 382 for t, l in product_dict.items(): 383 for f in l: # 遍历所有文件 384 if os.path.isdir(f): 385 continue 386 type_list = query_order.get(t) 387 _, base_name = os.path.split(f) 388 size = os.path.getsize(f) 389 if not type_list: 390 logging.warning( 391 f"'{t}' not found in query_order of the config.yaml") 392 break 393 cls._iterate_all_template_type( 394 type_list, gn_info, gn_info_file, base_name, rom_ram_baseline, rom_size_dict, f, size) 395 396 @classmethod 397 def analysis(cls, product_name: str, product_dict: Dict[str, List[str]], output_file_name: str): 398 """analysis the rom of lite/small product 399 400 Args: 401 product_name (str): product name configured in the yaml 402 product_dict (Dict[str, List[str]]): result dict of compiled product file 403 format: 404 "bin":[...], 405 "so":[...] 406 ... 407 """ 408 logging.info("start analyzing...") 409 rom_ram_baseline: Dict[str, Dict] = RomRamBaselineCollector.collect( 410 project_path) 411 with open("rom_ram_baseline.json", 'w', encoding='utf-8') as f: 412 json.dump(rom_ram_baseline, f, indent=4) 413 gn_info_file = configs["gn_info_file"] # filename to save gn_info 414 with open(gn_info_file, 'r', encoding='utf-8') as f: 415 gn_info = json.load(f) 416 query_order: Dict[str, List[str] 417 ] = configs[product_name]["query_order"] # query order of the gn template to be matched 418 query_order["etc"] = configs["target_type"] # etc会查找所有的template 419 rom_size_dict: Dict = dict() 420 if "manual_config" in configs[product_name].keys(): 421 cls._match_manual_configured( 422 configs[product_name]["manual_config"], product_dict, configs[product_name]["product_dir"]["root"], rom_size_dict) 423 cls._subsystem_component_for_all_product_file( 424 product_dict, query_order, gn_info, gn_info_file, rom_ram_baseline, rom_size_dict) 425 if unit_adapt: 426 cls._result_unit_adaptive(rom_size_dict) 427 with open(output_file_name + ".json", 'w', encoding='utf-8') as f: 428 json.dump(rom_size_dict, f, indent=4) 429 cls._save_as_xls(rom_size_dict, product_name, baseline) 430 logging.info("success") 431 432 433def main(): 434 if recollect_gn: 435 RomAnalysisTool.collect_gn_info() 436 product_dict: Dict[str, List[str] 437 ] = RomAnalysisTool.collect_product_info(product_name) 438 RomAnalysisTool.analysis(product_name, product_dict, output_file) 439 440 441if __name__ == "__main__": 442 main() 443