• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import importlib
20import os
21import re
22import secrets
23import socket
24import sys
25
26from xdevice import calculate_elapsed_time
27from xdevice import get_decode
28from xdevice import ParamError
29from xdevice import DeviceConnectorType
30
31from devicetest.core.error_message import ErrorMessage
32from devicetest.core.exception import DeviceTestError
33from devicetest.log.logger import DeviceTestLog as log
34
35
36def clean_sys_resource(file_path=None, file_base_name=None):
37    """
38    clean sys.path/sys.modules resource
39    :param file_path: sys path
40    :param file_base_name: module name
41    :return: None
42    """
43    if file_path in sys.path:
44        sys.path.remove(file_path)
45
46    if file_base_name in sys.modules:
47        del sys.modules[file_base_name]
48
49
50def get_base_name(file_abs_path, is_abs_name=False):
51    """
52    Args:
53        file_abs_path: str , file path
54        is_abs_name  : bool,
55    Returns:
56        file base name
57    Example:
58        input: D:/xdevice/decc.py
59        if is_abs_name return: D:/xdevice/decc, else return: decc
60    """
61    if isinstance(file_abs_path, str):
62        base_name = file_abs_path if is_abs_name else os.path.basename(
63            file_abs_path)
64        file_base_name, _ = os.path.splitext(base_name)
65        return file_base_name
66    return None
67
68
69def get_dir_path(file_path):
70    if isinstance(file_path, str):
71        if os.path.exists(file_path):
72            return os.path.dirname(file_path)
73    return None
74
75
76def import_from_file(file_path, file_base_name):
77    if file_path in sys.path:
78        sys.path.remove(file_path)
79
80    sys.path.insert(0, file_path)
81    if file_base_name in sys.modules:
82        del sys.modules[file_base_name]
83
84    try:
85        importlib.import_module(file_base_name)
86    except Exception as exception:
87        file_abs_path = os.path.join(file_path, file_base_name)
88        error_msg = "Can't load file {}".format(file_abs_path)
89        log.error(error_msg, is_traceback=True)
90        raise ImportError(error_msg) from exception
91    return getattr(sys.modules[file_base_name], file_base_name)
92
93
94def get_forward_ports(self=None):
95    try:
96        ports_list = []
97        if hasattr(self, "is_oh") or self.usb_type == DeviceConnectorType.hdc:
98            # get hdc
99            cmd = "fport ls"
100        else:
101            cmd = "forward --list"
102        out = get_decode(self.connector_command(cmd)).strip()
103        clean_lines = out.split('\n')
104        for line_text in clean_lines:
105            # clear reverse port first  Example: 'tcp:8011 tcp:9963'     [Reverse]
106            if "Reverse" in line_text and "fport" in cmd:
107                connector_tokens = line_text.split()
108                self.connector_command(["fport", "rm",
109                                        connector_tokens[0].replace("'", ""),
110                                        connector_tokens[1].replace("'", "")])
111                continue
112            connector_tokens = line_text.split("tcp:")
113            if len(connector_tokens) != 3:
114                continue
115            ports_list.append(int(connector_tokens[1]))
116        return ports_list
117    except Exception:
118        log.error(ErrorMessage.Error_01208.Message.en,
119                  error_no=ErrorMessage.Error_01208.Code)
120        return []
121
122
123def is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool:
124    """端口是否空闲"""
125    s = None
126    is_idle = False
127    try:
128        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129        s.settimeout(3)
130        s.connect((host, port))
131    except Exception:
132        # 已知会抛出ConnectionRefusedError和TimeoutError两种
133        is_idle = True
134    finally:
135        if s is not None:
136            s.close()
137    return is_idle
138
139
140def get_forward_port(self, host=None, port=None):
141    try:
142        ports_list = get_forward_ports(self)
143
144        port = 9999 - secrets.randbelow(99)
145        cnt = 0
146        while cnt < 10 and port > 1024:
147            if port not in ports_list and is_port_idle(host, port):
148                cnt += 1
149                break
150
151            port -= 1
152        return port
153    except Exception as error:
154        log.error(ErrorMessage.Error_01208.Message.en,
155                  error_no=ErrorMessage.Error_01208.Code)
156        raise DeviceTestError(ErrorMessage.Error_01208.Topic) from error
157
158
159def get_local_ip_address():
160    """
161    查询本机ip地址
162    :return: ip
163    """
164    ip = "127.0.0.1"
165    return ip
166
167
168def calculate_execution_time(begin, end):
169    """计算时间间隔
170    Args:
171        begin: datetime, begin time
172        end  : datetime, end time
173    Returns:
174        elapsed time description
175    """
176    return calculate_elapsed_time(begin, end)
177
178
179def compare_version(version, base_version: tuple, rex: str):
180    """比较两个版本号的大小,若version版本大于等于base_version,返回True
181    Args:
182        version: str, version
183        rex: version style rex
184        base_version: list, base_version
185    Example:
186        version: "4.0.0.1" base_version:[4.0.0.0]
187        if version bigger than base_version or equal to base_version, return True, else return False
188    """
189    version = version.strip()
190    if re.match(rex, version):
191        version = tuple(version.split("."))
192        if version > base_version:
193            return True
194    return False
195
196
197class DeviceFileUtils:
198    @staticmethod
199    def check_remote_file_is_exist(_ad, remote_file):
200        # test -f remotepath judge file exists.
201        # if exist,return 0,else return None
202        # 判断设备中文件是否存在
203        ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file)
204        if ret != "" \
205                and len(str(ret).split()) \
206                != 0 and str(ret).split()[0] == "0":
207            return True
208        return False
209
210    @staticmethod
211    def check_remote_dict_is_exist(_ad, remote_file):
212        # test -f remotepath judge folder exists.
213        # if exist,return 0,else return None
214        # 判断设备中文件夹是否存在
215        ret = _ad.execute_shell_command(
216            "test -d {} && echo 0".format(remote_file))
217        if ret != "" \
218                and len(str(ret).split()) != 0 and str(ret).split()[0] == "0":
219            return True
220        return False
221
222
223def compare_text(text, expect_text, fuzzy):
224    """支持多种匹配方式的文本匹配"""
225    if fuzzy is None or fuzzy.startswith("equal"):
226        result = (expect_text == text)
227    elif fuzzy == "starts_with":
228        result = text.startswith(expect_text)
229    elif fuzzy == "ends_with":
230        result = text.endswith(expect_text)
231    elif fuzzy == "contains":
232        result = expect_text in text
233    elif fuzzy == "regexp":
234        result = re.search(expect_text, text)
235        result = False if result is None else True
236    else:
237        raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy))
238    return result
239
240
241def get_process_pid(device, process_name):
242    cmd = "ps -ef | grep '{}'".format(process_name)
243    ret = device.execute_shell_command(cmd)
244    ret = ret.strip()
245    pids = ret.split("\n")
246    for pid in pids:
247        if "grep" not in pid:
248            pid = pid.split()
249            return pid[1]
250    return None
251