#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import json import stat import subprocess import time from subprocess import Popen, PIPE, STDOUT, TimeoutExpired def logger(content, level): """ æ—¥å¿—æ‰“å° :param content:日志内容 :param level: 日志ç‰çº§ :return: """ create_time = "{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) print("[{}] [{}] [{}]".format(create_time, level, content)) def json_parse(json_file): """ json文件解æžä¸ºjson对象 :param json_file:json文件 :return:json对象 """ if os.path.exists(json_file): with open(json_file, "r") as jf: return json.load(jf) logger("{} not exist.".format(json_file), "ERROR") return {} def get_product_name(root_path): """ 从ohos_config.jsonä¸èŽ·å–编译产物路径 :param root_path: ohos_config.json所在的目录 :return: 编译产é‡ç”Ÿæˆçš„路径 """ ohos_config = os.path.join(root_path, "out", "ohos_config.json") json_obj = json_parse(ohos_config) if json_obj: product_name = json_obj["out_path"].split("out")[1].strip("/") return product_name logger("{} not exist.".format(ohos_config), "ERROR") return "" def get_target_cpu(root_path): """ 从ohos_config.jsonä¸èŽ·å–编译cpu :param root_path: ohos_config.json所在的目录 :return: 编译产é‡ç”Ÿæˆçš„路径 """ ohos_config = os.path.join(root_path, "out", "ohos_config.json") json_obj = json_parse(ohos_config) if json_obj: target_cpu = json_obj["target_cpu"] return target_cpu logger("{} not exist.".format(ohos_config), "ERROR") return "" def shell_command(command_list: list): """ 命令行执行命令 :param command_list:命令傿•°åˆ—表 :return: """ process = Popen(command_list, stdout=PIPE, stderr=STDOUT) try: outs, errs = process.communicate(timeout=900) except TimeoutExpired: process.kill() outs, errs = process.communicate() logger(outs.decode("utf-8").strip(), "INFO") return errs, process.returncode def hdc_command(device_ip, device_port, device_sn, command): """ hdcå¯¹è¿œç¨‹æ˜ å°„çš„è®¾å¤‡æ‰§è¡Œå‘½ä»¤ :param device_ip:è¿œç¨‹æ˜ å°„çš„ip :param device_port:hdcç«¯å£ :param device_sn:设备snå· :param command: :return: """ connect_cmd = "hdc -s {}:{} -t {} ".format(device_ip, device_port, device_sn) cmd = connect_cmd + command cmd_list = cmd.split(" ") logger(cmd_list, "INFO") _, exitcode = shell_command(cmd_list) return exitcode def coverage_command(command): """ coverage_command :param command: :return: """ proc = subprocess.Popen(command, shell=True) try: proc.communicate() except subprocess.TimeoutExpired: proc.kill() proc.terminate() def tree_find_file_endswith(path, suffix, file_list=None): """ 获å–目录下所有以指定å—符串结尾的文件 :param path: 需è¦é历的目录 :param suffix: åŽç¼€ :param file_list: :return: """ for f in os.listdir(path): full_path = os.path.join(path, f) if os.path.isfile(full_path) and full_path.endswith(suffix): file_list.append(full_path) if os.path.isdir(full_path): tree_find_file_endswith(full_path, suffix, file_list) return file_list class FoundationServer: """ foundation拆分的进程和其对应的so之间的对应关系 """ lib_dict = { "ams": ["libabilityms.z.so", "libdataobsms.z.so", "libupms.z.so", "libappms.z.so"], "bms": ["libbms.z.so"], "call": ["libtel_call_manager.z.so"], "dms": ["libdistributed_ability_manager_svr.z.so"], "fms": ["libfms.z.so"], "notification": ["libcesfwk_services.z.so", "libans.z.so"], "power": ["libbatteryservice.z.so", "libdisplaymgrservice.z.so", "libpowermgrservice.z.so", "libthermalservice.z.so", "libbatterystats_service.z.so"], "state": ["libtel_state_registry.z.so"], "wms": ["libwms.z.so"], "theme": ["libscreenlock_server.z.so"] } def is_elffile(filepath: str) -> bool: """ åˆ¤æ–æ–‡ä»¶æ˜¯å¦äºŒè¿›åˆ¶æ–‡ä»¶ :param filepath: :return: bool """ if not os.path.exists(filepath): logger("{} not exists.".format(filepath), "ERROR") return False try: file_states = os.stat(filepath) file_mode = file_states[stat.ST_MODE] if not stat.S_ISREG(file_mode): return False with open(filepath, "rb") as f: header = (bytearray(f.read(4)[1:4])).decode(encoding="utf-8") if header in ["ELF"]: return True except UnicodeDecodeError as e: logger(e, "ERROR") return False