• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) Huawei Device Co., Ltd. 2025. All right reserved.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import hashlib
20import json
21import os
22import re
23import shlex
24import shutil
25import subprocess
26import tarfile
27import time
28import zipfile
29from enum import Enum
30from typing import List
31
32import psutil
33import requests
34import urllib3
35
36from _core.constants import CaseResult, Cluster, ConfigConst
37from _core.error import Error, ErrorMessage
38from _core.executor.bean import SuiteResult
39from _core.report.suite_reporter import SuiteReporter
40from _core.utils import get_local_ip
41from xdevice import Variables
42from xdevice import platform_logger
43
44LOG = platform_logger("Cluster")
45urllib3.disable_warnings()
46
47
48class MatchPattern(Enum):
49    contains = 0
50    equals = 1
51    ends_with = 2
52    starts_with = 3
53
54
55class MatchException:
56
57    def __init__(self, error: Error, keyword: str, pattern: MatchPattern):
58        self.error = error
59        self.keyword = keyword
60        self.pattern = pattern
61
62
63class SVN:
64
65    def __init__(self, url, username, password, project_path):
66        self.url = url.replace(" ", "%20")
67        self.password = password
68        self.project_path = project_path
69        self.usr_pwd = f"--username {username} --password {password}"
70
71    def cleanup(self):
72        cmd = f"svn cleanup {self.project_path}"
73        self.execute_svn_command(cmd)
74
75    def revert(self):
76        cmd = f"svn revert -R {self.project_path}"
77        self.execute_svn_command(cmd)
78
79    def checkout(self):
80        cmd = f"svn checkout --non-interactive {self.url} {self.project_path} {self.usr_pwd}"
81        self.execute_svn_command(cmd)
82
83    def update(self):
84        self.cleanup()
85        self.revert()
86        cmd = f"svn update --non-interactive {self.project_path} {self.usr_pwd}"
87        self.execute_svn_command(cmd)
88
89    def execute_svn_command(self, cmd: str):
90        if Utils.which("svn") is None:
91            raise Exception(ErrorMessage.Cluster.Code_0104022.format("svn"))
92        log_cmd = cmd.replace(self.password, Cluster.stars)
93        exceptions = [
94            MatchException(
95                ErrorMessage.Cluster.Code_0104014,
96                "svn: E",
97                MatchPattern.starts_with
98            )
99        ]
100        Utils.execute_command_with_logback(cmd, log_cmd=log_cmd, exceptions=exceptions)
101        LOG.info("svn工程下载或更新完成")
102
103
104class Utils:
105
106    @staticmethod
107    def delete_file(pth: str):
108        ret = True
109        try:
110            if os.path.isdir(pth):
111                shutil.rmtree(pth)
112            if os.path.isfile(pth):
113                os.remove(pth)
114        except Exception as e:
115            LOG.error(e)
116            ret = False
117        return ret
118
119    @staticmethod
120    def hash_message(message: str):
121        return hashlib.sha1(message.encode("utf-8")).hexdigest()
122
123    @staticmethod
124    def kill_process_and_child(pid):
125        """终止进程及其子进程"""
126        try:
127            process = psutil.Process(pid)
128            children = process.children(recursive=True)[::-1]
129            if len(children) > 0:
130                # 递归查找子进程
131                for child in children:
132                    Utils.kill_process_and_child(child.pid)
133        except psutil.NoSuchProcess:
134            pass
135        # 终止进程
136        Utils.kill_process_by_pid(pid)
137
138    @staticmethod
139    def kill_process_by_pid(pid):
140        """使用进程id终止进程"""
141        cmd = "taskkill /t /f /pid %d" if os.name == "nt" else "kill -9 %d"
142        Utils.execute_command(cmd % pid, timeout=5)
143
144    @staticmethod
145    def kill_process_by_pobj(p_obj):
146        """使用进程对象终止进程"""
147        if p_obj.is_alive():
148            try:
149                p_obj.terminate()
150            except PermissionError:
151                Utils.kill_process_by_pid(p_obj.pid)
152        p_obj.join()
153
154    @staticmethod
155    def match_exception(line: str, exceptions: List[MatchException] = None):
156        if exceptions is None:
157            return
158        for exp in exceptions:
159            if not isinstance(exp, MatchException):
160                continue
161            k, p = exp.keyword, exp.pattern
162            if p == MatchPattern.contains and k in line \
163                    or p == MatchPattern.equals and k == line \
164                    or p == MatchPattern.ends_with and line.endswith(k) \
165                    or p == MatchPattern.starts_with and line.startswith(k):
166                # 抛出指定异常信息
167                raise Exception(exp.error.format(line))
168
169    @staticmethod
170    def start_a_subprocess(cmd: str, cwd: str = None):
171        """使用subprocess拉起子进程执行命令"""
172        if cwd is not None and not os.path.exists(cwd):
173            raise Exception("current directory does not exist")
174        if os.name != "nt":
175            cmd = shlex.split(cmd)
176        return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd)
177
178    @staticmethod
179    def execute_command(cmd: str, cwd: str = None, timeout: float = 10):
180        """执行命令,适用于执行时间较短和输出内容较少的命令"""
181        LOG.info(f"execute command: {cmd}")
182        output = ""
183        proc = None
184        try:
185            proc = Utils.start_a_subprocess(cmd, cwd=cwd)
186            output, _ = proc.communicate(timeout=timeout)
187            output = output.decode()
188        except subprocess.TimeoutExpired as e:
189            proc.kill()
190            output = str(e)
191        except UnicodeDecodeError:
192            output = output.decode("gb2312")
193        output = output.replace("\r", "")
194        LOG.info(f"output: {output}")
195        return output
196
197    @staticmethod
198    def execute_command_with_logback(cmd: str, cwd: str = None, timeout: float = None, log_cmd: str = None,
199                                     exceptions: List[MatchException] = None):
200        """执行命令,并记录其运行输出
201        cmd: execute command
202        cwd: execute command in directory
203        timeout: execute command with timeout(seconds)
204        log_cmd: display command in log
205        exceptions: match exceptions
206        """
207        is_timeout = False
208        start_time = time.time()
209        proc = Utils.start_a_subprocess(cmd, cwd=cwd)
210        LOG.info(f"execute command: {cmd if log_cmd is None else log_cmd}")
211        if timeout is not None:
212            LOG.info(f"execute timeout: {timeout}")
213        LOG.info(f"process id: {proc.pid}")
214        try:
215            while True:
216                # 程序退出码
217                exit_code = proc.poll()
218                if exit_code is not None:
219                    break
220                if timeout is not None and time.time() - start_time >= timeout:
221                    is_timeout = True
222                    proc.kill()
223                    break
224                line = proc.stdout.readline().strip()
225                try:
226                    if not line:
227                        continue
228                    line = line.decode("utf-8", "ignore")
229                    LOG.debug(line)
230                    Utils.match_exception(line, exceptions=exceptions)
231                except UnicodeDecodeError:
232                    LOG.debug(line.decode("gb2312"))
233                except Exception as e:
234                    LOG.error(e)
235        finally:
236            proc.kill()
237        return exit_code, is_timeout
238
239    @staticmethod
240    def create_zip(src: str, out: str, exclude: List[str] = None, include: List[str] = None):
241        """创建zip压缩文件
242        src: 目标路径(文件或目录)
243        out: 生成压缩文件路径
244        exclude: 过滤文件列表,可用正则表达式,如["path/*"]
245        include: 添加文件列表,可用正则表达式,如["path/*"]
246        @return: 成功返回True,反之返回False
247        """
248        if not os.path.exists(src):
249            return False
250        with zipfile.ZipFile(out, "w") as zip_out:
251            if os.path.isfile(src):
252                zip_out.write(src, os.path.basename(src), zipfile.ZIP_DEFLATED)
253            if os.path.isdir(src):
254                if exclude:
255                    exclude = re.compile("|".join(exclude))
256                if include:
257                    include = re.compile("|".join(include))
258                for top, _, files in os.walk(src):
259                    if not files:
260                        continue
261                    for file_name in files:
262                        file_path = os.path.join(top, file_name)
263                        if file_path == out:
264                            continue
265                        temp_path = file_path.replace(src + os.sep, "").replace("\\", "/")
266                        if exclude and re.match(exclude, temp_path) is not None:
267                            continue
268                        if include and re.match(include, temp_path) is None:
269                            continue
270                        zip_out.write(file_path, temp_path, zipfile.ZIP_DEFLATED)
271        return True
272
273    @staticmethod
274    def extract_tgz(src: str, to_path: str):
275        """解压tar.gz压缩文件
276        src: 压缩文件路径
277        to_path: 压缩文件解压路径
278        @return: 成功返回True,反之返回False
279        """
280        ret = False
281        if not tarfile.is_tarfile(src):
282            return ret
283        os.makedirs(to_path, exist_ok=True)
284        with tarfile.open(src) as tgz_file:
285            for file in tgz_file.getnames():
286                tgz_file.extract(file, to_path)
287            ret = True
288        return ret
289
290    @staticmethod
291    def extract_zip(src: str, to_path: str):
292        """解压zip压缩文件
293        src: 压缩文件路径
294        to_path: 压缩文件解压路径
295        @return: 成功返回True,反之返回False
296        """
297        ret = False
298        if not zipfile.is_zipfile(src):
299            return ret
300        os.makedirs(to_path, exist_ok=True)
301        with zipfile.ZipFile(src) as zip_file:
302            infolist = zip_file.infolist()
303            # 检测点1:检测文件个数是否大于预期值
304            file_count = len(infolist)
305            if file_count >= 100 * 10000:
306                raise IOError(ErrorMessage.Cluster.Code_0104023.format(src, file_count))
307            # 检测点2:检查第一层解压文件总大小是否超过设定的上限值5GB
308            total_size = sum(info.file_size for info in infolist)
309            if total_size > 5 * (1 << 30):
310                raise IOError(ErrorMessage.Cluster.Code_0104024.format(src, total_size))
311            # 检查点3:检查第一层解压文件总大小是否超过磁盘剩余空间
312            if total_size >= psutil.disk_usage(to_path).free:
313                raise IOError(ErrorMessage.Cluster.Code_0104025.format(src, total_size))
314            # 所有检查点均通过,解压所有文件
315            for filename in zip_file.namelist():
316                zip_file.extract(filename, to_path)
317            ret = True
318        return ret
319
320    @staticmethod
321    def which(cmd: str):
322        return shutil.which(cmd) or shutil.which(cmd + ".exe")
323
324
325def create_empty_result(report_path, case_name: str, error_message: str):
326    """创建空的用例结果xml"""
327    LOG.info(f"case name: {case_name}, error message: {error_message}")
328    result_path = os.path.join(report_path, "result")
329    os.makedirs(result_path, exist_ok=True)
330
331    suite_result = SuiteResult()
332    suite_result.suite_name = case_name
333    suite_result.stacktrace = error_message
334    suite_reporter = SuiteReporter(
335        [(suite_result, [])], case_name, result_path,
336        message=error_message, result_kind=CaseResult.unavailable)
337    suite_reporter.create_empty_report()
338
339
340def do_request(url, headers=None, body=None, method="POST", **kwargs):
341    if headers is None:
342        headers = {}
343    if body is None:
344        body = {}
345    timeout = kwargs.get("timeout", (10, 60))
346    verify = kwargs.get("verify", False)
347
348    rsp = None
349    try:
350        rsp = requests.request(method, url, json=body, headers=headers, timeout=timeout, verify=verify)
351        code = rsp.status_code
352        rsp_msg = rsp.content.decode()
353    except Exception as e:
354        code = 400
355        rsp_msg = json.dumps({"status": "failed", "message": str(e)})
356    finally:
357        if rsp is not None:
358            rsp.close()
359    return code, rsp_msg
360
361
362def console_create_task(template_file):
363    """提供接口在xdevice命令行中创建测试任务"""
364    if not os.path.exists(template_file):
365        LOG.error(ErrorMessage.Cluster.Code_0104027.format(template_file))
366        return False
367    try:
368        with open(template_file, encoding="utf-8") as json_file:
369            body = json.load(json_file)
370        from .models import TaskTemplate
371        TaskTemplate.model_validate(body)
372    except Exception as e:
373        LOG.error(ErrorMessage.Cluster.Code_0104028)
374        LOG.error(e)
375        return False
376    cluster = Variables.config.cluster
377    url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/task/create"
378    LOG.info(f"create the controller task with body: {body}")
379    code, rsp_msg = do_request(url, body=body)
380    LOG.info(f"create the controller task response code: {code}")
381    LOG.info(f"create the controller task response body: {rsp_msg}")
382    if code == 201 and json.loads(rsp_msg).get("status") == "ok":
383        LOG.info("create the controller task successfully")
384    else:
385        LOG.error(ErrorMessage.Cluster.Code_0104029.format(rsp_msg))
386    return True
387
388
389def console_list_devices():
390    cluster = Variables.config.cluster
391    url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/device/list-all"
392    code, rsp_msg = do_request(url, method="GET")
393    if code != 200:
394        LOG.error(ErrorMessage.Cluster.Code_0104030.format(rsp_msg))
395        return
396    print("Controller devices:")
397    result = json.loads(rsp_msg).get("result", [])
398    if not result:
399        print("No data")
400        return
401    line = "{:<3}  {:<15}  {:<32}  {:<12}  {:<10}  {:<10}  {:<15}"
402    print(line.format("ID", "IP", "SN", "OS", "Type", "State", "Usage"))
403    for r in result:
404        print(line.format(r.get("id"), r.get("ip"), r.get("sn"), r.get("os"),
405                          r.get("type"), r.get("state"), r.get("usage_state")))
406
407
408def console_list_task(task_id: str = ""):
409    if task_id:
410        uri = f"/controller/v1/task/{task_id}/list"
411    else:
412        uri = "/controller/v1/task/list-all"
413    cluster = Variables.config.cluster
414    url = cluster.get(ConfigConst.control_service_url) + uri
415    code, rsp_msg = do_request(url, method="GET")
416    if code != 200:
417        LOG.error(ErrorMessage.Cluster.Code_0104031.format(rsp_msg))
418        return
419    print("Controller tasks:")
420    result = json.loads(rsp_msg).get("result", [])
421    if not result:
422        print("No data")
423        return
424    line = "{:<26}  {:<10}  {}"
425    print(line.format("TaskID", "State", "ReportUrl"))
426    for r in result:
427        print(line.format(r.get("id"), r.get("state"), r.get("report_url")))
428
429
430def report_worker_device(sn, model, version, device_os, device_type, device_state):
431    cluster = Variables.config.cluster
432    if cluster.get(ConfigConst.service_mode) != Cluster.worker:
433        return
434    service_port = cluster.get(ConfigConst.service_port) or Cluster.service_port
435    url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/device/add"
436    local_ip = get_local_ip()
437    body = [{
438        "ip": local_ip,
439        "sn": sn,
440        "os": device_os if device_os != "default" else "device",
441        "type": device_type if device_type != "default" else "phone",
442        "model": model,
443        "version": version,
444        "worker_url": f"http://{local_ip}:{service_port}",
445        "state": device_state.value if isinstance(device_state, Enum) else device_state
446    }]
447    LOG.info(f"report worker device: {body}")
448    code, rsp_msg = do_request(url, body=body)
449    LOG.info(f"report worker device response code: {code}")
450    LOG.info(f"report worker device response body: {rsp_msg}")
451    if code == 200:
452        LOG.info("report worker device successfully")
453    else:
454        LOG.error(ErrorMessage.Cluster.Code_0104032.format(rsp_msg))
455
456
457def upload_task_end(task_id, block_id, report_path):
458    cluster = Variables.config.cluster
459    url = cluster.get(ConfigConst.control_service_url) + "/controller/v1/task/upload-end"
460    data = {"task_id": task_id, "block_id": block_id}
461    LOG.info(f"upload task end: {data}")
462    if os.path.exists(report_path):
463        filename = block_id + ".zip"
464        report_zip = os.path.join(report_path, filename)
465        Utils.create_zip(
466            report_path,
467            report_zip,
468            exclude=["log/task_log.log"],
469            include=["details/*", "log/*", "result/*"]
470        )
471        with open(report_zip, "rb") as file_fd:
472            files = {"file": (filename, file_fd, "application/zip")}
473            rsp = requests.post(url, data=data, files=files, timeout=30)
474    else:
475        rsp = requests.post(url, data=data, timeout=30)
476    code, rsp_msg = rsp.status_code, rsp.content.decode()
477    rsp.close()
478    LOG.info(f"upload task end response code: {code}")
479    LOG.info(f"upload task end response body: {rsp_msg}")
480    if code == 200:
481        LOG.info("upload task end successfully")
482    else:
483        LOG.error(ErrorMessage.Cluster.Code_0104033.format(rsp_msg))
484