1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) Huawei Device Co., Ltd. 2025. All right reserved. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import base64 20import os 21import re 22import time 23from concurrent.futures import ThreadPoolExecutor 24from urllib.parse import unquote 25 26import requests 27from filelock import FileLock 28 29from _core.constants import Cluster, FilePermission 30from _core.error import ErrorMessage 31from xdevice import platform_logger 32from .models import TaskInfo 33from .utils import SVN, Utils, create_empty_result 34 35LOG = platform_logger("Cluster") 36thread_pool = ThreadPoolExecutor() 37 38 39class Runner: 40 41 def __init__(self, task_info: TaskInfo): 42 self.task_info = task_info 43 44 self.case_names = self.task_info.cases 45 self.task_id = self.task_info.task_id 46 self.project_path = "" 47 self.report_path = "" 48 49 def mark_cases_error(self, case_names, error_message): 50 for case_name in case_names: 51 create_empty_result(self.report_path, case_name, error_message) 52 53 def run(self): 54 pass 55 56 def prepare_project(self): 57 """准备测试工程""" 58 project = self.task_info.project 59 url = project.url.strip() 60 mode = project.mode.strip().lower() 61 branch = project.branch.strip() 62 username = project.username.strip() 63 password = base64.b64decode(project.password).decode() 64 relative_path = project.relative_path.strip() 65 66 if mode == "local": 67 self.project_path = url 68 elif mode == "git": 69 self.__prepare_git_project(url, username, password, branch) 70 elif mode == "svn": 71 self.__prepare_svn_project(url, username, password) 72 elif mode == "http": 73 self.__download_file(url) 74 else: 75 raise Exception(ErrorMessage.Cluster.Code_0104002.format(mode)) 76 77 if not self.project_path or not os.path.exists(self.project_path): 78 raise Exception(ErrorMessage.Cluster.Code_0104003.format(self.project_path)) 79 if relative_path: 80 project_relative_path = os.path.join(self.project_path, relative_path) 81 if not os.path.exists(project_relative_path): 82 raise Exception(ErrorMessage.Cluster.Code_0104004.format(project_relative_path)) 83 self.project_path = relative_path 84 85 def __prepare_git_project(self, url, username, password, branch): 86 """准备git测试工程""" 87 if not url: 88 raise Exception(ErrorMessage.Cluster.Code_0104005) 89 if not username: 90 raise Exception(ErrorMessage.Cluster.Code_0104006) 91 if not password: 92 raise Exception(ErrorMessage.Cluster.Code_0104007) 93 if not branch: 94 raise Exception(ErrorMessage.Cluster.Code_0104008) 95 LOG.info("准备git测试工程,等待获取文件锁") 96 with FileLock(os.path.join(Cluster.project_root_path, ".lock_file")): 97 LOG.info("准备git测试工程,已获取文件锁") 98 local_path = self.__find_project_record(url, "git", branch=branch) 99 if os.path.exists(local_path): 100 cmd = "git pull -f" 101 exit_code, _ = Utils.execute_command_with_logback(cmd, cwd=local_path) 102 else: 103 head, tail = "", "" 104 if url.startswith("http://"): 105 head, tail = "http://", url[7:] 106 elif url.startswith("https://"): 107 head, tail = "https://", url[8:] 108 if not head: 109 raise Exception(ErrorMessage.Cluster.Code_0104009.format(url)) 110 usr_pwd_url = head + f"{username}:{password}@" + tail 111 cmd = f"git clone {usr_pwd_url} -b {branch} {local_path}" 112 cwd = Cluster.project_root_path 113 log_cmd = cmd.replace(password, Cluster.stars) 114 exit_code, _ = Utils.execute_command_with_logback(cmd, cwd=cwd, log_cmd=log_cmd) 115 if exit_code == 0: 116 self.__make_project_record(local_path, f"{url} -branch {branch}") 117 LOG.info("准备git测试工程,已释放文件锁") 118 if exit_code != 0: 119 raise Exception(ErrorMessage.Cluster.Code_0104010) 120 LOG.info("准备git测试工程完成") 121 self.project_path = local_path 122 123 def __prepare_svn_project(self, url, username, password): 124 """准备svn测试工程""" 125 if not url: 126 raise Exception(ErrorMessage.Cluster.Code_0104011) 127 if not username: 128 raise Exception(ErrorMessage.Cluster.Code_0104012) 129 if not password: 130 raise Exception(ErrorMessage.Cluster.Code_0104013) 131 LOG.info("") 132 LOG.info("查找svn工程下载记录,等待获取文件锁") 133 with FileLock(os.path.join(Cluster.project_root_path, ".lock_file")): 134 LOG.info("查找svn工程下载记录,已获取文件锁") 135 local_path = self.__find_project_record(url, "svn") 136 LOG.info("查找svn工程下载记录,已释放文件锁") 137 138 LOG.info("准备svn测试工程,等待获取文件锁") 139 with FileLock(os.path.join(local_path, ".lock_file")): 140 LOG.info("准备svn测试工程,已获取文件锁") 141 svn = SVN(url, username, password, local_path) 142 if self.__get_svn_info(local_path) is None: 143 self.__clean_project(local_path) 144 LOG.info("正在检出工程文件到本地,请等待...") 145 svn.checkout() 146 else: 147 LOG.info("正在更新工程文件到本地,请等待...") 148 svn.update() 149 # 增加更新间隔时间 150 time.sleep(5) 151 LOG.info("准备svn测试工程,已释放文件锁") 152 LOG.info("准备svn测试工程完成") 153 self.project_path = local_path 154 155 def __download_file(self, url): 156 """下载测试文件""" 157 if not url: 158 raise Exception(ErrorMessage.Cluster.Code_0104015) 159 with FileLock(os.path.join(Cluster.project_root_path, ".lock_file")): 160 file_name = url.rstrip("/").split("/")[-1] 161 save_path = self.__find_project_record(url, "http") 162 save_file = os.path.join(save_path, file_name) 163 self.project_path = save_path 164 LOG.info(f"download file from {url}") 165 rsp = requests.get(url, stream=True, timeout=10, verify=False) 166 try: 167 if rsp.status_code != 200: 168 raise Exception(ErrorMessage.Cluster.Code_0104016.format(rsp.content.decode())) 169 rsp_headers = rsp.headers 170 file_size = int(rsp_headers.get("Content-Length", "0")) 171 if file_size > 0: 172 LOG.info(f"download file size: {file_size}") 173 if os.path.exists(save_file): 174 actual_size = os.path.getsize(save_file) 175 LOG.info(f"exist file: {save_file}") 176 LOG.info(f"exist file size: {actual_size}") 177 # 没有给出文件大小或大小一致 178 if file_size == 0 or file_size == actual_size: 179 return 180 LOG.info("different file size, delete the file and download it again") 181 os.remove(save_file) 182 save_file_fd = os.open(save_file, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644) 183 with os.fdopen(save_file_fd, "wb+") as s_file: 184 for chunk in rsp.iter_content(chunk_size=4096): 185 s_file.write(chunk) 186 s_file.flush() 187 if file_size != 0 and file_size != os.path.getsize(save_file): 188 raise Exception(ErrorMessage.Cluster.Code_0104017) 189 LOG.info(f"download file success, path: {save_file}") 190 191 flag1 = (file_name.endswith(".tar.gz") or file_name.endswith(".tgz")) \ 192 and not Utils.extract_tgz(save_file, save_path) 193 flag2 = file_name.endswith(".zip") \ 194 and not Utils.extract_zip(save_file, save_path) 195 if not flag1 or not flag2: 196 raise Exception(ErrorMessage.Cluster.Code_0104018.format(save_file)) 197 finally: 198 rsp.close() 199 200 def __find_project_record(self, repo_url: str, manage_mode: str, branch="master"): 201 """查找工程下载记录""" 202 local_path = "" 203 repo_url = repo_url.rstrip("/") 204 record_url = repo_url 205 if manage_mode == "git": 206 record_url = f"{repo_url} -branch {branch}" 207 LOG.info(f"工程类型: {manage_mode},地址: {repo_url}") 208 209 for timestamp_file in os.listdir(Cluster.project_root_path): 210 timestamp_path = os.path.join(Cluster.project_root_path, timestamp_file) 211 if os.path.isfile(timestamp_path): 212 continue 213 LOG.info("") 214 LOG.info(f"当前目录:{timestamp_path}") 215 record_txt = os.path.join(timestamp_path, "project_record.txt") 216 if os.path.exists(record_txt): 217 LOG.info("发现工程下载记录文件") 218 with open(record_txt, encoding="utf-8") as record_file: 219 record = record_file.readline().strip() 220 LOG.info(f"记录文件的内容:'{record}'") 221 if record == record_url: 222 # 本地URL和下发的一致 223 LOG.info("记录文件内容与现仓库地址相符,查找到工程在本地的拷贝的路径") 224 local_path = timestamp_path 225 break 226 else: 227 LOG.info("记录文件内容与现仓库地址不相符") 228 else: 229 LOG.info("缺失下载记录文件") 230 if manage_mode in ["http", "git"]: 231 continue 232 LOG.info("检查是否为svn工程的拷贝") 233 url = self.__get_svn_info(timestamp_path) 234 if url is None: 235 LOG.info("不是svn工程的拷贝") 236 continue 237 LOG.info(f"是svn工程的拷贝,对应的svn仓库地址:{url}") 238 self.__make_project_record(timestamp_path, url) 239 if url == record_url: 240 LOG.info("检查结果与现仓库地址相符,查找到工程在本地的拷贝的路径") 241 local_path = timestamp_path 242 break 243 # 未找到下载记录,新建目录 244 if local_path == "": 245 section = repo_url.split("/")[-1].replace(" ", "") 246 ret = re.search(r'(.*)\.(?:git|tar\.gz|tgz|zip)$', section) 247 if ret: 248 section = ret.group(1) 249 timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) 250 local_path = os.path.join(Cluster.project_root_path, f"{timestamp}_{section}") 251 if manage_mode in ["http", "svn"]: 252 os.makedirs(local_path, exist_ok=True) 253 self.__make_project_record(local_path, record_url) 254 time.sleep(1) 255 return local_path 256 257 @staticmethod 258 def __make_project_record(project_path, record_url): 259 record_txt = os.path.join(project_path, "project_record.txt") 260 if os.path.exists(record_txt): 261 LOG.info(f"修改下载记录:{record_txt}") 262 else: 263 LOG.info(f"新建下载记录:{record_txt}") 264 txt_fd = os.open(record_txt, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, FilePermission.mode_644) 265 with os.fdopen(txt_fd, "w", encoding="utf-8") as record_file: 266 record_file.write(record_url) 267 268 @staticmethod 269 def __clean_project(project_path): 270 LOG.info(f"clean project files, path: {project_path}") 271 if os.name == "nt": 272 # 清理目录时,.svn文件夹可能会被TSVNCache.exe进程占用 273 Utils.execute_command("taskkill /f /t /im TSVNCache.exe") 274 for file in os.listdir(project_path): 275 if file in ["project_record.txt", ".lock_file"]: 276 continue 277 temp_path = os.path.join(project_path, file) 278 Utils.delete_file(temp_path) 279 280 @staticmethod 281 def __get_svn_info(path): 282 repository = None 283 Utils.execute_command(f"svn cleanup {path}") 284 output = Utils.execute_command("svn info", cwd=path).strip("\n") 285 if output.find("Revision") != -1: 286 for line in output.split("\n"): 287 if line.startswith("URL"): 288 # 处理含中文字符的svn地址 289 repository = unquote(line.split(" ")[1], encoding="utf-8") 290 break 291 if repository is None: 292 LOG.info(f"execute svn info's output: {output}") 293 return repository 294 295 def find_testcase_json(self): 296 """查找用例json""" 297 case_names = self.case_names 298 in_path = os.path.join(self.project_path, "testcases") 299 results = {case_name: "" for case_name in case_names} 300 set_case_names = set(case_names) 301 for top, _, filenames in os.walk(in_path): 302 for filename in filenames: 303 if not filename.endswith(".json"): 304 continue 305 case_name = filename[:-5] 306 case_path = os.path.join(top, filename) 307 # 任务为指定用例编号 308 if not case_names: 309 results.update({case_name: case_path}) 310 continue 311 # 任务指定了用例编号 312 if case_name in set_case_names: 313 results.update({case_name: case_path}) 314 set_case_names.remove(case_name) 315 # 任务指定了用例编号,并找到了所有用例json的路径,退出循环 316 if case_names and not set_case_names: 317 break 318 return results 319