• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2024-2025 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import re
19import os
20import logging
21import signal
22import time
23from typing import Union, Optional
24from pathlib import Path
25from subprocess import Popen, PIPE
26from threading import Thread, Timer
27from dataclasses import dataclass
28from tempfile import mktemp
29from vmb.helpers import Singleton
30
31log = logging.getLogger('vmb')
32tm_re = re.compile(
33    r"(?:Elapsed.*\(h:mm:ss or m:ss\)|Real time)"
34    r"[^:]*:\s*(?:(\d*):)?(\d*)(?:.(\d*))?")
35rss_re = re.compile(r"(?:Maximum resident set size|Max RSS)[^:]*:\s*(\d*)")
36
37
38@dataclass
39class ShellResult:
40
41    # Default initial result is 'failure'
42    ret: int = -13
43    out: str = ''
44    err: str = ''
45    tm: float = 0.0
46    rss: int = 0
47
48    def grep(self, regex: str) -> str:
49        out = self.out.split("\n")
50        err = self.err.split("\n")
51        for line in out + err:
52            m = re.search(regex, line)
53            if m:
54                if len(m.groups()) < 1:
55                    return m.group()
56                return m.group(1)
57        return ''
58
59    def replace_out(self, regex: re.Pattern, repl: str = '') -> None:
60        out = self.out.split("\n")
61        new_lines = [regex.sub(repl, line) for line in out if line.strip()]
62        self.out = "\n".join(new_lines)
63
64    def set_ret_val(self) -> None:
65        if not self.out:
66            log.error("No shell output")
67            self.ret = -13
68        matches = re.search(r"__RET_VAL__=(\d*)", self.out)
69        if not matches:
70            log.error("No shell ret val; out:")
71            self.ret = -13
72        else:
73            try:
74                self.ret = int(matches.groups()[0])
75            except ValueError:
76                log.error('Error parsing return code')
77                self.ret = -14
78
79    def set_time(self) -> None:
80        # expecting output of '\time -v' to stderr
81        if not self.err:
82            return
83        tm_val = re.search(tm_re, self.err)
84        if tm_val:
85            tmp = tm_val.groups()
86            if tmp[0] is None:
87                self.tm = float(str(tmp[1]) + "." + tmp[2])
88            else:
89                self.tm = int(tmp[0]) * 60 + float(str(tmp[1]) + "." + tmp[2])
90            self.tm = round(self.tm, 5)
91        else:
92            self.tm = 0.0
93        self.tm *= 1e9
94        rss_val = re.search(rss_re, self.err)
95        if rss_val:
96            self.rss = int(rss_val.group(1))
97        else:
98            self.rss = 0
99
100    def log_output(self) -> None:
101        if self.ret != 0:
102            if self.out:
103                log.error(self.out)
104            err = self.err.split("\n")[:3] if self.err else []
105            for line in err:
106                log.error(line.strip())
107        else:
108            if self.out:
109                log.debug(self.out)
110
111
112class ShellBase(metaclass=Singleton):
113
114    def __init__(self, timeout: Optional[float] = None) -> None:
115        self._timeout = timeout
116        self.taskset = ''
117
118    @staticmethod
119    def timed_cmd(cmd: str) -> str:
120        return f"\\time -v env {cmd}"
121
122    def run(self,
123            cmd: str,
124            measure_time: bool = False,
125            timeout: Optional[float] = None,
126            cwd: str = '') -> ShellResult:
127        raise NotImplementedError
128
129    def run_async(self, cmd: str) -> None:
130        raise NotImplementedError
131
132    def run_syslog(self, cmd: str,
133                   finished_marker: str,
134                   measure_time: bool = False,
135                   timeout: Optional[float] = None,
136                   cwd: str = '',
137                   ping_interval: int = 5,
138                   tag: str = 'VMB') -> ShellResult:
139        raise NotImplementedError
140
141    def push(self,
142             src: Union[str, Path],
143             dst: Union[str, Path]) -> ShellResult:
144        raise NotImplementedError
145
146    def pull(self,
147             src: Union[str, Path],
148             dst: Union[str, Path]) -> ShellResult:
149        raise NotImplementedError
150
151    def install(self, package: Union[str, Path], name: str = '') -> ShellResult:
152        raise NotImplementedError
153
154    def get_filesize(self, filepath: Union[str, Path]) -> int:
155        if os.path.exists(str(filepath)):
156            return os.stat(str(filepath)).st_size
157        return 0
158
159    def grep_output(self, cmd: str, regex: str) -> str:
160        r = self.run(cmd=cmd)
161        return r.grep(regex)
162
163    def set_affinity(self, arg: str) -> None:
164        """Set affinity mask for processes.
165
166        Effective only on devices, so hardcoding path
167        """
168        self.taskset = f'/system/bin/taskset -a {arg}'
169
170
171class ShellUnix(ShellBase):
172
173    def __init__(self, timeout: Optional[float] = None) -> None:
174        super().__init__(timeout=timeout)
175
176    def run(self,
177            cmd: str,
178            measure_time: bool = False,
179            timeout: Optional[float] = None,
180            cwd: str = '') -> ShellResult:
181        return self.__run(
182            cmd, measure_time=measure_time, timeout=timeout, cwd=cwd)
183
184    def push(self,
185             src: Union[str, Path],
186             dst: Union[str, Path]) -> ShellResult:
187        raise NotImplementedError
188
189    def pull(self,
190             src: Union[str, Path],
191             dst: Union[str, Path]) -> ShellResult:
192        raise NotImplementedError
193
194    def run_syslog(self, cmd: str,
195                   finished_marker: str,
196                   measure_time: bool = False,
197                   timeout: Optional[float] = None,
198                   cwd: str = '',
199                   ping_interval: int = 5,
200                   tag: str = 'VMB') -> ShellResult:
201        raise NotImplementedError
202
203    def run_async(self, cmd: str) -> None:
204        def run_shell():
205            # pylint: disable-next=all
206            return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)  # NOQA
207
208        log.debug('Async cmd: %s', cmd)
209        async_trhead = Thread(target=run_shell)
210        async_trhead.daemon = True
211        async_trhead.start()
212
213    def __run(self,
214              cmd: str,
215              measure_time: bool = False,
216              timeout: Optional[float] = None,
217              cwd: str = '') -> ShellResult:
218        if measure_time:
219            cmd = self.timed_cmd(cmd)
220        result = self.__exec_process(cmd, cwd=cwd, timeout=timeout)
221        if measure_time:
222            result.set_time()
223        result.log_output()
224        return result
225
226    def __exec_process(self, cmd: str, cwd: str = '',
227                       timeout: Optional[float] = None) -> ShellResult:
228        result = ShellResult()
229        # Note: self._timeout=None so default behaivior is to wait forever
230        to = timeout if timeout else self._timeout
231        if timeout is not None and self._timeout is not None:
232            to = max(timeout, self._timeout)
233        log.debug(cmd)
234        log.trace('CWD="%s" Timeout=[%s]', cwd, to)
235        # pylint: disable-next=all
236        with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE,  # NOQA
237                   cwd=(cwd if cwd else None),
238                   preexec_fn=os.setsid) as proc:
239            if to is not None:
240                timer = Timer(to,
241                              lambda x: os.killpg(
242                                  os.getpgid(x.pid), signal.SIGKILL), [proc])
243                timer.start()
244            out, err = proc.communicate(timeout=to)
245            if to is not None:
246                timer.cancel()
247            ret_code = proc.poll()
248            if ret_code is not None:
249                result.ret = ret_code
250            result.out = out.decode('utf-8', errors='replace')
251            result.err = err.decode('utf-8', errors='replace')
252        return result
253
254
255class ShellDevice(ShellBase):
256    def __init__(self,
257                 dev_sh: str,
258                 timeout: Optional[float] = None,
259                 tmp_dir: str = '/data/local/tmp/vmb',) -> None:
260        super().__init__(timeout=timeout)
261        self._sh = ShellUnix()
262        self._devsh = dev_sh
263        self.tmp_dir = tmp_dir
264        self.stderr_out = os.path.join(tmp_dir, 'vmb-stderr.out')
265
266    def run(self, cmd: str,
267            measure_time: bool = False,
268            timeout: Optional[float] = None,
269            cwd: str = '') -> ShellResult:
270        redir = ''
271        if measure_time:
272            cmd = f"\\time -v {self.taskset} env {cmd}"
273            redir = f' 2>{self.stderr_out}'
274        cwd = f'cd {cwd}; ' if cwd else ''
275        res = self._sh.run(
276            f"{self._devsh} shell '{cwd}({cmd}){redir}; echo __RET_VAL__=$?'",
277            timeout=timeout,
278            measure_time=False)
279        res.set_ret_val()
280        if measure_time:
281            stderr_host = mktemp(prefix='vmb-')
282            self.pull(self.stderr_out, stderr_host)
283            self._sh.run(f"{self._devsh} shell 'rm -f {self.stderr_out}'")
284            if not Path(stderr_host).exists():
285                res.err = 'Pull from device failed'
286                return res
287            with open(stderr_host, 'r', encoding="utf-8") as f:
288                res.err = f.read()
289            self._sh.run(f'rm -f {stderr_host}')
290            res.set_time()
291        else:
292            res.err = ''
293        return res
294
295    def run_syslog(self, cmd: str,
296                   finished_marker: str,
297                   measure_time: bool = False,
298                   timeout: Optional[float] = None,
299                   cwd: str = '',
300                   ping_interval: int = 5,
301                   tag: str = 'VMB') -> ShellResult:
302        raise NotImplementedError
303
304    def run_async(self, cmd: str) -> None:
305        self._sh.run_async(f"{self._devsh} shell '{cmd}'")
306
307    def get_filesize(self, filepath: Union[str, Path]) -> int:
308        res = self.run(f"stat -c '%s' {filepath}")
309        if res.ret == 0 and res.out:
310            return int(res.out.split("\n")[0])
311        return 0
312
313    def push(self,
314             src: Union[str, Path],
315             dst: Union[str, Path]) -> ShellResult:
316        raise NotImplementedError
317
318    def pull(self,
319             src: Union[str, Path],
320             dst: Union[str, Path]) -> ShellResult:
321        raise NotImplementedError
322
323    def install(self, package: Union[str, Path], name: str = '') -> ShellResult:
324        raise NotImplementedError
325
326    def mk_tmp_dir(self):
327        res = self.run(f'mkdir -p {self.tmp_dir}')
328        if res.ret != 0:
329            raise RuntimeError('Device connection failed!\n'
330                               f'{res.out}\n{res.err}')
331
332
333class ShellAdb(ShellDevice):
334    binname = f"a{'d'}b"
335
336    def __init__(self,
337                 dev_serial: str = '',
338                 timeout: Optional[float] = None,
339                 tmp_dir: str = '/data/local/tmp/vmb') -> None:
340        super().__init__(
341            f"{os.environ.get(self.binname.upper(), self.binname)}",
342            timeout=timeout,
343            tmp_dir=tmp_dir)
344        if dev_serial:
345            self._devsh = f'{self._devsh} -s {dev_serial}'
346        self.mk_tmp_dir()
347
348    def push(self,
349             src: Union[str, Path],
350             dst: Union[str, Path]) -> ShellResult:
351        return self._sh.run(f'{self._devsh} push {src} {dst}',
352                            measure_time=False)
353
354    def pull(self,
355             src: Union[str, Path],
356             dst: Union[str, Path]) -> ShellResult:
357        return self._sh.run(f'{self._devsh} pull {src} {dst}',
358                            measure_time=False)
359
360    def install(self, package: Union[str, Path], name: str = '') -> ShellResult:
361        raise NotImplementedError
362
363
364class ShellHdc(ShellDevice):
365    # hardcoded tag and app name for now
366    hilog_re = re.compile(r'^.*com.example.helllopanda/VMB: ')
367
368    def __init__(self,
369                 dev_serial: str = '',
370                 dev_host: str = '',
371                 timeout: Optional[float] = None,
372                 tmp_dir: str = '/data/local/tmp/vmb') -> None:
373        # -l0 because of HDC mutex file permission messages
374        # -p (undocumented) due to poor hdc performance
375        super().__init__(f"{os.environ.get('HDC', 'hdc')} -p -l0",
376                         timeout=timeout,
377                         tmp_dir=tmp_dir)
378        if dev_serial:
379            self._devsh = f'{self._devsh} -t {dev_serial}'
380        if dev_host:
381            self._devsh = f'{self._devsh} -s {dev_host}'
382        self.mk_tmp_dir()
383
384    def push(self,
385             src: Union[str, Path],
386             dst: Union[str, Path]) -> ShellResult:
387        return self._sh.run(f'{self._devsh} file send {src} {dst}',
388                            measure_time=False)
389
390    def pull(self,
391             src: Union[str, Path],
392             dst: Union[str, Path]) -> ShellResult:
393        return self._sh.run(f'{self._devsh} file recv {src} {dst}',
394                            measure_time=False)
395
396    def install(self, package: Union[str, Path], name: str = '') -> ShellResult:
397        if name:
398            self._sh.run(f'{self._devsh} uninstall {name}', measure_time=False)
399        return self._sh.run(f'{self._devsh} aa install {package}', measure_time=False)
400
401    def grab_log(self, tag: str, finished_marker: str) -> Optional[ShellResult]:
402        opts = f' -T {tag}' if tag else ''
403        res = self.run(f'hilog -x{opts}')
404        if res.grep(finished_marker):
405            # success. strip hilog data
406            res.replace_out(self.hilog_re)
407            return res
408        return None
409
410    def run_syslog(self, cmd: str,
411                   finished_marker: str,
412                   measure_time: bool = False,
413                   timeout: Optional[float] = None,
414                   cwd: str = '',
415                   ping_interval: int = 5,
416                   tag: str = 'VMB') -> ShellResult:
417        self.run('rm -f /data/log/faultlog/faultlogger/*')
418        self.run('hilog -r')  # clear log buffer
419        res = self.run(cmd=cmd, measure_time=measure_time, cwd=cwd)
420        if res.ret != 0:
421            log.error('Command failed. Skippping results.')
422            return res
423        res_log = None
424        if 0 == ping_interval:  # synchronous cmd
425            res_log = self.grab_log(tag, finished_marker)
426        else:  # async cmd
427            to = 30 if timeout is None else timeout
428            elapsed = 0
429            while elapsed < to:
430                log.debug("Waiting  %d sec for [%s]", ping_interval, finished_marker)
431                time.sleep(ping_interval)
432                elapsed += ping_interval
433                res_log = self.grab_log(tag, finished_marker)
434                if res_log:
435                    break
436        if res_log:
437            res.out = res_log.out
438            return res
439        # error. save full log
440        res.ret = 1
441        try:
442            res.out = self.run('cat /data/log/faultlog/faultlogger/* | head -40').out
443        except Exception:  # pylint: disable=broad-exception-caught
444            log.warning('Error getting fault logs!')
445        return res
446