#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import subprocess import sys import time import traceback import zipfile import json5 import performance_config class PerformanceBuild(): def __init__(self, config_input, mail_obj): self.config = None self.first_line_in_avg_excel = "" self.time_avg_dic = {} self.all_time_dic = {} self.size_avg_dic = {} self.all_size_dic = {} self.mail_helper = None self.mail_msg = '' self.mail_helper = mail_obj self.config = config_input self.prj_name = '' self.timeout = 1800 self.error_log_str = '' def start(self): self.init() self.start_test() self.write_mail_msg() os.chdir(self.config.project_path) @staticmethod def append_into_dic(key, value, dic): if key not in dic: dic[key] = [] dic[key].append(value) def init(self): if self.config.ide == performance_config.IdeType.DevEco: os.environ['path'] = self.config.node_js_path + ";" + os.environ['path'] os.chdir(self.config.project_path) os.environ['path'] = os.path.join(self.config.jbr_path, "bin") + ";" + os.environ['path'] os.environ['JAVA_HOME'] = self.config.jbr_path self.config.cmd_prefix = os.path.join(self.config.project_path, self.config.cmd_prefix) self.config.debug_package_path = os.path.join(self.config.project_path, self.config.debug_package_path) self.config.release_package_path = os.path.join(self.config.project_path, self.config.release_package_path) self.config.incremental_code_path = os.path.join(self.config.project_path, self.config.incremental_code_path) self.config.json5_path = os.path.join(self.config.project_path, self.config.json5_path) if self.config.developing_test_data_path: self.config.build_times = 3 else: subprocess.Popen((self.config.cmd_prefix + " --stop-daemon").split(" "), stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) @staticmethod def add_code(code_path, start_pos, end_pos, code_str, lines): with open(code_path, 'r+', encoding='UTF-8') as modified_file: content = modified_file.read() add_str_end_pos = content.find(end_pos) if add_str_end_pos == -1: print(f'Can not find code : {end_pos} in {code_path}, please check config') return add_str_start_pos = content.find(start_pos) if add_str_start_pos == -1: if lines == 0: return add_str_start_pos = add_str_end_pos content_add = "" for i in range(lines, 0, -1): if "%d" in code_str: content_add = content_add + code_str % i else: content_add = content_add + code_str content = content[:add_str_start_pos] + content_add + content[add_str_end_pos:] modified_file.seek(0) modified_file.write(content) modified_file.truncate() def add_incremental_code(self, lines): PerformanceBuild.add_code(self.config.incremental_code_path, self.config.incremental_code_start_pos, self.config.incremental_code_end_pos, self.config.incremental_code_str, lines) def revert_incremental_code(self): self.add_incremental_code(0) def reset(self): self.first_line_in_avg_excel = "" self.time_avg_dic = {} self.all_time_dic = {} self.size_avg_dic = {} self.all_size_dic = {} self.error_log_str = '' self.revert_incremental_code() def clean_project(self): if not self.config.developing_test_data_path: print(self.config.cmd_prefix + " clean") subprocess.Popen((self.config.cmd_prefix + " clean").split(" "), stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) def get_bytecode_size(self, is_debug): if self.config.developing_test_data_path: # test data for size PerformanceBuild.append_into_dic("ets/mudules.abc rawSize", 44444, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules.abc Compress_size", 33333, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules2.abc rawSize", 44444, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules2.abc Compress_size", 33333, self.all_size_dic) return package_path = self.config.debug_package_path if is_debug else self.config.release_package_path package = zipfile.ZipFile(package_path) extension_name = ".abc" if self.config.ide == performance_config.IdeType.DevEco else ".dex" for info in package.infolist(): if info.filename.endswith(extension_name): name_str1 = info.filename + " rawSize" name_str2 = info.filename + " compress_size" PerformanceBuild.append_into_dic(name_str1, info.file_size, self.all_size_dic) PerformanceBuild.append_into_dic(name_str2, info.compress_size, self.all_size_dic) def collect_build_data(self, is_debug, report_path): event_obj = None with open(report_path, 'r+', encoding='UTF-8') as report: event_obj = json5.load(report)['events'] if not event_obj: raise Exception('Open report json failed') found_error = False for node in event_obj: if node['head']['type'] == "log" and node['additional']['logType'] == 'error': self.error_log_str = self.error_log_str + node['head']['name'] found_error = True if found_error: continue build_time = 0 task_name = node['head']['name'] if node['head']['type'] == "mark": if node['additional']['markType'] == 'history': build_time = (node['body']['endTime'] - node['body']['startTime']) / 1000000000 task_name = "total build cost" else: continue elif node['head']['type'] == "continual": build_time = node['additional']['totalTime'] / 1000000000 else: continue PerformanceBuild.append_into_dic(task_name, build_time, self.all_time_dic) if found_error: raise Exception('Build Failed') self.get_bytecode_size(is_debug) def start_build(self, is_debug): if self.config.developing_test_data_path: # test data self.collect_build_data(is_debug, os.path.join(os.path.dirname(__file__), self.config.developing_test_data_path)) return True reports_before = [] report_dir = '.hvigor/report' if os.path.exists(report_dir): reports_before = os.listdir(report_dir) cmd_suffix = self.config.cmd_debug_suffix if is_debug else self.config.cmd_release_suffix print(self.config.cmd_prefix + cmd_suffix) subprocess.Popen((self.config.cmd_prefix + cmd_suffix).split(" "), stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) report_path = (set(os.listdir(report_dir)) - set(reports_before)).pop() self.collect_build_data(is_debug, os.path.join(report_dir, report_path)) return True def get_millisecond(self, time_string): if self.config.ide != performance_config.IdeType.DevEco and not self.config.developing_test_data_path: return int(time_string) else: cost_time = 0 res = time_string.split(" min ") target_str = "" if len(res) > 1: cost_time = int(res[0]) * 60000 target_str = res[1] else: target_str = res[0] res = target_str.split(" s ") if len(res) > 1: cost_time = cost_time + int(res[0]) * 1000 target_str = res[1] else: target_str = res[0] res = target_str.split(" ms") if len(res) > 1: cost_time = cost_time + int(res[0]) return cost_time def cal_incremental_avg_time(self): self.first_line_in_avg_excel = self.first_line_in_avg_excel + "\n" for key in self.all_time_dic: task_count = len(self.all_time_dic[key]) has_task = True if task_count != 2 * self.config.build_times: if task_count == self.config.build_times: has_task = False else: continue # average of first build sum_build_time = 0 for i in range(0, self.config.build_times): index = i * 2 if not has_task: self.all_time_dic[key].insert(index + 1, 0) sum_build_time = sum_build_time + self.all_time_dic[key][index] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, self.time_avg_dic) # average of incremental build sum_build_time = 0 for i in range(1, len(self.all_time_dic[key]), 2): sum_build_time = sum_build_time + self.all_time_dic[key][i] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, self.time_avg_dic) def cal_incremental_avg_size(self): total_raw_size = [] total_compressed_size = [] for i in range(0, self.config.build_times * 2): total_raw_size.append(0) total_compressed_size.append(0) for key in self.all_size_dic: if "raw" in key: total_raw_size[i] += self.all_size_dic[key][i] else: total_compressed_size[i] += self.all_size_dic[key][i] self.all_size_dic["total_raw_size"] = total_raw_size self.all_size_dic["total_compressed_size"] = total_compressed_size for key in self.all_size_dic: # sizes should be the same, just check full_first_size = self.all_size_dic[key][0] for i in range(0, len(self.all_size_dic[key]), 2): if full_first_size != self.all_size_dic[key][i]: full_first_size = -1 break PerformanceBuild.append_into_dic(key, full_first_size, self.size_avg_dic) incremental_first_size = self.all_size_dic[key][1] for i in range(1, len(self.all_size_dic[key]), 2): if incremental_first_size != self.all_size_dic[key][i]: incremental_first_size = -1 break PerformanceBuild.append_into_dic(key, incremental_first_size, self.size_avg_dic) def cal_incremental_avg(self): self.cal_incremental_avg_time() self.cal_incremental_avg_size() @staticmethod def add_row(context): return rf'