#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2025 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 运行环境: python 3.10+, pytest import hashlib import json import os import re import shutil import subprocess import sys import time import functools import pytest import importlib class GP(object): """ Global Parameters customize here !!! """ hdc_exe = "hdc" local_path = "resource" remote_path = "data/local/tmp" remote_dir_path = "data/local/tmp/it_send_dir" remote_ip = "auto" remote_port = 8710 hdc_head = "hdc" device_name = "" targets = [] tmode = "usb" changed_testcase = "n" testcase_path = "ts_windows.csv" loaded_testcase = 0 hdcd_rom = "not checked" debug_app = "com.example.myapplication" @classmethod def init(cls): if os.path.exists(".hdctester.conf"): cls.load() cls.start_host() cls.list_targets() else: cls.set_options() cls.print_options() cls.start_host() cls.dump() return @classmethod def start_host(cls): cmd = f"{cls.hdc_exe} start" res = subprocess.call(cmd.split()) return res @classmethod def list_targets(cls): try: targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() except (OSError, IndexError): targets = [b"failed to auto detect device"] cls.targets = [targets[0].decode()] return False cls.targets = [t.decode() for t in targets] return True @classmethod def get_device(cls): cls.start_host() cls.list_targets() if len(cls.targets) > 1: print("Multiple device detected, please select one:") for i, t in enumerate(cls.targets): print(f"{i+1}. {t}") print("input the nums of the device above:") cls.device_name = cls.targets[int(input()) - 1] else: cls.device_name = cls.targets[0] if cls.device_name == "failed to auto detect device": print("No device detected, please check your device connection") return False elif cls.device_name == "[empty]": print("No hdc device detected.") return False cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" return True @classmethod def dump(cls): try: os.remove(".hdctester.conf") except OSError: pass content = filter( lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) json_str = json.dumps(dict(content)) fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, json_str.encode()) os.close(fd) return True @classmethod def load(cls): if not os.path.exists(".hdctester.conf"): raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]") with open(".hdctester.conf") as f: content = json.load(f) cls.hdc_exe = content.get("hdc_exe") cls.local_path = content.get("local_path") cls.remote_path = content.get("remote_path") cls.remote_ip = content.get("remote_ip") cls.hdc_head = content.get("hdc_head") cls.tmode = content.get("tmode") cls.device_name = content.get("device_name") cls.changed_testcase = content.get("changed_testcase") cls.testcase_path = content.get("testcase_path") cls.loaded_testcase = content.get("load_testcase") return True @classmethod def print_options(cls): info = "HDC Tester Default Options: \n\n" \ + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" print(info) @classmethod def tconn_tcp(cls): res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() if "Connect OK" in res: return True else: return False @classmethod def set_options(cls): if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): cls.hdc_exe = opt if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): cls.local_path = opt if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): cls.remote_path = opt if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): cls.remote_ip = opt if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): cls.remote_port = int(opt) if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): cls.device_name = opt if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): cls.tmode = opt if cls.tmode == "usb": ret = cls.get_device() if ret: print("USB device detected.") elif cls.tconn_tcp(): cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" else: print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") return False return True @classmethod def change_testcase(cls): if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): cls.changed_testcase = opt if opt == "n": return False if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): cls.testcase_path = os.path.join(opt) cls.print_options() return True @classmethod def load_testcase(cls): print("this fuction will coming soon.") return False @classmethod def get_version(cls): version = f"v1.0.5a" return version def pytest_run(): start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) pytest.main() end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) report_dir = os.path.join(os.getcwd(), "reports") report_file = os.path.join(report_dir, f"{report_time}report.html") print(f"Test over, the script version is {GP.get_version()}," f" start at {start_time}, end at {end_time} \n" f"=======>{report_file} is saved. \n" ) input("=======>press [Enter] key to Show logs.") def rmdir(path): try: if sys.platform == "win32": if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path) else: subprocess.call(f"rm -rf {path}".split()) except OSError: print(f"Error: {path} : cannot remove") pass def get_local_path(path): return os.path.join(GP.local_path, path) def get_remote_path(path): return f"{GP.remote_path}/{path}" def get_local_md5(local): md5_hash = hashlib.md5() with open(local, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): md5_hash.update(byte_block) return md5_hash.hexdigest() def check_shell_any_device(cmd, pattern=None, fetch=False): print(f"\nexecuting command: {cmd}") if pattern: # check output valid print("pattern case") output = subprocess.check_output(cmd.split()).decode() res = pattern in output print(f"--> output: {output}") print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") return res, output elif fetch: output = subprocess.check_output(cmd.split()).decode() print(f"--> output: {output}") return True, output else: # check cmd run successfully print("other case") return subprocess.check_call(cmd.split()) == 0, "" def check_shell(cmd, pattern=None, fetch=False, head=None): if head is None: head = GP.hdc_head cmd = f"{head} {cmd}" print(f"\nexecuting command: {cmd}") if pattern: # check output valid output = subprocess.check_output(cmd.split()).decode() res = pattern in output print(f"--> output: {output}") print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") return res elif fetch: output = subprocess.check_output(cmd.split()).decode() print(f"--> output: {output}") return output else: # check cmd run successfully return subprocess.check_call(cmd.split()) == 0 def get_shell_result(cmd, pattern=None, fetch=False): cmd = f"{GP.hdc_head} {cmd}" print(f"\nexecuting command: {cmd}") return subprocess.check_output(cmd.split()).decode() def check_rate(cmd, expected_rate): send_result = get_shell_result(cmd) rate = float(send_result.split("rate:")[1].split("kB/s")[0]) return rate > expected_rate def check_dir(local, remote, is_single_dir=False): def _get_md5sum(remote, is_single_dir=False): if is_single_dir: cmd = f"{GP.hdc_head} shell md5sum {remote}/*" else: cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}' result = subprocess.check_output(cmd.split()).decode() return result def _calculate_md5(file_path): md5 = hashlib.md5() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk) return md5.hexdigest() except PermissionError: return "PermissionError" except FileNotFoundError: return "FileNotFoundError" print("remote:" + remote) output = _get_md5sum(remote) print(output) result = 1 for line in output.splitlines(): if len(line) < 32: # length of MD5 continue expected_md5, file_name = line.split()[:2] if is_single_dir: file_name = file_name.replace(f"{remote}", "") elif GP.remote_path in remote: file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") else: file_name = file_name.split(remote)[1].replace("/", "\\") file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 if is_single_dir: file_path = os.path.join(os.getcwd(), local) + file_name print(file_path) actual_md5 = _calculate_md5(file_path) print(f"Expected: {expected_md5}") print(f"Actual: {actual_md5}") print(f"MD5 matched {file_name}") if actual_md5 != expected_md5: print(f"[Fail]MD5 mismatch for {file_name}") result *= 0 return (result == 1) def _check_file(local, remote, head=None): if head is None: head = GP.hdc_head if remote.startswith("/proc"): local_size = os.path.getsize(local) if local_size > 0: return True else: return False else: cmd = f"shell md5sum {remote}" local_md5 = get_local_md5(local) return check_shell(cmd, local_md5, head=head) def _check_app_installed(bundle, is_shared=False): dump = "dump-shared" if is_shared else "dump" cmd = f"shell bm {dump} -a" return check_shell(cmd, bundle) def check_hdc_targets(): cmd = f"{GP.hdc_head} list targets" print(GP.device_name) return check_shell(cmd, GP.device_name) def check_file_send(local, remote): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file send {local_path} {remote_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_file_recv(remote, local): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file recv {remote_path} {local_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_app_install(app, bundle, args=""): app = os.path.join(GP.local_path, app) install_cmd = f"install {args} {app}" if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): return check_shell(install_cmd, "failed to install bundle") else: return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) def check_app_uninstall(bundle, args=""): uninstall_cmd = f"uninstall {args} {bundle}" return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) def check_app_install_multi(tables, args=""): apps = [] bundles = [] for app, bundle in tables.items() : app = os.path.join(GP.local_path, app) apps.append(app) bundles.append(bundle) apps_str = " ".join(apps) install_cmd = f"install {args} {apps_str}" if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) or (args == "" and 0 == apps_str.count(".hap"))): if not check_shell(install_cmd, "failed to install bundle"): return False else: if not check_shell(install_cmd, "successfully"): return False for bundle in bundles: if not _check_app_installed(bundle, "s" in args): return False return True def check_app_uninstall_multi(tables, args=""): for app, bundle in tables.items() : if not check_app_uninstall(bundle, args): return False return True def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args): if head is None: head = GP.hdc_head if cmd.startswith("file"): if not check_shell(cmd, "FileTransfer finish", head=head): return False if cmd.startswith("file send"): local, remote = cmd.split()[-2:] if remote[-1] == '/' or remote[-1] == '\\': remote = f"{remote}{os.path.basename(local)}" else: remote, local = cmd.split()[-2:] if local[-1] == '/' or local[-1] == '\\': local = f"{local}{os.path.basename(remote)}" if "-b" in cmd: mnt_debug_path = "mnt/debug/100/debug_hap/" remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}" if os.path.isfile(local): return _check_file(local, remote, head=head) else: return check_dir(local, remote, is_single_dir=is_single_dir) elif cmd.startswith("install"): bundle = args.get("bundle", "invalid") opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) elif cmd.startswith("uninstall"): bundle = cmd.split()[-1] opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) else: return check_shell(cmd, pattern, head=head, **args) def check_soft_local(local_source, local_soft, remote): cmd = f"file send {local_soft} {remote}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_source, remote) def check_soft_remote(remote_source, remote_soft, local_recv): check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") cmd = f"file recv {remote_soft} {local_recv}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_recv, get_remote_path(remote_source)) def switch_usb(): res = check_hdc_cmd("tmode usb") time.sleep(3) if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" return res def copy_file(src, dst): try: shutil.copy2(src, dst) print(f"File copied successfully from {src} to {dst}") except IOError as e: print(f"Unable to copy file. {e}") except Exception as e: print(f"Unexpected error: {e}") def switch_tcp(): if not GP.remote_ip: # skip tcp check print("!!! remote_ip is none, skip tcp check !!!") return True if GP.remote_ip == "auto": ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) if not ipconf: print("!!! device ip not found, skip tcp check !!!") return True GP.remote_ip = ipconf.split(":")[1].split()[0] print(f"fetch remote ip: {GP.remote_ip}") ret = check_hdc_cmd(f"tmode port {GP.remote_port}") if ret: time.sleep(3) res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" return res def select_cmd(): msg = "1) Proceed tester\n" \ + "2) Customize tester\n" \ + "3) Setup files for transfer\n" \ + "4) Load custom testcase(default unused) \n" \ + "5) Exit\n" \ + ">> " while True: opt = input(msg).strip() if len(opt) == 1 and '1' <= opt <= '5': return opt def gen_file(path, size): index = 0 path = os.path.abspath(path) fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) while index < size: os.write(fd, os.urandom(1024)) index += 1024 os.close(fd) def gen_version_file(path): with open(path, "w") as f: f.write(GP.get_version()) def gen_zero_file(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'0' * size) os.close(fd) def create_file_with_size(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'\0' * size) os.close(fd) def gen_soft_link(): print("generating soft link ...") try: os.symlink("small", os.path.join(GP.local_path, "soft_small")) except FileExistsError: print("soft_small already exists") except OSError: print("生成soft_small失败,需要使用管理员权限用户执行软链接生成") def gen_file_set(): print("generating empty file ...") gen_file(os.path.join(GP.local_path, "empty"), 0) print("generating small file ...") gen_file(os.path.join(GP.local_path, "small"), 102400) print("generating medium file ...") gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) print("generating large file ...") gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) print("generating text file ...") gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) gen_soft_link() print("generating package dir ...") if not os.path.exists(os.path.join(GP.local_path, "package")): os.mkdir(os.path.join(GP.local_path, "package")) for i in range(1, 6): gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) print("generating deep dir ...") deepth = 4 deep_path = os.path.join(GP.local_path, "deep_dir") if not os.path.exists(deep_path): os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_dir{deep}") if not os.path.exists(deep_path): os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep"), 102400) print("generating dir with file ...") dir_path = os.path.join(GP.local_path, "problem_dir") rmdir(dir_path) os.mkdir(dir_path) gen_file(os.path.join(dir_path, "small2"), 102400) fuzz_count = 47 # 47 is the count that circulated the file transfer data_unit = 1024 # 1024 is the size that circulated the file transfer data_extra = 936 # 936 is the size that cased the extra file transfer for i in range(fuzz_count): create_file_with_size( os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra) print("generating empty dir ...") dir_path = os.path.join(GP.local_path, "empty_dir") rmdir(dir_path) os.mkdir(dir_path) print("generating version file ...") gen_version_file(os.path.join(GP.local_path, "version")) def gen_package_dir(): print("generating app dir ...") dir_path = os.path.join(GP.local_path, "app_dir") if os.path.exists(dir_path): rmdir(dir_path) os.mkdir(dir_path) app = os.path.join(GP.local_path, "AACommand07.hap") dst_dir = os.path.join(GP.local_path, "app_dir") if not os.path.exists(app): print(f"Source file {app} does not exist.") else: copy_file(app, dst_dir) def prepare_source(): version_file = os.path.join(GP.local_path, "version") if os.path.exists(version_file): with open(version_file, "r") as f: version = f.read() if version == GP.get_version(): print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.") return print(f"in prepare {GP.local_path},wait for 2 mins.") current_path = os.getcwd() if os.path.exists(GP.local_path): #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 for file in os.listdir(GP.local_path): if file.endswith(".hap") or file.endswith(".hsp"): continue file_path = os.path.join(GP.local_path, file) rmdir(file_path) else: os.mkdir(GP.local_path) gen_file_set() def add_prepare_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") print("generating recv test dir ...") os.mkdir(recv_dir) def update_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") if not os.path.exists(deep_path): print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") if not os.path.exists(recv_dir): print("generating recv test dir ...") os.mkdir(recv_dir) def load_testcase(): if not GP.load_testcase: print("load testcase failed") return False print("load testcase success") return True def check_library_installation(library_name): try: importlib.metadata.version(library_name) return 0 except importlib.metadata.PackageNotFoundError: print(f"\n\n{library_name} is not installed.\n\n") print(f"try to use command below:") print(f"pip install {library_name}") return 1 def check_subprocess_cmd(cmd, process_num, timeout): for i in range(process_num): p = subprocess.Popen(cmd.split()) try: p.wait(timeout=5) except subprocess.TimeoutExpired: p.kill() def create_file_commands(local, remote, mode, num): if mode == "send": return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] elif mode == "recv": return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] else: return [] def create_dir_commands(local, remote, mode, num): if mode == "send": return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] elif mode == "recv": return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] else: return [] def execute_commands(commands): processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands] while processes: for p in processes: if not handle_process(p, processes, assert_out="FileTransfer finish"): return False return True def handle_process(p, processes, assert_out="FileTransfer finish"): if p.poll() is not None: stdout, stderr = p.communicate(timeout=512) # timeout wait 512s if stderr: print(f"{stderr.decode()}") if stdout: print(f"{stdout.decode()}") if assert_out is not None and stdout.decode().find(assert_out) == -1: return False processes.remove(p) return True def check_files(local, remote, mode, num): res = 1 for i in range(num): if mode == "send": if _check_file(local, f"{remote}_{i}"): res *= 1 else: res *= 0 elif mode == "recv": if _check_file(f"{local}_{i}", f"{remote}_{i}"): res *= 1 else: res *= 0 return res == 1 def check_dirs(local, remote, mode, num): res = 1 for _ in range(num): if mode == "send": end_of_file_name = os.path.basename(local) if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): res *= 1 else: res *= 0 elif mode == "recv": end_of_file_name = os.path.basename(remote) local = os.path.join(local, end_of_file_name) if check_dir(f"{local}", f"{remote}", is_single_dir=True): res *= 1 else: res *= 0 return res == 1 def make_multiprocess_file(local, remote, mode, num, task_type): if num < 1: return False if task_type == "file": commands = create_file_commands(local, remote, mode, num) elif task_type == "dir": commands = create_dir_commands(local, remote, mode, num) else: return False print(commands[0]) if not execute_commands(commands): return False if task_type == "file": return check_files(local, remote, mode, num) elif task_type == "dir": return check_dirs(local, remote, mode, num) else: return False def hdc_get_key(cmd): test_cmd = f"{GP.hdc_head} {cmd}" result = subprocess.check_output(test_cmd.split()).decode() return result def check_hdc_version(cmd, version): def _convert_version_to_hex(_version): parts = _version.split("Ver: ")[1].split('.') hex_version = ''.join(parts) return int(hex_version, 16) expected_version = _convert_version_to_hex(version) cmd = f"{GP.hdc_head} -v" print(f"\nexecuting command: {cmd}") if version is not None: # check output valid output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") real_version = _convert_version_to_hex(output) print(f"--> output: {output}") print(f"--> your local [{version}] is" f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" ) return expected_version <= real_version def check_cmd_time(cmd, pattern, duration, times): if times < 1: print("times should be bigger than 0.") return False if pattern is None: fetchable = True else: fetchable = False start_time = time.time() * 1000 print(f"{cmd} start {start_time}") res = [] for i in range(times): start_in = time.time() * 1000 if pattern is None: subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) elif not check_shell(cmd, pattern, fetch=fetchable): return False start_out = time.time() * 1000 res.append(start_out - start_in) # 计算最大值、最小值和中位数 max_value = max(res) min_value = min(res) median_value = sorted(res)[len(res) // 2] print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") end_time = time.time() * 1000 try: timecost = int(end_time - start_time) / times print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") except ZeroDivisionError: print(f"除数为0") if duration is None: duration = 150 * 1.2 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status return timecost < duration def check_rom(baseline): def _try_get_size(message): try: size = int(message.split('\t')[0]) except ValueError: size = -9999 * 1024 # error size print(f"try get size value error, from {message}") return size if baseline is None: baseline = 2200 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() hdcd_size = _try_get_size(result_hdcd) cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() if "directory" in result_libhdc: cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() if "directory" in result_libhdc64: libhdc_size = 0 else: libhdc_size = _try_get_size(result_libhdc64) else: libhdc_size = _try_get_size(result_libhdc) all_size = hdcd_size + libhdc_size GP.hdcd_rom = all_size if all_size < 0: GP.hdcd_rom = "error" return False else: GP.hdcd_rom = f"{all_size} KB" if all_size > baseline: print(f"rom size is {all_size}, overlimit baseline {baseline}") return False else: print(f"rom size is {all_size}, underlimit baseline {baseline}") return True def run_command_with_timeout(command, timeout): try: result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) return result.stdout.decode(), result.stderr.decode() except subprocess.TimeoutExpired: return "", "Command timed out" except subprocess.CalledProcessError as e: return "", e.stderr.decode() def check_cmd_block(command, pattern, timeout=600): # 启动子进程 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 用于存储子进程的输出 output = "" try: # 读取子进程的输出 output, _ = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.terminate() process.kill() output, _ = process.communicate(timeout=timeout) print(f"--> output: {output}") if pattern in output: return True else: return False def check_version(version): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version): print("version does not match, ignore this case") pytest.skip("Version does not match, test skipped.") return func(*args, **kwargs) return wrapper return decorator @pytest.fixture(scope='class', autouse=True) def load_gp(request): GP.load() class ConfigFileNotFoundException(Exception): """配置文件未找到异常""" pass