#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2025 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ 运行环境: python 3.10+ 测试方法: 方法1、使用pytest框架执行 请见测试用例根目录test\scripts下readme.md中的执行稳定性用例小节 方法2、直接执行 进入test\scripts目录,执行python .\testModule\stability_utils.py 测试范围 hdc shell hdc file send/recv hdc fport 文件传输 多session场景测试(hdc tmode + hdc tconn) 注意: 1、当前仅支持一台设备的压力测试 2、如果异常退出,请执行 hdc -t [usb_connect_key] tmode port close,关闭网络接口通道 执行 hdc list targets查看是否还有网络连接,如果有请执行hdc tconn 127.0.0.1:port -remove断开 执行hdc fport ls查看是否还有端口转发的规则,如果有请执行hdc fport rm tcp:xxxxx tcp:9999删除转发 """ import subprocess import os import logging import logging.config import time from datetime import datetime from enum import Enum import threading import uuid import hashlib import queue from utils import server_loop from utils import client_get_file from utils import get_local_md5 STABILITY_TEST_VERSION = "v1.0.0" TIME_REPORT_FORMAT = '%Y-%m-%d %H:%M:%S' TIME_FILE_FORMAT = '%Y%m%d_%H%M%S' TEST_RESULT = False WORK_DIR = os.getcwd() # 消息队列中使用到的key名称 # 报告公共信息key名称 REPORT_PUBLIC_INFO_NAME = "public_info" # 报告公共信息中的错误信息key名称 TASK_ERROR_KEY_NAME = "task_error" # 资源文件相对路径 RESOURCE_RELATIVE_PATH = "resource" # 报告文件相对路径 REPORTS_RELATIVE_PATH = "reports" # 缓存查询到的设备sn DEVICE_LIST = [] # 记录当前启动的session类信息 SESSION_LIST = [] # 释放资源命令列表 RELEASE_TCONN_CMD_LIST = [] RELEASE_FPORT_CMD_LIST = [] RESOURCE_PATH = os.path.join(WORK_DIR, RESOURCE_RELATIVE_PATH) EXIT_EVENT = threading.Event() TEST_THREAD_MSG_QUEUE = queue.Queue() def get_time_str(fmt=TIME_REPORT_FORMAT, need_ms=False): now_time = datetime.now() if need_ms is False: return now_time.strftime(fmt) else: return f"{now_time.strftime(fmt)}.{now_time.microsecond // 1000:03d}" TEST_FILE_TIME_STR = get_time_str(TIME_FILE_FORMAT) LOG_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_{TEST_FILE_TIME_STR}.log") REPORT_FILE_NAME = os.path.join(REPORTS_RELATIVE_PATH, f"hdc_stability_test_report_{TEST_FILE_TIME_STR}.html") logger = None # 配置日志记录 logging_config_info = { 'version': 1, 'formatters': { 'format1': { 'format': '[%(asctime)s %(name)s %(funcName)s %(lineno)d %(levelname)s][%(process)d][%(thread)d]' '[%(threadName)s]%(message)s' }, }, 'handlers': { 'console_handle': { 'level': 'DEBUG', 'formatter': 'format1', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', }, 'file_handle': { 'level': 'DEBUG', 'formatter': 'format1', 'class': 'logging.FileHandler', 'filename': LOG_FILE_NAME, }, }, 'loggers': { '': { 'handlers': ['console_handle', 'file_handle'], 'level': 'DEBUG', }, }, } class TestType(Enum): SHELL = 1 FILE_SEND = 2 FILE_RECV = 3 FPORT_TRANS_FILE = 4 # 当前测试任务并行运行的数量 TASK_COUNT_DEFAULT = 5 # 循环测试次数 LOOP_COUNT_DEFAULT = 20 # 每个循环结束的等待时间,单位秒 LOOP_DELAY_S_DEFAULT = 0.01 # 进行多session测试时,启动多个端口转发到设备侧的tmode监听的端口,电脑端开启的端口列表 # 参考端口配置:12345, 12346, 12347, 12348, 12349,需要几个session,可以复制几个放入下面的MUTIL_SESSION_TEST_PC_PORT_LIST中 MUTIL_SESSION_TEST_PC_PORT_LIST = [] # 多session测试, 通过tmode命令,设备端切换tcp模式,监听的端口号 DEVICE_LISTEN_PORT = 9999 # usb session 测试项配置 """ 配置项包括如下: { "test_type": TestType.SHELL, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "shell ls", "expected_results": "data", }, { "test_type": TestType.FILE_SEND, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file send", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, { "test_type": TestType.FILE_RECV, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file recv", "original_file": "medium", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, { "test_type": TestType.FPORT_TRANS_FILE, "port_info": [ { "client_connect_port": 22345, "daemon_transmit_port": 11081, "server_listen_port": 18000, }, { "client_connect_port": 22346, "daemon_transmit_port": 11082, "server_listen_port": 18001, }, ], "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "original_file": "medium", }, """ USB_SESSION_TEST_CONFIG = [ { "test_type": TestType.SHELL, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "shell ls", "expected_results": "data", }, { "test_type": TestType.FILE_SEND, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file send", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, { "test_type": TestType.FILE_RECV, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file recv", "original_file": "medium", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, ] # tcp session 测试项配置 """ 配置项包括如下: { "test_type": TestType.SHELL, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "shell ls", "expected_results": "data", }, { "test_type": TestType.FILE_SEND, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file send", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, { "test_type": TestType.FILE_RECV, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file recv", "original_file": "medium", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, """ TCP_SESSION_TEST_CONFIG = [ { "test_type": TestType.SHELL, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "shell ls", "expected_results": "data", }, { "test_type": TestType.FILE_SEND, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file send", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, { "test_type": TestType.FILE_RECV, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": LOOP_DELAY_S_DEFAULT, "task_count": TASK_COUNT_DEFAULT, "cmd": "file recv", "original_file": "medium", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", }, ] HTML_HEAD = """ HDC稳定性测试报告 """ def init_logger(): logging.config.dictConfig(logging_config_info) def put_msg(queue_obj, info): """ 给队列中填入测试报告信息,用于汇总报告和生成结果 """ queue_obj.put(info) def run_cmd_block(command, timeout=600): logger.info(f"cmd: {command}") # 启动子进程 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # 用于存储子进程的输出 output = "" error = "" try: # 读取子进程的输出 output, error = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: logger.info(f"run cmd:{command} timeout") process.terminate() process.kill() output, error = process.communicate(timeout=timeout) return output, error def run_cmd_and_check_output(command, want_str, timeout=600): """ 阻塞的方式运行命令,然后判断标准输出的返回值中是否有预期的字符串 存在预期的字符串返回 True 不存在预期的字符串返回 False """ output, error = run_cmd_block(command, timeout) if want_str in output: return True, (output, error) else: logger.error(f"can not get expect str:{want_str} cmd:{command} output:{output} error:{error}") return False, (output, error) def process_shell_test(connect_key, config, thread_name): """ config格式 { "test_type": TestType.SHELL, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": 0.01, "task_count": TASK_COUNT_DEFAULT, "cmd": "shell ls", "expected_results": "data", } """ logger.info(f"start process_shell_test thread {thread_name}") test_count = config["loop_count"] delay_s = config["loop_delay_s"] cmd = f"hdc -t {connect_key} {config['cmd']}" expected_results = config["expected_results"] report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count} start_time = time.perf_counter() for count in range(test_count): logger.info(f"{thread_name} {count}") is_ok, info = run_cmd_and_check_output(cmd, expected_results) if is_ok: logger.info(f"{thread_name} {count} success result:{info}") else: logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results}," f"result:{info}") error_time = get_time_str(need_ms=True) msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \ f" can not get expect result:{expected_results}, result:{info}" report["error_msg"] = f"[{error_time}] result:{info}" TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}}) EXIT_EVENT.set() logger.info(f"{thread_name} {count} set exit event") report["finished_test_count"] = count break if EXIT_EVENT.is_set(): logger.info(f"{thread_name} {count} exit event is set") report["error_msg"] = "event is set, normal exit" report["finished_test_count"] = count + 1 break time.sleep(delay_s) if "finished_test_count" not in report: report["finished_test_count"] = test_count if report["finished_test_count"] < test_count: report["passed"] = False else: report["passed"] = True elapsed_time = time.perf_counter() - start_time report["cost_time"] = elapsed_time TEST_THREAD_MSG_QUEUE.put({thread_name: report}) logger.info(f"stop process_shell_test thread {thread_name}") def process_file_send_test(connect_key, config, thread_name): """ config格式 { "test_type": TestType.FILE_SEND, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": 0.01, "task_count": TASK_COUNT_DEFAULT, "cmd": "file send", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", } """ logger.info(f"start process_file_send_test thread {thread_name}") test_count = config["loop_count"] delay_s = config["loop_delay_s"] expected_results = config["expected_results"] local_file = os.path.join(RESOURCE_PATH, config['local']) remote_file = f"{config['remote']}_{uuid.uuid4()}" cmd = f"hdc -t {connect_key} {config['cmd']} {local_file} {remote_file}" report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count} start_time = time.perf_counter() for count in range(test_count): logger.info(f"{thread_name} {count}") is_ok, info = run_cmd_and_check_output(cmd, expected_results) if is_ok: logger.info(f"{thread_name} {count} success result:{info}") else: logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results}," f"result:{info}") error_time = get_time_str(need_ms=True) msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \ f" can not get expect result:{expected_results}, result:{info}" report["error_msg"] = f"[{error_time}] result:{info}" TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}}) EXIT_EVENT.set() logger.info(f"{thread_name} {count} set exit event") report["finished_test_count"] = count break run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}") if EXIT_EVENT.is_set(): logger.info(f"{thread_name} {count} exit event is set") report["error_msg"] = "event is set, normal exit" report["finished_test_count"] = count + 1 break time.sleep(delay_s) if "finished_test_count" not in report: report["finished_test_count"] = test_count if report["finished_test_count"] < test_count: report["passed"] = False else: report["passed"] = True elapsed_time = time.perf_counter() - start_time report["cost_time"] = elapsed_time TEST_THREAD_MSG_QUEUE.put({thread_name: report}) logger.info(f"stop process_file_send_test thread {thread_name}") def process_file_recv_test(connect_key, config, thread_name): """ config格式 { "test_type": TestType.FILE_RECV, "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": 0.01, "task_count": TASK_COUNT_DEFAULT, "cmd": "file recv", "original_file": "medium", "local": "medium", "remote": "/data/medium", "expected_results": "FileTransfer finish", } """ logger.info(f"start process_file_recv_test thread {thread_name}") test_count = config["loop_count"] delay_s = config["loop_delay_s"] expected_results = config["expected_results"] original_file = os.path.join(RESOURCE_PATH, config['original_file']) name_id = uuid.uuid4() local_file = os.path.join(RESOURCE_PATH, f"{config['local']}_{name_id}") remote_file = f"{config['remote']}_{name_id}" cmd = f"hdc -t {connect_key} {config['cmd']} {remote_file} {local_file}" report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count} start_time = time.perf_counter() run_cmd_block(f"hdc -t {connect_key} file send {original_file} {remote_file}") for count in range(test_count): logger.info(f"{thread_name} {count}") is_ok, info = run_cmd_and_check_output(cmd, expected_results) if is_ok: logger.info(f"{thread_name} {count} success result:{info}") else: logger.error(f"{thread_name} loop_count:{count+1} run:{cmd} can not get expect result:{expected_results}," f"result:{info}") error_time = get_time_str(need_ms=True) msg = f"[{error_time}] {thread_name} loop_count:{count+1} run:{cmd}" \ f" can not get expect result:{expected_results}, result:{info}" report["error_msg"] = f"[{error_time}] result:{info}" TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}}) EXIT_EVENT.set() logger.info(f"{thread_name} {count} set exit event") report["finished_test_count"] = count break os.remove(local_file) logger.debug(f"{connect_key} remove {local_file} finished") if EXIT_EVENT.is_set(): logger.info(f"{thread_name} {count} exit event is set") report["error_msg"] = "event is set, normal exit" report["finished_test_count"] = count + 1 break time.sleep(delay_s) if "finished_test_count" not in report: report["finished_test_count"] = test_count if report["finished_test_count"] < test_count: report["passed"] = False else: report["passed"] = True run_cmd_block(f"hdc -t {connect_key} shell rm {remote_file}") elapsed_time = time.perf_counter() - start_time report["cost_time"] = elapsed_time TEST_THREAD_MSG_QUEUE.put({thread_name: report}) logger.info(f"stop process_file_recv_test thread {thread_name}") def create_fport_trans_test_env(info): # 构建传输通道 thread_name = info.get("thread_name") connect_key = info.get("connect_key") fport_arg = info.get("fport_arg") result, _ = run_cmd_block(f"hdc -t {connect_key} fport {fport_arg}") logger.info(f"{thread_name} fport result:{result}") rport_arg = info.get("rport_arg") result, _ = run_cmd_block(f"hdc -t {connect_key} rport {rport_arg}") logger.info(f"{thread_name} rport result:{result}") def do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name): """ 进行一次数据传输测试 """ logger.info(f"do_fport_trans_file_test_once start, file_save_name:{file_save_name}") server_thread = threading.Thread(target=server_loop, args=(server_listen_port,)) server_thread.start() client_get_file(client_connect_port, file_name, file_save_name) server_thread.join() ori_file_md5 = get_local_md5(os.path.join(RESOURCE_PATH, file_name)) new_file = os.path.join(RESOURCE_PATH, file_save_name) new_file_md5 = 0 if os.path.exists(new_file): new_file_md5 = get_local_md5(new_file) os.remove(new_file) logger.info(f"ori_file_md5:{ori_file_md5}, new_file_md5:{new_file_md5}") if not ori_file_md5 == new_file_md5: logger.error(f"check file md5 failed, file_save_name:{file_save_name}, ori_file_md5:{ori_file_md5}," f"new_file_md5:{new_file_md5}") return False logger.info(f"do_fport_trans_file_test_once exit, file_save_name:{file_save_name}") return True def process_fport_trans_file_test(connect_key, config, thread_name, task_index): """ task_index对应当前测试的port_info,从0开始计数。 config格式 { "test_type": TestType.FPORT_TRANS_FILE, "port_info": [ { "client_connect_port": 22345, "daemon_transmit_port": 11081, "server_listen_port": 18000, }, { "client_connect_port": 22346, "daemon_transmit_port": 11082, "server_listen_port": 18001, }, ], "loop_count": LOOP_COUNT_DEFAULT, "loop_delay_s": 0.01, "original_file": "medium", } """ logger.info(f"start process_fport_trans_file_test thread {thread_name}") test_count = config["loop_count"] delay_s = config["loop_delay_s"] port_info = config["port_info"] client_connect_port = port_info[task_index]["client_connect_port"] daemon_transmit_port = port_info[task_index]["daemon_transmit_port"] server_listen_port = port_info[task_index]["server_listen_port"] fport_arg = f"tcp:{client_connect_port} tcp:{daemon_transmit_port}" rport_arg = f"tcp:{daemon_transmit_port} tcp:{server_listen_port}" report = {"test_type": config["test_type"], "test_name": thread_name, "loop_count": test_count} start_time = time.perf_counter() # 构建传输通道 create_fport_trans_test_env({"thread_name": thread_name, "connect_key": connect_key, "fport_arg": fport_arg, "rport_arg": rport_arg}) # transmit file start file_name = config["original_file"] file_save_name = f"{file_name}_recv_fport_{uuid.uuid4()}" for count in range(test_count): logger.info(f"{thread_name} {count}") if do_fport_trans_file_test_once(client_connect_port, server_listen_port, file_name, file_save_name) is False: error_time = get_time_str(need_ms=True) msg = f"[{error_time}] {thread_name} loop_count:{count+1}" \ f" check file md5 failed" report["error_msg"] = f"[{error_time}] check file md5 failed" TEST_THREAD_MSG_QUEUE.put({TASK_ERROR_KEY_NAME: {"name": thread_name, "error_msg": msg}}) EXIT_EVENT.set() logger.info(f"{thread_name} {count} set exit event") report["finished_test_count"] = count break if EXIT_EVENT.is_set(): logger.info(f"{thread_name} {count} exit event is set") report["error_msg"] = "event is set, normal exit" report["finished_test_count"] = count + 1 break time.sleep(delay_s) # 关闭fport通道 run_cmd_block(f"hdc -t {connect_key} fport rm {fport_arg}") run_cmd_block(f"hdc -t {connect_key} fport rm {rport_arg}") if "finished_test_count" not in report: report["finished_test_count"] = test_count if report["finished_test_count"] < test_count: report["passed"] = False else: report["passed"] = True elapsed_time = time.perf_counter() - start_time report["cost_time"] = elapsed_time TEST_THREAD_MSG_QUEUE.put({thread_name: report}) logger.info(f"stop process_fport_trans_file_test thread {thread_name}") def get_test_result(fail_num): """ 返回错误结果及错误结果显示类型信息 """ global TEST_RESULT if fail_num > 0: test_result = "失败" TEST_RESULT = False test_result_class = "fail" else: test_result = "通过" TEST_RESULT = True test_result_class = "pass" return test_result, test_result_class def get_error_msg_html(public_info): """ 获取html格式的错误信息 """ error_msg_list = public_info.get(TASK_ERROR_KEY_NAME) error_msg_html = '' if error_msg_list is not None: error_msg = '\n'.join(error_msg_list) error_msg_html = f"""
错误信息 {error_msg}
""" return error_msg_html def gen_report_public_info(public_info): """ 生成html格式的报告公共信息部分 """ start_time = public_info.get('start_time') stop_time = public_info.get('stop_time') pass_num = public_info.get('pass_num') fail_num = public_info.get('fail_num') test_task_num = pass_num + fail_num test_result, test_result_class = get_test_result(fail_num) # 错误信息 error_msg_html = get_error_msg_html(public_info) public_html_temp = f"""
用例版本号 {STABILITY_TEST_VERSION}
开始时间 {start_time}
结束时间 {stop_time}
测试任务数 {test_task_num}
失败任务数 {fail_num}
成功任务数 {pass_num}
测试结果 {test_result}
{error_msg_html} """ return public_html_temp def gen_report_detail_info(pass_list, fail_list): """ 生成html格式的报告详细信息部分 pass_list fail_list 格式:[("测试名称", {"key": value, "key": value, "key": value, "key": value ...}), ... ] """ # 生成表格每一行记录 detail_rows_list = [] for one_test_key, one_test_value in fail_list + pass_list: if one_test_value.get("passed") is True: result_class = "pass" result_text = "通过" else: result_class = "fail" result_text = "失败" finished_test_count = one_test_value.get("finished_test_count") loop_count = one_test_value.get("loop_count") completion_rate = 100.0 * finished_test_count / loop_count cost_time = one_test_value.get("cost_time") if finished_test_count > 0: cost_time_per_test = f"{cost_time / finished_test_count:.3f}" else: cost_time_per_test = "NA" one_row = f""" {one_test_key} {result_text} {completion_rate:.1f} {finished_test_count} {loop_count} {cost_time_per_test} {cost_time:.3f} {one_test_value.get("error_msg", "NA")} """ detail_rows_list.append(one_row) detail_rows = ''.join(detail_rows_list) # 合成表格 detail_table_temp = f""" {detail_rows}
任务名称 测试结果 完成率 已完成次数 总次数 平均每轮耗时(秒) 总耗时(秒) 错误信息
""" return detail_table_temp def gen_report_info(public_info, detail_info): """ 生成html格式的报告 """ html_temp = f""" {HTML_HEAD}

HDC稳定性测试报告

概要

{public_info}

详情

{detail_info}
""" return html_temp def save_report(report_info, save_file_name): """ 生成测试报告 """ # 字典中获取报告公共信息 public_info = report_info.get(REPORT_PUBLIC_INFO_NAME) # 字典中获取详细测试项目的信息 detail_test_info = {key: value for key, value in report_info.items() if key != REPORT_PUBLIC_INFO_NAME} # 分类测试结果 pass_list = [] fail_list = [] for one_test_key, one_test_value in detail_test_info.items(): if one_test_value.get("passed") is True: pass_list.append((one_test_key, one_test_value)) else: fail_list.append((one_test_key, one_test_value)) # 排序成功和失败的任务列表 pass_list = sorted(pass_list) fail_list = sorted(fail_list) public_info["pass_num"] = len(pass_list) public_info["fail_num"] = len(fail_list) # 生成报告公共信息 public_info_html = gen_report_public_info(public_info) # 生成报告详细信息 detail_info_html = gen_report_detail_info(pass_list, fail_list) # 合成报告 report_info_html = gen_report_info(public_info_html, detail_info_html) # 写入文件 with open(save_file_name, "w", encoding="utf-8") as f: f.write(report_info_html) def add_error_msg_to_public_info(err_msg, report_info): """ 将一条错误信息追加到public_info里面的task_error字段中 err_msg的结构为{"name": "", "error_msg": ""} """ msg = err_msg.get("error_msg") if REPORT_PUBLIC_INFO_NAME in report_info: public_info = report_info.get(REPORT_PUBLIC_INFO_NAME) if TASK_ERROR_KEY_NAME in public_info: public_info[TASK_ERROR_KEY_NAME].append(msg) else: public_info[TASK_ERROR_KEY_NAME] = [msg, ] else: report_info[REPORT_PUBLIC_INFO_NAME] = {TASK_ERROR_KEY_NAME: [msg, ]} def add_to_report_info(one_info, report_info): """ 将一条信息添加到报告信息集合中, 不存在则新增,存在则添加或者覆盖以前的值 one_info格式为 {key: {key1: value ...}, key: {key1: value ...}...} 当前的key包括如下几类: 1、任务名称:[sn]_filerecv_0 或者 [ip:port]_filerecv_0 2、公共信息:REPORT_PUBLIC_INFO_NAME = "public_info" 3、测试任务报告的错误信息:TASK_ERROR_KEY_NAME = "task_error" 最终追加到public_info里面的task_error字段中 """ for report_section_add, section_dict_add in one_info.items(): if report_section_add == TASK_ERROR_KEY_NAME: # 追加error msg到public_info里面的task_error字段中 add_error_msg_to_public_info(section_dict_add, report_info) continue if report_section_add in report_info: # 报告数据中已经存在待添加的字段,则获取报告数据中的该段落,给段落中增加或者覆盖已有字段 report_section_dict = report_info.get(report_section_add) report_section_dict.update(section_dict_add) else: report_info[report_section_add] = section_dict_add def process_msg(msg_queue, thread_name): """ 消息中心,用于接收所有测试线程的消息,保存到如下结构的字典report_info中,用于生成报告: { "public_info": {"key": value, "key": value, "key": value, "key": value ...} "thread_name1": {"key": value, "key": value, "key": value, "key": value ...} "thread_name2": {"key": value, "key": value, "key": value, "key": value ...} } public_info为报告开头的公共信息 thread_namex为各个测试项目的测试信息 通过msg_queue,传递过来的消息,格式为 {key: {key1: value ...}, key: {key1: value ...}...}的格式, key表示上面report_info的key,不存在则新增,存在则添加或者覆盖以前的值 """ logger.info(f"start process_msg thread {thread_name}") report_info = {} while True: one_info = msg_queue.get() if one_info is None: # 报告接收完毕,退出 logger.info(f"{thread_name} get None from queue") break add_to_report_info(one_info, report_info) logger.info(f"start save report to {REPORT_FILE_NAME}, report len:{len(report_info)}") save_report(report_info, REPORT_FILE_NAME) logger.info(f"stop process_msg thread {thread_name}") class Session(object): """ session class 一个session连接对应一个session class 包含了所有在当前连接下面的测试任务 """ # session类型,usb or tcp session_type = "" # 设备连接标识 connect_key = "" test_config = [] thread_list = [] def __init__(self, connect_key): self.connect_key = connect_key def set_test_config(self, config): self.test_config = config def start_test(self): if len(self.test_config) == 0: logger.error(f"start test test_config is empty, do not test, type:{self.session_type} {self.connect_key}") return True for one_config in self.test_config: if self.start_one_test(one_config) is False: logger.error(f"start one test failed, type:{self.session_type} {self.connect_key} config:{one_config}") return False return True def start_one_test(self, config): if config["test_type"] == TestType.SHELL: if self.start_shell_test(config) is False: logger.error(f"start_shell_test failed, type:{self.session_type} {self.connect_key} config:{config}") return False if config["test_type"] == TestType.FILE_SEND: if self.start_file_send_test(config) is False: logger.error(f"start_file_send_test failed, type:{self.session_type} \ {self.connect_key} config:{config}") return False if config["test_type"] == TestType.FILE_RECV: if self.start_file_recv_test(config) is False: logger.error(f"start_file_recv_test failed, type:{self.session_type} \ {self.connect_key} config:{config}") return False if config["test_type"] == TestType.FPORT_TRANS_FILE: if self.start_fport_trans_file_test(config) is False: logger.error(f"start_fport_trans_file_test failed, type:{self.session_type} \ {self.connect_key} config:{config}") return False return True def start_shell_test(self, config): task_count = config["task_count"] test_name = f"{self.connect_key}_shell" for task_index in range(task_count): thread = threading.Thread(target=process_shell_test, name=f"{test_name}_{task_index}", args=(self.connect_key, config, f"{test_name}_{task_index}")) thread.start() self.thread_list.append(thread) def start_file_send_test(self, config): task_count = config["task_count"] test_name = f"{self.connect_key}_filesend" for task_index in range(task_count): thread = threading.Thread(target=process_file_send_test, name=f"{test_name}_{task_index}", args=(self.connect_key, config, f"{test_name}_{task_index}")) thread.start() self.thread_list.append(thread) def start_file_recv_test(self, config): task_count = config["task_count"] test_name = f"{self.connect_key}_filerecv" for task_index in range(task_count): thread = threading.Thread(target=process_file_recv_test, name=f"{test_name}_{task_index}", args=(self.connect_key, config, f"{test_name}_{task_index}")) thread.start() self.thread_list.append(thread) def start_fport_trans_file_test(self, config): task_count = len(config["port_info"]) test_name = f"{self.connect_key}_fporttrans" for task_index in range(task_count): thread = threading.Thread(target=process_fport_trans_file_test, name=f"{test_name}_{task_index}", args=(self.connect_key, config, f"{test_name}_{task_index}", task_index)) thread.start() self.thread_list.append(thread) class UsbSession(Session): """ usb连接对应的session类 """ session_type = "usb" class TcpSession(Session): """ tcp连接对应的session类 """ session_type = "tcp" def start_tcp_connect(self): result, _ = run_cmd_block(f"hdc tconn {self.connect_key}") logger.info(f"start_tcp_connect {self.connect_key} result:{result}") RELEASE_TCONN_CMD_LIST.append(f"hdc tconn {self.connect_key} -remove") return True def start_server(): cmd = "hdc start" result, _ = run_cmd_block(cmd) return result def get_dev_list(): try: result, _ = run_cmd_block("hdc list targets") result = result.split() except (OSError, IndexError): result = ["failed to detect device"] return False, result targets = result if len(targets) == 0: logger.error(f"get device, devices list is empty") return False, [] if "[Empty]" in targets[0]: logger.error(f"get device, no devices found, devices:{targets}") return False, targets return True, targets def check_device_online(connect_key): """ 确认设备是否在线 """ result, devices = get_dev_list() if result is False: logger.error(f"get device failed, devices:{devices}") return False if connect_key in devices: return True return False def init_test_env(): """ 初始化测试环境 1、使用tmode port和fport转发,构建多session tcp测试场景 """ logger.info(f"init env") start_server() result, devices = get_dev_list() if result is False: logger.error(f"get device failed, devices:{devices}") return False device_sn = devices[0] if len(devices) > 1: # 存在多个设备连接,获取不是ip:port的设备作为待测试的设备 logger.info(f"Multiple devices are connected, devices:{devices}") for dev in devices: if ':' not in dev: device_sn = dev # 关闭设备侧监听端口 result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port close") logger.info(f"close tmode port finished") time.sleep(10) logger.info(f"get device:{device_sn}") # 开启设备侧监听端口 result, _ = run_cmd_block(f"hdc -t {device_sn} tmode port {DEVICE_LISTEN_PORT}") logger.info(f"run tmode port {DEVICE_LISTEN_PORT}, result:{result}") time.sleep(10) logger.info(f"start check device:{device_sn}") if check_device_online(device_sn) is False: logger.error(f"device {device_sn} in not online") return False logger.info(f"init_test_env finished") return True def start_usb_session_test(connect_key): """ 开始一个usb session的测试 """ logger.info(f"start_usb_session_test connect_key:{connect_key}") session = UsbSession(connect_key) session.set_test_config(USB_SESSION_TEST_CONFIG) if session.start_test() is False: logger.error(f"session.start_test failed, connect_key:{connect_key}") return False SESSION_LIST.append(session) logger.info(f"start_usb_session_test connect_key:{connect_key} finished") return True def start_local_tcp_session_test(connect_key, port_list): """ 1、遍历传入的端口号,创建fport端口转发到设备端的DEVICE_LISTEN_PORT, 2、使用tconn连接后进行测试 """ logger.info(f"start_local_tcp_session_test port_list:{port_list}") if len(TCP_SESSION_TEST_CONFIG) == 0 or len(port_list) == 0: logger.info(f"port_list:{port_list} TCP_SESSION_TEST_CONFIG:{TCP_SESSION_TEST_CONFIG}") logger.info(f"port_list or TCP_SESSION_TEST_CONFIG is empty, no need do local tcp session test, exit") return True for port in port_list: # 构建fport通道 logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT}") result, _ = run_cmd_block(f"hdc -t {connect_key} fport tcp:{port} tcp:{DEVICE_LISTEN_PORT}") logger.info(f"create fport local:{port} device:{DEVICE_LISTEN_PORT} result:{result}") RELEASE_FPORT_CMD_LIST.append(f"hdc -t {connect_key} fport rm tcp:{port} tcp:{DEVICE_LISTEN_PORT}") tcp_key = f"127.0.0.1:{port}" logger.info(f"start TcpSession connect_key:{tcp_key}") session = TcpSession(tcp_key) session.set_test_config(TCP_SESSION_TEST_CONFIG) session.start_tcp_connect() if session.start_test() is False: logger.error(f"session.start_test failed, connect_key:{tcp_key}") return False SESSION_LIST.append(session) logger.info(f"start tcp test connect_key:{tcp_key} finished") logger.info(f"start_local_tcp_session_test finished") return True def start_test(msg_queue): """ 启动测试 1、启动usb session场景的测试 2、启动通过 tmode+fport模拟的tcp session场景的测试 """ logger.info(f"start test") result, devices = get_dev_list() if result is False: logger.error(f"get device failed, devices:{devices}") return False device_sn = devices[0] DEVICE_LIST.append(device_sn) if start_usb_session_test(device_sn) is False: logger.error(f"start_usb_session_test failed, devices:{devices}") return False if start_local_tcp_session_test(device_sn, MUTIL_SESSION_TEST_PC_PORT_LIST) is False: logger.error(f"start_tcp_session_test failed, devices:{devices}") return False return True def release_resource(): """ 释放相关资源 1、断开tconn连接 2、fport转发 3、tmode port """ logger.info(f"enter release_resource") for cmd in RELEASE_TCONN_CMD_LIST: result, _ = run_cmd_block(cmd) logger.info(f"release tconn cmd:{cmd} result:{result}") for cmd in RELEASE_FPORT_CMD_LIST: result, _ = run_cmd_block(cmd) logger.info(f"release fport cmd:{cmd} result:{result}") result, _ = run_cmd_block(f"hdc -t {DEVICE_LIST[0]} tmode port close") logger.info(f"tmode port close, device:{DEVICE_LIST[0]} result:{result}") def run_stability_test(): global logger logger = logging.getLogger(__name__) logger.info(f"main resource_path:{RESOURCE_PATH}") start_time = get_time_str() if init_test_env() is False: logger.error(f"init_test_env failed") return False # 启动消息收集进程 msg_thread = threading.Thread(target=process_msg, name="msg_process", args=(TEST_THREAD_MSG_QUEUE, "msg_process")) msg_thread.start() put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"start_time": start_time}}) logger.info(f"start run test thread") if start_test(TEST_THREAD_MSG_QUEUE) is False: logger.error(f"start_test failed") stop_time = get_time_str() put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}}) TEST_THREAD_MSG_QUEUE.put(None) msg_thread.join() return False # wait all thread exit logger.info(f"wait all test thread exit") for session in SESSION_LIST: for thread in session.thread_list: thread.join() stop_time = get_time_str() put_msg(TEST_THREAD_MSG_QUEUE, {"public_info": {"stop_time": stop_time}}) release_resource() # 传递 None 参数,报告进程收到后退出 logger.info(f"main put None to TEST_THREAD_MSG_QUEUE") TEST_THREAD_MSG_QUEUE.put(None) msg_thread.join() logger.info(f"exit main, test result:{TEST_RESULT}") return TEST_RESULT if __name__ == "__main__": init_logger() run_stability_test()