1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import re 22import shutil 23import socket 24import sys 25import time 26import platform 27import argparse 28import subprocess 29import signal 30import uuid 31import json 32import stat 33from datetime import timezone 34from datetime import timedelta 35from datetime import datetime 36from tempfile import NamedTemporaryFile 37 38from _core.error import ErrorCategory 39from _core.error import ErrorMessage 40from _core.executor.bean import SuiteResult 41from _core.driver.parser_lite import ShellHandler 42from _core.exception import ParamError 43from _core.exception import ExecuteTerminate 44from _core.logger import platform_logger 45from _core.report.suite_reporter import SuiteReporter 46from _core.plugin import get_plugin 47from _core.plugin import Plugin 48from _core.constants import ModeType 49from _core.constants import CaseResult 50from _core.constants import ConfigConst 51 52LOG = platform_logger("Utils") 53 54 55def get_filename_extension(file_path): 56 _, fullname = os.path.split(file_path) 57 filename, ext = os.path.splitext(fullname) 58 return filename, ext 59 60 61def unique_id(type_name, value): 62 return "{}_{}_{:0>8}".format(type_name, value, 63 str(uuid.uuid1()).split("-")[0]) 64 65 66def start_standing_subprocess(cmd, pipe=subprocess.PIPE, return_result=False): 67 """Starts a non-blocking subprocess that is going to continue running after 68 this function returns. 69 70 A subprocess group is actually started by setting sid, so we can kill all 71 the processes spun out from the subprocess when stopping it. This is 72 necessary in case users pass in pipe commands. 73 74 Args: 75 cmd: Command to start the subprocess with. 76 pipe: pipe to get execution result 77 return_result: return execution result or not 78 79 Returns: 80 The subprocess that got started. 81 """ 82 sys_type = platform.system() 83 process = subprocess.Popen(cmd, stdout=pipe, shell=False, 84 preexec_fn=None if sys_type == "Windows" 85 else os.setsid) 86 if not return_result: 87 return process 88 else: 89 rev = process.stdout.read() 90 return rev.decode("utf-8").strip() 91 92 93def stop_standing_subprocess(process): 94 """Stops a subprocess started by start_standing_subprocess. 95 96 Catches and ignores the PermissionError which only happens on Macs. 97 98 Args: 99 process: Subprocess to terminate. 100 """ 101 try: 102 if isinstance(process, subprocess.Popen): 103 process.kill() 104 else: 105 LOG.warning(f'{process} is not a subprocess.Popen') 106 except Exception as e: 107 LOG.error(f'Stop standing subprocess error, {e}') 108 109 110def get_decode(stream): 111 if not isinstance(stream, str) and not isinstance(stream, bytes): 112 ret = str(stream) 113 else: 114 try: 115 ret = stream.decode("utf-8", errors="ignore") 116 except (ValueError, AttributeError, TypeError) as _: 117 ret = str(stream) 118 return ret 119 120 121def is_proc_running(pid, name=None): 122 if platform.system() == "Windows": 123 pid = "{}.exe".format(pid) 124 proc_sub = subprocess.Popen(["C:\\Windows\\System32\\tasklist"], 125 stdout=subprocess.PIPE, 126 shell=False) 127 proc = subprocess.Popen(["C:\\Windows\\System32\\findstr", "/B", "%s" % pid], 128 stdin=proc_sub.stdout, 129 stdout=subprocess.PIPE, shell=False) 130 elif platform.system() == "Linux": 131 # /bin/ps -ef | /bin/grep -v grep | /bin/grep -w pid 132 proc_sub = subprocess.Popen(["/bin/ps", "-ef"], 133 stdout=subprocess.PIPE, 134 shell=False) 135 proc_v_sub = subprocess.Popen(["/bin/grep", "-v", "grep"], 136 stdin=proc_sub.stdout, 137 stdout=subprocess.PIPE, 138 shell=False) 139 proc = subprocess.Popen(["/bin/grep", "-w", "%s" % pid], 140 stdin=proc_v_sub.stdout, 141 stdout=subprocess.PIPE, shell=False) 142 elif platform.system() == "Darwin": 143 proc_sub = subprocess.Popen(["/bin/ps", "-ef"], 144 stdout=subprocess.PIPE, 145 shell=False) 146 proc_v_sub = subprocess.Popen(["/usr/bin/grep", "-v", "grep"], 147 stdin=proc_sub.stdout, 148 stdout=subprocess.PIPE, 149 shell=False) 150 proc = subprocess.Popen(["/usr/bin/grep", "-w", "%s" % pid], 151 stdin=proc_v_sub.stdout, 152 stdout=subprocess.PIPE, shell=False) 153 else: 154 raise Exception(ErrorMessage.Common.Code_0101001) 155 156 (out, _) = proc.communicate(timeout=60) 157 out = get_decode(out).strip() 158 LOG.debug("Check %s proc running output: %s", pid, out) 159 if out == "": 160 return False 161 else: 162 return True if name is None else out.find(name) != -1 163 164 165def exec_cmd(cmd, timeout=5 * 60, error_print=True, join_result=False, redirect=False): 166 """ 167 Executes commands in a new shell. Directing stderr to PIPE. 168 169 This is fastboot's own exe_cmd because of its peculiar way of writing 170 non-error info to stderr. 171 172 Args: 173 cmd: A sequence of commands and arguments. 174 timeout: timeout for exe cmd. 175 error_print: print error output or not. 176 join_result: join error and out 177 redirect: redirect output 178 Returns: 179 The output of the command run. 180 """ 181 # PIPE本身可容纳的量比较小,所以程序会卡死,所以一大堆内容输出过来的时候,会导致PIPE不足够处理这些内容,因此需要将输出内容定位到其他地方,例如临时文件等 182 import tempfile 183 out_temp = tempfile.SpooledTemporaryFile(max_size=10 * 1000) 184 fileno = out_temp.fileno() 185 186 sys_type = platform.system() 187 if sys_type == "Linux" or sys_type == "Darwin": 188 if redirect: 189 proc = subprocess.Popen(cmd, stdout=fileno, 190 stderr=fileno, shell=False, 191 preexec_fn=os.setsid) 192 else: 193 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 194 stderr=subprocess.PIPE, shell=False, 195 preexec_fn=os.setsid) 196 else: 197 if redirect: 198 proc = subprocess.Popen(cmd, stdout=fileno, 199 stderr=fileno, shell=False) 200 else: 201 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 202 stderr=subprocess.PIPE, shell=False) 203 try: 204 (out, err) = proc.communicate(timeout=timeout) 205 err = get_decode(err).strip() 206 out = get_decode(out).strip() 207 if err and error_print: 208 LOG.exception(err, exc_info=False) 209 if join_result: 210 return "%s\n %s" % (out, err) if err else out 211 else: 212 return err if err else out 213 214 except (TimeoutError, KeyboardInterrupt, AttributeError, ValueError, # pylint:disable=undefined-variable 215 EOFError, IOError, subprocess.TimeoutExpired) as e: 216 sys_type = platform.system() 217 if sys_type == "Linux" or sys_type == "Darwin": 218 os.killpg(proc.pid, signal.SIGTERM) 219 else: 220 os.kill(proc.pid, signal.SIGINT) 221 raise e 222 223 224def create_dir(path): 225 """Creates a directory if it does not exist already. 226 227 Args: 228 path: The path of the directory to create. 229 """ 230 full_path = os.path.abspath(os.path.expanduser(path)) 231 if not os.path.exists(full_path): 232 os.makedirs(full_path, exist_ok=True) 233 234 235def get_config_value(key, config_dict, is_list=True, default=None): 236 """Get corresponding values for key in config_dict 237 238 Args: 239 key: target key in config_dict 240 config_dict: dictionary that store values 241 is_list: decide return values is list type or not 242 default: if key not in config_dict, default value will be returned 243 244 Returns: 245 corresponding values for key 246 """ 247 if not isinstance(config_dict, dict): 248 return default 249 250 value = config_dict.get(key, None) 251 if isinstance(value, bool): 252 return value 253 254 if value is None: 255 if default is not None: 256 return default 257 return [] if is_list else "" 258 259 if isinstance(value, list): 260 return value if is_list else value[0] 261 return [value] if is_list else value 262 263 264def get_file_absolute_path(input_name, paths=None, alt_dir=None): 265 """Find absolute path for input_name 266 267 Args: 268 input_name: the target file to search 269 paths: path list for searching input_name 270 alt_dir: extra dir that appended to paths 271 272 Returns: 273 absolute path for input_name 274 """ 275 LOG.debug("Input name:{}, paths:{}, alt dir:{}". 276 format(input_name, paths, alt_dir)) 277 input_name = str(input_name) 278 abs_paths = set(paths) if paths else set() 279 _update_paths(abs_paths) 280 281 _inputs = [input_name] 282 if input_name.startswith("resource/"): 283 _inputs.append(input_name.replace("resource/", "", 1)) 284 elif input_name.startswith("testcases/"): 285 _inputs.append(input_name.replace("testcases/", "", 1)) 286 elif input_name.startswith("resource\\"): 287 _inputs.append(input_name.replace("resource\\", "", 1)) 288 elif input_name.startswith("testcases\\"): 289 _inputs.append(input_name.replace("testcases\\", "", 1)) 290 291 for _input in _inputs: 292 for path in abs_paths: 293 if alt_dir: 294 file_path = os.path.join(path, alt_dir, _input) 295 if os.path.exists(file_path): 296 return os.path.abspath(file_path) 297 298 file_path = os.path.join(path, _input) 299 if os.path.exists(file_path): 300 return os.path.abspath(file_path) 301 302 err_msg = ErrorMessage.Common.Code_0101002.format(ErrorCategory.Environment, input_name) 303 LOG.warning(err_msg) 304 if alt_dir: 305 LOG.debug("Alt dir is %s" % alt_dir) 306 LOG.debug("Paths is:") 307 for path in abs_paths: 308 LOG.debug(path) 309 raise ParamError(err_msg) 310 311 312def _update_paths(paths): 313 from xdevice import Variables 314 resource_dir = "resource" 315 testcases_dir = "testcases" 316 317 need_add_path = set() 318 for path in paths: 319 if not os.path.exists(path): 320 continue 321 head, tail = os.path.split(path) 322 if not tail: 323 head, tail = os.path.split(head) 324 if tail in [resource_dir, testcases_dir]: 325 need_add_path.add(head) 326 paths.update(need_add_path) 327 328 inner_dir = os.path.abspath(os.path.join(Variables.exec_dir, 329 testcases_dir)) 330 top_inner_dir = os.path.abspath(os.path.join(Variables.top_dir, 331 testcases_dir)) 332 res_dir = os.path.abspath(os.path.join(Variables.exec_dir, resource_dir)) 333 top_res_dir = os.path.abspath(os.path.join(Variables.top_dir, 334 resource_dir)) 335 paths.update([inner_dir, res_dir, top_inner_dir, top_res_dir, 336 Variables.exec_dir, Variables.top_dir]) 337 338 339def modify_props(device, local_prop_file, target_prop_file, new_props): 340 """To change the props if it is need 341 Args: 342 device: the device to modify props 343 local_prop_file : the local file to save the old props 344 target_prop_file : the target prop file to change 345 new_props : the new props 346 Returns: 347 True : prop file changed 348 False : prop file no need to change 349 """ 350 is_changed = False 351 device.pull_file(target_prop_file, local_prop_file) 352 old_props = {} 353 changed_prop_key = [] 354 flags = os.O_RDONLY 355 modes = stat.S_IWUSR | stat.S_IRUSR 356 with os.fdopen(os.open(local_prop_file, flags, modes), "r") as old_file: 357 lines = old_file.readlines() 358 if lines: 359 lines[-1] = lines[-1] + '\n' 360 for line in lines: 361 line = line.strip() 362 if not line.startswith("#") and line.find("=") > 0: 363 key_value = line.split("=") 364 if len(key_value) == 2: 365 old_props[line.split("=")[0]] = line.split("=")[1] 366 367 for key, value in new_props.items(): 368 if key not in old_props.keys(): 369 lines.append("".join([key, "=", value, '\n'])) 370 is_changed = True 371 elif old_props.get(key) != value: 372 changed_prop_key.append(key) 373 is_changed = True 374 375 if is_changed: 376 local_temp_prop_file = NamedTemporaryFile(mode='w', prefix='build', 377 suffix='.tmp', delete=False) 378 for index, line in enumerate(lines): 379 if not line.startswith("#") and line.find("=") > 0: 380 key = line.split("=")[0] 381 if key in changed_prop_key: 382 lines[index] = "".join([key, "=", new_props[key], '\n']) 383 local_temp_prop_file.writelines(lines) 384 local_temp_prop_file.close() 385 device.push_file(local_temp_prop_file.name, target_prop_file) 386 device.execute_shell_command(" ".join(["chmod 644", target_prop_file])) 387 LOG.info("Changed the system property as required successfully") 388 os.remove(local_temp_prop_file.name) 389 390 return is_changed 391 392 393def get_device_log_file(report_path, serial=None, log_name="device_log", 394 device_name="", module_name=None, repeat=1, repeat_round=1): 395 from xdevice import Variables 396 # new a module folder to save log 397 round_folder = f"round{repeat_round}" if repeat > 1 else "" 398 log_path = os.path.join(report_path, Variables.report_vars.log_dir, round_folder) 399 if module_name: 400 log_path = os.path.join(log_path, module_name) 401 os.makedirs(log_path, exist_ok=True) 402 403 serial = serial or time.time_ns() 404 if device_name: 405 serial = "%s_%s" % (device_name, serial) 406 device_file_name = "{}_{}.log".format(log_name, str(serial).replace( 407 ":", "_")) 408 device_log_file = os.path.join(log_path, device_file_name) 409 LOG.info("Generate device log file: %s", device_log_file) 410 return device_log_file 411 412 413def check_result_report(report_root_dir, report_file, error_message="", 414 report_name="", module_name="", **kwargs): 415 """ 416 Check whether report_file exits or not. If report_file is not exist, 417 create empty report with error_message under report_root_dir 418 """ 419 if os.path.exists(report_file): 420 return report_file 421 suite_name = report_name 422 if not suite_name: 423 suite_name, _ = get_filename_extension(report_file) 424 425 # 测试套运行异常,将已运行的部分用例结果记录到结果文件 426 request = kwargs.get("request") 427 if request is not None: 428 for listener in request.listeners: 429 if not hasattr(listener, "handle_half_break"): 430 continue 431 listener.handle_half_break(suite_name, error_message=error_message) 432 if os.path.exists(report_file): 433 return report_file 434 435 LOG.info(f"{report_file} does not exist, create an empty report") 436 report_dir = os.path.dirname(report_file) 437 if os.path.isabs(report_dir): 438 result_dir = report_dir 439 else: 440 result_dir = os.path.join(report_root_dir, "result", report_dir) 441 os.makedirs(result_dir, exist_ok=True) 442 443 suite_result = SuiteResult() 444 suite_result.suite_name = suite_name 445 suite_result.stacktrace = error_message 446 if module_name: 447 suite_name = module_name 448 # 设置测试结果,默认结果是Unavailable 449 result_kind = kwargs.get("result_kind", CaseResult.unavailable) 450 suite_reporter = SuiteReporter( 451 [(suite_result, [])], suite_name, result_dir, 452 modulename=module_name, message=error_message, result_kind=result_kind) 453 suite_reporter.create_empty_report() 454 return "%s.xml" % os.path.join(result_dir, suite_name) 455 456 457def get_sub_path(test_suite_path): 458 pattern = "%stests%s" % (os.sep, os.sep) 459 file_dir = os.path.dirname(test_suite_path) 460 pos = file_dir.find(pattern) 461 if -1 == pos: 462 return "" 463 464 sub_path = file_dir[pos + len(pattern):] 465 pos = sub_path.find(os.sep) 466 if -1 == pos: 467 return "" 468 return sub_path[pos + len(os.sep):] 469 470 471def is_config_str(content): 472 return True if "{" in content and "}" in content else False 473 474 475def is_python_satisfied(): 476 mini_version = (3, 7, 0) 477 if sys.version_info > mini_version: 478 return True 479 LOG.error("Please use python {} or higher version to start project".format(mini_version)) 480 return False 481 482 483def convert_ip(origin_ip): 484 addr = origin_ip.strip().split(".") 485 if len(addr) == 4: 486 return "{}.{}.{}.{}".format( 487 addr[0], '*' * len(addr[1]), '*' * len(addr[2]), addr[-1]) 488 else: 489 return origin_ip 490 491 492def convert_port(port): 493 _port = str(port) 494 if len(_port) >= 2: 495 return "{}{}{}".format(_port[0], "*" * (len(_port) - 2), _port[-1]) 496 else: 497 return "*{}".format(_port[-1]) 498 499 500def convert_serial(serial): 501 if serial.startswith("remote_"): 502 return "remote_{}_{}".format(convert_ip(serial.split("_")[1]), 503 convert_port(serial.split("_")[-1])) 504 return serial 505 506 507def convert_mac(message): 508 if isinstance(message, list): 509 return message 510 pattern = r'.+\'hcptest\':\'(.+)\'' 511 pattern2 = r'.+pass_through:.+\'hcptest\':\'(.+)\'' 512 result1 = re.match(pattern, message) 513 result2 = re.search(pattern2, message) 514 if result1 or result2: 515 result = result1 if result1 else result2 516 result = result.group(1) 517 length = len(result) // 8 518 convert_mes = "{}{}{}".format(result[0:length], "*" * (len(result) - length * 2), result[-length:]) 519 return message.replace(result, convert_mes) 520 else: 521 return message 522 523 524def get_shell_handler(request, parser_type): 525 suite_name = request.root.source.test_name 526 parsers = get_plugin(Plugin.PARSER, parser_type) 527 if parsers: 528 parsers = parsers[:1] 529 parser_instances = [] 530 for listener in request.listeners: 531 listener.device_sn = request.config.environment.devices[0].device_sn 532 for parser in parsers: 533 parser_instance = parser.__class__() 534 parser_instance.suite_name = suite_name 535 parser_instance.listeners = request.listeners 536 parser_instances.append(parser_instance) 537 handler = ShellHandler(parser_instances) 538 return handler 539 540 541def get_kit_instances(json_config, resource_path="", testcases_path=""): 542 from _core.testkit.json_parser import JsonParser 543 kit_instances = [] 544 545 # check input param 546 if not isinstance(json_config, JsonParser): 547 return kit_instances 548 549 # get kit instances 550 for kit in json_config.config.kits: 551 kit["paths"] = [resource_path, testcases_path] 552 kit_type = kit.get("type", "") 553 device_name = kit.get("device_name", None) 554 if get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type): 555 test_kit = \ 556 get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type)[0] 557 test_kit_instance = test_kit.__class__() 558 test_kit_instance.__check_config__(kit) 559 setattr(test_kit_instance, "device_name", device_name) 560 kit_instances.append(test_kit_instance) 561 else: 562 raise ParamError(ErrorMessage.Common.Code_0101003.format(kit_type)) 563 return kit_instances 564 565 566def check_device_name(device, kit, step="setup"): 567 kit_device_name = getattr(kit, "device_name", None) 568 device_name = device.get("name") 569 if kit_device_name and device_name and \ 570 kit_device_name != device_name: 571 return False 572 if kit_device_name and device_name: 573 LOG.debug("Do kit:%s %s for device:%s", 574 kit.__class__.__name__, step, device_name) 575 else: 576 LOG.debug("Do kit:%s %s", kit.__class__.__name__, step) 577 return True 578 579 580def check_device_env_index(device, kit): 581 if not hasattr(device, "env_index"): 582 return True 583 kit_device_index_list = getattr(kit, "env_index_list", None) 584 env_index = device.get("env_index") 585 if kit_device_index_list and env_index and \ 586 len(kit_device_index_list) > 0 and env_index not in kit_device_index_list: 587 return False 588 return True 589 590 591def check_path_legal(path): 592 if path and " " in path: 593 return "\"%s\"" % path 594 return path 595 596 597def get_local_ip(): 598 try: 599 sys_type = platform.system() 600 if sys_type == "Windows": 601 _list = socket.gethostbyname_ex(socket.gethostname()) 602 _list = _list[2] 603 for ip_add in _list: 604 if ip_add.startswith("10."): 605 return ip_add 606 607 return socket.gethostbyname(socket.getfqdn(socket.gethostname())) 608 elif sys_type == "Darwin": 609 hostname = socket.getfqdn(socket.gethostname()) 610 return socket.gethostbyname(hostname) 611 elif sys_type == "Linux": 612 real_ip = "/%s/%s" % ("hostip", "realip") 613 if os.path.exists(real_ip): 614 srw = None 615 try: 616 import codecs 617 srw = codecs.open(real_ip, "r", "utf-8") 618 lines = srw.readlines() 619 local_ip = str(lines[0]).strip() 620 except (IOError, ValueError) as error_message: 621 LOG.error(error_message) 622 local_ip = "127.0.0.1" 623 finally: 624 if srw is not None: 625 srw.close() 626 else: 627 local_ip = "127.0.0.1" 628 return local_ip 629 else: 630 return "127.0.0.1" 631 except Exception as error: 632 LOG.debug("Get local ip error: %s, skip!" % error) 633 return "127.0.0.1" 634 635 636class SplicingAction(argparse.Action): 637 def __call__(self, parser, namespace, values, option_string=None): 638 setattr(namespace, self.dest, " ".join(values)) 639 640 641def get_test_component_version(config): 642 if check_mode(ModeType.decc): 643 return "" 644 645 try: 646 paths = [config.resource_path, config.testcases_path] 647 test_file = get_file_absolute_path("test_component.json", paths) 648 flags = os.O_RDONLY 649 modes = stat.S_IWUSR | stat.S_IRUSR 650 with os.fdopen(os.open(test_file, flags, modes), "r") as file_content: 651 json_content = json.load(file_content) 652 version = json_content.get("version", "") 653 return version 654 except (ParamError, ValueError) as error: 655 LOG.error("The exception {} happened when get version".format(error)) 656 return "" 657 658 659def check_mode(mode): 660 from _core.context.center import Context 661 return Context.session().mode == mode 662 663 664def do_module_kit_setup(request, kits): 665 for device in request.get_devices(): 666 setattr(device, ConfigConst.module_kits, []) 667 668 from _core.context.center import Context 669 for kit in kits: 670 run_flag = False 671 for device in request.get_devices(): 672 if not Context.is_executing(): 673 raise ExecuteTerminate() 674 if not check_device_env_index(device, kit): 675 continue 676 if check_device_name(device, kit): 677 run_flag = True 678 kit_copy = copy.deepcopy(kit) 679 module_kits = getattr(device, ConfigConst.module_kits) 680 module_kits.append(kit_copy) 681 kit_copy.__setup__(device, request=request) 682 if not run_flag: 683 err_msg = ErrorMessage.Common.Code_0101004.format(kit.__class__.__name__) 684 LOG.error(err_msg) 685 raise ParamError(err_msg) 686 687 688def do_module_kit_teardown(request): 689 for device in request.get_devices(): 690 for kit in getattr(device, ConfigConst.module_kits, []): 691 if check_device_name(device, kit, step="teardown"): 692 kit.__teardown__(device) 693 setattr(device, ConfigConst.module_kits, []) 694 695 696def get_current_time(): 697 current_time = time.time() 698 local_time = time.localtime(current_time) 699 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 700 return data_head 701 702 703def check_mode_in_sys(mode): 704 if not hasattr(sys, "mode"): 705 return False 706 return getattr(sys, "mode") == mode 707 708 709def get_cst_time(): 710 sh_tz = timezone( 711 timedelta(hours=8), 712 name='Asia/Shanghai', 713 ) 714 return datetime.now(tz=sh_tz) 715 716 717def get_delta_time_ms(start_time): 718 end_time = get_cst_time() 719 delta = (end_time - start_time).total_seconds() * 1000 720 return delta 721 722 723def get_netstat_proc_pid(device, port): 724 if not hasattr(device, "execute_shell_command") or \ 725 not hasattr(device, "log") or \ 726 not hasattr(device, "get_recover_state"): 727 return "" 728 if not device.get_recover_state(): 729 return "" 730 cmd = 'netstat -atn | grep :{}'.format(port) 731 proc_running = device.execute_shell_command(cmd).strip() 732 proc_running = proc_running.split("\n") 733 for data in proc_running: 734 if str(port) in data and "grep" not in data: 735 data = data.split() 736 data = data[len(data) - 1] 737 device.log.debug('{} proc:{}'.format(port, data)) 738 data = data.split("/") 739 return data[0] 740 return "" 741 742 743def get_repeat_round(d_unique_id): 744 """获取当前重复执行的轮次 745 Args: 746 d_unique_id: str, driver descriptor unique id 747 Returns: 748 repeat round 749 """ 750 match_result = re.match("^TestSource_.+_.+_(\\d+)$", d_unique_id) 751 return int(match_result.group(1)) if match_result else 1 752 753 754def calculate_elapsed_time(begin, end): 755 """计算时间间隔 756 Args: 757 begin: int/datetime, begin time 758 end : int/datetime, end time 759 Returns: 760 elapsed time description 761 """ 762 elapsed = [] 763 # 传入datetime对象 764 if isinstance(begin, datetime) and isinstance(end, datetime): 765 total_seconds = (end - begin).total_seconds() 766 # 传入耗时秒数 767 else: 768 total_seconds = end - begin 769 total_seconds = float(round(total_seconds, 3)) 770 771 seconds = int(total_seconds) 772 if seconds < 0: 773 return f"calculate error, total seconds is {total_seconds}" 774 if seconds == 0: 775 milliseconds = int((total_seconds - seconds) * 1000) 776 if milliseconds > 0: 777 return "{}ms".format(milliseconds) 778 else: 779 return "0s" 780 d, s = divmod(seconds, 24 * 60 * 60) 781 if d >= 1: 782 elapsed.append(f"{d}d") 783 h, s = divmod(s, 60 * 60) 784 if h >= 1: 785 elapsed.append(f"{h}h") 786 m, s = divmod(s, 60) 787 if m >= 1: 788 elapsed.append(f"{m}m") 789 if s >= 1: 790 elapsed.append(f"{s}s") 791 return "".join(elapsed) 792 793 794def calculate_percent(num1, num2): 795 """计算百分比 796 Args: 797 num1: number, 被除数 798 num2: number, 除数 799 Returns: 800 percentage representation 801 """ 802 if not isinstance(num1, (int, float)) or not isinstance(num2, (int, float)): 803 LOG.error("num1 or num2 is not a numeric type") 804 return "0%" 805 if num1 > num2: 806 LOG.error("the dividend(num1) is greater than the divisor(num2)") 807 return "0%" 808 if num1 == 0 or num2 == 0: 809 return "0%" 810 # 百分比表示最多有两位小数,最小表示为0.01%,99.999%取为99.99% 811 ret = str(num1 / num2 * 100) 812 ret = ret[:ret.find(".") + 3] 813 if float(ret) < 0.01: 814 ret = "0.01" 815 return ret + "%" 816 817 818def copy_folder(src, dst): 819 if not os.path.exists(src): 820 LOG.error(f"copy folder error, source path '{src}' does not exist") 821 return 822 if not os.path.exists(dst): 823 os.makedirs(dst) 824 for filename in os.listdir(src): 825 fr_path = os.path.join(src, filename) 826 to_path = os.path.join(dst, filename) 827 if os.path.isfile(fr_path): 828 shutil.copy(fr_path, to_path) 829 if os.path.isdir(fr_path): 830 if not os.path.exists(to_path): 831 os.makedirs(to_path) 832 copy_folder(fr_path, to_path) 833 834 835def show_current_environment(): 836 try: 837 LOG.debug("Show the current environment. Please wait.") 838 from pip._internal.operations.freeze import freeze 839 installed_packages = list(freeze()) 840 for package in installed_packages: 841 if "xdevice" in package or "hypium" in package: 842 LOG.debug(package) 843 except ImportError: 844 pass 845 846 847def check_uitest_version(uitest_version_info: str, base_version: tuple) -> bool: 848 if uitest_version_info: 849 version_list = uitest_version_info.strip().split("\n") 850 for i in range(len(version_list) - 1, -1, -1): 851 if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', version_list[i]): 852 version = tuple(version_list[i].split(".")) 853 return version > base_version 854 return True 855 856 857def parse_xml_cdata(content: str) -> str: 858 """ 859 提取CDATA标签里面的内容 860 :param content: 内容 861 :return: 返回content或者移除CDATA的内容 862 """ 863 if content and '<![CDATA[' in content: 864 ret = re.search(r'<!\[CDATA\[(.*)]]', content) 865 if ret is not None: 866 return ret.group(1) 867 return content 868