#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2025 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 运行环境: python 3.10+, pytest import hashlib import json import os import re import shutil import subprocess import sys import time import tempfile import functools import logging import socket import platform import pytest import importlib logger = logging.getLogger(__name__) class GP(object): """ Global Parameters customize here !!! """ hdc_exe = "hdc" local_path = "resource" remote_path = "data/local/tmp" remote_dir_path = "data/local/tmp/it_send_dir" remote_ip = "auto" remote_port = 8710 hdc_head = "hdc" device_name = "" targets = [] tmode = "usb" changed_testcase = "n" testcase_path = "ts_windows.csv" loaded_testcase = 0 hdcd_rom = "not checked" debug_app = "com.example.myapplication" @classmethod def init(cls): if os.path.exists(".hdctester.conf"): cls.load() cls.start_host() cls.list_targets() else: cls.set_options() cls.print_options() cls.start_host() cls.dump() return @classmethod def start_host(cls): cmd = f"{cls.hdc_exe} start" res = subprocess.call(cmd.split()) return res @classmethod def list_targets(cls): try: targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() except (OSError, IndexError): targets = [b"failed to auto detect device"] cls.targets = [targets[0].decode()] return False cls.targets = [t.decode() for t in targets] return True @classmethod def get_device(cls): cls.start_host() cls.list_targets() if len(cls.targets) > 1: print("Multiple device detected, please select one:") for i, t in enumerate(cls.targets): print(f"{i+1}. {t}") print("input the nums of the device above:") cls.device_name = cls.targets[int(input()) - 1] else: cls.device_name = cls.targets[0] if cls.device_name == "failed to auto detect device": print("No device detected, please check your device connection") return False elif cls.device_name == "[empty]": print("No hdc device detected.") return False cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" return True @classmethod def dump(cls): try: os.remove(".hdctester.conf") except OSError: pass content = filter( lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) json_str = json.dumps(dict(content)) fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, json_str.encode()) os.close(fd) return True @classmethod def load(cls): if not os.path.exists(".hdctester.conf"): raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]") with open(".hdctester.conf") as f: content = json.load(f) cls.hdc_exe = content.get("hdc_exe") cls.local_path = content.get("local_path") cls.remote_path = content.get("remote_path") cls.remote_ip = content.get("remote_ip") cls.hdc_head = content.get("hdc_head") cls.tmode = content.get("tmode") cls.device_name = content.get("device_name") cls.changed_testcase = content.get("changed_testcase") cls.testcase_path = content.get("testcase_path") cls.loaded_testcase = content.get("load_testcase") return True @classmethod def print_options(cls): info = "HDC Tester Default Options: \n\n" \ + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" print(info) @classmethod def tconn_tcp(cls): res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() if "Connect OK" in res: return True else: return False @classmethod def set_options(cls): if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): cls.hdc_exe = opt if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): cls.local_path = opt if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): cls.remote_path = opt if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): cls.remote_ip = opt if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): cls.remote_port = int(opt) if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): cls.device_name = opt if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): cls.tmode = opt if cls.tmode == "usb": ret = cls.get_device() if ret: print("USB device detected.") elif cls.tconn_tcp(): cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" else: print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") return False return True @classmethod def change_testcase(cls): if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): cls.changed_testcase = opt if opt == "n": return False if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): cls.testcase_path = os.path.join(opt) cls.print_options() return True @classmethod def load_testcase(cls): print("this fuction will coming soon.") return False @classmethod def get_version(cls): version = f"v1.0.6a" return version def pytest_run(): start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) pytest.main() end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) report_dir = os.path.join(os.getcwd(), "reports") report_file = os.path.join(report_dir, f"{report_time}report.html") print(f"Test over, the script version is {GP.get_version()}," f" start at {start_time}, end at {end_time} \n" f"=======>{report_file} is saved. \n" ) input("=======>press [Enter] key to Show logs.") def rmdir(path): try: if sys.platform == "win32": if os.path.isfile(path) or os.path.islink(path): os.remove(path) else: shutil.rmtree(path) else: subprocess.call(f"rm -rf {path}".split()) except OSError: print(f"Error: {path} : cannot remove") pass def get_local_path(path): return os.path.join(GP.local_path, path) def get_remote_path(path): return f"{GP.remote_path}/{path}" def get_local_md5(local): md5_hash = hashlib.md5() with open(local, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): md5_hash.update(byte_block) return md5_hash.hexdigest() def check_shell_any_device(cmd, pattern=None, fetch=False): print(f"\nexecuting command: {cmd}") if pattern: # check output valid print("pattern case") try: output = subprocess.check_output(cmd.split()).decode('utf-8') except UnicodeDecodeError: output = subprocess.check_output(cmd.split()).decode('gbk') res = pattern in output print(f"--> output: {output}") print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") return res, output elif fetch: output = subprocess.check_output(cmd.split()).decode() print(f"--> output: {output}") return True, output else: # check cmd run successfully print("other case") return subprocess.check_call(cmd.split()) == 0, "" def check_shell(cmd, pattern=None, fetch=False, head=None): if head is None: head = GP.hdc_head cmd = f"{head} {cmd}" print(f"\nexecuting command: {cmd}") if pattern: # check output valid output = subprocess.check_output(cmd.split()).decode() res = pattern in output print(f"--> output: {output}") print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") return res elif fetch: output = subprocess.check_output(cmd.split()).decode() print(f"--> output: {output}") return output else: # check cmd run successfully return subprocess.check_call(cmd.split()) == 0 def get_shell_result(cmd, pattern=None, fetch=False): cmd = f"{GP.hdc_head} {cmd}" print(f"\nexecuting command: {cmd}") return subprocess.check_output(cmd.split()).decode() def check_rate(cmd, expected_rate): send_result = get_shell_result(cmd) rate = float(send_result.split("rate:")[1].split("kB/s")[0]) return rate > expected_rate def check_dir(local, remote, is_single_dir=False): def _get_md5sum(remote, is_single_dir=False): if is_single_dir: cmd = f"{GP.hdc_head} shell md5sum {remote}/*" else: cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}' result = subprocess.check_output(cmd.split()).decode() return result def _calculate_md5(file_path): md5 = hashlib.md5() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk) return md5.hexdigest() except PermissionError: return "PermissionError" except FileNotFoundError: return "FileNotFoundError" print("remote:" + remote) output = _get_md5sum(remote) print(output) result = 1 for line in output.splitlines(): if len(line) < 32: # length of MD5 continue expected_md5, file_name = line.split()[:2] if is_single_dir: file_name = file_name.replace(f"{remote}", "") elif GP.remote_path in remote: file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") else: file_name = file_name.split(remote)[1].replace("/", "\\") file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 if is_single_dir: file_path = os.path.join(os.getcwd(), local) + file_name print(file_path) actual_md5 = _calculate_md5(file_path) print(f"Expected: {expected_md5}") print(f"Actual: {actual_md5}") print(f"MD5 matched {file_name}") if actual_md5 != expected_md5: print(f"[Fail]MD5 mismatch for {file_name}") result *= 0 return (result == 1) def _check_file(local, remote, head=None): if head is None: head = GP.hdc_head if remote.startswith("/proc"): local_size = os.path.getsize(local) if local_size > 0: return True else: return False else: cmd = f"shell md5sum {remote}" local_md5 = get_local_md5(local) return check_shell(cmd, local_md5, head=head) def _check_app_installed(bundle, is_shared=False): dump = "dump-shared" if is_shared else "dump" cmd = f"shell bm {dump} -a" return check_shell(cmd, bundle) def check_hdc_targets(): cmd = f"{GP.hdc_head} list targets" print(GP.device_name) return check_shell(cmd, GP.device_name) def check_file_send(local, remote): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file send {local_path} {remote_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_file_recv(remote, local): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file recv {remote_path} {local_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_app_install(app, bundle, args=""): app = os.path.join(GP.local_path, app) install_cmd = f"install {args} {app}" if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): return check_shell(install_cmd, "failed to install bundle") else: return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) def check_app_uninstall(bundle, args=""): uninstall_cmd = f"uninstall {args} {bundle}" return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) def check_app_install_multi(tables, args=""): apps = [] bundles = [] for app, bundle in tables.items() : app = os.path.join(GP.local_path, app) apps.append(app) bundles.append(bundle) apps_str = " ".join(apps) install_cmd = f"install {args} {apps_str}" if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) or (args == "" and 0 == apps_str.count(".hap"))): if not check_shell(install_cmd, "failed to install bundle"): return False else: if not check_shell(install_cmd, "successfully"): return False for bundle in bundles: if not _check_app_installed(bundle, "s" in args): return False return True def check_app_uninstall_multi(tables, args=""): for app, bundle in tables.items() : if not check_app_uninstall(bundle, args): return False return True def check_app_not_exist(app, bundle, args=""): app = os.path.join(GP.local_path, app) install_cmd = f"install {args} {app}" if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): return check_shell(install_cmd, "Error opening file") return False def check_app_dir_not_exist(app, bundle, args=""): app = os.path.join(GP.local_path, app) install_cmd = f"install {args} {app}" return check_shell(install_cmd, "Not any installation package was found") def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args): if head is None: head = GP.hdc_head if cmd.startswith("file"): if not check_shell(cmd, "FileTransfer finish", head=head): return False if cmd.startswith("file send"): local, remote = cmd.split()[-2:] if remote[-1] == '/' or remote[-1] == '\\': remote = f"{remote}{os.path.basename(local)}" else: remote, local = cmd.split()[-2:] if local[-1] == '/' or local[-1] == '\\': local = f"{local}{os.path.basename(remote)}" if "-b" in cmd: mnt_debug_path = "mnt/debug/100/debug_hap/" remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}" if os.path.isfile(local): return _check_file(local, remote, head=head) else: return check_dir(local, remote, is_single_dir=is_single_dir) elif cmd.startswith("install"): bundle = args.get("bundle", "invalid") opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) elif cmd.startswith("uninstall"): bundle = cmd.split()[-1] opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) else: return check_shell(cmd, pattern, head=head, **args) def check_soft_local(local_source, local_soft, remote): cmd = f"file send {local_soft} {remote}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_source, remote) def check_soft_remote(remote_source, remote_soft, local_recv): check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") cmd = f"file recv {remote_soft} {local_recv}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_recv, get_remote_path(remote_source)) def switch_usb(): res = check_hdc_cmd("tmode usb") time.sleep(3) if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" return res def copy_file(src, dst): try: shutil.copy2(src, dst) print(f"File copied successfully from {src} to {dst}") except IOError as e: print(f"Unable to copy file. {e}") except Exception as e: print(f"Unexpected error: {e}") def switch_tcp(): if not GP.remote_ip: # skip tcp check print("!!! remote_ip is none, skip tcp check !!!") return True if GP.remote_ip == "auto": ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) if not ipconf: print("!!! device ip not found, skip tcp check !!!") return True GP.remote_ip = ipconf.split(":")[1].split()[0] print(f"fetch remote ip: {GP.remote_ip}") ret = check_hdc_cmd(f"tmode port {GP.remote_port}") if ret: time.sleep(3) res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" return res def select_cmd(): msg = "1) Proceed tester\n" \ + "2) Customize tester\n" \ + "3) Setup files for transfer\n" \ + "4) Load custom testcase(default unused) \n" \ + "5) Exit\n" \ + ">> " while True: opt = input(msg).strip() if len(opt) == 1 and '1' <= opt <= '5': return opt def gen_file(path, size): index = 0 path = os.path.abspath(path) fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) while index < size: os.write(fd, os.urandom(1024)) index += 1024 os.close(fd) def gen_version_file(path): with open(path, "w") as f: f.write(GP.get_version()) def gen_zero_file(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'0' * size) os.close(fd) def create_file_with_size(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'\0' * size) os.close(fd) def gen_soft_link(): print("generating soft link ...") depth_1_path = os.path.join(GP.local_path, "d1") depth_2_path = os.path.join(GP.local_path, "d1", "d2") if not os.path.exists(os.path.join(GP.local_path, "d1")): os.mkdir(depth_1_path) os.mkdir(depth_2_path) copy_file(os.path.join(GP.local_path, "small"), depth_2_path) try: os.symlink("small", os.path.join(GP.local_path, "soft_small")) except FileExistsError: print("soft_small already exists") except OSError: print("生成soft_small失败,需要使用管理员权限用户执行软链接生成") try: os.symlink("d1", os.path.join(GP.local_path, "soft_dir")) except FileExistsError: print("soft_dir already exists") except OSError: print("生成soft_dir失败,需要使用管理员权限用户执行软链接生成") def gen_file_set(): print("generating empty file ...") gen_file(os.path.join(GP.local_path, "empty"), 0) print("generating small file ...") gen_file(os.path.join(GP.local_path, "small"), 102400) print("generating medium file ...") gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) print("generating large file ...") gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) print("generating text file ...") gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) gen_soft_link() print("generating package dir ...") if not os.path.exists(os.path.join(GP.local_path, "package")): os.mkdir(os.path.join(GP.local_path, "package")) for i in range(1, 6): gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) print("generating deep dir ...") deepth = 4 deep_path = os.path.join(GP.local_path, "deep_dir") if not os.path.exists(deep_path): os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_dir{deep}") if not os.path.exists(deep_path): os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep"), 102400) print("generating dir with file ...") dir_path = os.path.join(GP.local_path, "problem_dir") rmdir(dir_path) os.mkdir(dir_path) gen_file(os.path.join(dir_path, "small2"), 102400) fuzz_count = 47 # 47 is the count that circulated the file transfer data_unit = 1024 # 1024 is the size that circulated the file transfer data_extra = 936 # 936 is the size that cased the extra file transfer for i in range(fuzz_count): create_file_with_size( os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra) print("generating empty dir ...") dir_path = os.path.join(GP.local_path, "empty_dir") rmdir(dir_path) os.mkdir(dir_path) print("generating version file ...") gen_version_file(os.path.join(GP.local_path, "version")) def gen_package_dir(): print("generating app dir ...") dir_path = os.path.join(GP.local_path, "app_dir") if os.path.exists(dir_path): rmdir(dir_path) os.mkdir(dir_path) app = os.path.join(GP.local_path, "AACommand07.hap") dst_dir = os.path.join(GP.local_path, "app_dir") if not os.path.exists(app): print(f"Source file {app} does not exist.") else: copy_file(app, dst_dir) def prepare_source(): version_file = os.path.join(GP.local_path, "version") if os.path.exists(version_file): with open(version_file, "r") as f: version = f.read() if version == GP.get_version(): print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.") return print(f"in prepare {GP.local_path},wait for 2 mins.") current_path = os.getcwd() if os.path.exists(GP.local_path): #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 for file in os.listdir(GP.local_path): if file.endswith(".hap") or file.endswith(".hsp"): continue file_path = os.path.join(GP.local_path, file) rmdir(file_path) else: os.mkdir(GP.local_path) gen_file_set() def add_prepare_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") print("generating recv test dir ...") os.mkdir(recv_dir) def update_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") if not os.path.exists(deep_path): print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") if not os.path.exists(recv_dir): print("generating recv test dir ...") os.mkdir(recv_dir) def load_testcase(): if not GP.load_testcase: print("load testcase failed") return False print("load testcase success") return True def check_library_installation(library_name): try: importlib.metadata.version(library_name) return 0 except importlib.metadata.PackageNotFoundError: print(f"\n\n{library_name} is not installed.\n\n") print(f"try to use command below:") print(f"pip install {library_name}") return 1 def check_subprocess_cmd(cmd, process_num, timeout): for i in range(process_num): p = subprocess.Popen(cmd.split()) try: p.wait(timeout=5) except subprocess.TimeoutExpired: p.kill() def create_file_commands(local, remote, mode, num): if mode == "send": return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] elif mode == "recv": return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] else: return [] def create_dir_commands(local, remote, mode, num): if mode == "send": return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] elif mode == "recv": return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] else: return [] def execute_commands(commands): processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands] while processes: for p in processes: if not handle_process(p, processes, assert_out="FileTransfer finish"): return False return True def handle_process(p, processes, assert_out="FileTransfer finish"): if p.poll() is not None: stdout, stderr = p.communicate(timeout=512) # timeout wait 512s if stderr: print(f"{stderr.decode()}") if stdout: print(f"{stdout.decode()}") if assert_out is not None and stdout.decode().find(assert_out) == -1: return False processes.remove(p) return True def check_files(local, remote, mode, num): res = 1 for i in range(num): if mode == "send": if _check_file(local, f"{remote}_{i}"): res *= 1 else: res *= 0 elif mode == "recv": if _check_file(f"{local}_{i}", f"{remote}_{i}"): res *= 1 else: res *= 0 return res == 1 def check_dirs(local, remote, mode, num): res = 1 for _ in range(num): if mode == "send": end_of_file_name = os.path.basename(local) if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): res *= 1 else: res *= 0 elif mode == "recv": end_of_file_name = os.path.basename(remote) local = os.path.join(local, end_of_file_name) if check_dir(f"{local}", f"{remote}", is_single_dir=True): res *= 1 else: res *= 0 return res == 1 def make_multiprocess_file(local, remote, mode, num, task_type): if num < 1: return False if task_type == "file": commands = create_file_commands(local, remote, mode, num) elif task_type == "dir": commands = create_dir_commands(local, remote, mode, num) else: return False print(commands[0]) if not execute_commands(commands): return False if task_type == "file": return check_files(local, remote, mode, num) elif task_type == "dir": return check_dirs(local, remote, mode, num) else: return False def hdc_get_key(cmd): test_cmd = f"{GP.hdc_head} {cmd}" result = subprocess.check_output(test_cmd.split()).decode() return result def check_hdc_version(cmd, version): def _convert_version_to_hex(_version): parts = _version.split("Ver: ")[1].split('.') hex_version = ''.join(parts) return int(hex_version, 36) expected_version = _convert_version_to_hex(version) cmd = f"{GP.hdc_head} {cmd}" print(f"\nexecuting command: {cmd}") if version is not None: # check output valid output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") real_version = _convert_version_to_hex(output) print(f"--> output: {output}") print(f"--> your local [{version}] is" f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" ) return expected_version <= real_version def check_cmd_time(cmd, pattern, duration, times): if times < 1: print("times should be bigger than 0.") return False if pattern is None: fetchable = True else: fetchable = False start_time = time.time() * 1000 print(f"{cmd} start {start_time}") res = [] for i in range(times): start_in = time.time() * 1000 if pattern is None: subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) elif not check_shell(cmd, pattern, fetch=fetchable): return False start_out = time.time() * 1000 res.append(start_out - start_in) # 计算最大值、最小值和中位数 max_value = max(res) min_value = min(res) median_value = sorted(res)[len(res) // 2] print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") end_time = time.time() * 1000 try: timecost = int(end_time - start_time) / times print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") except ZeroDivisionError: print(f"除数为0") if duration is None: duration = 150 * 1.2 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status return timecost < duration def check_rom(baseline): def _try_get_size(message): try: size = int(message.split('\t')[0]) except ValueError: size = -9999 * 1024 # error size print(f"try get size value error, from {message}") return size if baseline is None: baseline = 2200 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() hdcd_size = _try_get_size(result_hdcd) cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() if "directory" in result_libhdc: cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() if "directory" in result_libhdc64: libhdc_size = 0 else: libhdc_size = _try_get_size(result_libhdc64) else: libhdc_size = _try_get_size(result_libhdc) all_size = hdcd_size + libhdc_size GP.hdcd_rom = all_size if all_size < 0: GP.hdcd_rom = "error" return False else: GP.hdcd_rom = f"{all_size} KB" if all_size > baseline: print(f"rom size is {all_size}, overlimit baseline {baseline}") return False else: print(f"rom size is {all_size}, underlimit baseline {baseline}") return True def run_command_with_timeout(command, timeout): try: result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) return result.stdout.decode(), result.stderr.decode() except subprocess.TimeoutExpired: return "", "Command timed out" except subprocess.CalledProcessError as e: return "", e.stderr.decode() def check_cmd_block(command, pattern, timeout=600): # 启动子进程 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 用于存储子进程的输出 output = "" try: # 读取子进程的输出 output, _ = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.terminate() process.kill() output, _ = process.communicate(timeout=timeout) print(f"--> output: {output}") if pattern in output: return True else: return False def check_version(version): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version): print("version does not match, ignore this case") pytest.skip("Version does not match, test skipped.") return func(*args, **kwargs) return wrapper return decorator @pytest.fixture(scope='class', autouse=True) def load_gp(request): GP.load() class ConfigFileNotFoundException(Exception): """配置文件未找到异常""" pass def get_hdcd_pss(): mem_string = get_shell_result("shell hidumper --mem `pidof hdcd`") print(f"--> hdcd mem: \n{mem_string}") pss_string = get_shell_result("shell hidumper --mem `pidof hdcd` | grep Total") if "Total" in pss_string: pss_value = int(re.sub(r"\s+", " ", pss_string.split('\n')[1]).split(' ')[2]) else: print("error: can't get pss value, message:%s", pss_string) pss_value = 0 return pss_value def get_hdcd_fd_count(): sep = '/' fd_string = get_shell_result(f"shell ls {sep}proc/`pidof hdcd`/fd | wc -l") print(f"--> hdcd fd cound: {fd_string}") try: end_symbol = get_end_symbol() fd_value = int(fd_string.split(end_symbol)[0]) except ValueError: fd_value = 0 return fd_value def get_end_symbol(): return sys.platform == 'win32' and "\r\n" or '\n' def get_server_pid_from_file(): is_ohos = "Harmony" in platform.system() if not is_ohos: tmp_dir_path = tempfile.gettempdir() else: tmp_dir_path = os.path.expanduser("~") pid_file = os.path.join(tmp_dir_path, ".HDCServer.pid") with open(pid_file, "r") as f: pid = f.read() try: pid = int(pid) except ValueError: pid = 0 print(f"--> pid of hdcserver is {pid}") return pid def check_unsupport_systems(systems): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): cur_sys = platform.system() for system in systems: if system in cur_sys: print("system not support, ignore this case") pytest.skip("System not support, test skipped.") return func(*args, **kwargs) return wrapper return decorator @check_unsupport_systems(["Harmony"]) def get_cmd_block_output(command, timeout=600): # 启动子进程 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 用于存储子进程的输出 output = "" try: # 读取子进程的输出 output, _ = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.terminate() process.kill() output, _ = process.communicate(timeout=timeout) print(f"--> output: {output}") return output def get_cmd_block_output_and_error(command, timeout=600): print(f"cmd: {command}") # 启动子进程 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 用于存储子进程的输出 output = "" error = "" try: # 读取子进程的输出 output, error = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.terminate() process.kill() output, error = process.communicate(timeout=timeout) print(f"--> output: {output}") print(f"--> error: {error}") return output, error def send_file(conn, file_name): """ socket收发数据相关的功能函数,用于socket收发数据测试 文件发送端发送文件到接收端 1. Sender send file size(bytes string) to receiver 2. The sender waits for the receiver to send back the file size 3. The sender cyclically sends file data to the receiver """ logger.info(f"send_file enter:{file_name}") file_path = os.path.join(GP.local_path, file_name) logger.info(f"send_file file full name:{file_path}") file_size = os.path.getsize(file_path) logger.info(f"send_file file size:{file_size}") # send size conn.send(str(file_size).encode('utf-8')) # recv file size for check logger.info(f"send_file: start recv check size") size_raw = conn.recv(1024) logger.info(f"send_file: check size_raw {size_raw}") if len(size_raw) == 0: logger.error(f"send_file: recv check size len is 0, exit") return file_size_recv = int(size_raw.decode('utf-8')) if file_size_recv != file_size: logger.error(f"send_file: check size failed, file_size_recv:{file_size_recv} file size:{file_size}") return logger.info(f"send_file start send file data") index = 0 with open(file_path, 'rb') as f: while True: one_block = f.read(4096) if not one_block: logger.info(f"send_file index:{index} read 0 block") break conn.send(one_block) index = index + 1 def process_conn(conn, addr): """ socket收发数据相关的功能函数,用于socket收发数据测试 Server client interaction description: 1. client send "get [file_name]" to server 2. server send file size(string) to client 3. client send back size to server 4. server send file data to client """ conn.settimeout(5) # timeout 5 second try: logger.info(f"server accept, addr:{str(addr)}") message = conn.recv(1024) message_str = message.decode('utf-8') logger.info(f"conn recv msg [{len(message_str)}] {message_str}") if len(message) == 0: conn.close() logger.info(f"conn msg len is 0, close {conn} addr:{addr}") return cmds = message_str.split() logger.info(f"conn cmds:{cmds}") cmd = cmds[0] if cmd == "get": # ['get', 'xxxx'] logger.info(f"conn cmd is get, file name:{cmds[1]}") file_name = cmds[1] send_file(conn, file_name) logger.info(f"conn normal close") except socket.timeout: logger.info(f"conn:{conn} comm timeout, addr:{addr}") except ConnectionResetError: logger.info(f"conn:{conn} ConnectionResetError, addr:{addr}") conn.close() def server_loop(port_num): """ socket收发数据相关的功能函数,用于socket收发数据测试 服务端,每次监听,接入连接,收发数据结束后关闭监听 """ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('localhost', port_num)) server_socket.listen() logger.info(f"server start listen {port_num}") server_socket.settimeout(10) # timeout 10 second try: conn, addr = server_socket.accept() process_conn(conn, addr) except socket.timeout: logger.error(f"server accept timeout, port:{port_num}") server_socket.close() logger.info(f"server exit, port:{port_num}") def recv_file_data(client_socket, file_path, file_size): """ socket收发数据相关的功能函数,用于socket收发数据测试 """ logger.info(f"client: start recv file data, file size:{file_size}, file path:{file_path}") with open(file_path, 'wb') as f: recv_size = 0 while recv_size < file_size: one_block = client_socket.recv(4096) if not one_block: logger.info(f"client: read block size 0, exit") break f.write(one_block) recv_size += len(one_block) logger.info(f"client: recv file data finished, recv size:{recv_size}") def client_get_file(port_num, file_name, file_save_name): """ socket收发数据相关的功能函数,用于socket收发数据测试 """ logger.info(f"client: connect port:{port_num}") client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.settimeout(10) # timeout 10 second try: client_socket.connect(('localhost', port_num)) except socket.timeout: logger.info(f"client connect timeout, port:{port_num}") return try: cmd = f"get {file_name}" logger.info(f"client: send cmd:{cmd}") client_socket.send(cmd.encode('utf-8')) # recv file size size_raw = client_socket.recv(1024) logger.info(f"client: recv size_raw {size_raw}") if len(size_raw) == 0: logger.info(f"client: cmd:{cmd} recv size_raw len is 0, exit") return file_size = int(size_raw.decode('utf-8')) logger.info(f"client: file size {file_size}") file_path = os.path.join(GP.local_path, file_save_name) if os.path.exists(file_path): logger.info(f"client: file {file_path} exist, delete") try: os.remove(file_path) except OSError as error: logger.info(f"delete {file_path} failed: {error.strerror}") # send size msg to client for check logger.info(f"client: Send back file size:{size_raw}") client_socket.send(size_raw) recv_file_data(client_socket, file_path, file_size) except socket.timeout: logger.error(f"client communication timeout, port:{port_num}") return finally: logger.info("client socket close") client_socket.close() logger.info("client exit")