1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# This file is for rom analyzation of lite/small devices. 17 18import sys 19import argparse 20import json 21import logging 22import os 23from typing import * 24import copy 25import preprocess 26from time import time 27from concurrent.futures import ThreadPoolExecutor, Future 28from threading import RLock 29import collections 30 31from config import result_dict, collector_config, configs, \ 32 project_path, sub_com_dict, product_name, recollect_gn, baseline, unit_adapt 33from pkgs.basic_tool import BasicTool, unit_adaptive 34from pkgs.gn_common_tool import GnCommonTool 35from pkgs.simple_excel_writer import SimpleExcelWriter 36from pkgs.rom_ram_baseline_collector import RomRamBaselineCollector 37from misc import gn_lineno_collect 38 39 40""" 411. 先收集BUILD.gn中的target信息 422. 然后根据编译产物到1中进行搜索,匹配其所属的部件 43 44对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段 45对于gn原生的template,主要根据bundle.json中的字段来归属其部件 46 47对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤 48""" 49 50 51class RomAnalysisTool: 52 @classmethod 53 def collect_gn_info(cls): 54 logging.info("start scanning BUILD.gn") 55 with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool: 56 future_list: List[Future] = list() 57 for c in collector_config: 58 future_list.append(pool.submit(c)) 59 for f in future_list: 60 f.result() 61 gn_info_file = configs["gn_info_file"] 62 with open(gn_info_file, 'w', encoding='utf-8') as f: 63 json.dump(result_dict, f, indent=4) 64 65 @classmethod 66 def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None: 67 """ 68 :top_dir 顶层目录,不会变化 69 :rela_path 最顶层的值为空 70 :sub_path 一般是a/b/c这种形式 71 :dir_list 相对于原始top目录的子目录的全路径 72 example: 73 / 74 |-a 75 |-b 76 |-c 77 |-|-d 78 |-|-e 79 |-|-f 80 |-|-|-g 81 |-|-|-h 82 top_dir: / 83 rela_path: "" 84 sub_path: c/e 85 dir_list: [c] 86 => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list 87 """ 88 if (not sub_path) or (os.sep not in sub_path): 89 return 90 # 将其他目录添加到dir_list 91 t, sub_sub_path = sub_path.split(os.sep, 1) # 如果是c/e,分割成c,e 92 t = os.path.join(rela_path, t) 93 if t in dir_list: 94 dir_list.remove(t) 95 sub_sub_dir_list = os.listdir(os.path.join(top_dir, t)) 96 for ssdl in sub_sub_dir_list: 97 if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl): 98 dir_list.append(os.path.join(t, ssdl)) 99 if not sub_sub_dir_list: 100 return 101 cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list) 102 103 @classmethod 104 def _find_files(cls, product_name: str) -> Dict[str, List[str]]: 105 product_dir: Dict[str, Dict] = configs[product_name]["product_dir"] 106 if not product_name: 107 logging.error( 108 f"product_name '{product_name}' not found in the config.yaml") 109 exit(1) 110 product_path_dit: Dict[str, str] = dict() # 存储编译产物的类型及目录 111 root_dir = product_dir.get("root") 112 root_dir = os.path.join(project_path, root_dir) 113 relative_dir: Dict[str, str] = product_dir.get("relative") 114 if not relative_dir: 115 logging.warning( 116 f"'relative_dir' of {product_name} not found in the config.yaml") 117 exit(1) 118 # 除了so a hap bin外的全部归到etc里面 119 for k, v in relative_dir.items(): 120 product_path_dit[k] = os.path.join(root_dir, v) 121 # 查找编译产物信息 122 # product_dict格式: {"so": ["a.so", "b.so"]} 123 product_dict: Dict[str, List[str]] = dict() # 存储编译产物的名称 124 for k, v in product_path_dit.items(): 125 if not os.path.exists(v): 126 logging.warning(f"dir '{v}' not exist") 127 product_dict[k] = BasicTool.find_files_with_pattern(v) # v是全路径 128 if product_dir.get("rest"): 129 rest_dir_list: List[str] = os.listdir( 130 root_dir) # 除了配置在relative下之外的所有剩余目录,全部归到etc下 131 for v in relative_dir.values(): 132 if v in rest_dir_list: 133 rest_dir_list.remove(v) 134 for v in relative_dir.values(): 135 if os.sep in v: 136 cls._add_rest_dir(root_dir, str(), v, rest_dir_list) 137 if "etc" not in product_dict.keys(): 138 product_dict["etc"] = list() 139 for r in rest_dir_list: 140 product_dict["etc"].extend( 141 BasicTool.find_files_with_pattern(os.path.join(root_dir, r))) 142 return product_dict 143 144 @classmethod 145 def collect_product_info(cls, product_name: str): 146 logging.info("start scanning compile products") 147 product_dict: Dict[str, List[str]] = cls._find_files(product_name) 148 with open(configs[product_name]["product_infofile"], 'w', encoding='utf-8') as f: 149 json.dump(product_dict, f, indent=4) 150 return product_dict 151 152 @classmethod 153 def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict, com_size_baseline: str = str()): 154 size = unit.get("size") 155 if not rom_size_dict.get("size"): # 总大小 156 rom_size_dict["size"] = 0 157 if not rom_size_dict.get(sub): # 子系统大小 158 rom_size_dict[sub]: Dict[str, Dict] = dict() 159 rom_size_dict[sub]["size"] = 0 160 rom_size_dict[sub]["count"] = 0 161 162 if not rom_size_dict.get(sub).get(com): # 部件 163 rom_size_dict.get(sub)[com] = dict() 164 rom_size_dict[sub][com]["filelist"] = list() 165 rom_size_dict[sub][com]["size"] = 0 166 rom_size_dict[sub][com]["count"] = 0 167 168 if (sub != "NOTFOUND" and sub != "UNDEFINED" and com != "NOTFOUND" and com != "UNDEFINED") \ 169 and (not rom_size_dict.get(sub).get(com).get("baseline")) and baseline: 170 rom_size_dict[sub][com]["baseline"] = com_size_baseline 171 172 rom_size_dict[sub][com]["filelist"].append(unit) 173 rom_size_dict[sub][com]["size"] += size 174 rom_size_dict[sub][com]["count"] += 1 175 rom_size_dict[sub]["size"] += size 176 rom_size_dict[sub]["count"] += 1 177 rom_size_dict["size"] += size 178 179 @classmethod 180 def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]: 181 """ 182 TODO 应当先遍历gn_info进行匹配 183 直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name""" 184 logging.info(f"fuzzy match: {file_name}") 185 _, base_name = os.path.split(file_name) 186 if base_name.startswith("lib"): 187 base_name = base_name[3:] 188 if base_name.endswith(".a"): 189 base_name = base_name[:base_name.index(".a")] 190 elif base_name.endswith(".z.so"): 191 base_name = base_name[:base_name.index(".z.so")] 192 elif base_name.endswith(".so"): 193 base_name = base_name[:base_name.index(".so")] 194 exclude_dir = configs["black_list"] 195 tbl = [x for x in exclude_dir if os.sep in x] 196 197 def handler(content: Text) -> List[str]: 198 t = list(filter(lambda y: len(y) > 0, list( 199 map(lambda x: x.strip(), content.split("\n"))))) 200 for item in tbl: 201 p = os.path.join(project_path, item) 202 t = list(filter(lambda x: p not in x, t)) 203 return t 204 grep_result: List[str] = BasicTool.grep_ern( 205 base_name, 206 project_path, 207 include="BUILD.gn", 208 exclude=tuple(exclude_dir), 209 post_handler=handler) 210 if filter_path_keyword: 211 tmp = list() 212 for gr in grep_result: 213 for item in filter_path_keyword: 214 if item in gr: 215 continue 216 tmp.append(gr) 217 grep_result = tmp 218 if not grep_result: 219 logging.info(f"fuzzy match failed.") 220 return str(), str(), str() 221 gn_dict: Dict[str, int] = collections.defaultdict(int) 222 for g in grep_result: 223 gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep) 224 gn_dict[gn] += 1 225 gn_file, _ = collections.Counter(gn_dict).most_common(1)[0] 226 for k, v in sub_com_dict.items(): 227 if gn_file.startswith(k): 228 s = v.get("subsystem") 229 c = v.get("component") 230 logging.info( 231 f"fuzzy match success: subsystem_name={s}, component_name={c}") 232 return gn_file, s, c 233 logging.info(f"fuzzy match failed.") 234 return str(), str(), str() 235 236 @classmethod 237 def _save_as_xls(cls, result_dict: Dict, product_name: str, baseline: bool) -> None: 238 logging.info("saving as xls...") 239 header = ["subsystem_name", "component_name", 240 "output_file", "size(Byte)"] 241 if baseline: 242 header = ["subsystem_name", "component_name", "baseline", 243 "output_file", "size(Byte)"] 244 tmp_dict = copy.deepcopy(result_dict) 245 excel_writer = SimpleExcelWriter("rom") 246 excel_writer.set_sheet_header(headers=header) 247 subsystem_start_row = 1 248 subsystem_end_row = 0 249 subsystem_col = 0 250 component_start_row = 1 251 component_end_row = 0 252 component_col = 1 253 baseline_col = 2 254 del tmp_dict["size"] 255 for subsystem_name in tmp_dict.keys(): 256 subsystem_dict = tmp_dict.get(subsystem_name) 257 subsystem_size = subsystem_dict.get("size") 258 subsystem_file_count = subsystem_dict.get("count") 259 del subsystem_dict["count"] 260 del subsystem_dict["size"] 261 subsystem_end_row += subsystem_file_count 262 263 for component_name in subsystem_dict.keys(): 264 component_dict: Dict[str, int] = subsystem_dict.get( 265 component_name) 266 component_size = component_dict.get("size") 267 component_file_count = component_dict.get("count") 268 component_baseline = component_dict.get("baseline") 269 if component_baseline: 270 del component_dict["baseline"] 271 del component_dict["count"] 272 del component_dict["size"] 273 component_end_row += component_file_count 274 275 for fileinfo in component_dict.get("filelist"): 276 file_name = fileinfo.get("file_name") 277 file_size = fileinfo.get("size") 278 line = [subsystem_name, component_name, 279 file_name, file_size] 280 if baseline: 281 line = [subsystem_name, component_name, 282 component_baseline, file_name, file_size] 283 excel_writer.append_line(line) 284 excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col, 285 component_name) 286 if baseline: 287 excel_writer.write_merge(component_start_row, baseline_col, component_end_row, baseline_col, 288 component_baseline) 289 component_start_row = component_end_row + 1 290 excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col, 291 subsystem_name) 292 subsystem_start_row = subsystem_end_row + 1 293 output_name: str = configs[product_name]["output_name"] 294 output_name = output_name.replace(".json", ".xls") 295 excel_writer.save(output_name) 296 logging.info("save as xls success.") 297 298 @classmethod 299 def _result_unit_adaptive(cls, result_dict: Dict[str, Dict]) -> None: 300 total_size = unit_adaptive(result_dict["size"]) 301 del result_dict["size"] 302 for subsystem_name, subsystem_info in result_dict.items(): 303 sub_size = unit_adaptive(subsystem_info["size"]) 304 count = subsystem_info["count"] 305 del subsystem_info["size"] 306 del subsystem_info["count"] 307 for component_name, component_info in subsystem_info.items(): 308 component_info["size"] = unit_adaptive(component_info["size"]) 309 subsystem_info["size"] = sub_size 310 subsystem_info["count"] = count 311 result_dict["size"] = total_size 312 313 @classmethod 314 def _match_manual_configured(cls, manual_config_info: Dict[str, Dict], compiled_files: Dict[str, List], compiled_root_path: str, result_dict: Dict[str, Dict]) -> None: 315 for file_path, file_info in manual_config_info.items(): 316 full_path = os.path.join( 317 project_path, compiled_root_path, file_path) 318 if not os.path.isfile(full_path): 319 logging.warning(f"config error: {file_path} is not a file.") 320 continue 321 file_info["size"] = os.path.getsize(full_path) 322 file_info["file_name"] = full_path 323 cls._put(file_info["subsystem"], 324 file_info["component"], file_info, result_dict) 325 for _, v in compiled_files.items(): 326 if full_path not in v: 327 continue 328 index = v.index(full_path) 329 del v[index] 330 break 331 332 @classmethod 333 def _iterate_all_template_type(cls, type_list: List[str], gn_info: Dict, gn_info_file: str, base_name: str, rom_ram_baseline: Dict, rom_size_dict: Dict, f: str, size: int): 334 find_flag = False 335 component_rom_baseline = None 336 for tn in type_list: # tn example: ohos_shared_library 337 if find_flag: # 如果已经在前面的template中找到了,后面的就不必再查找 338 break 339 output_dict: Dict[str, Dict] = gn_info.get( 340 tn) # 这个模板对应的所有可能编译产物 341 if not output_dict: 342 logging.warning( 343 f"'{tn}' not found in the {gn_info_file}") 344 continue 345 d = output_dict.get(base_name) 346 if not d: 347 continue 348 d["size"] = size 349 d["file_name"] = f.replace(project_path, "") 350 if rom_ram_baseline.get(d["subsystem_name"]) and rom_ram_baseline.get(d["subsystem_name"]).get(d["component_name"]): 351 component_rom_baseline = rom_ram_baseline.get( 352 d["subsystem_name"]).get(d["component_name"]).get("rom") 353 cls._put(d["subsystem_name"], 354 d["component_name"], d, rom_size_dict, component_rom_baseline) 355 find_flag = True 356 if not find_flag: # 如果指定序列中的template都没有查找到,则模糊匹配 357 # fuzzy match 358 psesudo_gn, sub, com = cls._fuzzy_match(f) 359 if sub and com: 360 if rom_ram_baseline.get(sub) and rom_ram_baseline.get(sub).get(com): 361 component_rom_baseline = rom_ram_baseline.get( 362 sub).get(com).get("baseline") 363 cls._put(sub, com, { 364 "subsystem_name": sub, 365 "component_name": com, 366 "psesudo_gn_path": psesudo_gn, 367 "description": "fuzzy match", 368 "file_name": f.replace(project_path, ""), 369 "size": size, 370 }, rom_size_dict, component_rom_baseline) 371 find_flag = True 372 if not find_flag: # 模糊匹配都没有匹配到的,归属到NOTFOUND 373 cls._put("NOTFOUND", "NOTFOUND", { 374 "file_name": f.replace(project_path, ""), 375 "size": size, 376 }, rom_size_dict) 377 378 @classmethod 379 def _subsystem_component_for_all_product_file(cls, product_dict: Dict[str, List[str]], query_order: Dict[str, List[str]], 380 gn_info: Dict, gn_info_file: str, rom_ram_baseline: Dict, rom_size_dict: Dict): 381 for t, l in product_dict.items(): 382 for f in l: # 遍历所有文件 383 if os.path.isdir(f): 384 continue 385 type_list = query_order.get(t) 386 _, base_name = os.path.split(f) 387 size = os.path.getsize(f) 388 if not type_list: 389 logging.warning( 390 f"'{t}' not found in query_order of the config.yaml") 391 break 392 cls._iterate_all_template_type( 393 type_list, gn_info, gn_info_file, base_name, rom_ram_baseline, rom_size_dict, f, size) 394 395 @classmethod 396 def analysis(cls, product_name: str, product_dict: Dict[str, List[str]]): 397 """analysis the rom of lite/small product 398 399 Args: 400 product_name (str): product name configured in the yaml 401 product_dict (Dict[str, List[str]]): result dict of compiled product file 402 format: 403 "bin":[...], 404 "so":[...] 405 ... 406 """ 407 logging.info("start analyzing...") 408 rom_ram_baseline: Dict[str, Dict] = RomRamBaselineCollector.collect( 409 project_path) 410 with open("rom_ram_baseline.json", 'w', encoding='utf-8') as f: 411 json.dump(rom_ram_baseline, f, indent=4) 412 gn_info_file = configs["gn_info_file"] # filename to save gn_info 413 with open(gn_info_file, 'r', encoding='utf-8') as f: 414 gn_info = json.load(f) 415 query_order: Dict[str, List[str] 416 ] = configs[product_name]["query_order"] # query order of the gn template to be matched 417 query_order["etc"] = configs["target_type"] # etc会查找所有的template 418 rom_size_dict: Dict = dict() 419 if "manual_config" in configs[product_name].keys(): 420 cls._match_manual_configured( 421 configs[product_name]["manual_config"], product_dict, configs[product_name]["product_dir"]["root"], rom_size_dict) 422 cls._subsystem_component_for_all_product_file( 423 product_dict, query_order, gn_info, gn_info_file, rom_ram_baseline, rom_size_dict) 424 if unit_adapt: 425 cls._result_unit_adaptive(rom_size_dict) 426 with open(configs[product_name]["output_name"], 'w', encoding='utf-8') as f: 427 json.dump(rom_size_dict, f, indent=4) 428 cls._save_as_xls(rom_size_dict, product_name, baseline) 429 logging.info("success") 430 431 432def main(): 433 if recollect_gn: 434 RomAnalysisTool.collect_gn_info() 435 product_dict: Dict[str, List[str] 436 ] = RomAnalysisTool.collect_product_info(product_name) 437 RomAnalysisTool.analysis(product_name, product_dict) 438 439 440if __name__ == "__main__": 441 main() 442