1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import re 19import os 20import logging 21import signal 22import time 23from typing import Union, Optional 24from pathlib import Path 25from subprocess import Popen, PIPE 26from threading import Thread, Timer 27from dataclasses import dataclass 28from tempfile import mktemp 29from vmb.helpers import Singleton 30 31log = logging.getLogger('vmb') 32tm_re = re.compile( 33 r"(?:Elapsed.*\(h:mm:ss or m:ss\)|Real time)" 34 r"[^:]*:\s*(?:(\d*):)?(\d*)(?:.(\d*))?") 35rss_re = re.compile(r"(?:Maximum resident set size|Max RSS)[^:]*:\s*(\d*)") 36 37 38@dataclass 39class ShellResult: 40 41 # Default initial result is 'failure' 42 ret: int = -13 43 out: str = '' 44 err: str = '' 45 tm: float = 0.0 46 rss: int = 0 47 48 def grep(self, regex: str) -> str: 49 out = self.out.split("\n") 50 err = self.err.split("\n") 51 for line in out + err: 52 m = re.search(regex, line) 53 if m: 54 if len(m.groups()) < 1: 55 return m.group() 56 return m.group(1) 57 return '' 58 59 def replace_out(self, regex: re.Pattern, repl: str = '') -> None: 60 out = self.out.split("\n") 61 new_lines = [regex.sub(repl, line) for line in out if line.strip()] 62 self.out = "\n".join(new_lines) 63 64 def set_ret_val(self) -> None: 65 if not self.out: 66 log.error("No shell output") 67 self.ret = -13 68 matches = re.search(r"__RET_VAL__=(\d*)", self.out) 69 if not matches: 70 log.error("No shell ret val; out:") 71 self.ret = -13 72 else: 73 try: 74 self.ret = int(matches.groups()[0]) 75 except ValueError: 76 log.error('Error parsing return code') 77 self.ret = -14 78 79 def set_time(self) -> None: 80 # expecting output of '\time -v' to stderr 81 if not self.err: 82 return 83 tm_val = re.search(tm_re, self.err) 84 if tm_val: 85 tmp = tm_val.groups() 86 if tmp[0] is None: 87 self.tm = float(str(tmp[1]) + "." + tmp[2]) 88 else: 89 self.tm = int(tmp[0]) * 60 + float(str(tmp[1]) + "." + tmp[2]) 90 self.tm = round(self.tm, 5) 91 else: 92 self.tm = 0.0 93 self.tm *= 1e9 94 rss_val = re.search(rss_re, self.err) 95 if rss_val: 96 self.rss = int(rss_val.group(1)) 97 else: 98 self.rss = 0 99 100 def log_output(self) -> None: 101 if self.ret != 0: 102 if self.out: 103 log.error(self.out) 104 err = self.err.split("\n")[:3] if self.err else [] 105 for line in err: 106 log.error(line.strip()) 107 else: 108 if self.out: 109 log.debug(self.out) 110 111 112class ShellBase(metaclass=Singleton): 113 114 def __init__(self, timeout: Optional[float] = None) -> None: 115 self._timeout = timeout 116 self.taskset = '' 117 118 @staticmethod 119 def timed_cmd(cmd: str) -> str: 120 return f"\\time -v env {cmd}" 121 122 def run(self, 123 cmd: str, 124 measure_time: bool = False, 125 timeout: Optional[float] = None, 126 cwd: str = '') -> ShellResult: 127 raise NotImplementedError 128 129 def run_async(self, cmd: str) -> None: 130 raise NotImplementedError 131 132 def run_syslog(self, cmd: str, 133 finished_marker: str, 134 measure_time: bool = False, 135 timeout: Optional[float] = None, 136 cwd: str = '', 137 ping_interval: int = 5, 138 tag: str = 'VMB') -> ShellResult: 139 raise NotImplementedError 140 141 def push(self, 142 src: Union[str, Path], 143 dst: Union[str, Path]) -> ShellResult: 144 raise NotImplementedError 145 146 def pull(self, 147 src: Union[str, Path], 148 dst: Union[str, Path]) -> ShellResult: 149 raise NotImplementedError 150 151 def install(self, package: Union[str, Path], name: str = '') -> ShellResult: 152 raise NotImplementedError 153 154 def get_filesize(self, filepath: Union[str, Path]) -> int: 155 if os.path.exists(str(filepath)): 156 return os.stat(str(filepath)).st_size 157 return 0 158 159 def grep_output(self, cmd: str, regex: str) -> str: 160 r = self.run(cmd=cmd) 161 return r.grep(regex) 162 163 def set_affinity(self, arg: str) -> None: 164 """Set affinity mask for processes. 165 166 Effective only on devices, so hardcoding path 167 """ 168 self.taskset = f'/system/bin/taskset -a {arg}' 169 170 171class ShellUnix(ShellBase): 172 173 def __init__(self, timeout: Optional[float] = None) -> None: 174 super().__init__(timeout=timeout) 175 176 def run(self, 177 cmd: str, 178 measure_time: bool = False, 179 timeout: Optional[float] = None, 180 cwd: str = '') -> ShellResult: 181 return self.__run( 182 cmd, measure_time=measure_time, timeout=timeout, cwd=cwd) 183 184 def push(self, 185 src: Union[str, Path], 186 dst: Union[str, Path]) -> ShellResult: 187 raise NotImplementedError 188 189 def pull(self, 190 src: Union[str, Path], 191 dst: Union[str, Path]) -> ShellResult: 192 raise NotImplementedError 193 194 def run_syslog(self, cmd: str, 195 finished_marker: str, 196 measure_time: bool = False, 197 timeout: Optional[float] = None, 198 cwd: str = '', 199 ping_interval: int = 5, 200 tag: str = 'VMB') -> ShellResult: 201 raise NotImplementedError 202 203 def run_async(self, cmd: str) -> None: 204 def run_shell(): 205 # pylint: disable-next=all 206 return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) # NOQA 207 208 log.debug('Async cmd: %s', cmd) 209 async_trhead = Thread(target=run_shell) 210 async_trhead.daemon = True 211 async_trhead.start() 212 213 def __run(self, 214 cmd: str, 215 measure_time: bool = False, 216 timeout: Optional[float] = None, 217 cwd: str = '') -> ShellResult: 218 if measure_time: 219 cmd = self.timed_cmd(cmd) 220 result = self.__exec_process(cmd, cwd=cwd, timeout=timeout) 221 if measure_time: 222 result.set_time() 223 result.log_output() 224 return result 225 226 def __exec_process(self, cmd: str, cwd: str = '', 227 timeout: Optional[float] = None) -> ShellResult: 228 result = ShellResult() 229 # Note: self._timeout=None so default behaivior is to wait forever 230 to = timeout if timeout else self._timeout 231 if timeout is not None and self._timeout is not None: 232 to = max(timeout, self._timeout) 233 log.debug(cmd) 234 log.trace('CWD="%s" Timeout=[%s]', cwd, to) 235 # pylint: disable-next=all 236 with Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, # NOQA 237 cwd=(cwd if cwd else None), 238 preexec_fn=os.setsid) as proc: 239 if to is not None: 240 timer = Timer(to, 241 lambda x: os.killpg( 242 os.getpgid(x.pid), signal.SIGKILL), [proc]) 243 timer.start() 244 out, err = proc.communicate(timeout=to) 245 if to is not None: 246 timer.cancel() 247 ret_code = proc.poll() 248 if ret_code is not None: 249 result.ret = ret_code 250 result.out = out.decode('utf-8', errors='replace') 251 result.err = err.decode('utf-8', errors='replace') 252 return result 253 254 255class ShellDevice(ShellBase): 256 def __init__(self, 257 dev_sh: str, 258 timeout: Optional[float] = None, 259 tmp_dir: str = '/data/local/tmp/vmb',) -> None: 260 super().__init__(timeout=timeout) 261 self._sh = ShellUnix() 262 self._devsh = dev_sh 263 self.tmp_dir = tmp_dir 264 self.stderr_out = os.path.join(tmp_dir, 'vmb-stderr.out') 265 266 def run(self, cmd: str, 267 measure_time: bool = False, 268 timeout: Optional[float] = None, 269 cwd: str = '') -> ShellResult: 270 redir = '' 271 if measure_time: 272 cmd = f"\\time -v {self.taskset} env {cmd}" 273 redir = f' 2>{self.stderr_out}' 274 cwd = f'cd {cwd}; ' if cwd else '' 275 res = self._sh.run( 276 f"{self._devsh} shell '{cwd}({cmd}){redir}; echo __RET_VAL__=$?'", 277 timeout=timeout, 278 measure_time=False) 279 res.set_ret_val() 280 if measure_time: 281 stderr_host = mktemp(prefix='vmb-') 282 self.pull(self.stderr_out, stderr_host) 283 self._sh.run(f"{self._devsh} shell 'rm -f {self.stderr_out}'") 284 if not Path(stderr_host).exists(): 285 res.err = 'Pull from device failed' 286 return res 287 with open(stderr_host, 'r', encoding="utf-8") as f: 288 res.err = f.read() 289 self._sh.run(f'rm -f {stderr_host}') 290 res.set_time() 291 else: 292 res.err = '' 293 return res 294 295 def run_syslog(self, cmd: str, 296 finished_marker: str, 297 measure_time: bool = False, 298 timeout: Optional[float] = None, 299 cwd: str = '', 300 ping_interval: int = 5, 301 tag: str = 'VMB') -> ShellResult: 302 raise NotImplementedError 303 304 def run_async(self, cmd: str) -> None: 305 self._sh.run_async(f"{self._devsh} shell '{cmd}'") 306 307 def get_filesize(self, filepath: Union[str, Path]) -> int: 308 res = self.run(f"stat -c '%s' {filepath}") 309 if res.ret == 0 and res.out: 310 return int(res.out.split("\n")[0]) 311 return 0 312 313 def push(self, 314 src: Union[str, Path], 315 dst: Union[str, Path]) -> ShellResult: 316 raise NotImplementedError 317 318 def pull(self, 319 src: Union[str, Path], 320 dst: Union[str, Path]) -> ShellResult: 321 raise NotImplementedError 322 323 def install(self, package: Union[str, Path], name: str = '') -> ShellResult: 324 raise NotImplementedError 325 326 def mk_tmp_dir(self): 327 res = self.run(f'mkdir -p {self.tmp_dir}') 328 if res.ret != 0: 329 raise RuntimeError('Device connection failed!\n' 330 f'{res.out}\n{res.err}') 331 332 333class ShellAdb(ShellDevice): 334 binname = f"a{'d'}b" 335 336 def __init__(self, 337 dev_serial: str = '', 338 timeout: Optional[float] = None, 339 tmp_dir: str = '/data/local/tmp/vmb') -> None: 340 super().__init__( 341 f"{os.environ.get(self.binname.upper(), self.binname)}", 342 timeout=timeout, 343 tmp_dir=tmp_dir) 344 if dev_serial: 345 self._devsh = f'{self._devsh} -s {dev_serial}' 346 self.mk_tmp_dir() 347 348 def push(self, 349 src: Union[str, Path], 350 dst: Union[str, Path]) -> ShellResult: 351 return self._sh.run(f'{self._devsh} push {src} {dst}', 352 measure_time=False) 353 354 def pull(self, 355 src: Union[str, Path], 356 dst: Union[str, Path]) -> ShellResult: 357 return self._sh.run(f'{self._devsh} pull {src} {dst}', 358 measure_time=False) 359 360 def install(self, package: Union[str, Path], name: str = '') -> ShellResult: 361 raise NotImplementedError 362 363 364class ShellHdc(ShellDevice): 365 # hardcoded tag and app name for now 366 hilog_re = re.compile(r'^.*com.example.helllopanda/VMB: ') 367 368 def __init__(self, 369 dev_serial: str = '', 370 dev_host: str = '', 371 timeout: Optional[float] = None, 372 tmp_dir: str = '/data/local/tmp/vmb') -> None: 373 # -l0 because of HDC mutex file permission messages 374 # -p (undocumented) due to poor hdc performance 375 super().__init__(f"{os.environ.get('HDC', 'hdc')} -p -l0", 376 timeout=timeout, 377 tmp_dir=tmp_dir) 378 if dev_serial: 379 self._devsh = f'{self._devsh} -t {dev_serial}' 380 if dev_host: 381 self._devsh = f'{self._devsh} -s {dev_host}' 382 self.mk_tmp_dir() 383 384 def push(self, 385 src: Union[str, Path], 386 dst: Union[str, Path]) -> ShellResult: 387 return self._sh.run(f'{self._devsh} file send {src} {dst}', 388 measure_time=False) 389 390 def pull(self, 391 src: Union[str, Path], 392 dst: Union[str, Path]) -> ShellResult: 393 return self._sh.run(f'{self._devsh} file recv {src} {dst}', 394 measure_time=False) 395 396 def install(self, package: Union[str, Path], name: str = '') -> ShellResult: 397 if name: 398 self._sh.run(f'{self._devsh} uninstall {name}', measure_time=False) 399 return self._sh.run(f'{self._devsh} aa install {package}', measure_time=False) 400 401 def grab_log(self, tag: str, finished_marker: str) -> Optional[ShellResult]: 402 opts = f' -T {tag}' if tag else '' 403 res = self.run(f'hilog -x{opts}') 404 if res.grep(finished_marker): 405 # success. strip hilog data 406 res.replace_out(self.hilog_re) 407 return res 408 return None 409 410 def run_syslog(self, cmd: str, 411 finished_marker: str, 412 measure_time: bool = False, 413 timeout: Optional[float] = None, 414 cwd: str = '', 415 ping_interval: int = 5, 416 tag: str = 'VMB') -> ShellResult: 417 self.run('rm -f /data/log/faultlog/faultlogger/*') 418 self.run('hilog -r') # clear log buffer 419 res = self.run(cmd=cmd, measure_time=measure_time, cwd=cwd) 420 if res.ret != 0: 421 log.error('Command failed. Skippping results.') 422 return res 423 res_log = None 424 if 0 == ping_interval: # synchronous cmd 425 res_log = self.grab_log(tag, finished_marker) 426 else: # async cmd 427 to = 30 if timeout is None else timeout 428 elapsed = 0 429 while elapsed < to: 430 log.debug("Waiting %d sec for [%s]", ping_interval, finished_marker) 431 time.sleep(ping_interval) 432 elapsed += ping_interval 433 res_log = self.grab_log(tag, finished_marker) 434 if res_log: 435 break 436 if res_log: 437 res.out = res_log.out 438 return res 439 # error. save full log 440 res.ret = 1 441 try: 442 res.out = self.run('cat /data/log/faultlog/faultlogger/* | head -40').out 443 except Exception: # pylint: disable=broad-exception-caught 444 log.warning('Error getting fault logs!') 445 return res 446