1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) Huawei Device Co., Ltd. 2025. All right reserved. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import hashlib 20import json 21import os 22import re 23import shlex 24import shutil 25import subprocess 26import tarfile 27import time 28import zipfile 29from enum import Enum 30from typing import List 31 32import psutil 33import requests 34import urllib3 35 36from _core.constants import CaseResult, Cluster, ConfigConst 37from _core.error import Error, ErrorMessage 38from _core.executor.bean import SuiteResult 39from _core.report.suite_reporter import SuiteReporter 40from _core.utils import get_local_ip 41from xdevice import Variables 42from xdevice import platform_logger 43 44LOG = platform_logger("Cluster") 45urllib3.disable_warnings() 46 47 48class MatchPattern(Enum): 49 contains = 0 50 equals = 1 51 ends_with = 2 52 starts_with = 3 53 54 55class MatchException: 56 57 def __init__(self, error: Error, keyword: str, pattern: MatchPattern): 58 self.error = error 59 self.keyword = keyword 60 self.pattern = pattern 61 62 63class SVN: 64 65 def __init__(self, url, username, password, project_path): 66 self.url = url.replace(" ", "%20") 67 self.password = password 68 self.project_path = project_path 69 self.usr_pwd = f"--username {username} --password {password}" 70 71 def cleanup(self): 72 cmd = f"svn cleanup {self.project_path}" 73 self.execute_svn_command(cmd) 74 75 def revert(self): 76 cmd = f"svn revert -R {self.project_path}" 77 self.execute_svn_command(cmd) 78 79 def checkout(self): 80 cmd = f"svn checkout --non-interactive {self.url} {self.project_path} {self.usr_pwd}" 81 self.execute_svn_command(cmd) 82 83 def update(self): 84 self.cleanup() 85 self.revert() 86 cmd = f"svn update --non-interactive {self.project_path} {self.usr_pwd}" 87 self.execute_svn_command(cmd) 88 89 def execute_svn_command(self, cmd: str): 90 if Utils.which("svn") is None: 91 raise Exception(ErrorMessage.Cluster.Code_0104022.format("svn")) 92 log_cmd = cmd.replace(self.password, Cluster.stars) 93 exceptions = [ 94 MatchException( 95 ErrorMessage.Cluster.Code_0104014, 96 "svn: E", 97 MatchPattern.starts_with 98 ) 99 ] 100 Utils.execute_command_with_logback(cmd, log_cmd=log_cmd, exceptions=exceptions) 101 LOG.info("svn工程下载或更新完成") 102 103 104class Utils: 105 106 @staticmethod 107 def delete_file(pth: str): 108 ret = True 109 try: 110 if os.path.isdir(pth): 111 shutil.rmtree(pth) 112 if os.path.isfile(pth): 113 os.remove(pth) 114 except Exception as e: 115 LOG.error(e) 116 ret = False 117 return ret 118 119 @staticmethod 120 def hash_message(message: str): 121 return hashlib.sha1(message.encode("utf-8")).hexdigest() 122 123 @staticmethod 124 def kill_process_and_child(pid): 125 """终止进程及其子进程""" 126 try: 127 process = psutil.Process(pid) 128 children = process.children(recursive=True)[::-1] 129 if len(children) > 0: 130 # 递归查找子进程 131 for child in children: 132 Utils.kill_process_and_child(child.pid) 133 except psutil.NoSuchProcess: 134 pass 135 # 终止进程 136 Utils.kill_process_by_pid(pid) 137 138 @staticmethod 139 def kill_process_by_pid(pid): 140 """使用进程id终止进程""" 141 cmd = "taskkill /t /f /pid %d" if os.name == "nt" else "kill -9 %d" 142 Utils.execute_command(cmd % pid, timeout=5) 143 144 @staticmethod 145 def kill_process_by_pobj(p_obj): 146 """使用进程对象终止进程""" 147 if p_obj.is_alive(): 148 try: 149 p_obj.terminate() 150 except PermissionError: 151 Utils.kill_process_by_pid(p_obj.pid) 152 p_obj.join() 153 154 @staticmethod 155 def match_exception(line: str, exceptions: List[MatchException] = None): 156 if exceptions is None: 157 return 158 for exp in exceptions: 159 if not isinstance(exp, MatchException): 160 continue 161 k, p = exp.keyword, exp.pattern 162 if p == MatchPattern.contains and k in line \ 163 or p == MatchPattern.equals and k == line \ 164 or p == MatchPattern.ends_with and line.endswith(k) \ 165 or p == MatchPattern.starts_with and line.startswith(k): 166 # 抛出指定异常信息 167 raise Exception(exp.error.format(line)) 168 169 @staticmethod 170 def start_a_subprocess(cmd: str, cwd: str = None): 171 """使用subprocess拉起子进程执行命令""" 172 if cwd is not None and not os.path.exists(cwd): 173 raise Exception("current directory does not exist") 174 if os.name != "nt": 175 cmd = shlex.split(cmd) 176 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) 177 178 @staticmethod 179 def execute_command(cmd: str, cwd: str = None, timeout: float = 10): 180 """执行命令,适用于执行时间较短和输出内容较少的命令""" 181 LOG.info(f"execute command: {cmd}") 182 output = "" 183 proc = None 184 try: 185 proc = Utils.start_a_subprocess(cmd, cwd=cwd) 186 output, _ = proc.communicate(timeout=timeout) 187 output = output.decode() 188 except subprocess.TimeoutExpired as e: 189 proc.kill() 190 output = str(e) 191 except UnicodeDecodeError: 192 output = output.decode("gb2312") 193 output = output.replace("\r", "") 194 LOG.info(f"output: {output}") 195 return output 196 197 @staticmethod 198 def execute_command_with_logback(cmd: str, cwd: str = None, timeout: float = None, log_cmd: str = None, 199 exceptions: List[MatchException] = None): 200 """执行命令,并记录其运行输出 201 cmd: execute command 202 cwd: execute command in directory 203 timeout: execute command with timeout(seconds) 204 log_cmd: display command in log 205 exceptions: match exceptions 206 """ 207 is_timeout = False 208 start_time = time.time() 209 proc = Utils.start_a_subprocess(cmd, cwd=cwd) 210 LOG.info(f"execute command: {cmd if log_cmd is None else log_cmd}") 211 if timeout is not None: 212 LOG.info(f"execute timeout: {timeout}") 213 LOG.info(f"process id: {proc.pid}") 214 try: 215 while True: 216 # 程序退出码 217 exit_code = proc.poll() 218 if exit_code is not None: 219 break 220 if timeout is not None and time.time() - start_time >= timeout: 221 is_timeout = True 222 proc.kill() 223 break 224 line = proc.stdout.readline().strip() 225 try: 226 if not line: 227 continue 228 line = line.decode("utf-8", "ignore") 229 LOG.debug(line) 230 Utils.match_exception(line, exceptions=exceptions) 231 except UnicodeDecodeError: 232 LOG.debug(line.decode("gb2312")) 233 except Exception as e: 234 LOG.error(e) 235 finally: 236 proc.kill() 237 return exit_code, is_timeout 238 239 @staticmethod 240 def create_zip(src: str, out: str, exclude: List[str] = None, include: List[str] = None): 241 """创建zip压缩文件 242 src: 目标路径(文件或目录) 243 out: 生成压缩文件路径 244 exclude: 过滤文件列表,可用正则表达式,如["path/*"] 245 include: 添加文件列表,可用正则表达式,如["path/*"] 246 @return: 成功返回True,反之返回False 247 """ 248 if not os.path.exists(src): 249 return False 250 with zipfile.ZipFile(out, "w") as zip_out: 251 if os.path.isfile(src): 252 zip_out.write(src, os.path.basename(src), zipfile.ZIP_DEFLATED) 253 if os.path.isdir(src): 254 if exclude: 255 exclude = re.compile("|".join(exclude)) 256 if include: 257 include = re.compile("|".join(include)) 258 for top, _, files in os.walk(src): 259 if not files: 260 continue 261 for file_name in files: 262 file_path = os.path.join(top, file_name) 263 if file_path == out: 264 continue 265 temp_path = file_path.replace(src + os.sep, "").replace("\\", "/") 266 if exclude and re.match(exclude, temp_path) is not None: 267 continue 268 if include and re.match(include, temp_path) is None: 269 continue 270 zip_out.write(file_path, temp_path, zipfile.ZIP_DEFLATED) 271 return True 272 273 @staticmethod 274 def extract_tgz(src: str, to_path: str): 275 """解压tar.gz压缩文件 276 src: 压缩文件路径 277 to_path: 压缩文件解压路径 278 @return: 成功返回True,反之返回False 279 """ 280 ret = False 281 if not tarfile.is_tarfile(src): 282 return ret 283 os.makedirs(to_path, exist_ok=True) 284 with tarfile.open(src) as tgz_file: 285 for file in tgz_file.getnames(): 286 tgz_file.extract(file, to_path) 287 ret = True 288 return ret 289 290 @staticmethod 291 def extract_zip(src: str, to_path: str): 292 """解压zip压缩文件 293 src: 压缩文件路径 294 to_path: 压缩文件解压路径 295 @return: 成功返回True,反之返回False 296 """ 297 ret = False 298 if not zipfile.is_zipfile(src): 299 return ret 300 os.makedirs(to_path, exist_ok=True) 301 with zipfile.ZipFile(src) as zip_file: 302 infolist = zip_file.infolist() 303 # 检测点1:检测文件个数是否大于预期值 304 file_count = len(infolist) 305 if file_count >= 100 * 10000: 306 raise IOError(ErrorMessage.Cluster.Code_0104023.format(src, file_count)) 307 # 检测点2:检查第一层解压文件总大小是否超过设定的上限值5GB 308 total_size = sum(info.file_size for info in infolist) 309 if total_size > 5 * (1 << 30): 310 raise IOError(ErrorMessage.Cluster.Code_0104024.format(src, total_size)) 311 # 检查点3:检查第一层解压文件总大小是否超过磁盘剩余空间 312 if total_size >= psutil.disk_usage(to_path).free: 313 raise IOError(ErrorMessage.Cluster.Code_0104025.format(src, total_size)) 314 # 所有检查点均通过,解压所有文件 315 for filename in zip_file.namelist(): 316 zip_file.extract(filename, to_path) 317 ret = True 318 return ret 319 320 @staticmethod 321 def which(cmd: str): 322 return shutil.which(cmd) or shutil.which(cmd + ".exe") 323 324 325def create_empty_result(report_path, case_name: str, error_message: str): 326 """创建空的用例结果xml""" 327 LOG.info(f"case name: {case_name}, error message: {error_message}") 328 result_path = os.path.join(report_path, "result") 329 os.makedirs(result_path, exist_ok=True) 330 331 suite_result = SuiteResult() 332 suite_result.suite_name = case_name 333 suite_result.stacktrace = error_message 334 suite_reporter = SuiteReporter( 335 [(suite_result, [])], case_name, result_path, 336 message=error_message, result_kind=CaseResult.unavailable) 337 suite_reporter.create_empty_report() 338 339 340def do_request(url, headers=None, body=None, method="POST", **kwargs): 341 if headers is None: 342 headers = {} 343 if body is None: 344 body = {} 345 timeout = kwargs.get("timeout", (10, 60)) 346 verify = kwargs.get("verify", False) 347 348 rsp = None 349 try: 350 rsp = requests.request(method, url, json=body, headers=headers, timeout=timeout, verify=verify) 351 code = rsp.status_code 352 rsp_msg = rsp.content.decode() 353 except Exception as e: 354 code = 400 355 rsp_msg = json.dumps({"status": "failed", "message": str(e)}) 356 finally: 357 if rsp is not None: 358 rsp.close() 359 return code, rsp_msg 360 361 362def console_create_task(template_file): 363 """提供接口在xdevice命令行中创建测试任务""" 364 if not os.path.exists(template_file): 365 LOG.error(ErrorMessage.Cluster.Code_0104027.format(template_file)) 366 return False 367 try: 368 with open(template_file, encoding="utf-8") as json_file: 369 body = json.load(json_file) 370 from .models import TaskTemplate 371 TaskTemplate.model_validate(body) 372 except Exception as e: 373 LOG.error(ErrorMessage.Cluster.Code_0104028) 374 LOG.error(e) 375 return False 376 cluster = Variables.config.cluster 377 url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/task/create" 378 LOG.info(f"create the controller task with body: {body}") 379 code, rsp_msg = do_request(url, body=body) 380 LOG.info(f"create the controller task response code: {code}") 381 LOG.info(f"create the controller task response body: {rsp_msg}") 382 if code == 201 and json.loads(rsp_msg).get("status") == "ok": 383 LOG.info("create the controller task successfully") 384 else: 385 LOG.error(ErrorMessage.Cluster.Code_0104029.format(rsp_msg)) 386 return True 387 388 389def console_list_devices(): 390 cluster = Variables.config.cluster 391 url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/device/list-all" 392 code, rsp_msg = do_request(url, method="GET") 393 if code != 200: 394 LOG.error(ErrorMessage.Cluster.Code_0104030.format(rsp_msg)) 395 return 396 print("Controller devices:") 397 result = json.loads(rsp_msg).get("result", []) 398 if not result: 399 print("No data") 400 return 401 line = "{:<3} {:<15} {:<32} {:<12} {:<10} {:<10} {:<15}" 402 print(line.format("ID", "IP", "SN", "OS", "Type", "State", "Usage")) 403 for r in result: 404 print(line.format(r.get("id"), r.get("ip"), r.get("sn"), r.get("os"), 405 r.get("type"), r.get("state"), r.get("usage_state"))) 406 407 408def console_list_task(task_id: str = ""): 409 if task_id: 410 uri = f"/controller/v1/task/{task_id}/list" 411 else: 412 uri = "/controller/v1/task/list-all" 413 cluster = Variables.config.cluster 414 url = cluster.get(ConfigConst.control_service_url) + uri 415 code, rsp_msg = do_request(url, method="GET") 416 if code != 200: 417 LOG.error(ErrorMessage.Cluster.Code_0104031.format(rsp_msg)) 418 return 419 print("Controller tasks:") 420 result = json.loads(rsp_msg).get("result", []) 421 if not result: 422 print("No data") 423 return 424 line = "{:<26} {:<10} {}" 425 print(line.format("TaskID", "State", "ReportUrl")) 426 for r in result: 427 print(line.format(r.get("id"), r.get("state"), r.get("report_url"))) 428 429 430def report_worker_device(sn, model, version, device_os, device_type, device_state): 431 cluster = Variables.config.cluster 432 if cluster.get(ConfigConst.service_mode) != Cluster.worker: 433 return 434 service_port = cluster.get(ConfigConst.service_port) or Cluster.service_port 435 url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/device/add" 436 local_ip = get_local_ip() 437 body = [{ 438 "ip": local_ip, 439 "sn": sn, 440 "os": device_os if device_os != "default" else "device", 441 "type": device_type if device_type != "default" else "phone", 442 "model": model, 443 "version": version, 444 "worker_url": f"http://{local_ip}:{service_port}", 445 "state": device_state.value if isinstance(device_state, Enum) else device_state 446 }] 447 LOG.info(f"report worker device: {body}") 448 code, rsp_msg = do_request(url, body=body) 449 LOG.info(f"report worker device response code: {code}") 450 LOG.info(f"report worker device response body: {rsp_msg}") 451 if code == 200: 452 LOG.info("report worker device successfully") 453 else: 454 LOG.error(ErrorMessage.Cluster.Code_0104032.format(rsp_msg)) 455 456 457def upload_task_end(task_id, block_id, report_path): 458 cluster = Variables.config.cluster 459 url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/task/upload-end" 460 data = {"task_id": task_id, "block_id": block_id} 461 LOG.info(f"upload task end: {data}") 462 if os.path.exists(report_path): 463 filename = block_id + ".zip" 464 report_zip = os.path.join(report_path, filename) 465 Utils.create_zip( 466 report_path, 467 report_zip, 468 exclude=["log/task_log.log"], 469 include=["details/*", "log/*", "result/*"] 470 ) 471 with open(report_zip, "rb") as file_fd: 472 files = {"file": (filename, file_fd, "application/zip")} 473 rsp = requests.post(url, data=data, files=files, timeout=30) 474 else: 475 rsp = requests.post(url, data=data, timeout=30) 476 code, rsp_msg = rsp.status_code, rsp.content.decode() 477 rsp.close() 478 LOG.info(f"upload task end response code: {code}") 479 LOG.info(f"upload task end response body: {rsp_msg}") 480 if code == 200: 481 LOG.info("upload task end successfully") 482 else: 483 LOG.error(ErrorMessage.Cluster.Code_0104033.format(rsp_msg)) 484