1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# This file is for rom analyzation of lite/small devices. 17 18import sys 19import argparse 20import json 21import logging 22import os 23from typing import * 24import copy 25import preprocess 26from time import time 27from concurrent.futures import ThreadPoolExecutor, Future 28from threading import RLock 29import collections 30 31from config import result_dict, collector_config, configs, \ 32 project_path, sub_com_dict, product_name, recollect_gn 33from pkgs.basic_tool import BasicTool 34from pkgs.gn_common_tool import GnCommonTool 35from pkgs.simple_excel_writer import SimpleExcelWriter 36from misc import gn_lineno_collect 37 38 39""" 401. 先收集BUILD.gn中的target信息 412. 然后根据编译产物到1中进行搜索,匹配其所属的部件 42 43对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段 44对于gn原生的template,主要根据bundle.json中的字段来归属其部件 45 46对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤 47""" 48 49 50class RomAnalysisTool: 51 @classmethod 52 def collect_gn_info(cls): 53 logging.info("start scanning BUILD.gn") 54 with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool: 55 future_list: List[Future] = list() 56 for c in collector_config: 57 future_list.append(pool.submit(c)) 58 for f in future_list: 59 f.result() 60 gn_info_file = configs["gn_info_file"] 61 with open(gn_info_file, 'w', encoding='utf-8') as f: 62 json.dump(result_dict, f, indent=4) 63 64 @classmethod 65 def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None: 66 """ 67 :top_dir 顶层目录,不会变化 68 :rela_path 最顶层的值为空 69 :sub_path 一般是a/b/c这种形式 70 :dir_list 相对于原始top目录的子目录的全路径 71 example: 72 / 73 |-a 74 |-b 75 |-c 76 |-|-d 77 |-|-e 78 |-|-f 79 |-|-|-g 80 |-|-|-h 81 top_dir: / 82 rela_path: "" 83 sub_path: c/e 84 dir_list: [c] 85 => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list 86 """ 87 if (not sub_path) or (os.sep not in sub_path): 88 return 89 # 将其他目录添加到dir_list 90 t, sub_sub_path = sub_path.split(os.sep, 1) # 如果是c/e,分割成c,e 91 t = os.path.join(rela_path, t) 92 if t in dir_list: 93 dir_list.remove(t) 94 sub_sub_dir_list = os.listdir(os.path.join(top_dir, t)) 95 for ssdl in sub_sub_dir_list: 96 if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl): 97 dir_list.append(os.path.join(t, ssdl)) 98 if not sub_sub_dir_list: 99 return 100 cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list) 101 102 @classmethod 103 def _find_files(cls, product_name: str) -> Dict[str, List[str]]: 104 product_dir: Dict[str, Dict] = configs[product_name]["product_dir"] 105 if not product_name: 106 logging.error( 107 f"product_name '{product_name}' not found in the config.yaml") 108 exit(1) 109 product_path_dit: Dict[str, str] = dict() # 存储编译产物的类型及目录 110 root_dir = product_dir.get("root") 111 root_dir = os.path.join(project_path, root_dir) 112 relative_dir: Dict[str, str] = product_dir.get("relative") 113 if not relative_dir: 114 logging.warning( 115 f"'relative_dir' of {product_name} not found in the config.yaml") 116 exit(1) 117 # 除了so a hap bin外的全部归到etc里面 118 for k, v in relative_dir.items(): 119 product_path_dit[k] = os.path.join(root_dir, v) 120 # 查找编译产物信息 121 # product_dict格式: {"so": ["a.so", "b.so"]} 122 product_dict: Dict[str, List[str]] = dict() # 存储编译产物的名称 123 for k, v in product_path_dit.items(): 124 if not os.path.exists(v): 125 logging.warning(f"dir '{v}' not exist") 126 product_dict[k] = BasicTool.find_files_with_pattern(v) # v是全路径 127 if product_dir.get("rest"): 128 rest_dir_list: List[str] = os.listdir( 129 root_dir) # 除了配置在relative下之外的所有剩余目录,全部归到etc下 130 for v in relative_dir.values(): 131 if v in rest_dir_list: 132 rest_dir_list.remove(v) 133 for v in relative_dir.values(): 134 if os.sep in v: 135 cls._add_rest_dir(root_dir, str(), v, rest_dir_list) 136 if "etc" not in product_dict.keys(): 137 product_dict["etc"] = list() 138 for r in rest_dir_list: 139 product_dict["etc"].extend( 140 BasicTool.find_files_with_pattern(os.path.join(root_dir, r))) 141 return product_dict 142 143 @classmethod 144 def collect_product_info(cls, product_name: str): 145 logging.info("start scanning compile products") 146 product_dict: Dict[str, List[str]] = cls._find_files(product_name) 147 with open(configs[product_name]["product_infofile"], 'w', encoding='utf-8') as f: 148 json.dump(product_dict, f, indent=4) 149 return product_dict 150 151 @classmethod 152 def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict): 153 size = unit.get("size") 154 if not rom_size_dict.get("size"): # 总大小 155 rom_size_dict["size"] = 0 156 if not rom_size_dict.get(sub): # 子系统大小 157 rom_size_dict[sub]: Dict[str, Dict] = dict() 158 rom_size_dict[sub]["size"] = 0 159 rom_size_dict[sub]["count"] = 0 160 161 if not rom_size_dict.get(sub).get(com): # 部件 162 rom_size_dict.get(sub)[com] = dict() 163 rom_size_dict[sub][com]["filelist"] = list() 164 rom_size_dict[sub][com]["size"] = 0 165 rom_size_dict[sub][com]["count"] = 0 166 167 rom_size_dict[sub][com]["filelist"].append(unit) 168 rom_size_dict[sub][com]["size"] += size 169 rom_size_dict[sub][com]["count"] += 1 170 rom_size_dict[sub]["size"] += size 171 rom_size_dict[sub]["count"] += 1 172 rom_size_dict["size"] += size 173 174 @classmethod 175 def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]: 176 """ 177 直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name""" 178 logging.info(f"fuzzy match: {file_name}") 179 _, base_name = os.path.split(file_name) 180 if base_name.startswith("lib"): 181 base_name = base_name[3:] 182 if base_name.endswith(".a"): 183 base_name = base_name[:base_name.index(".a")] 184 elif base_name.endswith(".z.so"): 185 base_name = base_name[:base_name.index(".z.so")] 186 elif base_name.endswith(".so"): 187 base_name = base_name[:base_name.index(".so")] 188 exclude_dir = configs["black_list"] 189 tbl = [x for x in exclude_dir if os.sep in x] 190 191 def handler(content: Text) -> List[str]: 192 t = list(filter(lambda y: len(y) > 0, list( 193 map(lambda x: x.strip(), content.split("\n"))))) 194 for item in tbl: 195 p = os.path.join(project_path, item) 196 t = list(filter(lambda x: p not in x, t)) 197 return t 198 grep_result: List[str] = BasicTool.grep_ern( 199 base_name, 200 project_path, 201 include="BUILD.gn", 202 exclude=tuple(exclude_dir), 203 post_handler=handler) 204 if filter_path_keyword: 205 tmp = list() 206 for gr in grep_result: 207 for item in filter_path_keyword: 208 if item in gr: 209 continue 210 tmp.append(gr) 211 grep_result = tmp 212 if not grep_result: 213 logging.info(f"fuzzy match failed.") 214 return str(), str(), str() 215 gn_dict: Dict[str, int] = collections.defaultdict(int) 216 for g in grep_result: 217 gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep) 218 gn_dict[gn] += 1 219 gn_file, _ = collections.Counter(gn_dict).most_common(1)[0] 220 for k, v in sub_com_dict.items(): 221 if gn_file.startswith(k): 222 s = v.get("subsystem") 223 c = v.get("component") 224 logging.info( 225 f"fuzzy match success: subsystem_name={s}, component_name={c}") 226 return gn_file, s, c 227 logging.info(f"fuzzy match failed.") 228 return str(), str(), str() 229 230 @classmethod 231 def _save_as_xls(cls, result_dict: Dict, product_name: str) -> None: 232 logging.info("saving as xls...") 233 header = ["subsystem_name", "component_name", 234 "output_file", "size(Byte)"] 235 tmp_dict = copy.deepcopy(result_dict) 236 excel_writer = SimpleExcelWriter("rom") 237 excel_writer.set_sheet_header(headers=header) 238 subsystem_start_row = 1 239 subsystem_end_row = 0 240 subsystem_col = 0 241 component_start_row = 1 242 component_end_row = 0 243 component_col = 1 244 del tmp_dict["size"] 245 for subsystem_name in tmp_dict.keys(): 246 subsystem_dict = tmp_dict.get(subsystem_name) 247 subsystem_size = subsystem_dict.get("size") 248 subsystem_file_count = subsystem_dict.get("count") 249 del subsystem_dict["count"] 250 del subsystem_dict["size"] 251 subsystem_end_row += subsystem_file_count 252 253 for component_name in subsystem_dict.keys(): 254 component_dict: Dict[str, int] = subsystem_dict.get( 255 component_name) 256 component_size = component_dict.get("size") 257 component_file_count = component_dict.get("count") 258 del component_dict["count"] 259 del component_dict["size"] 260 component_end_row += component_file_count 261 262 for fileinfo in component_dict.get("filelist"): 263 file_name = fileinfo.get("file_name") 264 file_size = fileinfo.get("size") 265 excel_writer.append_line( 266 [subsystem_name, component_name, file_name, file_size]) 267 excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col, 268 component_name) 269 component_start_row = component_end_row + 1 270 excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col, 271 subsystem_name) 272 subsystem_start_row = subsystem_end_row + 1 273 output_name: str = configs[product_name]["output_name"] 274 output_name = output_name.replace(".json", ".xls") 275 excel_writer.save(output_name) 276 logging.info("save as xls success.") 277 278 @ classmethod 279 def analysis(cls, product_name: str, product_dict: Dict[str, List[str]]): 280 logging.info("start analyzing...") 281 gn_info_file = configs["gn_info_file"] 282 with open(gn_info_file, 'r', encoding='utf-8') as f: 283 gn_info = json.load(f) 284 query_order: Dict[str, List[str] 285 ] = configs[product_name]["query_order"] 286 query_order["etc"] = configs["target_type"] # etc会查找所有的template 287 rom_size_dict: Dict = dict() 288 for t, l in product_dict.items(): 289 for f in l: # 遍历所有文件 290 if os.path.isdir(f): 291 continue 292 find_flag = False 293 type_list = query_order.get(t) 294 _, base_name = os.path.split(f) 295 size = os.path.getsize(f) 296 if not type_list: 297 logging.warning( 298 f"'{t}' not found in query_order of the config.yaml") 299 break 300 for tn in type_list: # tn example: ohos_shared_library 301 if find_flag: # 如果已经在前面的template中找到了,后面的就不必再查找 302 break 303 output_dict: Dict[str, Dict] = gn_info.get( 304 tn) # 这个模板对应的所有可能编译产物 305 if not output_dict: 306 logging.warning( 307 f"'{tn}' not found in the {gn_info_file}") 308 continue 309 d = output_dict.get(base_name) 310 if not d: 311 continue 312 d["size"] = size 313 d["file_name"] = f.replace(project_path, "") 314 cls._put(d["subsystem_name"], 315 d["component_name"], d, rom_size_dict) 316 find_flag = True 317 if not find_flag: # 如果指定序列中的template都没有查找到,则模糊匹配 318 # fuzzy match 319 psesudo_gn, sub, com = cls._fuzzy_match(f) 320 if sub and com: 321 cls._put(sub, com, { 322 "subsystem_name": sub, 323 "component_name": com, 324 "psesudo_gn_path": psesudo_gn, 325 "description": "fuzzy match", 326 "file_name": f.replace(project_path, ""), 327 "size": size, 328 }, rom_size_dict) 329 find_flag = True 330 if not find_flag: # 模糊匹配都没有匹配到的,归属到NOTFOUND 331 cls._put("NOTFOUND", "NOTFOUND", { 332 "file_name": f.replace(project_path, ""), 333 "size": size, 334 }, rom_size_dict) 335 with open(configs[product_name]["output_name"], 'w', encoding='utf-8') as f: 336 json.dump(rom_size_dict, f, indent=4) 337 cls._save_as_xls(rom_size_dict, product_name) 338 logging.info("success") 339 340 341def main(): 342 if recollect_gn: 343 RomAnalysisTool.collect_gn_info() 344 product_dict: Dict[str, List[str] 345 ] = RomAnalysisTool.collect_product_info(product_name) 346 RomAnalysisTool.analysis(product_name, product_dict) 347 348 349if __name__ == "__main__": 350 main() 351