• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import re
22import shutil
23import socket
24import sys
25import time
26import platform
27import argparse
28import subprocess
29import signal
30import uuid
31import json
32import stat
33from datetime import timezone
34from datetime import timedelta
35from datetime import datetime
36from tempfile import NamedTemporaryFile
37
38from _core.error import ErrorCategory
39from _core.error import ErrorMessage
40from _core.executor.bean import SuiteResult
41from _core.driver.parser_lite import ShellHandler
42from _core.exception import ParamError
43from _core.exception import ExecuteTerminate
44from _core.logger import platform_logger
45from _core.report.suite_reporter import SuiteReporter
46from _core.plugin import get_plugin
47from _core.plugin import Plugin
48from _core.constants import ModeType
49from _core.constants import CaseResult
50from _core.constants import ConfigConst
51
52LOG = platform_logger("Utils")
53
54
55def get_filename_extension(file_path):
56    _, fullname = os.path.split(file_path)
57    filename, ext = os.path.splitext(fullname)
58    return filename, ext
59
60
61def unique_id(type_name, value):
62    return "{}_{}_{:0>8}".format(type_name, value,
63                                 str(uuid.uuid1()).split("-")[0])
64
65
66def start_standing_subprocess(cmd, pipe=subprocess.PIPE, return_result=False):
67    """Starts a non-blocking subprocess that is going to continue running after
68    this function returns.
69
70    A subprocess group is actually started by setting sid, so we can kill all
71    the processes spun out from the subprocess when stopping it. This is
72    necessary in case users pass in pipe commands.
73
74    Args:
75        cmd: Command to start the subprocess with.
76        pipe: pipe to get execution result
77        return_result: return execution result or not
78
79    Returns:
80        The subprocess that got started.
81    """
82    sys_type = platform.system()
83    process = subprocess.Popen(cmd, stdout=pipe, shell=False,
84                               preexec_fn=None if sys_type == "Windows"
85                               else os.setsid)
86    if not return_result:
87        return process
88    else:
89        rev = process.stdout.read()
90        return rev.decode("utf-8").strip()
91
92
93def stop_standing_subprocess(process):
94    """Stops a subprocess started by start_standing_subprocess.
95
96    Catches and ignores the PermissionError which only happens on Macs.
97
98    Args:
99        process: Subprocess to terminate.
100    """
101    try:
102        if isinstance(process, subprocess.Popen):
103            process.kill()
104        else:
105            LOG.warning(f'{process} is not a subprocess.Popen')
106    except Exception as e:
107        LOG.error(f'Stop standing subprocess error, {e}')
108
109
110def get_decode(stream):
111    if not isinstance(stream, str) and not isinstance(stream, bytes):
112        ret = str(stream)
113    else:
114        try:
115            ret = stream.decode("utf-8", errors="ignore")
116        except (ValueError, AttributeError, TypeError) as _:
117            ret = str(stream)
118    return ret
119
120
121def is_proc_running(pid, name=None):
122    if platform.system() == "Windows":
123        pid = "{}.exe".format(pid)
124        proc_sub = subprocess.Popen(["C:\\Windows\\System32\\tasklist"],
125                                    stdout=subprocess.PIPE,
126                                    shell=False)
127        proc = subprocess.Popen(["C:\\Windows\\System32\\findstr", "/B", "%s" % pid],
128                                stdin=proc_sub.stdout,
129                                stdout=subprocess.PIPE, shell=False)
130    elif platform.system() == "Linux":
131        # /bin/ps -ef | /bin/grep -v grep | /bin/grep -w pid
132        proc_sub = subprocess.Popen(["/bin/ps", "-ef"],
133                                    stdout=subprocess.PIPE,
134                                    shell=False)
135        proc_v_sub = subprocess.Popen(["/bin/grep", "-v", "grep"],
136                                      stdin=proc_sub.stdout,
137                                      stdout=subprocess.PIPE,
138                                      shell=False)
139        proc = subprocess.Popen(["/bin/grep", "-w", "%s" % pid],
140                                stdin=proc_v_sub.stdout,
141                                stdout=subprocess.PIPE, shell=False)
142    elif platform.system() == "Darwin":
143        proc_sub = subprocess.Popen(["/bin/ps", "-ef"],
144                                    stdout=subprocess.PIPE,
145                                    shell=False)
146        proc_v_sub = subprocess.Popen(["/usr/bin/grep", "-v", "grep"],
147                                      stdin=proc_sub.stdout,
148                                      stdout=subprocess.PIPE,
149                                      shell=False)
150        proc = subprocess.Popen(["/usr/bin/grep", "-w", "%s" % pid],
151                                stdin=proc_v_sub.stdout,
152                                stdout=subprocess.PIPE, shell=False)
153    else:
154        raise Exception(ErrorMessage.Common.Code_0101001)
155
156    (out, _) = proc.communicate(timeout=60)
157    out = get_decode(out).strip()
158    LOG.debug("Check %s proc running output: %s", pid, out)
159    if out == "":
160        return False
161    else:
162        return True if name is None else out.find(name) != -1
163
164
165def exec_cmd(cmd, timeout=5 * 60, error_print=True, join_result=False, redirect=False):
166    """
167    Executes commands in a new shell. Directing stderr to PIPE.
168
169    This is fastboot's own exe_cmd because of its peculiar way of writing
170    non-error info to stderr.
171
172    Args:
173        cmd: A sequence of commands and arguments.
174        timeout: timeout for exe cmd.
175        error_print: print error output or not.
176        join_result: join error and out
177        redirect: redirect output
178    Returns:
179        The output of the command run.
180    """
181    # PIPE本身可容纳的量比较小,所以程序会卡死,所以一大堆内容输出过来的时候,会导致PIPE不足够处理这些内容,因此需要将输出内容定位到其他地方,例如临时文件等
182    import tempfile
183    out_temp = tempfile.SpooledTemporaryFile(max_size=10 * 1000)
184    fileno = out_temp.fileno()
185
186    sys_type = platform.system()
187    if sys_type == "Linux" or sys_type == "Darwin":
188        if redirect:
189            proc = subprocess.Popen(cmd, stdout=fileno,
190                                    stderr=fileno, shell=False,
191                                    preexec_fn=os.setsid)
192        else:
193            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
194                                    stderr=subprocess.PIPE, shell=False,
195                                    preexec_fn=os.setsid)
196    else:
197        if redirect:
198            proc = subprocess.Popen(cmd, stdout=fileno,
199                                    stderr=fileno, shell=False)
200        else:
201            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
202                                    stderr=subprocess.PIPE, shell=False)
203    try:
204        (out, err) = proc.communicate(timeout=timeout)
205        err = get_decode(err).strip()
206        out = get_decode(out).strip()
207        if err and error_print:
208            LOG.exception(err, exc_info=False)
209        if join_result:
210            return "%s\n %s" % (out, err) if err else out
211        else:
212            return err if err else out
213
214    except (TimeoutError, KeyboardInterrupt, AttributeError, ValueError,  # pylint:disable=undefined-variable
215            EOFError, IOError, subprocess.TimeoutExpired) as e:
216        sys_type = platform.system()
217        if sys_type == "Linux" or sys_type == "Darwin":
218            os.killpg(proc.pid, signal.SIGTERM)
219        else:
220            os.kill(proc.pid, signal.SIGINT)
221        raise e
222
223
224def create_dir(path):
225    """Creates a directory if it does not exist already.
226
227    Args:
228        path: The path of the directory to create.
229    """
230    full_path = os.path.abspath(os.path.expanduser(path))
231    if not os.path.exists(full_path):
232        os.makedirs(full_path, exist_ok=True)
233
234
235def get_config_value(key, config_dict, is_list=True, default=None):
236    """Get corresponding values for key in config_dict
237
238    Args:
239        key: target key in config_dict
240        config_dict: dictionary that store values
241        is_list: decide return values is list type or not
242        default: if key not in config_dict, default value will be returned
243
244    Returns:
245        corresponding values for key
246    """
247    if not isinstance(config_dict, dict):
248        return default
249
250    value = config_dict.get(key, None)
251    if isinstance(value, bool):
252        return value
253
254    if value is None:
255        if default is not None:
256            return default
257        return [] if is_list else ""
258
259    if isinstance(value, list):
260        return value if is_list else value[0]
261    return [value] if is_list else value
262
263
264def get_file_absolute_path(input_name, paths=None, alt_dir=None):
265    """Find absolute path for input_name
266
267    Args:
268        input_name: the target file to search
269        paths: path list for searching input_name
270        alt_dir: extra dir that appended to paths
271
272    Returns:
273        absolute path for input_name
274    """
275    LOG.debug("Input name:{}, paths:{}, alt dir:{}".
276              format(input_name, paths, alt_dir))
277    input_name = str(input_name)
278    abs_paths = set(paths) if paths else set()
279    _update_paths(abs_paths)
280
281    _inputs = [input_name]
282    if input_name.startswith("resource/"):
283        _inputs.append(input_name.replace("resource/", "", 1))
284    elif input_name.startswith("testcases/"):
285        _inputs.append(input_name.replace("testcases/", "", 1))
286    elif input_name.startswith("resource\\"):
287        _inputs.append(input_name.replace("resource\\", "", 1))
288    elif input_name.startswith("testcases\\"):
289        _inputs.append(input_name.replace("testcases\\", "", 1))
290
291    for _input in _inputs:
292        for path in abs_paths:
293            if alt_dir:
294                file_path = os.path.join(path, alt_dir, _input)
295                if os.path.exists(file_path):
296                    return os.path.abspath(file_path)
297
298            file_path = os.path.join(path, _input)
299            if os.path.exists(file_path):
300                return os.path.abspath(file_path)
301
302    err_msg = ErrorMessage.Common.Code_0101002.format(ErrorCategory.Environment, input_name)
303    LOG.warning(err_msg)
304    if alt_dir:
305        LOG.debug("Alt dir is %s" % alt_dir)
306    LOG.debug("Paths is:")
307    for path in abs_paths:
308        LOG.debug(path)
309    raise ParamError(err_msg)
310
311
312def _update_paths(paths):
313    from xdevice import Variables
314    resource_dir = "resource"
315    testcases_dir = "testcases"
316
317    need_add_path = set()
318    for path in paths:
319        if not os.path.exists(path):
320            continue
321        head, tail = os.path.split(path)
322        if not tail:
323            head, tail = os.path.split(head)
324        if tail in [resource_dir, testcases_dir]:
325            need_add_path.add(head)
326    paths.update(need_add_path)
327
328    inner_dir = os.path.abspath(os.path.join(Variables.exec_dir,
329                                             testcases_dir))
330    top_inner_dir = os.path.abspath(os.path.join(Variables.top_dir,
331                                                 testcases_dir))
332    res_dir = os.path.abspath(os.path.join(Variables.exec_dir, resource_dir))
333    top_res_dir = os.path.abspath(os.path.join(Variables.top_dir,
334                                               resource_dir))
335    paths.update([inner_dir, res_dir, top_inner_dir, top_res_dir,
336                  Variables.exec_dir, Variables.top_dir])
337
338
339def modify_props(device, local_prop_file, target_prop_file, new_props):
340    """To change the props if it is need
341    Args:
342        device: the device to modify props
343        local_prop_file : the local file to save the old props
344        target_prop_file : the target prop file to change
345        new_props  : the new props
346    Returns:
347        True : prop file changed
348        False : prop file no need to change
349    """
350    is_changed = False
351    device.pull_file(target_prop_file, local_prop_file)
352    old_props = {}
353    changed_prop_key = []
354    flags = os.O_RDONLY
355    modes = stat.S_IWUSR | stat.S_IRUSR
356    with os.fdopen(os.open(local_prop_file, flags, modes), "r") as old_file:
357        lines = old_file.readlines()
358        if lines:
359            lines[-1] = lines[-1] + '\n'
360        for line in lines:
361            line = line.strip()
362            if not line.startswith("#") and line.find("=") > 0:
363                key_value = line.split("=")
364                if len(key_value) == 2:
365                    old_props[line.split("=")[0]] = line.split("=")[1]
366
367    for key, value in new_props.items():
368        if key not in old_props.keys():
369            lines.append("".join([key, "=", value, '\n']))
370            is_changed = True
371        elif old_props.get(key) != value:
372            changed_prop_key.append(key)
373            is_changed = True
374
375    if is_changed:
376        local_temp_prop_file = NamedTemporaryFile(mode='w', prefix='build',
377                                                  suffix='.tmp', delete=False)
378        for index, line in enumerate(lines):
379            if not line.startswith("#") and line.find("=") > 0:
380                key = line.split("=")[0]
381                if key in changed_prop_key:
382                    lines[index] = "".join([key, "=", new_props[key], '\n'])
383        local_temp_prop_file.writelines(lines)
384        local_temp_prop_file.close()
385        device.push_file(local_temp_prop_file.name, target_prop_file)
386        device.execute_shell_command(" ".join(["chmod 644", target_prop_file]))
387        LOG.info("Changed the system property as required successfully")
388        os.remove(local_temp_prop_file.name)
389
390    return is_changed
391
392
393def get_device_log_file(report_path, serial=None, log_name="device_log",
394                        device_name="", module_name=None, repeat=1, repeat_round=1):
395    from xdevice import Variables
396    # new a module folder to save log
397    round_folder = f"round{repeat_round}" if repeat > 1 else ""
398    log_path = os.path.join(report_path, Variables.report_vars.log_dir, round_folder)
399    if module_name:
400        log_path = os.path.join(log_path, module_name)
401    os.makedirs(log_path, exist_ok=True)
402
403    serial = serial or time.time_ns()
404    if device_name:
405        serial = "%s_%s" % (device_name, serial)
406    device_file_name = "{}_{}.log".format(log_name, str(serial).replace(
407        ":", "_"))
408    device_log_file = os.path.join(log_path, device_file_name)
409    LOG.info("Generate device log file: %s", device_log_file)
410    return device_log_file
411
412
413def check_result_report(report_root_dir, report_file, error_message="",
414                        report_name="", module_name="", **kwargs):
415    """
416    Check whether report_file exits or not. If report_file is not exist,
417    create empty report with error_message under report_root_dir
418    """
419    if os.path.exists(report_file):
420        return report_file
421    suite_name = report_name
422    if not suite_name:
423        suite_name, _ = get_filename_extension(report_file)
424
425    # 测试套运行异常,将已运行的部分用例结果记录到结果文件
426    request = kwargs.get("request")
427    if request is not None:
428        for listener in request.listeners:
429            if not hasattr(listener, "handle_half_break"):
430                continue
431            listener.handle_half_break(suite_name, error_message=error_message)
432    if os.path.exists(report_file):
433        return report_file
434
435    LOG.info(f"{report_file} does not exist, create an empty report")
436    report_dir = os.path.dirname(report_file)
437    if os.path.isabs(report_dir):
438        result_dir = report_dir
439    else:
440        result_dir = os.path.join(report_root_dir, "result", report_dir)
441    os.makedirs(result_dir, exist_ok=True)
442
443    suite_result = SuiteResult()
444    suite_result.suite_name = suite_name
445    suite_result.stacktrace = error_message
446    if module_name:
447        suite_name = module_name
448    # 设置测试结果,默认结果是Unavailable
449    result_kind = kwargs.get("result_kind", CaseResult.unavailable)
450    suite_reporter = SuiteReporter(
451        [(suite_result, [])], suite_name, result_dir,
452        modulename=module_name, message=error_message, result_kind=result_kind)
453    suite_reporter.create_empty_report()
454    return "%s.xml" % os.path.join(result_dir, suite_name)
455
456
457def get_sub_path(test_suite_path):
458    pattern = "%stests%s" % (os.sep, os.sep)
459    file_dir = os.path.dirname(test_suite_path)
460    pos = file_dir.find(pattern)
461    if -1 == pos:
462        return ""
463
464    sub_path = file_dir[pos + len(pattern):]
465    pos = sub_path.find(os.sep)
466    if -1 == pos:
467        return ""
468    return sub_path[pos + len(os.sep):]
469
470
471def is_config_str(content):
472    return True if "{" in content and "}" in content else False
473
474
475def is_python_satisfied():
476    mini_version = (3, 7, 0)
477    if sys.version_info > mini_version:
478        return True
479    LOG.error("Please use python {} or higher version to start project".format(mini_version))
480    return False
481
482
483def convert_ip(origin_ip):
484    addr = origin_ip.strip().split(".")
485    if len(addr) == 4:
486        return "{}.{}.{}.{}".format(
487            addr[0], '*' * len(addr[1]), '*' * len(addr[2]), addr[-1])
488    else:
489        return origin_ip
490
491
492def convert_port(port):
493    _port = str(port)
494    if len(_port) >= 2:
495        return "{}{}{}".format(_port[0], "*" * (len(_port) - 2), _port[-1])
496    else:
497        return "*{}".format(_port[-1])
498
499
500def convert_serial(serial):
501    if serial.startswith("remote_"):
502        return "remote_{}_{}".format(convert_ip(serial.split("_")[1]),
503                                     convert_port(serial.split("_")[-1]))
504    return serial
505
506
507def convert_mac(message):
508    if isinstance(message, list):
509        return message
510    pattern = r'.+\'hcptest\':\'(.+)\''
511    pattern2 = r'.+pass_through:.+\'hcptest\':\'(.+)\''
512    result1 = re.match(pattern, message)
513    result2 = re.search(pattern2, message)
514    if result1 or result2:
515        result = result1 if result1 else result2
516        result = result.group(1)
517        length = len(result) // 8
518        convert_mes = "{}{}{}".format(result[0:length], "*" * (len(result) - length * 2), result[-length:])
519        return message.replace(result, convert_mes)
520    else:
521        return message
522
523
524def get_shell_handler(request, parser_type):
525    suite_name = request.root.source.test_name
526    parsers = get_plugin(Plugin.PARSER, parser_type)
527    if parsers:
528        parsers = parsers[:1]
529    parser_instances = []
530    for listener in request.listeners:
531        listener.device_sn = request.config.environment.devices[0].device_sn
532    for parser in parsers:
533        parser_instance = parser.__class__()
534        parser_instance.suite_name = suite_name
535        parser_instance.listeners = request.listeners
536        parser_instances.append(parser_instance)
537    handler = ShellHandler(parser_instances)
538    return handler
539
540
541def get_kit_instances(json_config, resource_path="", testcases_path=""):
542    from _core.testkit.json_parser import JsonParser
543    kit_instances = []
544
545    # check input param
546    if not isinstance(json_config, JsonParser):
547        return kit_instances
548
549    # get kit instances
550    for kit in json_config.config.kits:
551        kit["paths"] = [resource_path, testcases_path]
552        kit_type = kit.get("type", "")
553        device_name = kit.get("device_name", None)
554        if get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type):
555            test_kit = \
556                get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type)[0]
557            test_kit_instance = test_kit.__class__()
558            test_kit_instance.__check_config__(kit)
559            setattr(test_kit_instance, "device_name", device_name)
560            kit_instances.append(test_kit_instance)
561        else:
562            raise ParamError(ErrorMessage.Common.Code_0101003.format(kit_type))
563    return kit_instances
564
565
566def check_device_name(device, kit, step="setup"):
567    kit_device_name = getattr(kit, "device_name", None)
568    device_name = device.get("name")
569    if kit_device_name and device_name and \
570            kit_device_name != device_name:
571        return False
572    if kit_device_name and device_name:
573        LOG.debug("Do kit:%s %s for device:%s",
574                  kit.__class__.__name__, step, device_name)
575    else:
576        LOG.debug("Do kit:%s %s", kit.__class__.__name__, step)
577    return True
578
579
580def check_device_env_index(device, kit):
581    if not hasattr(device, "env_index"):
582        return True
583    kit_device_index_list = getattr(kit, "env_index_list", None)
584    env_index = device.get("env_index")
585    if kit_device_index_list and env_index and \
586            len(kit_device_index_list) > 0 and env_index not in kit_device_index_list:
587        return False
588    return True
589
590
591def check_path_legal(path):
592    if path and " " in path:
593        return "\"%s\"" % path
594    return path
595
596
597def get_local_ip():
598    try:
599        sys_type = platform.system()
600        if sys_type == "Windows":
601            _list = socket.gethostbyname_ex(socket.gethostname())
602            _list = _list[2]
603            for ip_add in _list:
604                if ip_add.startswith("10."):
605                    return ip_add
606
607            return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
608        elif sys_type == "Darwin":
609            hostname = socket.getfqdn(socket.gethostname())
610            return socket.gethostbyname(hostname)
611        elif sys_type == "Linux":
612            real_ip = "/%s/%s" % ("hostip", "realip")
613            if os.path.exists(real_ip):
614                srw = None
615                try:
616                    import codecs
617                    srw = codecs.open(real_ip, "r", "utf-8")
618                    lines = srw.readlines()
619                    local_ip = str(lines[0]).strip()
620                except (IOError, ValueError) as error_message:
621                    LOG.error(error_message)
622                    local_ip = "127.0.0.1"
623                finally:
624                    if srw is not None:
625                        srw.close()
626            else:
627                local_ip = "127.0.0.1"
628            return local_ip
629        else:
630            return "127.0.0.1"
631    except Exception as error:
632        LOG.debug("Get local ip error: %s, skip!" % error)
633        return "127.0.0.1"
634
635
636class SplicingAction(argparse.Action):
637    def __call__(self, parser, namespace, values, option_string=None):
638        setattr(namespace, self.dest, " ".join(values))
639
640
641def get_test_component_version(config):
642    if check_mode(ModeType.decc):
643        return ""
644
645    try:
646        paths = [config.resource_path, config.testcases_path]
647        test_file = get_file_absolute_path("test_component.json", paths)
648        flags = os.O_RDONLY
649        modes = stat.S_IWUSR | stat.S_IRUSR
650        with os.fdopen(os.open(test_file, flags, modes), "r") as file_content:
651            json_content = json.load(file_content)
652            version = json_content.get("version", "")
653            return version
654    except (ParamError, ValueError) as error:
655        LOG.error("The exception {} happened when get version".format(error))
656    return ""
657
658
659def check_mode(mode):
660    from _core.context.center import Context
661    return Context.session().mode == mode
662
663
664def do_module_kit_setup(request, kits):
665    for device in request.get_devices():
666        setattr(device, ConfigConst.module_kits, [])
667
668    from _core.context.center import Context
669    for kit in kits:
670        run_flag = False
671        for device in request.get_devices():
672            if not Context.is_executing():
673                raise ExecuteTerminate()
674            if not check_device_env_index(device, kit):
675                continue
676            if check_device_name(device, kit):
677                run_flag = True
678                kit_copy = copy.deepcopy(kit)
679                module_kits = getattr(device, ConfigConst.module_kits)
680                module_kits.append(kit_copy)
681                kit_copy.__setup__(device, request=request)
682        if not run_flag:
683            err_msg = ErrorMessage.Common.Code_0101004.format(kit.__class__.__name__)
684            LOG.error(err_msg)
685            raise ParamError(err_msg)
686
687
688def do_module_kit_teardown(request):
689    for device in request.get_devices():
690        for kit in getattr(device, ConfigConst.module_kits, []):
691            if check_device_name(device, kit, step="teardown"):
692                kit.__teardown__(device)
693        setattr(device, ConfigConst.module_kits, [])
694
695
696def get_current_time():
697    current_time = time.time()
698    local_time = time.localtime(current_time)
699    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
700    return data_head
701
702
703def check_mode_in_sys(mode):
704    if not hasattr(sys, "mode"):
705        return False
706    return getattr(sys, "mode") == mode
707
708
709def get_cst_time():
710    sh_tz = timezone(
711        timedelta(hours=8),
712        name='Asia/Shanghai',
713    )
714    return datetime.now(tz=sh_tz)
715
716
717def get_delta_time_ms(start_time):
718    end_time = get_cst_time()
719    delta = (end_time - start_time).total_seconds() * 1000
720    return delta
721
722
723def get_netstat_proc_pid(device, port):
724    if not hasattr(device, "execute_shell_command") or \
725            not hasattr(device, "log") or \
726            not hasattr(device, "get_recover_state"):
727        return ""
728    if not device.get_recover_state():
729        return ""
730    cmd = 'netstat -atn | grep :{}'.format(port)
731    proc_running = device.execute_shell_command(cmd).strip()
732    proc_running = proc_running.split("\n")
733    for data in proc_running:
734        if str(port) in data and "grep" not in data:
735            data = data.split()
736            data = data[len(data) - 1]
737            device.log.debug('{} proc:{}'.format(port, data))
738            data = data.split("/")
739            return data[0]
740    return ""
741
742
743def get_repeat_round(d_unique_id):
744    """获取当前重复执行的轮次
745    Args:
746        d_unique_id: str, driver descriptor unique id
747    Returns:
748        repeat round
749    """
750    match_result = re.match("^TestSource_.+_.+_(\\d+)$", d_unique_id)
751    return int(match_result.group(1)) if match_result else 1
752
753
754def calculate_elapsed_time(begin, end):
755    """计算时间间隔
756    Args:
757        begin: int/datetime, begin time
758        end  : int/datetime, end time
759    Returns:
760        elapsed time description
761    """
762    elapsed = []
763    # 传入datetime对象
764    if isinstance(begin, datetime) and isinstance(end, datetime):
765        total_seconds = (end - begin).total_seconds()
766    # 传入耗时秒数
767    else:
768        total_seconds = end - begin
769    total_seconds = float(round(total_seconds, 3))
770
771    seconds = int(total_seconds)
772    if seconds < 0:
773        return f"calculate error, total seconds is {total_seconds}"
774    if seconds == 0:
775        milliseconds = int((total_seconds - seconds) * 1000)
776        if milliseconds > 0:
777            return "{}ms".format(milliseconds)
778        else:
779            return "0s"
780    d, s = divmod(seconds, 24 * 60 * 60)
781    if d >= 1:
782        elapsed.append(f"{d}d")
783    h, s = divmod(s, 60 * 60)
784    if h >= 1:
785        elapsed.append(f"{h}h")
786    m, s = divmod(s, 60)
787    if m >= 1:
788        elapsed.append(f"{m}m")
789    if s >= 1:
790        elapsed.append(f"{s}s")
791    return "".join(elapsed)
792
793
794def calculate_percent(num1, num2):
795    """计算百分比
796    Args:
797        num1: number, 被除数
798        num2: number, 除数
799    Returns:
800        percentage representation
801    """
802    if not isinstance(num1, (int, float)) or not isinstance(num2, (int, float)):
803        LOG.error("num1 or num2 is not a numeric type")
804        return "0%"
805    if num1 > num2:
806        LOG.error("the dividend(num1) is greater than the divisor(num2)")
807        return "0%"
808    if num1 == 0 or num2 == 0:
809        return "0%"
810    # 百分比表示最多有两位小数,最小表示为0.01%,99.999%取为99.99%
811    ret = str(num1 / num2 * 100)
812    ret = ret[:ret.find(".") + 3]
813    if float(ret) < 0.01:
814        ret = "0.01"
815    return ret + "%"
816
817
818def copy_folder(src, dst):
819    if not os.path.exists(src):
820        LOG.error(f"copy folder error, source path '{src}' does not exist")
821        return
822    if not os.path.exists(dst):
823        os.makedirs(dst)
824    for filename in os.listdir(src):
825        fr_path = os.path.join(src, filename)
826        to_path = os.path.join(dst, filename)
827        if os.path.isfile(fr_path):
828            shutil.copy(fr_path, to_path)
829        if os.path.isdir(fr_path):
830            if not os.path.exists(to_path):
831                os.makedirs(to_path)
832            copy_folder(fr_path, to_path)
833
834
835def show_current_environment():
836    try:
837        LOG.debug("Show the current environment. Please wait.")
838        from pip._internal.operations.freeze import freeze
839        installed_packages = list(freeze())
840        for package in installed_packages:
841            if "xdevice" in package or "hypium" in package:
842                LOG.debug(package)
843    except ImportError:
844        pass
845
846
847def check_uitest_version(uitest_version_info: str, base_version: tuple) -> bool:
848    if uitest_version_info:
849        version_list = uitest_version_info.strip().split("\n")
850        for i in range(len(version_list) - 1, -1, -1):
851            if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', version_list[i]):
852                version = tuple(version_list[i].split("."))
853                return version > base_version
854    return True
855
856
857def parse_xml_cdata(content: str) -> str:
858    """
859    提取CDATA标签里面的内容
860    :param content: 内容
861    :return: 返回content或者移除CDATA的内容
862    """
863    if content and '<![CDATA[' in content:
864        ret = re.search(r'<!\[CDATA\[(.*)]]', content)
865        if ret is not None:
866            return ret.group(1)
867    return content
868