1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import importlib 20import os 21import re 22import secrets 23import socket 24import sys 25 26from xdevice import calculate_elapsed_time 27from xdevice import get_decode 28from xdevice import ParamError 29from xdevice import DeviceConnectorType 30 31from devicetest.core.error_message import ErrorMessage 32from devicetest.core.exception import DeviceTestError 33from devicetest.log.logger import DeviceTestLog as log 34 35 36def clean_sys_resource(file_path=None, file_base_name=None): 37 """ 38 clean sys.path/sys.modules resource 39 :param file_path: sys path 40 :param file_base_name: module name 41 :return: None 42 """ 43 if file_path in sys.path: 44 sys.path.remove(file_path) 45 46 if file_base_name in sys.modules: 47 del sys.modules[file_base_name] 48 49 50def get_base_name(file_abs_path, is_abs_name=False): 51 """ 52 Args: 53 file_abs_path: str , file path 54 is_abs_name : bool, 55 Returns: 56 file base name 57 Example: 58 input: D:/xdevice/decc.py 59 if is_abs_name return: D:/xdevice/decc, else return: decc 60 """ 61 if isinstance(file_abs_path, str): 62 base_name = file_abs_path if is_abs_name else os.path.basename( 63 file_abs_path) 64 file_base_name, _ = os.path.splitext(base_name) 65 return file_base_name 66 return None 67 68 69def get_dir_path(file_path): 70 if isinstance(file_path, str): 71 if os.path.exists(file_path): 72 return os.path.dirname(file_path) 73 return None 74 75 76def import_from_file(file_path, file_base_name): 77 if file_path in sys.path: 78 sys.path.remove(file_path) 79 80 sys.path.insert(0, file_path) 81 if file_base_name in sys.modules: 82 del sys.modules[file_base_name] 83 84 try: 85 importlib.import_module(file_base_name) 86 except Exception as exception: 87 file_abs_path = os.path.join(file_path, file_base_name) 88 error_msg = "Can't load file {}".format(file_abs_path) 89 log.error(error_msg, is_traceback=True) 90 raise ImportError(error_msg) from exception 91 return getattr(sys.modules[file_base_name], file_base_name) 92 93 94def get_forward_ports(self=None): 95 try: 96 ports_list = [] 97 if hasattr(self, "is_oh") or self.usb_type == DeviceConnectorType.hdc: 98 # get hdc 99 cmd = "fport ls" 100 else: 101 cmd = "forward --list" 102 out = get_decode(self.connector_command(cmd)).strip() 103 clean_lines = out.split('\n') 104 for line_text in clean_lines: 105 # clear reverse port first Example: 'tcp:8011 tcp:9963' [Reverse] 106 if "Reverse" in line_text and "fport" in cmd: 107 connector_tokens = line_text.split() 108 self.connector_command(["fport", "rm", 109 connector_tokens[0].replace("'", ""), 110 connector_tokens[1].replace("'", "")]) 111 continue 112 connector_tokens = line_text.split("tcp:") 113 if len(connector_tokens) != 3: 114 continue 115 ports_list.append(int(connector_tokens[1])) 116 return ports_list 117 except Exception: 118 log.error(ErrorMessage.Error_01208.Message.en, 119 error_no=ErrorMessage.Error_01208.Code) 120 return [] 121 122 123def is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool: 124 """端口是否空闲""" 125 s = None 126 is_idle = False 127 try: 128 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 129 s.settimeout(3) 130 s.connect((host, port)) 131 except Exception: 132 # 已知会抛出ConnectionRefusedError和TimeoutError两种 133 is_idle = True 134 finally: 135 if s is not None: 136 s.close() 137 return is_idle 138 139 140def get_forward_port(self, host=None, port=None): 141 try: 142 ports_list = get_forward_ports(self) 143 144 port = 9999 - secrets.randbelow(99) 145 cnt = 0 146 while cnt < 10 and port > 1024: 147 if port not in ports_list and is_port_idle(host, port): 148 cnt += 1 149 break 150 151 port -= 1 152 return port 153 except Exception as error: 154 log.error(ErrorMessage.Error_01208.Message.en, 155 error_no=ErrorMessage.Error_01208.Code) 156 raise DeviceTestError(ErrorMessage.Error_01208.Topic) from error 157 158 159def get_local_ip_address(): 160 """ 161 查询本机ip地址 162 :return: ip 163 """ 164 ip = "127.0.0.1" 165 return ip 166 167 168def calculate_execution_time(begin, end): 169 """计算时间间隔 170 Args: 171 begin: datetime, begin time 172 end : datetime, end time 173 Returns: 174 elapsed time description 175 """ 176 return calculate_elapsed_time(begin, end) 177 178 179def compare_version(version, base_version: tuple, rex: str): 180 """比较两个版本号的大小,若version版本大于等于base_version,返回True 181 Args: 182 version: str, version 183 rex: version style rex 184 base_version: list, base_version 185 Example: 186 version: "4.0.0.1" base_version:[4.0.0.0] 187 if version bigger than base_version or equal to base_version, return True, else return False 188 """ 189 version = version.strip() 190 if re.match(rex, version): 191 version = tuple(version.split(".")) 192 if version > base_version: 193 return True 194 return False 195 196 197class DeviceFileUtils: 198 @staticmethod 199 def check_remote_file_is_exist(_ad, remote_file): 200 # test -f remotepath judge file exists. 201 # if exist,return 0,else return None 202 # 判断设备中文件是否存在 203 ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file) 204 if ret != "" \ 205 and len(str(ret).split()) \ 206 != 0 and str(ret).split()[0] == "0": 207 return True 208 return False 209 210 @staticmethod 211 def check_remote_dict_is_exist(_ad, remote_file): 212 # test -f remotepath judge folder exists. 213 # if exist,return 0,else return None 214 # 判断设备中文件夹是否存在 215 ret = _ad.execute_shell_command( 216 "test -d {} && echo 0".format(remote_file)) 217 if ret != "" \ 218 and len(str(ret).split()) != 0 and str(ret).split()[0] == "0": 219 return True 220 return False 221 222 223def compare_text(text, expect_text, fuzzy): 224 """支持多种匹配方式的文本匹配""" 225 if fuzzy is None or fuzzy.startswith("equal"): 226 result = (expect_text == text) 227 elif fuzzy == "starts_with": 228 result = text.startswith(expect_text) 229 elif fuzzy == "ends_with": 230 result = text.endswith(expect_text) 231 elif fuzzy == "contains": 232 result = expect_text in text 233 elif fuzzy == "regexp": 234 result = re.search(expect_text, text) 235 result = False if result is None else True 236 else: 237 raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy)) 238 return result 239 240 241def get_process_pid(device, process_name): 242 cmd = "ps -ef | grep '{}'".format(process_name) 243 ret = device.execute_shell_command(cmd) 244 ret = ret.strip() 245 pids = ret.split("\n") 246 for pid in pids: 247 if "grep" not in pid: 248 pid = pid.split() 249 return pid[1] 250 return None 251