• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import platform
21import subprocess
22import signal
23import stat
24from tempfile import NamedTemporaryFile
25
26from xdevice import ShellHandler
27from xdevice import platform_logger
28from xdevice import get_plugin
29from xdevice import Plugin
30
31LOG = platform_logger("Utils")
32
33
34def get_filename_extension(file_path):
35    _, fullname = os.path.split(file_path)
36    filename, ext = os.path.splitext(fullname)
37    return filename, ext
38
39
40def start_standing_subprocess(cmd, pipe=subprocess.PIPE, return_result=False):
41    """Starts a non-blocking subprocess that is going to continue running after
42    this function returns.
43
44    A subprocess group is actually started by setting sid, so we can kill all
45    the processes spun out from the subprocess when stopping it. This is
46    necessary in case users pass in pipe commands.
47
48    Args:
49        cmd: Command to start the subprocess with.
50        pipe: pipe to get execution result
51        return_result: return execution result or not
52
53    Returns:
54        The subprocess that got started.
55    """
56    sys_type = platform.system()
57    process = subprocess.Popen(cmd, stdout=pipe, shell=False,
58                               preexec_fn=None if sys_type == "Windows"
59                               else os.setsid)
60    if not return_result:
61        return process
62    else:
63        rev = process.stdout.read()
64        return rev.decode("utf-8").strip()
65
66
67def stop_standing_subprocess(process):
68    """Stops a subprocess started by start_standing_subprocess.
69
70    Catches and ignores the PermissionError which only happens on Macs.
71
72    Args:
73        process: Subprocess to terminate.
74    """
75    try:
76        sys_type = platform.system()
77        signal_value = signal.SIGINT if sys_type == "Windows" \
78            else signal.SIGTERM
79        os.kill(process.pid, signal_value)
80    except (PermissionError, AttributeError, FileNotFoundError,
81            SystemError) as error:
82        LOG.error("stop standing subprocess error '%s'" % error)
83
84
85def get_decode(stream):
86    if not isinstance(stream, str) and not isinstance(stream, bytes):
87        ret = str(stream)
88    else:
89        try:
90            ret = stream.decode("utf-8", errors="ignore")
91        except (ValueError, AttributeError, TypeError):
92            ret = str(stream)
93    return ret
94
95
96def is_proc_running(pid, name=None):
97    if platform.system() == "Windows":
98        proc_sub = subprocess.Popen(["C:\\Windows\\System32\\tasklist"],
99                                    stdout=subprocess.PIPE,
100                                    shell=False)
101        proc = subprocess.Popen(["C:\\Windows\\System32\\findstr", "%s" % pid],
102                                stdin=proc_sub.stdout,
103                                stdout=subprocess.PIPE, shell=False)
104    else:
105        # /bin/ps -ef | /bin/grep -v grep | /bin/grep -w pid
106        proc_sub = subprocess.Popen(["/bin/ps", "-ef"],
107                                    stdout=subprocess.PIPE,
108                                    shell=False)
109        proc_v_sub = subprocess.Popen(["/bin/grep", "-v", "grep"],
110                                      stdin=proc_sub.stdout,
111                                      stdout=subprocess.PIPE,
112                                      shell=False)
113        proc = subprocess.Popen(["/bin/grep", "-w", "%s" % pid],
114                                stdin=proc_v_sub.stdout,
115                                stdout=subprocess.PIPE, shell=False)
116    (out, _) = proc.communicate()
117    out = get_decode(out).strip()
118    LOG.debug("check %s proc running output: %s", pid, out)
119    if out == "":
120        return False
121    else:
122        return True if name is None else out.find(name) != -1
123
124
125def create_dir(path):
126    """Creates a directory if it does not exist already.
127
128    Args:
129        path: The path of the directory to create.
130    """
131    full_path = os.path.abspath(os.path.expanduser(path))
132    if not os.path.exists(full_path):
133        os.makedirs(full_path, exist_ok=True)
134
135
136def modify_props(device, local_prop_file, target_prop_file, new_props):
137    """To change the props if need
138    Args:
139        device: the device to modify props
140        local_prop_file : the local file to save the old props
141        target_prop_file : the target prop file to change
142        new_props  : the new props
143    Returns:
144        True : prop file changed
145        False : prop file no need to change
146    """
147    is_changed = False
148    device.pull_file(target_prop_file, local_prop_file)
149    old_props = {}
150    changed_prop_key = []
151    lines = []
152    flags = os.O_RDONLY
153    modes = stat.S_IWUSR | stat.S_IRUSR
154    with os.fdopen(os.open(local_prop_file, flags, modes), "r") as old_file:
155        lines = old_file.readlines()
156        if lines:
157            lines[-1] = lines[-1] + '\n'
158        for line in lines:
159            line = line.strip()
160            if not line.startswith("#") and line.find("=") > 0:
161                key_value = line.split("=")
162                if len(key_value) == 2:
163                    old_props[line.split("=")[0]] = line.split("=")[1]
164
165    for key, value in new_props.items():
166        if key not in old_props.keys():
167            lines.append("".join([key, "=", value, '\n']))
168            is_changed = True
169        elif old_props.get(key) != value:
170            changed_prop_key.append(key)
171            is_changed = True
172
173    if is_changed:
174        local_temp_prop_file = NamedTemporaryFile(mode='w', prefix='build',
175                                                  suffix='.tmp', delete=False)
176        for index, line in enumerate(lines):
177            if not line.startswith("#") and line.find("=") > 0:
178                key = line.split("=")[0]
179                if key in changed_prop_key:
180                    lines[index] = "".join([key, "=", new_props[key], '\n'])
181        local_temp_prop_file.writelines(lines)
182        local_temp_prop_file.close()
183        device.push_file(local_temp_prop_file.name, target_prop_file)
184        device.execute_shell_command(" ".join(["chmod 644", target_prop_file]))
185        LOG.info("Changed the system property as required successfully")
186        os.remove(local_temp_prop_file.name)
187
188    return is_changed
189
190
191def convert_ip(origin_ip):
192    addr = origin_ip.strip().split(".")
193    if len(addr) == 4:
194        return addr[0] + "." + '*'*len(addr[1]) + "." + '*'*len(addr[2]) + \
195               "." + addr[-1]
196    else:
197        return origin_ip
198
199
200def convert_port(port):
201    _port = str(port)
202    if len(_port) >= 2:
203        return "{}{}{}".format(_port[0], "*" * (len(_port) - 2), _port[-1])
204    else:
205        return "*{}".format(_port[-1])
206
207
208def convert_serial(serial):
209    if serial.startswith("local_"):
210        return "local_" + '*'*(len(serial)-6)
211    elif serial.startswith("remote_"):
212        return "remote_" + convert_ip(serial.split("_")[1])
213    else:
214        length = len(serial)//3
215        return serial[0:length] + "*"*(len(serial)-length*2) + serial[-length:]
216
217
218def get_shell_handler(request, parser_type):
219    suite_name = request.root.source.test_name
220    parsers = get_plugin(Plugin.PARSER, parser_type)
221    if parsers:
222        parsers = parsers[:1]
223    parser_instances = []
224    for listener in request.listeners:
225        listener.device_sn = request.config.environment.devices[0].device_sn
226    for parser in parsers:
227        parser_instance = parser.__class__()
228        parser_instance.suite_name = suite_name
229        parser_instance.listeners = request.listeners
230        parser_instances.append(parser_instance)
231    handler = ShellHandler(parser_instances)
232    return handler
233
234
235def check_path_legal(path):
236    if path and " " in path:
237        return "\"%s\"" % path
238    return path
239