1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# This file is for rom analyzation of lite/small devices. 17 18""" 191. 先收集BUILD.gn中的target信息 202. 然后根据编译产物到1中进行搜索,匹配其所属的部件 21 22对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段 23对于gn原生的template,主要根据bundle.json中的字段来归属其部件 24 25对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤 26""" 27 28import sys 29import argparse 30import json 31import logging 32import os 33from typing import Dict, List, Tuple, Text 34import copy 35import preprocess 36from time import time 37from concurrent.futures import ThreadPoolExecutor, Future 38from threading import RLock 39import collections 40 41from config import result_dict, collector_config, configs, \ 42 project_path, sub_com_dict, product_name, recollect_gn, baseline, unit_adapt, output_file 43from pkgs.basic_tool import BasicTool, unit_adaptive 44from pkgs.gn_common_tool import GnCommonTool 45from pkgs.simple_excel_writer import SimpleExcelWriter 46from pkgs.rom_ram_baseline_collector import RomRamBaselineCollector 47from misc import gn_lineno_collect 48 49 50class RomAnalysisTool: 51 @classmethod 52 def analysis(cls, product_info: str, product_dict: Dict[str, List[str]], output_file_name: str): 53 """analysis the rom of lite/small product 54 55 Args: 56 product_info (str): product name configured in the yaml 57 product_dict (Dict[str, List[str]]): result dict of compiled product file 58 format: 59 "bin":[...], 60 "so":[...] 61 ... 62 """ 63 logging.info("start analyzing...") 64 rom_ram_baseline: Dict[str, Dict] = RomRamBaselineCollector.collect( 65 project_path) 66 with os.fdopen(os.open("rom_ram_baseline.json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', 67 encoding='utf-8') as f: 68 json.dump(rom_ram_baseline, f, indent=4) 69 gn_info_file = configs["gn_info_file"] # filename to save gn_info 70 with open(gn_info_file, 'r', encoding='utf-8') as f: 71 gn_info = json.load(f) 72 query_order: Dict[str, List[str] 73 ] = configs[product_info]["query_order"] # query order of the gn template to be matched 74 query_order["etc"] = configs["target_type"] # etc会查找所有的template 75 rom_size_dict: Dict = dict() 76 if "manual_config" in configs[product_info].keys(): 77 cls._match_manual_configured( 78 configs[product_info]["manual_config"], product_dict, configs[product_info]["product_dir"]["root"], 79 rom_size_dict) 80 cls._subsystem_component_for_all_product_file( 81 product_dict, query_order, gn_info, gn_info_file, rom_ram_baseline, rom_size_dict) 82 if unit_adapt: 83 cls._result_unit_adaptive(rom_size_dict) 84 with os.fdopen(os.open(output_file_name + ".json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', 85 encoding='utf-8') as f: 86 json.dump(rom_size_dict, f, indent=4) 87 cls._save_as_xls(rom_size_dict, product_info, baseline) 88 logging.info("success") 89 90 @classmethod 91 def collect_gn_info(cls): 92 logging.info("start scanning BUILD.gn") 93 with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool: 94 future_list: List[Future] = list() 95 for c in collector_config: 96 future_list.append(pool.submit(c)) 97 for f in future_list: 98 f.result() 99 gn_info_file = configs["gn_info_file"] 100 with os.fdopen(os.open(gn_info_file, os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f: 101 json.dump(result_dict, f, indent=4) 102 103 @classmethod 104 def collect_product_info(cls, product_name: str): 105 logging.info("start scanning compile products") 106 product_dict: Dict[str, List[str]] = cls._find_files(product_name) 107 with os.fdopen(os.open(configs[product_name]["product_infofile"], os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f: 108 json.dump(product_dict, f, indent=4) 109 return product_dict 110 111 @classmethod 112 def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None: 113 """ 114 :top_dir 顶层目录,不会变化 115 :rela_path 最顶层的值为空 116 :sub_path 一般是a/b/c这种形式 117 :dir_list 相对于原始top目录的子目录的全路径 118 example: 119 / 120 |-a 121 |-b 122 |-c 123 |-|-d 124 |-|-e 125 |-|-f 126 |-|-|-g 127 |-|-|-h 128 top_dir: / 129 rela_path: "" 130 sub_path: c/e 131 dir_list: [c] 132 => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list 133 """ 134 if (not sub_path) or (os.sep not in sub_path): 135 return 136 # 将其他目录添加到dir_list 137 t, sub_sub_path = sub_path.split(os.sep, 1) # 如果是c/e,分割成c,e 138 t = os.path.join(rela_path, t) 139 if t in dir_list: 140 dir_list.remove(t) 141 sub_sub_dir_list = os.listdir(os.path.join(top_dir, t)) 142 for ssdl in sub_sub_dir_list: 143 if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl): 144 dir_list.append(os.path.join(t, ssdl)) 145 if not sub_sub_dir_list: 146 return 147 cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list) 148 149 @classmethod 150 def _find_files(cls, product_name: str) -> Dict[str, List[str]]: 151 product_dir: Dict[str, Dict] = configs[product_name]["product_dir"] 152 if not product_name: 153 logging.error( 154 f"product_name '{product_name}' not found in the config.yaml") 155 exit(1) 156 product_path_dit: Dict[str, str] = dict() # 存储编译产物的类型及目录 157 root_dir = product_dir.get("root") 158 root_dir = os.path.join(project_path, root_dir) 159 relative_dir: Dict[str, str] = product_dir.get("relative") 160 if not relative_dir: 161 logging.warning( 162 f"'relative_dir' of {product_name} not found in the config.yaml") 163 exit(1) 164 # 除了so a hap bin外的全部归到etc里面 165 for k, v in relative_dir.items(): 166 product_path_dit[k] = os.path.join(root_dir, v) 167 # 查找编译产物信息 168 # product_dict格式: {"so": ["a.so", "b.so"]} 169 product_dict: Dict[str, List[str]] = dict() # 存储编译产物的名称 170 for k, v in product_path_dit.items(): 171 if not os.path.exists(v): 172 logging.warning(f"dir '{v}' not exist") 173 product_dict[k] = BasicTool.find_files_with_pattern(v) # v是全路径 174 if product_dir.get("rest"): 175 rest_dir_list: List[str] = os.listdir( 176 root_dir) # 除了配置在relative下之外的所有剩余目录,全部归到etc下 177 for v in relative_dir.values(): 178 if v in rest_dir_list: 179 rest_dir_list.remove(v) 180 for v in relative_dir.values(): 181 if os.sep in v: 182 cls._add_rest_dir(root_dir, str(), v, rest_dir_list) 183 if "etc" not in product_dict.keys(): 184 product_dict["etc"] = list() 185 for r in rest_dir_list: 186 product_dict["etc"].extend( 187 BasicTool.find_files_with_pattern(os.path.join(root_dir, r))) 188 return product_dict 189 190 @classmethod 191 def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict, com_size_baseline: str = str()): 192 size = unit.get("size") 193 if not rom_size_dict.get("size"): # 总大小 194 rom_size_dict["size"] = 0 195 if not rom_size_dict.get(sub): # 子系统大小 196 rom_size_dict[sub]: Dict[str, Dict] = dict() 197 rom_size_dict[sub]["size"] = 0 198 rom_size_dict[sub]["count"] = 0 199 200 if not rom_size_dict.get(sub).get(com): # 部件 201 rom_size_dict.get(sub)[com] = dict() 202 rom_size_dict[sub][com]["filelist"] = list() 203 rom_size_dict[sub][com]["size"] = 0 204 rom_size_dict[sub][com]["count"] = 0 205 206 if (sub != "NOTFOUND" and sub != "UNDEFINED" and com != "NOTFOUND" and com != "UNDEFINED") \ 207 and (not rom_size_dict.get(sub).get(com).get("baseline")) and baseline: 208 rom_size_dict[sub][com]["baseline"] = com_size_baseline 209 210 rom_size_dict[sub][com]["filelist"].append(unit) 211 rom_size_dict[sub][com]["size"] += size 212 rom_size_dict[sub][com]["count"] += 1 213 rom_size_dict[sub]["size"] += size 214 rom_size_dict[sub]["count"] += 1 215 rom_size_dict["size"] += size 216 217 @classmethod 218 def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]: 219 """ 220 TODO 应当先遍历gn_info进行匹配 221 直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name""" 222 logging.info(f"fuzzy match: {file_name}") 223 _, base_name = os.path.split(file_name) 224 if base_name.startswith("lib"): 225 base_name = base_name[3:] 226 if base_name.endswith(".a"): 227 base_name = base_name[:base_name.index(".a")] 228 elif base_name.endswith(".z.so"): 229 base_name = base_name[:base_name.index(".z.so")] 230 elif base_name.endswith(".so"): 231 base_name = base_name[:base_name.index(".so")] 232 exclude_dir = configs["black_list"] 233 tbl = [x for x in exclude_dir if os.sep in x] 234 235 def handler(content: Text) -> List[str]: 236 t = list(filter(lambda y: len(y) > 0, list( 237 map(lambda x: x.strip(), content.split("\n"))))) 238 for item in tbl: 239 p = os.path.join(project_path, item) 240 t = list(filter(lambda x: p not in x, t)) 241 return t 242 243 grep_result: List[str] = BasicTool.grep_ern( 244 base_name, 245 project_path, 246 include="BUILD.gn", 247 exclude=tuple(exclude_dir), 248 post_handler=handler) 249 if filter_path_keyword: 250 tmp = list() 251 for gr in grep_result: 252 for item in filter_path_keyword: 253 if item in gr: 254 continue 255 tmp.append(gr) 256 grep_result = tmp 257 if not grep_result: 258 logging.info(f"fuzzy match failed.") 259 return str(), str(), str() 260 gn_dict: Dict[str, int] = collections.defaultdict(int) 261 for g in grep_result: 262 gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep) 263 gn_dict[gn] += 1 264 gn_file, _ = collections.Counter(gn_dict).most_common(1)[0] 265 for k, v in sub_com_dict.items(): 266 if gn_file.startswith(k): 267 s = v.get("subsystem") 268 c = v.get("component") 269 logging.info( 270 f"fuzzy match success: subsystem_name={s}, component_name={c}") 271 return gn_file, s, c 272 logging.info(f"fuzzy match failed.") 273 return str(), str(), str() 274 275 @classmethod 276 def _get_one_line(cls, baseline_info, subsystem_name, component_name, component_baseline, file_name, file_size): 277 if baseline_info: 278 return [subsystem_name, component_name, 279 component_baseline, file_name, file_size] 280 else: 281 return [subsystem_name, component_name, 282 file_name, file_size] 283 284 @classmethod 285 def _save_as_xls(cls, result_dict_info: Dict, product_name_info: str, baseline_info: bool) -> None: 286 logging.info("saving as xls...") 287 header = ["subsystem_name", "component_name", "output_file", "size(Byte)"] 288 if baseline_info: 289 header = ["subsystem_name", "component_name", "baseline", "output_file", "size(Byte)"] 290 tmp_dict = dict() 291 for key in result_dict_info.keys(): 292 tmp_dict[key] = result_dict_info[key] 293 excel_writer = SimpleExcelWriter("rom") 294 excel_writer.set_sheet_header(headers=header) 295 (subsystem_start_row, subsystem_end_row, subsystem_col, component_start_row, component_end_row, 296 component_col, baseline_col) = (1, 0, 0, 1, 0, 1, 2) 297 if "size" in tmp_dict.keys(): 298 del tmp_dict["size"] 299 for subsystem_name in tmp_dict.keys(): 300 subsystem_dict = tmp_dict.get(subsystem_name) 301 subsystem_size = subsystem_dict.get("size") 302 subsystem_file_count = subsystem_dict.get("count") 303 del subsystem_dict["count"] 304 del subsystem_dict["size"] 305 subsystem_end_row += subsystem_file_count 306 307 for component_name in subsystem_dict.keys(): 308 component_dict: Dict[str, int] = subsystem_dict.get( 309 component_name) 310 component_size = component_dict.get("size") 311 component_file_count = component_dict.get("count") 312 component_baseline = component_dict.get("baseline") 313 if component_baseline: 314 del component_dict["baseline"] 315 del component_dict["count"] 316 del component_dict["size"] 317 component_end_row += component_file_count 318 319 for fileinfo in component_dict.get("filelist"): 320 file_name = fileinfo.get("file_name") 321 file_size = fileinfo.get("size") 322 line = cls._get_one_line(baseline_info, subsystem_name, component_name, component_baseline, 323 file_name, file_size) 324 excel_writer.append_line(line) 325 excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col, 326 component_name) 327 if baseline_info: 328 excel_writer.write_merge(component_start_row, baseline_col, component_end_row, baseline_col, 329 component_baseline) 330 component_start_row = component_end_row + 1 331 excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col, 332 subsystem_name) 333 subsystem_start_row = subsystem_end_row + 1 334 output_name: str = configs[product_name_info]["output_name"] 335 output_name = output_name.replace(".json", ".xls") 336 excel_writer.save(output_name) 337 logging.info("save as xls success.") 338 339 @classmethod 340 def _result_unit_adaptive(cls, output_result_dict: Dict[str, Dict]) -> None: 341 total_size = unit_adaptive(output_result_dict["size"]) 342 del output_result_dict["size"] 343 for subsystem_name, subsystem_info in output_result_dict.items(): 344 sub_size = unit_adaptive(subsystem_info["size"]) 345 count = subsystem_info["count"] 346 del subsystem_info["size"] 347 del subsystem_info["count"] 348 for component_name, component_info in subsystem_info.items(): 349 component_info["size"] = unit_adaptive(component_info["size"]) 350 subsystem_info["size"] = sub_size 351 subsystem_info["count"] = count 352 output_result_dict["size"] = total_size 353 354 @classmethod 355 def _match_manual_configured(cls, manual_config_info: Dict[str, Dict], compiled_files: Dict[str, List], 356 compiled_root_path: str, output_result_dict: Dict[str, Dict]) -> None: 357 for file_path, file_info in manual_config_info.items(): 358 full_path = os.path.join( 359 project_path, compiled_root_path, file_path) 360 if not os.path.isfile(full_path): 361 logging.warning(f"config error: {file_path} is not a file.") 362 continue 363 file_info["size"] = os.path.getsize(full_path) 364 file_info["file_name"] = full_path 365 cls._put(file_info["subsystem"], 366 file_info["component"], file_info, output_result_dict) 367 for _, v in compiled_files.items(): 368 if full_path not in v: 369 continue 370 index = v.index(full_path) 371 del v[index] 372 break 373 374 @classmethod 375 def _iterate_all_template_type(cls, type_list: List[str], gn_info: Dict, gn_info_file: str, base_name: str, 376 rom_ram_baseline: Dict, rom_size_dict: Dict, f: str, size: int): 377 find_flag = False 378 component_rom_baseline = None 379 for tn in type_list: # tn example: ohos_shared_library 380 if find_flag: # 如果已经在前面的template中找到了,后面的就不必再查找 381 break 382 output_dict: Dict[str, Dict] = gn_info.get( 383 tn) # 这个模板对应的所有可能编译产物 384 if not output_dict: 385 logging.warning( 386 f"'{tn}' not found in the {gn_info_file}") 387 continue 388 d = output_dict.get(base_name) 389 if not d: 390 continue 391 d["size"] = size 392 d["file_name"] = f.replace(project_path, "") 393 if rom_ram_baseline.get(d["subsystem_name"]) and rom_ram_baseline.get(d["subsystem_name"]).get( 394 d["component_name"]): 395 component_rom_baseline = rom_ram_baseline.get( 396 d["subsystem_name"]).get(d["component_name"]).get("rom") 397 cls._put(d["subsystem_name"], 398 d["component_name"], d, rom_size_dict, component_rom_baseline) 399 find_flag = True 400 if not find_flag: # 如果指定序列中的template都没有查找到,则模糊匹配 401 # fuzzy match 402 psesudo_gn, sub, com = cls._fuzzy_match(f) 403 if sub and com: 404 if rom_ram_baseline.get(sub) and rom_ram_baseline.get(sub).get(com): 405 component_rom_baseline = rom_ram_baseline.get( 406 sub).get(com).get("baseline") 407 cls._put(sub, com, { 408 "subsystem_name": sub, 409 "component_name": com, 410 "psesudo_gn_path": psesudo_gn, 411 "description": "fuzzy match", 412 "file_name": f.replace(project_path, ""), 413 "size": size, 414 }, rom_size_dict, component_rom_baseline) 415 find_flag = True 416 if not find_flag: # 模糊匹配都没有匹配到的,归属到NOTFOUND 417 cls._put("NOTFOUND", "NOTFOUND", { 418 "file_name": f.replace(project_path, ""), 419 "size": size, 420 }, rom_size_dict) 421 422 @classmethod 423 def _subsystem_component_for_all_product_file(cls, product_dict: Dict[str, List[str]], 424 query_order: Dict[str, List[str]], 425 gn_info: Dict, gn_info_file: str, rom_ram_baseline: Dict, 426 rom_size_dict: Dict): 427 for t, l in product_dict.items(): 428 for f in l: # 遍历所有文件 429 if os.path.isdir(f): 430 continue 431 type_list = query_order.get(t) 432 _, base_name = os.path.split(f) 433 size = os.path.getsize(f) 434 if not type_list: 435 logging.warning( 436 f"'{t}' not found in query_order of the config.yaml") 437 break 438 cls._iterate_all_template_type( 439 type_list, gn_info, gn_info_file, base_name, rom_ram_baseline, rom_size_dict, f, size) 440 441 442def main(): 443 if recollect_gn: 444 RomAnalysisTool.collect_gn_info() 445 product_dict: Dict[str, List[str] 446 ] = RomAnalysisTool.collect_product_info(product_name) 447 RomAnalysisTool.analysis(product_name, product_dict, output_file) 448 449 450if __name__ == "__main__": 451 main() 452