• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is for rom analyzation of lite/small devices.
17
18"""
191. 先收集BUILD.gn中的target信息
202. 然后根据编译产物到1中进行搜索,匹配其所属的部件
21
22对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件;同时也要考虑install_dir字段
23对于gn原生的template,主要根据bundle.json中的字段来归属其部件
24
25对于找不到的,可以模糊匹配,如,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤
26"""
27
28import sys
29import argparse
30import json
31import logging
32import os
33from typing import Dict, List, Tuple, Text
34import copy
35import preprocess
36from time import time
37from concurrent.futures import ThreadPoolExecutor, Future
38from threading import RLock
39import collections
40
41from config import result_dict, collector_config, configs, \
42    project_path, sub_com_dict, product_name, recollect_gn, baseline, unit_adapt, output_file
43from pkgs.basic_tool import BasicTool, unit_adaptive
44from pkgs.gn_common_tool import GnCommonTool
45from pkgs.simple_excel_writer import SimpleExcelWriter
46from pkgs.rom_ram_baseline_collector import RomRamBaselineCollector
47from misc import gn_lineno_collect
48
49
50class RomAnalysisTool:
51    @classmethod
52    def analysis(cls, product_info: str, product_dict: Dict[str, List[str]], output_file_name: str):
53        """analysis the rom of lite/small product
54
55        Args:
56            product_info (str): product name configured in the yaml
57            product_dict (Dict[str, List[str]]): result dict of compiled product file
58                format:
59                    "bin":[...],
60                    "so":[...]
61                    ...
62        """
63        logging.info("start analyzing...")
64        rom_ram_baseline: Dict[str, Dict] = RomRamBaselineCollector.collect(
65            project_path)
66        with os.fdopen(os.open("rom_ram_baseline.json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w',
67                       encoding='utf-8') as f:
68            json.dump(rom_ram_baseline, f, indent=4)
69        gn_info_file = configs["gn_info_file"]  # filename to save gn_info
70        with open(gn_info_file, 'r', encoding='utf-8') as f:
71            gn_info = json.load(f)
72        query_order: Dict[str, List[str]
73                          ] = configs[product_info]["query_order"]  # query order of the gn template to be matched
74        query_order["etc"] = configs["target_type"]  # etc会查找所有的template
75        rom_size_dict: Dict = dict()
76        if "manual_config" in configs[product_info].keys():
77            cls._match_manual_configured(
78                configs[product_info]["manual_config"], product_dict, configs[product_info]["product_dir"]["root"],
79                rom_size_dict)
80        cls._subsystem_component_for_all_product_file(
81            product_dict, query_order, gn_info, gn_info_file, rom_ram_baseline, rom_size_dict)
82        if unit_adapt:
83            cls._result_unit_adaptive(rom_size_dict)
84        with os.fdopen(os.open(output_file_name + ".json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w',
85                       encoding='utf-8') as f:
86            json.dump(rom_size_dict, f, indent=4)
87        cls._save_as_xls(rom_size_dict, product_info, baseline)
88        logging.info("success")
89
90    @classmethod
91    def collect_gn_info(cls):
92        logging.info("start scanning BUILD.gn")
93        with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool:
94            future_list: List[Future] = list()
95            for c in collector_config:
96                future_list.append(pool.submit(c))
97            for f in future_list:
98                f.result()
99        gn_info_file = configs["gn_info_file"]
100        with os.fdopen(os.open(gn_info_file, os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f:
101            json.dump(result_dict, f, indent=4)
102
103    @classmethod
104    def collect_product_info(cls, product_name: str):
105        logging.info("start scanning compile products")
106        product_dict: Dict[str, List[str]] = cls._find_files(product_name)
107        with os.fdopen(os.open(configs[product_name]["product_infofile"], os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f:
108            json.dump(product_dict, f, indent=4)
109        return product_dict
110
111    @classmethod
112    def _add_rest_dir(cls, top_dir: str, rela_path: str, sub_path: str, dir_list: List[str]) -> None:
113        """
114        :top_dir 顶层目录,不会变化
115        :rela_path 最顶层的值为空
116        :sub_path 一般是a/b/c这种形式
117        :dir_list 相对于原始top目录的子目录的全路径
118        example:
119        /
120        |-a
121        |-b
122        |-c
123        |-|-d
124        |-|-e
125        |-|-f
126        |-|-|-g
127        |-|-|-h
128        top_dir: /
129        rela_path: ""
130        sub_path: c/e
131        dir_list: [c]
132        => [c/d, c/f], assume 'a' and 'b' has been removed from dir_list
133        """
134        if (not sub_path) or (os.sep not in sub_path):
135            return
136        # 将其他目录添加到dir_list
137        t, sub_sub_path = sub_path.split(os.sep, 1)  # 如果是c/e,分割成c,e
138        t = os.path.join(rela_path, t)
139        if t in dir_list:
140            dir_list.remove(t)
141        sub_sub_dir_list = os.listdir(os.path.join(top_dir, t))
142        for ssdl in sub_sub_dir_list:
143            if os.path.join(rela_path, sub_path) != os.path.join(t, ssdl):
144                dir_list.append(os.path.join(t, ssdl))
145        if not sub_sub_dir_list:
146            return
147        cls._add_rest_dir(top_dir, t, sub_sub_path, dir_list)
148
149    @classmethod
150    def _find_files(cls, product_name: str) -> Dict[str, List[str]]:
151        product_dir: Dict[str, Dict] = configs[product_name]["product_dir"]
152        if not product_name:
153            logging.error(
154                f"product_name '{product_name}' not found in the config.yaml")
155            exit(1)
156        product_path_dit: Dict[str, str] = dict()  # 存储编译产物的类型及目录
157        root_dir = product_dir.get("root")
158        root_dir = os.path.join(project_path, root_dir)
159        relative_dir: Dict[str, str] = product_dir.get("relative")
160        if not relative_dir:
161            logging.warning(
162                f"'relative_dir' of {product_name} not found in the config.yaml")
163            exit(1)
164        # 除了so a hap bin外的全部归到etc里面
165        for k, v in relative_dir.items():
166            product_path_dit[k] = os.path.join(root_dir, v)
167        # 查找编译产物信息
168        # product_dict格式: {"so": ["a.so", "b.so"]}
169        product_dict: Dict[str, List[str]] = dict()  # 存储编译产物的名称
170        for k, v in product_path_dit.items():
171            if not os.path.exists(v):
172                logging.warning(f"dir '{v}' not exist")
173            product_dict[k] = BasicTool.find_files_with_pattern(v)  # v是全路径
174        if product_dir.get("rest"):
175            rest_dir_list: List[str] = os.listdir(
176                root_dir)  # 除了配置在relative下之外的所有剩余目录,全部归到etc下
177            for v in relative_dir.values():
178                if v in rest_dir_list:
179                    rest_dir_list.remove(v)
180            for v in relative_dir.values():
181                if os.sep in v:
182                    cls._add_rest_dir(root_dir, str(), v, rest_dir_list)
183            if "etc" not in product_dict.keys():
184                product_dict["etc"] = list()
185            for r in rest_dir_list:
186                product_dict["etc"].extend(
187                    BasicTool.find_files_with_pattern(os.path.join(root_dir, r)))
188        return product_dict
189
190    @classmethod
191    def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict, com_size_baseline: str = str()):
192        size = unit.get("size")
193        if not rom_size_dict.get("size"):  # 总大小
194            rom_size_dict["size"] = 0
195        if not rom_size_dict.get(sub):  # 子系统大小
196            rom_size_dict[sub]: Dict[str, Dict] = dict()
197            rom_size_dict[sub]["size"] = 0
198            rom_size_dict[sub]["count"] = 0
199
200        if not rom_size_dict.get(sub).get(com):  # 部件
201            rom_size_dict.get(sub)[com] = dict()
202            rom_size_dict[sub][com]["filelist"] = list()
203            rom_size_dict[sub][com]["size"] = 0
204            rom_size_dict[sub][com]["count"] = 0
205
206        if (sub != "NOTFOUND" and sub != "UNDEFINED" and com != "NOTFOUND" and com != "UNDEFINED") \
207                and (not rom_size_dict.get(sub).get(com).get("baseline")) and baseline:
208            rom_size_dict[sub][com]["baseline"] = com_size_baseline
209
210        rom_size_dict[sub][com]["filelist"].append(unit)
211        rom_size_dict[sub][com]["size"] += size
212        rom_size_dict[sub][com]["count"] += 1
213        rom_size_dict[sub]["size"] += size
214        rom_size_dict[sub]["count"] += 1
215        rom_size_dict["size"] += size
216
217    @classmethod
218    def _fuzzy_match(cls, file_name: str, filter_path_keyword: Tuple[str] = tuple()) -> Tuple[str, str, str]:
219        """
220        TODO 应当先遍历gn_info进行匹配
221        直接grep,利用出现次数最多的BUILD.gn去定位subsystem_name和component_name"""
222        logging.info(f"fuzzy match: {file_name}")
223        _, base_name = os.path.split(file_name)
224        if base_name.startswith("lib"):
225            base_name = base_name[3:]
226        if base_name.endswith(".a"):
227            base_name = base_name[:base_name.index(".a")]
228        elif base_name.endswith(".z.so"):
229            base_name = base_name[:base_name.index(".z.so")]
230        elif base_name.endswith(".so"):
231            base_name = base_name[:base_name.index(".so")]
232        exclude_dir = configs["black_list"]
233        tbl = [x for x in exclude_dir if os.sep in x]
234
235        def handler(content: Text) -> List[str]:
236            t = list(filter(lambda y: len(y) > 0, list(
237                map(lambda x: x.strip(), content.split("\n")))))
238            for item in tbl:
239                p = os.path.join(project_path, item)
240                t = list(filter(lambda x: p not in x, t))
241            return t
242
243        grep_result: List[str] = BasicTool.grep_ern(
244            base_name,
245            project_path,
246            include="BUILD.gn",
247            exclude=tuple(exclude_dir),
248            post_handler=handler)
249        if filter_path_keyword:
250            tmp = list()
251            for gr in grep_result:
252                for item in filter_path_keyword:
253                    if item in gr:
254                        continue
255                    tmp.append(gr)
256            grep_result = tmp
257        if not grep_result:
258            logging.info(f"fuzzy match failed.")
259            return str(), str(), str()
260        gn_dict: Dict[str, int] = collections.defaultdict(int)
261        for g in grep_result:
262            gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep)
263            gn_dict[gn] += 1
264        gn_file, _ = collections.Counter(gn_dict).most_common(1)[0]
265        for k, v in sub_com_dict.items():
266            if gn_file.startswith(k):
267                s = v.get("subsystem")
268                c = v.get("component")
269                logging.info(
270                    f"fuzzy match success: subsystem_name={s}, component_name={c}")
271                return gn_file, s, c
272        logging.info(f"fuzzy match failed.")
273        return str(), str(), str()
274
275    @classmethod
276    def _get_one_line(cls, baseline_info, subsystem_name, component_name, component_baseline, file_name, file_size):
277        if baseline_info:
278            return [subsystem_name, component_name,
279                    component_baseline, file_name, file_size]
280        else:
281            return [subsystem_name, component_name,
282                    file_name, file_size]
283
284    @classmethod
285    def _save_as_xls(cls, result_dict_info: Dict, product_name_info: str, baseline_info: bool) -> None:
286        logging.info("saving as xls...")
287        header = ["subsystem_name", "component_name", "output_file", "size(Byte)"]
288        if baseline_info:
289            header = ["subsystem_name", "component_name", "baseline", "output_file", "size(Byte)"]
290        tmp_dict = dict()
291        for key in result_dict_info.keys():
292            tmp_dict[key] = result_dict_info[key]
293        excel_writer = SimpleExcelWriter("rom")
294        excel_writer.set_sheet_header(headers=header)
295        (subsystem_start_row, subsystem_end_row, subsystem_col, component_start_row, component_end_row,
296         component_col, baseline_col) = (1, 0, 0, 1, 0, 1, 2)
297        if "size" in tmp_dict.keys():
298            del tmp_dict["size"]
299        for subsystem_name in tmp_dict.keys():
300            subsystem_dict = tmp_dict.get(subsystem_name)
301            subsystem_size = subsystem_dict.get("size")
302            subsystem_file_count = subsystem_dict.get("count")
303            del subsystem_dict["count"]
304            del subsystem_dict["size"]
305            subsystem_end_row += subsystem_file_count
306
307            for component_name in subsystem_dict.keys():
308                component_dict: Dict[str, int] = subsystem_dict.get(
309                    component_name)
310                component_size = component_dict.get("size")
311                component_file_count = component_dict.get("count")
312                component_baseline = component_dict.get("baseline")
313                if component_baseline:
314                    del component_dict["baseline"]
315                del component_dict["count"]
316                del component_dict["size"]
317                component_end_row += component_file_count
318
319                for fileinfo in component_dict.get("filelist"):
320                    file_name = fileinfo.get("file_name")
321                    file_size = fileinfo.get("size")
322                    line = cls._get_one_line(baseline_info, subsystem_name, component_name, component_baseline,
323                                             file_name, file_size)
324                    excel_writer.append_line(line)
325                excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
326                                         component_name)
327                if baseline_info:
328                    excel_writer.write_merge(component_start_row, baseline_col, component_end_row, baseline_col,
329                                             component_baseline)
330                component_start_row = component_end_row + 1
331            excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
332                                     subsystem_name)
333            subsystem_start_row = subsystem_end_row + 1
334        output_name: str = configs[product_name_info]["output_name"]
335        output_name = output_name.replace(".json", ".xls")
336        excel_writer.save(output_name)
337        logging.info("save as xls success.")
338
339    @classmethod
340    def _result_unit_adaptive(cls, output_result_dict: Dict[str, Dict]) -> None:
341        total_size = unit_adaptive(output_result_dict["size"])
342        del output_result_dict["size"]
343        for subsystem_name, subsystem_info in output_result_dict.items():
344            sub_size = unit_adaptive(subsystem_info["size"])
345            count = subsystem_info["count"]
346            del subsystem_info["size"]
347            del subsystem_info["count"]
348            for component_name, component_info in subsystem_info.items():
349                component_info["size"] = unit_adaptive(component_info["size"])
350            subsystem_info["size"] = sub_size
351            subsystem_info["count"] = count
352        output_result_dict["size"] = total_size
353
354    @classmethod
355    def _match_manual_configured(cls, manual_config_info: Dict[str, Dict], compiled_files: Dict[str, List],
356                                 compiled_root_path: str, output_result_dict: Dict[str, Dict]) -> None:
357        for file_path, file_info in manual_config_info.items():
358            full_path = os.path.join(
359                project_path, compiled_root_path, file_path)
360            if not os.path.isfile(full_path):
361                logging.warning(f"config error: {file_path} is not a file.")
362                continue
363            file_info["size"] = os.path.getsize(full_path)
364            file_info["file_name"] = full_path
365            cls._put(file_info["subsystem"],
366                     file_info["component"], file_info, output_result_dict)
367            for _, v in compiled_files.items():
368                if full_path not in v:
369                    continue
370                index = v.index(full_path)
371                del v[index]
372                break
373
374    @classmethod
375    def _iterate_all_template_type(cls, type_list: List[str], gn_info: Dict, gn_info_file: str, base_name: str,
376                                   rom_ram_baseline: Dict, rom_size_dict: Dict, f: str, size: int):
377        find_flag = False
378        component_rom_baseline = None
379        for tn in type_list:  # tn example: ohos_shared_library
380            if find_flag:  # 如果已经在前面的template中找到了,后面的就不必再查找
381                break
382            output_dict: Dict[str, Dict] = gn_info.get(
383                tn)  # 这个模板对应的所有可能编译产物
384            if not output_dict:
385                logging.warning(
386                    f"'{tn}' not found in the {gn_info_file}")
387                continue
388            d = output_dict.get(base_name)
389            if not d:
390                continue
391            d["size"] = size
392            d["file_name"] = f.replace(project_path, "")
393            if rom_ram_baseline.get(d["subsystem_name"]) and rom_ram_baseline.get(d["subsystem_name"]).get(
394                    d["component_name"]):
395                component_rom_baseline = rom_ram_baseline.get(
396                    d["subsystem_name"]).get(d["component_name"]).get("rom")
397            cls._put(d["subsystem_name"],
398                     d["component_name"], d, rom_size_dict, component_rom_baseline)
399            find_flag = True
400        if not find_flag:  # 如果指定序列中的template都没有查找到,则模糊匹配
401            # fuzzy match
402            psesudo_gn, sub, com = cls._fuzzy_match(f)
403            if sub and com:
404                if rom_ram_baseline.get(sub) and rom_ram_baseline.get(sub).get(com):
405                    component_rom_baseline = rom_ram_baseline.get(
406                        sub).get(com).get("baseline")
407                cls._put(sub, com, {
408                    "subsystem_name": sub,
409                    "component_name": com,
410                    "psesudo_gn_path": psesudo_gn,
411                    "description": "fuzzy match",
412                    "file_name": f.replace(project_path, ""),
413                    "size": size,
414                }, rom_size_dict, component_rom_baseline)
415                find_flag = True
416        if not find_flag:  # 模糊匹配都没有匹配到的,归属到NOTFOUND
417            cls._put("NOTFOUND", "NOTFOUND", {
418                "file_name": f.replace(project_path, ""),
419                "size": size,
420            }, rom_size_dict)
421
422    @classmethod
423    def _subsystem_component_for_all_product_file(cls, product_dict: Dict[str, List[str]],
424                                                  query_order: Dict[str, List[str]],
425                                                  gn_info: Dict, gn_info_file: str, rom_ram_baseline: Dict,
426                                                  rom_size_dict: Dict):
427        for t, l in product_dict.items():
428            for f in l:  # 遍历所有文件
429                if os.path.isdir(f):
430                    continue
431                type_list = query_order.get(t)
432                _, base_name = os.path.split(f)
433                size = os.path.getsize(f)
434                if not type_list:
435                    logging.warning(
436                        f"'{t}' not found in query_order of the config.yaml")
437                    break
438                cls._iterate_all_template_type(
439                    type_list, gn_info, gn_info_file, base_name, rom_ram_baseline, rom_size_dict, f, size)
440
441
442def main():
443    if recollect_gn:
444        RomAnalysisTool.collect_gn_info()
445    product_dict: Dict[str, List[str]
446    ] = RomAnalysisTool.collect_product_info(product_name)
447    RomAnalysisTool.analysis(product_name, product_dict, output_file)
448
449
450if __name__ == "__main__":
451    main()
452