1import os 2import re 3import time 4import logging 5import posixpath 6import subprocess 7import tarfile 8import tempfile 9import threading 10from collections import namedtuple 11 12from devlib.host import LocalConnection, PACKAGE_BIN_DIRECTORY 13from devlib.module import get_module 14from devlib.platform import Platform 15from devlib.exception import TargetError, TargetNotRespondingError, TimeoutError 16from devlib.utils.ssh import SshConnection 17from devlib.utils.android import AdbConnection, AndroidProperties, LogcatMonitor, adb_command, adb_disconnect 18from devlib.utils.misc import memoized, isiterable, convert_new_lines, merge_lists 19from devlib.utils.misc import ABI_MAP, get_cpu_name, ranges_to_list, escape_double_quotes 20from devlib.utils.types import integer, boolean, bitmask, identifier, caseless_string 21 22 23FSTAB_ENTRY_REGEX = re.compile(r'(\S+) on (.+) type (\S+) \((\S+)\)') 24ANDROID_SCREEN_STATE_REGEX = re.compile('(?:mPowerState|mScreenOn|Display Power: state)=([0-9]+|true|false|ON|OFF)', 25 re.IGNORECASE) 26ANDROID_SCREEN_RESOLUTION_REGEX = re.compile(r'mUnrestrictedScreen=\(\d+,\d+\)' 27 r'\s+(?P<width>\d+)x(?P<height>\d+)') 28DEFAULT_SHELL_PROMPT = re.compile(r'^.*(shell|root)@.*:/\S* [#$] ', 29 re.MULTILINE) 30KVERSION_REGEX =re.compile( 31 r'(?P<version>\d+)(\.(?P<major>\d+)(\.(?P<minor>\d+)(-rc(?P<rc>\d+))?)?)?(.*-g(?P<sha1>[0-9a-fA-F]{7,}))?' 32) 33 34GOOGLE_DNS_SERVER_ADDRESS = '8.8.8.8' 35 36 37class Target(object): 38 39 path = None 40 os = None 41 42 default_modules = [ 43 'hotplug', 44 'cpufreq', 45 'cpuidle', 46 'cgroups', 47 'hwmon', 48 ] 49 50 @property 51 def core_names(self): 52 return self.platform.core_names 53 54 @property 55 def core_clusters(self): 56 return self.platform.core_clusters 57 58 @property 59 def big_core(self): 60 return self.platform.big_core 61 62 @property 63 def little_core(self): 64 return self.platform.little_core 65 66 @property 67 def is_connected(self): 68 return self.conn is not None 69 70 @property 71 def connected_as_root(self): 72 if self._connected_as_root is None: 73 result = self.execute('id') 74 self._connected_as_root = 'uid=0(' in result 75 return self._connected_as_root 76 77 @property 78 @memoized 79 def is_rooted(self): 80 if self.connected_as_root: 81 return True 82 try: 83 self.execute('ls /', timeout=2, as_root=True) 84 return True 85 except (TargetError, TimeoutError): 86 return False 87 88 @property 89 @memoized 90 def needs_su(self): 91 return not self.connected_as_root and self.is_rooted 92 93 @property 94 @memoized 95 def kernel_version(self): 96 return KernelVersion(self.execute('{} uname -r -v'.format(self.busybox)).strip()) 97 98 @property 99 def os_version(self): # pylint: disable=no-self-use 100 return {} 101 102 @property 103 def abi(self): # pylint: disable=no-self-use 104 return None 105 106 @property 107 def supported_abi(self): 108 return [self.abi] 109 110 @property 111 @memoized 112 def cpuinfo(self): 113 return Cpuinfo(self.execute('cat /proc/cpuinfo')) 114 115 @property 116 @memoized 117 def number_of_cpus(self): 118 num_cpus = 0 119 corere = re.compile(r'^\s*cpu\d+\s*$') 120 output = self.execute('ls /sys/devices/system/cpu') 121 for entry in output.split(): 122 if corere.match(entry): 123 num_cpus += 1 124 return num_cpus 125 126 @property 127 @memoized 128 def config(self): 129 try: 130 return KernelConfig(self.execute('zcat /proc/config.gz')) 131 except TargetError: 132 for path in ['/boot/config', '/boot/config-$(uname -r)']: 133 try: 134 return KernelConfig(self.execute('cat {}'.format(path))) 135 except TargetError: 136 pass 137 return KernelConfig('') 138 139 @property 140 @memoized 141 def user(self): 142 return self.getenv('USER') 143 144 @property 145 def conn(self): 146 if self._connections: 147 tid = id(threading.current_thread()) 148 if tid not in self._connections: 149 self._connections[tid] = self.get_connection() 150 return self._connections[tid] 151 else: 152 return None 153 154 @property 155 def shutils(self): 156 if self._shutils is None: 157 self._setup_shutils() 158 return self._shutils 159 160 def __init__(self, 161 connection_settings=None, 162 platform=None, 163 working_directory=None, 164 executables_directory=None, 165 connect=True, 166 modules=None, 167 load_default_modules=True, 168 shell_prompt=DEFAULT_SHELL_PROMPT, 169 conn_cls=None, 170 ): 171 self._connected_as_root = None 172 self.connection_settings = connection_settings or {} 173 # Set self.platform: either it's given directly (by platform argument) 174 # or it's given in the connection_settings argument 175 # If neither, create default Platform() 176 if platform is None: 177 self.platform = self.connection_settings.get('platform', Platform()) 178 else: 179 self.platform = platform 180 # Check if the user hasn't given two different platforms 181 if 'platform' in self.connection_settings: 182 if connection_settings['platform'] is not platform: 183 raise TargetError('Platform specified in connection_settings ' 184 '({}) differs from that directly passed ' 185 '({})!)' 186 .format(connection_settings['platform'], 187 self.platform)) 188 self.connection_settings['platform'] = self.platform 189 self.working_directory = working_directory 190 self.executables_directory = executables_directory 191 self.modules = modules or [] 192 self.load_default_modules = load_default_modules 193 self.shell_prompt = shell_prompt 194 self.conn_cls = conn_cls 195 self.logger = logging.getLogger(self.__class__.__name__) 196 self._installed_binaries = {} 197 self._installed_modules = {} 198 self._cache = {} 199 self._connections = {} 200 self._shutils = None 201 self.busybox = None 202 203 if load_default_modules: 204 module_lists = [self.default_modules] 205 else: 206 module_lists = [] 207 module_lists += [self.modules, self.platform.modules] 208 self.modules = merge_lists(*module_lists, duplicates='first') 209 self._update_modules('early') 210 if connect: 211 self.connect() 212 213 # connection and initialization 214 215 def connect(self, timeout=None): 216 self.platform.init_target_connection(self) 217 tid = id(threading.current_thread()) 218 self._connections[tid] = self.get_connection(timeout=timeout) 219 self._resolve_paths() 220 self.execute('mkdir -p {}'.format(self.working_directory)) 221 self.execute('mkdir -p {}'.format(self.executables_directory)) 222 self.busybox = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, self.abi, 'busybox')) 223 self.platform.update_from_target(self) 224 self._update_modules('connected') 225 if self.platform.big_core and self.load_default_modules: 226 self._install_module(get_module('bl')) 227 228 def disconnect(self): 229 for conn in self._connections.itervalues(): 230 conn.close() 231 self._connections = {} 232 233 def get_connection(self, timeout=None): 234 if self.conn_cls == None: 235 raise ValueError('Connection class not specified on Target creation.') 236 return self.conn_cls(timeout=timeout, **self.connection_settings) # pylint: disable=not-callable 237 238 def setup(self, executables=None): 239 self._setup_shutils() 240 241 for host_exe in (executables or []): # pylint: disable=superfluous-parens 242 self.install(host_exe) 243 244 # Check for platform dependent setup procedures 245 self.platform.setup(self) 246 247 # Initialize modules which requires Buxybox (e.g. shutil dependent tasks) 248 self._update_modules('setup') 249 250 def reboot(self, hard=False, connect=True, timeout=180): 251 if hard: 252 if not self.has('hard_reset'): 253 raise TargetError('Hard reset not supported for this target.') 254 self.hard_reset() # pylint: disable=no-member 255 else: 256 if not self.is_connected: 257 message = 'Cannot reboot target becuase it is disconnected. ' +\ 258 'Either connect() first, or specify hard=True ' +\ 259 '(in which case, a hard_reset module must be installed)' 260 raise TargetError(message) 261 self.reset() 262 # Wait a fixed delay before starting polling to give the target time to 263 # shut down, otherwise, might create the connection while it's still shutting 264 # down resulting in subsequenct connection failing. 265 self.logger.debug('Waiting for target to power down...') 266 reset_delay = 20 267 time.sleep(reset_delay) 268 timeout = max(timeout - reset_delay, 10) 269 if self.has('boot'): 270 self.boot() # pylint: disable=no-member 271 self._connected_as_root = None 272 if connect: 273 self.connect(timeout=timeout) 274 275 # file transfer 276 277 def push(self, source, dest, timeout=None): 278 return self.conn.push(source, dest, timeout=timeout) 279 280 def pull(self, source, dest, timeout=None): 281 return self.conn.pull(source, dest, timeout=timeout) 282 283 def get_directory(self, source_dir, dest): 284 """ Pull a directory from the device, after compressing dir """ 285 # Create all file names 286 tar_file_name = source_dir.lstrip(self.path.sep).replace(self.path.sep, '.') 287 # Host location of dir 288 outdir = os.path.join(dest, tar_file_name) 289 # Host location of archive 290 tar_file_name = '{}.tar'.format(tar_file_name) 291 tempfile = os.path.join(dest, tar_file_name) 292 293 # Does the folder exist? 294 self.execute('ls -la {}'.format(source_dir)) 295 # Try compressing the folder 296 try: 297 self.execute('{} tar -cvf {} {}'.format(self.busybox, tar_file_name, 298 source_dir)) 299 except TargetError: 300 self.logger.debug('Failed to run tar command on target! ' \ 301 'Not pulling directory {}'.format(source_dir)) 302 # Pull the file 303 os.mkdir(outdir) 304 self.pull(tar_file_name, tempfile ) 305 # Decompress 306 f = tarfile.open(tempfile, 'r') 307 f.extractall(outdir) 308 os.remove(tempfile) 309 310 # execution 311 312 def execute(self, command, timeout=None, check_exit_code=True, as_root=False): 313 return self.conn.execute(command, timeout, check_exit_code, as_root) 314 315 def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): 316 return self.conn.background(command, stdout, stderr, as_root) 317 318 def invoke(self, binary, args=None, in_directory=None, on_cpus=None, 319 as_root=False, timeout=30): 320 """ 321 Executes the specified binary under the specified conditions. 322 323 :binary: binary to execute. Must be present and executable on the device. 324 :args: arguments to be passed to the binary. The can be either a list or 325 a string. 326 :in_directory: execute the binary in the specified directory. This must 327 be an absolute path. 328 :on_cpus: taskset the binary to these CPUs. This may be a single ``int`` (in which 329 case, it will be interpreted as the mask), a list of ``ints``, in which 330 case this will be interpreted as the list of cpus, or string, which 331 will be interpreted as a comma-separated list of cpu ranges, e.g. 332 ``"0,4-7"``. 333 :as_root: Specify whether the command should be run as root 334 :timeout: If the invocation does not terminate within this number of seconds, 335 a ``TimeoutError`` exception will be raised. Set to ``None`` if the 336 invocation should not timeout. 337 338 :returns: output of command. 339 """ 340 command = binary 341 if args: 342 if isiterable(args): 343 args = ' '.join(args) 344 command = '{} {}'.format(command, args) 345 if on_cpus: 346 on_cpus = bitmask(on_cpus) 347 command = '{} taskset 0x{:x} {}'.format(self.busybox, on_cpus, command) 348 if in_directory: 349 command = 'cd {} && {}'.format(in_directory, command) 350 return self.execute(command, as_root=as_root, timeout=timeout) 351 352 def background_invoke(self, binary, args=None, in_directory=None, 353 on_cpus=None, as_root=False): 354 """ 355 Executes the specified binary as a background task under the 356 specified conditions. 357 358 :binary: binary to execute. Must be present and executable on the device. 359 :args: arguments to be passed to the binary. The can be either a list or 360 a string. 361 :in_directory: execute the binary in the specified directory. This must 362 be an absolute path. 363 :on_cpus: taskset the binary to these CPUs. This may be a single ``int`` (in which 364 case, it will be interpreted as the mask), a list of ``ints``, in which 365 case this will be interpreted as the list of cpus, or string, which 366 will be interpreted as a comma-separated list of cpu ranges, e.g. 367 ``"0,4-7"``. 368 :as_root: Specify whether the command should be run as root 369 370 :returns: the subprocess instance handling that command 371 """ 372 command = binary 373 if args: 374 if isiterable(args): 375 args = ' '.join(args) 376 command = '{} {}'.format(command, args) 377 if on_cpus: 378 on_cpus = bitmask(on_cpus) 379 command = '{} taskset 0x{:x} {}'.format(self.busybox, on_cpus, command) 380 if in_directory: 381 command = 'cd {} && {}'.format(in_directory, command) 382 return self.background(command, as_root=as_root) 383 384 def kick_off(self, command, as_root=False): 385 raise NotImplementedError() 386 387 # sysfs interaction 388 389 def read_value(self, path, kind=None): 390 output = self.execute('cat \'{}\''.format(path), as_root=self.needs_su).strip() # pylint: disable=E1103 391 if kind: 392 return kind(output) 393 else: 394 return output 395 396 def read_int(self, path): 397 return self.read_value(path, kind=integer) 398 399 def read_bool(self, path): 400 return self.read_value(path, kind=boolean) 401 402 def write_value(self, path, value, verify=True): 403 value = str(value) 404 self.execute('echo {} > \'{}\''.format(value, path), check_exit_code=False, as_root=True) 405 if verify: 406 output = self.read_value(path) 407 if not output == value: 408 message = 'Could not set the value of {} to "{}" (read "{}")'.format(path, value, output) 409 raise TargetError(message) 410 411 def reset(self): 412 try: 413 self.execute('reboot', as_root=self.needs_su, timeout=2) 414 except (TargetError, TimeoutError, subprocess.CalledProcessError): 415 # on some targets "reboot" doesn't return gracefully 416 pass 417 self._connected_as_root = None 418 419 def check_responsive(self): 420 try: 421 self.conn.execute('ls /', timeout=5) 422 except (TimeoutError, subprocess.CalledProcessError): 423 raise TargetNotRespondingError(self.conn.name) 424 425 # process management 426 427 def kill(self, pid, signal=None, as_root=False): 428 signal_string = '-s {}'.format(signal) if signal else '' 429 self.execute('kill {} {}'.format(signal_string, pid), as_root=as_root) 430 431 def killall(self, process_name, signal=None, as_root=False): 432 for pid in self.get_pids_of(process_name): 433 try: 434 self.kill(pid, signal=signal, as_root=as_root) 435 except TargetError: 436 pass 437 438 def get_pids_of(self, process_name): 439 raise NotImplementedError() 440 441 def ps(self, **kwargs): 442 raise NotImplementedError() 443 444 # files 445 446 def file_exists(self, filepath): 447 command = 'if [ -e \'{}\' ]; then echo 1; else echo 0; fi' 448 output = self.execute(command.format(filepath), as_root=self.is_rooted) 449 return boolean(output.strip()) 450 451 def directory_exists(self, filepath): 452 output = self.execute('if [ -d \'{}\' ]; then echo 1; else echo 0; fi'.format(filepath)) 453 # output from ssh my contain part of the expression in the buffer, 454 # split out everything except the last word. 455 return boolean(output.split()[-1]) # pylint: disable=maybe-no-member 456 457 def list_file_systems(self): 458 output = self.execute('mount') 459 fstab = [] 460 for line in output.split('\n'): 461 line = line.strip() 462 if not line: 463 continue 464 match = FSTAB_ENTRY_REGEX.search(line) 465 if match: 466 fstab.append(FstabEntry(match.group(1), match.group(2), 467 match.group(3), match.group(4), 468 None, None)) 469 else: # assume pre-M Android 470 fstab.append(FstabEntry(*line.split())) 471 return fstab 472 473 def list_directory(self, path, as_root=False): 474 raise NotImplementedError() 475 476 def get_workpath(self, name): 477 return self.path.join(self.working_directory, name) 478 479 def tempfile(self, prefix='', suffix=''): 480 names = tempfile._get_candidate_names() # pylint: disable=W0212 481 for _ in xrange(tempfile.TMP_MAX): 482 name = names.next() 483 path = self.get_workpath(prefix + name + suffix) 484 if not self.file_exists(path): 485 return path 486 raise IOError('No usable temporary filename found') 487 488 def remove(self, path, as_root=False): 489 self.execute('rm -rf {}'.format(path), as_root=as_root) 490 491 # misc 492 def core_cpus(self, core): 493 return [i for i, c in enumerate(self.core_names) if c == core] 494 495 def list_online_cpus(self, core=None): 496 path = self.path.join('/sys/devices/system/cpu/online') 497 output = self.read_value(path) 498 all_online = ranges_to_list(output) 499 if core: 500 cpus = self.core_cpus(core) 501 if not cpus: 502 raise ValueError(core) 503 return [o for o in all_online if o in cpus] 504 else: 505 return all_online 506 507 def list_offline_cpus(self): 508 online = self.list_online_cpus() 509 return [c for c in xrange(self.number_of_cpus) 510 if c not in online] 511 512 def getenv(self, variable): 513 return self.execute('echo ${}'.format(variable)).rstrip('\r\n') 514 515 def capture_screen(self, filepath): 516 raise NotImplementedError() 517 518 def install(self, filepath, timeout=None, with_name=None): 519 raise NotImplementedError() 520 521 def uninstall(self, name): 522 raise NotImplementedError() 523 524 def get_installed(self, name, search_system_binaries=True): 525 # Check user installed binaries first 526 if self.file_exists(self.executables_directory): 527 if name in self.list_directory(self.executables_directory): 528 return self.path.join(self.executables_directory, name) 529 # Fall back to binaries in PATH 530 if search_system_binaries: 531 for path in self.getenv('PATH').split(self.path.pathsep): 532 try: 533 if name in self.list_directory(path): 534 return self.path.join(path, name) 535 except TargetError: 536 pass # directory does not exist or no executable premssions 537 538 which = get_installed 539 540 def install_if_needed(self, host_path, search_system_binaries=True): 541 542 binary_path = self.get_installed(os.path.split(host_path)[1], 543 search_system_binaries=search_system_binaries) 544 if not binary_path: 545 binary_path = self.install(host_path) 546 return binary_path 547 548 def is_installed(self, name): 549 return bool(self.get_installed(name)) 550 551 def bin(self, name): 552 return self._installed_binaries.get(name, name) 553 554 def has(self, modname): 555 return hasattr(self, identifier(modname)) 556 557 def lsmod(self): 558 lines = self.execute('lsmod').splitlines() 559 entries = [] 560 for line in lines[1:]: # first line is the header 561 if not line.strip(): 562 continue 563 parts = line.split() 564 name = parts[0] 565 size = int(parts[1]) 566 use_count = int(parts[2]) 567 if len(parts) > 3: 568 used_by = ''.join(parts[3:]).split(',') 569 else: 570 used_by = [] 571 entries.append(LsmodEntry(name, size, use_count, used_by)) 572 return entries 573 574 def insmod(self, path): 575 target_path = self.get_workpath(os.path.basename(path)) 576 self.push(path, target_path) 577 self.execute('insmod {}'.format(target_path), as_root=True) 578 579 580 def extract(self, path, dest=None): 581 """ 582 Extact the specified on-target file. The extraction method to be used 583 (unzip, gunzip, bunzip2, or tar) will be based on the file's extension. 584 If ``dest`` is specified, it must be an existing directory on target; 585 the extracted contents will be placed there. 586 587 Note that, depending on the archive file format (and therfore the 588 extraction method used), the original archive file may or may not exist 589 after the extraction. 590 591 The return value is the path to the extracted contents. In case of 592 gunzip and bunzip2, this will be path to the extracted file; for tar 593 and uzip, this will be the directory with the extracted file(s) 594 (``dest`` if it was specified otherwise, the directory that cotained 595 the archive). 596 597 """ 598 for ending in ['.tar.gz', '.tar.bz', '.tar.bz2', 599 '.tgz', '.tbz', '.tbz2']: 600 if path.endswith(ending): 601 return self._extract_archive(path, 'tar xf {} -C {}', dest) 602 603 ext = self.path.splitext(path)[1] 604 if ext in ['.bz', '.bz2']: 605 return self._extract_file(path, 'bunzip2 -f {}', dest) 606 elif ext == '.gz': 607 return self._extract_file(path, 'gunzip -f {}', dest) 608 elif ext == '.zip': 609 return self._extract_archive(path, 'unzip {} -d {}', dest) 610 else: 611 raise ValueError('Unknown compression format: {}'.format(ext)) 612 613 def sleep(self, duration): 614 timeout = duration + 10 615 self.execute('sleep {}'.format(duration), timeout=timeout) 616 617 def read_tree_values_flat(self, path, depth=1, check_exit_code=True): 618 command = 'read_tree_values {} {}'.format(path, depth) 619 output = self._execute_util(command, as_root=self.is_rooted, 620 check_exit_code=check_exit_code) 621 result = {} 622 for entry in output.strip().split('\n'): 623 if ':' not in entry: 624 continue 625 path, value = entry.strip().split(':', 1) 626 result[path] = value 627 return result 628 629 def read_tree_values(self, path, depth=1, dictcls=dict, check_exit_code=True): 630 value_map = self.read_tree_values_flat(path, depth, check_exit_code) 631 return _build_path_tree(value_map, path, self.path.sep, dictcls) 632 633 # internal methods 634 635 def _setup_shutils(self): 636 shutils_ifile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils.in') 637 shutils_ofile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils') 638 shell_path = '/bin/sh' 639 if self.os == 'android': 640 shell_path = '/system/bin/sh' 641 with open(shutils_ifile) as fh: 642 lines = fh.readlines() 643 with open(shutils_ofile, 'w') as ofile: 644 for line in lines: 645 line = line.replace("__DEVLIB_SHELL__", shell_path) 646 line = line.replace("__DEVLIB_BUSYBOX__", self.busybox) 647 ofile.write(line) 648 self._shutils = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils')) 649 650 def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False): 651 command = '{} {}'.format(self.shutils, command) 652 return self.conn.execute(command, timeout, check_exit_code, as_root) 653 654 def _extract_archive(self, path, cmd, dest=None): 655 cmd = '{} ' + cmd # busybox 656 if dest: 657 extracted = dest 658 else: 659 extracted = self.path.dirname(path) 660 cmdtext = cmd.format(self.busybox, path, extracted) 661 self.execute(cmdtext) 662 return extracted 663 664 def _extract_file(self, path, cmd, dest=None): 665 cmd = '{} ' + cmd # busybox 666 cmdtext = cmd.format(self.busybox, path) 667 self.execute(cmdtext) 668 extracted = self.path.splitext(path)[0] 669 if dest: 670 self.execute('mv -f {} {}'.format(extracted, dest)) 671 if dest.endswith('/'): 672 extracted = self.path.join(dest, self.path.basename(extracted)) 673 else: 674 extracted = dest 675 return extracted 676 677 def _update_modules(self, stage): 678 for mod in self.modules: 679 if isinstance(mod, dict): 680 mod, params = mod.items()[0] 681 else: 682 params = {} 683 mod = get_module(mod) 684 if not mod.stage == stage: 685 continue 686 if mod.probe(self): 687 self._install_module(mod, **params) 688 else: 689 msg = 'Module {} is not supported by the target'.format(mod.name) 690 if self.load_default_modules: 691 self.logger.debug(msg) 692 else: 693 self.logger.warning(msg) 694 695 def _install_module(self, mod, **params): 696 if mod.name not in self._installed_modules: 697 self.logger.debug('Installing module {}'.format(mod.name)) 698 mod.install(self, **params) 699 self._installed_modules[mod.name] = mod 700 else: 701 self.logger.debug('Module {} is already installed.'.format(mod.name)) 702 703 def _resolve_paths(self): 704 raise NotImplementedError() 705 706 def is_network_connected(self): 707 self.logger.debug('Checking for internet connectivity...') 708 709 timeout_s = 5 710 # It would be nice to use busybox for this, but that means we'd need 711 # root (ping is usually setuid so it can open raw sockets to send ICMP) 712 command = 'ping -q -c 1 -w {} {} 2>&1'.format(timeout_s, 713 GOOGLE_DNS_SERVER_ADDRESS) 714 715 # We'll use our own retrying mechanism (rather than just using ping's -c 716 # to send multiple packets) so that we don't slow things down in the 717 # 'good' case where the first packet gets echoed really quickly. 718 attempts = 5 719 for _ in range(attempts): 720 try: 721 self.execute(command) 722 return True 723 except TargetError as e: 724 err = str(e).lower() 725 if '100% packet loss' in err: 726 # We sent a packet but got no response. 727 # Try again - we don't want this to fail just because of a 728 # transient drop in connection quality. 729 self.logger.debug('No ping response from {} after {}s' 730 .format(GOOGLE_DNS_SERVER_ADDRESS, timeout_s)) 731 continue 732 elif 'network is unreachable' in err: 733 # No internet connection at all, we can fail straight away 734 self.logger.debug('Network unreachable') 735 return False 736 else: 737 # Something else went wrong, we don't know what, raise an 738 # error. 739 raise 740 741 self.logger.debug('Failed to ping {} after {} attempts'.format( 742 GOOGLE_DNS_SERVER_ADDRESS, attempts)) 743 return False 744 745class LinuxTarget(Target): 746 747 path = posixpath 748 os = 'linux' 749 750 @property 751 @memoized 752 def abi(self): 753 value = self.execute('uname -m').strip() 754 for abi, architectures in ABI_MAP.iteritems(): 755 if value in architectures: 756 result = abi 757 break 758 else: 759 result = value 760 return result 761 762 @property 763 @memoized 764 def os_version(self): 765 os_version = {} 766 try: 767 command = 'ls /etc/*-release /etc*-version /etc/*_release /etc/*_version 2>/dev/null' 768 version_files = self.execute(command, check_exit_code=False).strip().split() 769 for vf in version_files: 770 name = self.path.basename(vf) 771 output = self.read_value(vf) 772 os_version[name] = output.strip().replace('\n', ' ') 773 except TargetError: 774 raise 775 return os_version 776 777 @property 778 @memoized 779 # There is currently no better way to do this cross platform. 780 # ARM does not have dmidecode 781 def model(self): 782 if self.file_exists("/proc/device-tree/model"): 783 raw_model = self.execute("cat /proc/device-tree/model") 784 return '_'.join(raw_model.split()[:2]) 785 return None 786 787 def __init__(self, 788 connection_settings=None, 789 platform=None, 790 working_directory=None, 791 executables_directory=None, 792 connect=True, 793 modules=None, 794 load_default_modules=True, 795 shell_prompt=DEFAULT_SHELL_PROMPT, 796 conn_cls=SshConnection, 797 ): 798 super(LinuxTarget, self).__init__(connection_settings=connection_settings, 799 platform=platform, 800 working_directory=working_directory, 801 executables_directory=executables_directory, 802 connect=connect, 803 modules=modules, 804 load_default_modules=load_default_modules, 805 shell_prompt=shell_prompt, 806 conn_cls=conn_cls) 807 808 def connect(self, timeout=None): 809 super(LinuxTarget, self).connect(timeout=timeout) 810 811 def kick_off(self, command, as_root=False): 812 command = 'sh -c "{}" 1>/dev/null 2>/dev/null &'.format(escape_double_quotes(command)) 813 return self.conn.execute(command, as_root=as_root) 814 815 def get_pids_of(self, process_name): 816 """Returns a list of PIDs of all processes with the specified name.""" 817 # result should be a column of PIDs with the first row as "PID" header 818 result = self.execute('ps -C {} -o pid'.format(process_name), # NOQA 819 check_exit_code=False).strip().split() 820 if len(result) >= 2: # at least one row besides the header 821 return map(int, result[1:]) 822 else: 823 return [] 824 825 def ps(self, **kwargs): 826 command = 'ps -eo user,pid,ppid,vsize,rss,wchan,pcpu,state,fname' 827 lines = iter(convert_new_lines(self.execute(command)).split('\n')) 828 lines.next() # header 829 830 result = [] 831 for line in lines: 832 parts = re.split(r'\s+', line, maxsplit=8) 833 if parts and parts != ['']: 834 result.append(PsEntry(*(parts[0:1] + map(int, parts[1:5]) + parts[5:]))) 835 836 if not kwargs: 837 return result 838 else: 839 filtered_result = [] 840 for entry in result: 841 if all(getattr(entry, k) == v for k, v in kwargs.iteritems()): 842 filtered_result.append(entry) 843 return filtered_result 844 845 def list_directory(self, path, as_root=False): 846 contents = self.execute('ls -1 {}'.format(path), as_root=as_root) 847 return [x.strip() for x in contents.split('\n') if x.strip()] 848 849 def install(self, filepath, timeout=None, with_name=None): # pylint: disable=W0221 850 destpath = self.path.join(self.executables_directory, 851 with_name and with_name or self.path.basename(filepath)) 852 self.push(filepath, destpath) 853 self.execute('chmod a+x {}'.format(destpath), timeout=timeout) 854 self._installed_binaries[self.path.basename(destpath)] = destpath 855 return destpath 856 857 def uninstall(self, name): 858 path = self.path.join(self.executables_directory, name) 859 self.remove(path) 860 861 def capture_screen(self, filepath): 862 if not self.is_installed('scrot'): 863 self.logger.debug('Could not take screenshot as scrot is not installed.') 864 return 865 try: 866 867 tmpfile = self.tempfile() 868 self.execute('DISPLAY=:0.0 scrot {}'.format(tmpfile)) 869 self.pull(tmpfile, filepath) 870 self.remove(tmpfile) 871 except TargetError as e: 872 if "Can't open X dispay." not in e.message: 873 raise e 874 message = e.message.split('OUTPUT:', 1)[1].strip() # pylint: disable=no-member 875 self.logger.debug('Could not take screenshot: {}'.format(message)) 876 877 def _resolve_paths(self): 878 if self.working_directory is None: 879 if self.connected_as_root: 880 self.working_directory = '/root/devlib-target' 881 else: 882 self.working_directory = '/home/{}/devlib-target'.format(self.user) 883 if self.executables_directory is None: 884 self.executables_directory = self.path.join(self.working_directory, 'bin') 885 886 887class AndroidTarget(Target): 888 889 path = posixpath 890 os = 'android' 891 ls_command = '' 892 893 @property 894 @memoized 895 def abi(self): 896 return self.getprop()['ro.product.cpu.abi'].split('-')[0] 897 898 @property 899 @memoized 900 def supported_abi(self): 901 props = self.getprop() 902 result = [props['ro.product.cpu.abi']] 903 if 'ro.product.cpu.abi2' in props: 904 result.append(props['ro.product.cpu.abi2']) 905 if 'ro.product.cpu.abilist' in props: 906 for abi in props['ro.product.cpu.abilist'].split(','): 907 if abi not in result: 908 result.append(abi) 909 910 mapped_result = [] 911 for supported_abi in result: 912 for abi, architectures in ABI_MAP.iteritems(): 913 found = False 914 if supported_abi in architectures and abi not in mapped_result: 915 mapped_result.append(abi) 916 found = True 917 break 918 if not found and supported_abi not in mapped_result: 919 mapped_result.append(supported_abi) 920 return mapped_result 921 922 @property 923 @memoized 924 def os_version(self): 925 os_version = {} 926 for k, v in self.getprop().iteritems(): 927 if k.startswith('ro.build.version'): 928 part = k.split('.')[-1] 929 os_version[part] = v 930 return os_version 931 932 @property 933 def adb_name(self): 934 return self.conn.device 935 936 @property 937 @memoized 938 def android_id(self): 939 """ 940 Get the device's ANDROID_ID. Which is 941 942 "A 64-bit number (as a hex string) that is randomly generated when the user 943 first sets up the device and should remain constant for the lifetime of the 944 user's device." 945 946 .. note:: This will get reset on userdata erasure. 947 948 """ 949 output = self.execute('content query --uri content://settings/secure --projection value --where "name=\'android_id\'"').strip() 950 return output.split('value=')[-1] 951 952 @property 953 @memoized 954 def model(self): 955 try: 956 return self.getprop(prop='ro.product.device') 957 except KeyError: 958 return None 959 960 @property 961 @memoized 962 def screen_resolution(self): 963 output = self.execute('dumpsys window') 964 match = ANDROID_SCREEN_RESOLUTION_REGEX.search(output) 965 if match: 966 return (int(match.group('width')), 967 int(match.group('height'))) 968 else: 969 return (0, 0) 970 971 def __init__(self, 972 connection_settings=None, 973 platform=None, 974 working_directory=None, 975 executables_directory=None, 976 connect=True, 977 modules=None, 978 load_default_modules=True, 979 shell_prompt=DEFAULT_SHELL_PROMPT, 980 conn_cls=AdbConnection, 981 package_data_directory="/data/data", 982 ): 983 super(AndroidTarget, self).__init__(connection_settings=connection_settings, 984 platform=platform, 985 working_directory=working_directory, 986 executables_directory=executables_directory, 987 connect=connect, 988 modules=modules, 989 load_default_modules=load_default_modules, 990 shell_prompt=shell_prompt, 991 conn_cls=conn_cls) 992 self.package_data_directory = package_data_directory 993 self.clear_logcat_lock = threading.Lock() 994 995 def reset(self, fastboot=False): # pylint: disable=arguments-differ 996 try: 997 self.execute('reboot {}'.format(fastboot and 'fastboot' or ''), 998 as_root=self.needs_su, timeout=2) 999 except (TargetError, TimeoutError, subprocess.CalledProcessError): 1000 # on some targets "reboot" doesn't return gracefully 1001 pass 1002 self._connected_as_root = None 1003 1004 def wait_boot_complete(self, timeout=10): 1005 start = time.time() 1006 boot_completed = boolean(self.getprop('sys.boot_completed')) 1007 while not boot_completed and timeout >= time.time() - start: 1008 time.sleep(5) 1009 boot_completed = boolean(self.getprop('sys.boot_completed')) 1010 if not boot_completed: 1011 raise TargetError('Connected but Android did not fully boot.') 1012 1013 def connect(self, timeout=10, check_boot_completed=True): # pylint: disable=arguments-differ 1014 device = self.connection_settings.get('device') 1015 if device and ':' in device: 1016 # ADB does not automatically remove a network device from it's 1017 # devices list when the connection is broken by the remote, so the 1018 # adb connection may have gone "stale", resulting in adb blocking 1019 # indefinitely when making calls to the device. To avoid this, 1020 # always disconnect first. 1021 adb_disconnect(device) 1022 super(AndroidTarget, self).connect(timeout=timeout) 1023 1024 if check_boot_completed: 1025 self.wait_boot_complete(timeout) 1026 1027 def setup(self, executables=None): 1028 super(AndroidTarget, self).setup(executables) 1029 self.execute('mkdir -p {}'.format(self._file_transfer_cache)) 1030 1031 def kick_off(self, command, as_root=None): 1032 """ 1033 Like execute but closes adb session and returns immediately, leaving the command running on the 1034 device (this is different from execute(background=True) which keeps adb connection open and returns 1035 a subprocess object). 1036 """ 1037 if as_root is None: 1038 as_root = self.needs_su 1039 try: 1040 command = 'cd {} && {} nohup {} &'.format(self.working_directory, self.busybox, command) 1041 output = self.execute(command, timeout=1, as_root=as_root) 1042 except TimeoutError: 1043 pass 1044 1045 def __setup_list_directory(self): 1046 # In at least Linaro Android 16.09 (which was their first Android 7 release) and maybe 1047 # AOSP 7.0 as well, the ls command was changed. 1048 # Previous versions default to a single column listing, which is nice and easy to parse. 1049 # Newer versions default to a multi-column listing, which is not, but it does support 1050 # a '-1' option to get into single column mode. Older versions do not support this option 1051 # so we try the new version, and if it fails we use the old version. 1052 self.ls_command = 'ls -1' 1053 try: 1054 self.execute('ls -1 {}'.format(self.working_directory), as_root=False) 1055 except TargetError: 1056 self.ls_command = 'ls' 1057 1058 def list_directory(self, path, as_root=False): 1059 if self.ls_command == '': 1060 self.__setup_list_directory() 1061 contents = self.execute('{} {}'.format(self.ls_command, path), as_root=as_root) 1062 return [x.strip() for x in contents.split('\n') if x.strip()] 1063 1064 def install(self, filepath, timeout=None, with_name=None): # pylint: disable=W0221 1065 ext = os.path.splitext(filepath)[1].lower() 1066 if ext == '.apk': 1067 return self.install_apk(filepath, timeout) 1068 else: 1069 return self.install_executable(filepath, with_name) 1070 1071 def uninstall(self, name): 1072 if self.package_is_installed(name): 1073 self.uninstall_package(name) 1074 else: 1075 self.uninstall_executable(name) 1076 1077 def get_pids_of(self, process_name): 1078 result = [] 1079 search_term = process_name[-15:] 1080 for entry in self.ps(): 1081 if search_term in entry.name: 1082 result.append(entry.pid) 1083 return result 1084 1085 def ps(self, **kwargs): 1086 lines = iter(convert_new_lines(self.execute('ps')).split('\n')) 1087 lines.next() # header 1088 result = [] 1089 for line in lines: 1090 parts = line.split(None, 8) 1091 if not parts: 1092 continue 1093 if len(parts) == 8: 1094 # wchan was blank; insert an empty field where it should be. 1095 parts.insert(5, '') 1096 result.append(PsEntry(*(parts[0:1] + map(int, parts[1:5]) + parts[5:]))) 1097 if not kwargs: 1098 return result 1099 else: 1100 filtered_result = [] 1101 for entry in result: 1102 if all(getattr(entry, k) == v for k, v in kwargs.iteritems()): 1103 filtered_result.append(entry) 1104 return filtered_result 1105 1106 def capture_screen(self, filepath): 1107 on_device_file = self.path.join(self.working_directory, 'screen_capture.png') 1108 self.execute('screencap -p {}'.format(on_device_file)) 1109 self.pull(on_device_file, filepath) 1110 self.remove(on_device_file) 1111 1112 def push(self, source, dest, as_root=False, timeout=None): # pylint: disable=arguments-differ 1113 if not as_root: 1114 self.conn.push(source, dest, timeout=timeout) 1115 else: 1116 device_tempfile = self.path.join(self._file_transfer_cache, source.lstrip(self.path.sep)) 1117 self.execute("mkdir -p '{}'".format(self.path.dirname(device_tempfile))) 1118 self.conn.push(source, device_tempfile, timeout=timeout) 1119 self.execute("cp '{}' '{}'".format(device_tempfile, dest), as_root=True) 1120 1121 def pull(self, source, dest, as_root=False, timeout=None): # pylint: disable=arguments-differ 1122 if not as_root: 1123 self.conn.pull(source, dest, timeout=timeout) 1124 else: 1125 device_tempfile = self.path.join(self._file_transfer_cache, source.lstrip(self.path.sep)) 1126 self.execute("mkdir -p '{}'".format(self.path.dirname(device_tempfile))) 1127 self.execute("cp '{}' '{}'".format(source, device_tempfile), as_root=True) 1128 self.execute("chmod 0644 '{}'".format(device_tempfile), as_root=True) 1129 self.conn.pull(device_tempfile, dest, timeout=timeout) 1130 1131 # Android-specific 1132 1133 def swipe_to_unlock(self, direction="diagonal"): 1134 width, height = self.screen_resolution 1135 command = 'input swipe {} {} {} {}' 1136 if direction == "diagonal": 1137 start = 100 1138 stop = width - start 1139 swipe_height = height * 2 // 3 1140 self.execute(command.format(start, swipe_height, stop, 0)) 1141 elif direction == "horizontal": 1142 swipe_height = height * 2 // 3 1143 start = 100 1144 stop = width - start 1145 self.execute(command.format(start, swipe_height, stop, swipe_height)) 1146 elif direction == "vertical": 1147 swipe_middle = width / 2 1148 swipe_height = height * 2 // 3 1149 self.execute(command.format(swipe_middle, swipe_height, swipe_middle, 0)) 1150 else: 1151 raise TargetError("Invalid swipe direction: {}".format(direction)) 1152 1153 def getprop(self, prop=None): 1154 props = AndroidProperties(self.execute('getprop')) 1155 if prop: 1156 return props[prop] 1157 return props 1158 1159 def is_installed(self, name): 1160 return super(AndroidTarget, self).is_installed(name) or self.package_is_installed(name) 1161 1162 def package_is_installed(self, package_name): 1163 return package_name in self.list_packages() 1164 1165 def list_packages(self): 1166 output = self.execute('pm list packages') 1167 output = output.replace('package:', '') 1168 return output.split() 1169 1170 def get_package_version(self, package): 1171 output = self.execute('dumpsys package {}'.format(package)) 1172 for line in convert_new_lines(output).split('\n'): 1173 if 'versionName' in line: 1174 return line.split('=', 1)[1] 1175 return None 1176 1177 def get_sdk_version(self): 1178 try: 1179 return int(self.getprop('ro.build.version.sdk')) 1180 except (ValueError, TypeError): 1181 return None 1182 1183 def install_apk(self, filepath, timeout=None, replace=False, allow_downgrade=False): # pylint: disable=W0221 1184 ext = os.path.splitext(filepath)[1].lower() 1185 if ext == '.apk': 1186 flags = [] 1187 if replace: 1188 flags.append('-r') # Replace existing APK 1189 if allow_downgrade: 1190 flags.append('-d') # Install the APK even if a newer version is already installed 1191 if self.get_sdk_version() >= 23: 1192 flags.append('-g') # Grant all runtime permissions 1193 self.logger.debug("Replace APK = {}, ADB flags = '{}'".format(replace, ' '.join(flags))) 1194 return adb_command(self.adb_name, "install {} '{}'".format(' '.join(flags), filepath), timeout=timeout) 1195 else: 1196 raise TargetError('Can\'t install {}: unsupported format.'.format(filepath)) 1197 1198 def install_executable(self, filepath, with_name=None): 1199 self._ensure_executables_directory_is_writable() 1200 executable_name = with_name or os.path.basename(filepath) 1201 on_device_file = self.path.join(self.working_directory, executable_name) 1202 on_device_executable = self.path.join(self.executables_directory, executable_name) 1203 self.push(filepath, on_device_file) 1204 if on_device_file != on_device_executable: 1205 self.execute('cp {} {}'.format(on_device_file, on_device_executable), as_root=self.needs_su) 1206 self.remove(on_device_file, as_root=self.needs_su) 1207 self.execute("chmod 0777 '{}'".format(on_device_executable), as_root=self.needs_su) 1208 self._installed_binaries[executable_name] = on_device_executable 1209 return on_device_executable 1210 1211 def uninstall_package(self, package): 1212 adb_command(self.adb_name, "uninstall {}".format(package), timeout=30) 1213 1214 def uninstall_executable(self, executable_name): 1215 on_device_executable = self.path.join(self.executables_directory, executable_name) 1216 self._ensure_executables_directory_is_writable() 1217 self.remove(on_device_executable, as_root=self.needs_su) 1218 1219 def dump_logcat(self, filepath, filter=None, append=False, timeout=30): # pylint: disable=redefined-builtin 1220 op = '>>' if append else '>' 1221 filtstr = ' -s {}'.format(filter) if filter else '' 1222 command = 'logcat -d{} {} {}'.format(filtstr, op, filepath) 1223 adb_command(self.adb_name, command, timeout=timeout) 1224 1225 def clear_logcat(self): 1226 with self.clear_logcat_lock: 1227 adb_command(self.adb_name, 'logcat -c', timeout=30) 1228 1229 def get_logcat_monitor(self, regexps=None): 1230 return LogcatMonitor(self, regexps) 1231 1232 def adb_kill_server(self, timeout=30): 1233 adb_command(self.adb_name, 'kill-server', timeout) 1234 1235 def adb_wait_for_device(self, timeout=30): 1236 adb_command(self.adb_name, 'wait-for-device', timeout) 1237 1238 def adb_reboot_bootloader(self, timeout=30): 1239 adb_command(self.adb_name, 'reboot-bootloader', timeout) 1240 1241 def adb_root(self, enable=True, force=False): 1242 if enable: 1243 if self._connected_as_root and not force: 1244 return 1245 adb_command(self.adb_name, 'root', timeout=30) 1246 self._connected_as_root = True 1247 return 1248 adb_command(self.adb_name, 'unroot', timeout=30) 1249 self._connected_as_root = False 1250 1251 def is_screen_on(self): 1252 output = self.execute('dumpsys power') 1253 match = ANDROID_SCREEN_STATE_REGEX.search(output) 1254 if match: 1255 return boolean(match.group(1)) 1256 else: 1257 raise TargetError('Could not establish screen state.') 1258 1259 def ensure_screen_is_on(self): 1260 if not self.is_screen_on(): 1261 self.execute('input keyevent 26') 1262 1263 def ensure_screen_is_off(self): 1264 if self.is_screen_on(): 1265 self.execute('input keyevent 26') 1266 1267 def set_auto_brightness(self, auto_brightness): 1268 cmd = 'settings put system screen_brightness_mode {}' 1269 self.execute(cmd.format(int(boolean(auto_brightness)))) 1270 1271 def get_auto_brightness(self): 1272 cmd = 'settings get system screen_brightness_mode' 1273 return boolean(self.execute(cmd).strip()) 1274 1275 def set_brightness(self, value): 1276 if not 0 <= value <= 255: 1277 msg = 'Invalid brightness "{}"; Must be between 0 and 255' 1278 raise ValueError(msg.format(value)) 1279 self.set_auto_brightness(False) 1280 cmd = 'settings put system screen_brightness {}' 1281 self.execute(cmd.format(int(value))) 1282 1283 def get_brightness(self): 1284 cmd = 'settings get system screen_brightness' 1285 return integer(self.execute(cmd).strip()) 1286 1287 def get_airplane_mode(self): 1288 cmd = 'settings get global airplane_mode_on' 1289 return boolean(self.execute(cmd).strip()) 1290 1291 def set_airplane_mode(self, mode): 1292 root_required = self.get_sdk_version() > 23 1293 if root_required and not self.is_rooted: 1294 raise TargetError('Root is required to toggle airplane mode on Android 7+') 1295 mode = int(boolean(mode)) 1296 cmd = 'settings put global airplane_mode_on {}' 1297 self.execute(cmd.format(mode)) 1298 self.execute('am broadcast -a android.intent.action.AIRPLANE_MODE ' 1299 '--ez state {}'.format(mode), as_root=root_required) 1300 1301 def get_auto_rotation(self): 1302 cmd = 'settings get system accelerometer_rotation' 1303 return boolean(self.execute(cmd).strip()) 1304 1305 def set_auto_rotation(self, autorotate): 1306 cmd = 'settings put system accelerometer_rotation {}' 1307 self.execute(cmd.format(int(boolean(autorotate)))) 1308 1309 def set_natural_rotation(self): 1310 self.set_rotation(0) 1311 1312 def set_left_rotation(self): 1313 self.set_rotation(1) 1314 1315 def set_inverted_rotation(self): 1316 self.set_rotation(2) 1317 1318 def set_right_rotation(self): 1319 self.set_rotation(3) 1320 1321 def get_rotation(self): 1322 cmd = 'settings get system user_rotation' 1323 return int(self.execute(cmd).strip()) 1324 1325 def set_rotation(self, rotation): 1326 if not 0 <= rotation <= 3: 1327 raise ValueError('Rotation value must be between 0 and 3') 1328 self.set_auto_rotation(False) 1329 cmd = 'settings put system user_rotation {}' 1330 self.execute(cmd.format(rotation)) 1331 1332 def homescreen(self): 1333 self.execute('am start -a android.intent.action.MAIN -c android.intent.category.HOME') 1334 1335 def _resolve_paths(self): 1336 if self.working_directory is None: 1337 self.working_directory = '/data/local/tmp/devlib-target' 1338 self._file_transfer_cache = self.path.join(self.working_directory, '.file-cache') 1339 if self.executables_directory is None: 1340 self.executables_directory = '/data/local/tmp/bin' 1341 1342 def _ensure_executables_directory_is_writable(self): 1343 matched = [] 1344 for entry in self.list_file_systems(): 1345 if self.executables_directory.rstrip('/').startswith(entry.mount_point): 1346 matched.append(entry) 1347 if matched: 1348 entry = sorted(matched, key=lambda x: len(x.mount_point))[-1] 1349 if 'rw' not in entry.options: 1350 self.execute('mount -o rw,remount {} {}'.format(entry.device, 1351 entry.mount_point), 1352 as_root=True) 1353 else: 1354 message = 'Could not find mount point for executables directory {}' 1355 raise TargetError(message.format(self.executables_directory)) 1356 1357 _charging_enabled_path = '/sys/class/power_supply/battery/charging_enabled' 1358 1359 @property 1360 def charging_enabled(self): 1361 """ 1362 Whether drawing power to charge the battery is enabled 1363 1364 Not all devices have the ability to enable/disable battery charging 1365 (e.g. because they don't have a battery). In that case, 1366 ``charging_enabled`` is None. 1367 """ 1368 if not self.file_exists(self._charging_enabled_path): 1369 return None 1370 return self.read_bool(self._charging_enabled_path) 1371 1372 @charging_enabled.setter 1373 def charging_enabled(self, enabled): 1374 """ 1375 Enable/disable drawing power to charge the battery 1376 1377 Not all devices have this facility. In that case, do nothing. 1378 """ 1379 if not self.file_exists(self._charging_enabled_path): 1380 return 1381 self.write_value(self._charging_enabled_path, int(bool(enabled))) 1382 1383FstabEntry = namedtuple('FstabEntry', ['device', 'mount_point', 'fs_type', 'options', 'dump_freq', 'pass_num']) 1384PsEntry = namedtuple('PsEntry', 'user pid ppid vsize rss wchan pc state name') 1385LsmodEntry = namedtuple('LsmodEntry', ['name', 'size', 'use_count', 'used_by']) 1386 1387 1388class Cpuinfo(object): 1389 1390 @property 1391 @memoized 1392 def architecture(self): 1393 for section in self.sections: 1394 if 'CPU architecture' in section: 1395 return section['CPU architecture'] 1396 if 'architecture' in section: 1397 return section['architecture'] 1398 1399 @property 1400 @memoized 1401 def cpu_names(self): 1402 cpu_names = [] 1403 global_name = None 1404 for section in self.sections: 1405 if 'processor' in section: 1406 if 'CPU part' in section: 1407 cpu_names.append(_get_part_name(section)) 1408 elif 'model name' in section: 1409 cpu_names.append(_get_model_name(section)) 1410 else: 1411 cpu_names.append(None) 1412 elif 'CPU part' in section: 1413 global_name = _get_part_name(section) 1414 return [caseless_string(c or global_name) for c in cpu_names] 1415 1416 def __init__(self, text): 1417 self.sections = None 1418 self.text = None 1419 self.parse(text) 1420 1421 @memoized 1422 def get_cpu_features(self, cpuid=0): 1423 global_features = [] 1424 for section in self.sections: 1425 if 'processor' in section: 1426 if int(section.get('processor')) != cpuid: 1427 continue 1428 if 'Features' in section: 1429 return section.get('Features').split() 1430 elif 'flags' in section: 1431 return section.get('flags').split() 1432 elif 'Features' in section: 1433 global_features = section.get('Features').split() 1434 elif 'flags' in section: 1435 global_features = section.get('flags').split() 1436 return global_features 1437 1438 def parse(self, text): 1439 self.sections = [] 1440 current_section = {} 1441 self.text = text.strip() 1442 for line in self.text.split('\n'): 1443 line = line.strip() 1444 if line: 1445 key, value = line.split(':', 1) 1446 current_section[key.strip()] = value.strip() 1447 else: # not line 1448 self.sections.append(current_section) 1449 current_section = {} 1450 self.sections.append(current_section) 1451 1452 def __str__(self): 1453 return 'CpuInfo({})'.format(self.cpu_names) 1454 1455 __repr__ = __str__ 1456 1457 1458class KernelVersion(object): 1459 """ 1460 Class representing the version of a target kernel 1461 1462 Not expected to work for very old (pre-3.0) kernel version numbers. 1463 1464 :ivar release: Version number/revision string. Typical output of 1465 ``uname -r`` 1466 :type release: str 1467 :ivar version: Extra version info (aside from ``release``) reported by 1468 ``uname`` 1469 :type version: str 1470 :ivar version_number: Main version number (e.g. 3 for Linux 3.18) 1471 :type version_number: int 1472 :ivar major: Major version number (e.g. 18 for Linux 3.18) 1473 :type major: int 1474 :ivar minor: Minor version number for stable kernels (e.g. 9 for 4.9.9). May 1475 be None 1476 :type minor: int 1477 :ivar rc: Release candidate number (e.g. 3 for Linux 4.9-rc3). May be None. 1478 :type rc: int 1479 :ivar sha1: Kernel git revision hash, if available (otherwise None) 1480 :type sha1: str 1481 1482 :ivar parts: Tuple of version number components. Can be used for 1483 lexicographically comparing kernel versions. 1484 :type parts: tuple(int) 1485 """ 1486 def __init__(self, version_string): 1487 if ' #' in version_string: 1488 release, version = version_string.split(' #') 1489 self.release = release 1490 self.version = version 1491 elif version_string.startswith('#'): 1492 self.release = '' 1493 self.version = version_string 1494 else: 1495 self.release = version_string 1496 self.version = '' 1497 1498 self.version_number = None 1499 self.major = None 1500 self.minor = None 1501 self.sha1 = None 1502 self.rc = None 1503 match = KVERSION_REGEX.match(version_string) 1504 if match: 1505 groups = match.groupdict() 1506 self.version_number = int(groups['version']) 1507 self.major = int(groups['major']) 1508 if groups['minor'] is not None: 1509 self.minor = int(groups['minor']) 1510 if groups['rc'] is not None: 1511 self.rc = int(groups['rc']) 1512 if groups['sha1'] is not None: 1513 self.sha1 = match.group('sha1') 1514 1515 self.parts = (self.version_number, self.major, self.minor) 1516 1517 def __str__(self): 1518 return '{} {}'.format(self.release, self.version) 1519 1520 __repr__ = __str__ 1521 1522 1523class KernelConfig(object): 1524 1525 not_set_regex = re.compile(r'# (\S+) is not set') 1526 1527 @staticmethod 1528 def get_config_name(name): 1529 name = name.upper() 1530 if not name.startswith('CONFIG_'): 1531 name = 'CONFIG_' + name 1532 return name 1533 1534 def iteritems(self): 1535 return self._config.iteritems() 1536 1537 def __init__(self, text): 1538 self.text = text 1539 self._config = {} 1540 for line in text.split('\n'): 1541 line = line.strip() 1542 if line.startswith('#'): 1543 match = self.not_set_regex.search(line) 1544 if match: 1545 self._config[match.group(1)] = 'n' 1546 elif '=' in line: 1547 name, value = line.split('=', 1) 1548 self._config[name.strip()] = value.strip() 1549 1550 def get(self, name): 1551 return self._config.get(self.get_config_name(name)) 1552 1553 def like(self, name): 1554 regex = re.compile(name, re.I) 1555 result = {} 1556 for k, v in self._config.iteritems(): 1557 if regex.search(k): 1558 result[k] = v 1559 return result 1560 1561 def is_enabled(self, name): 1562 return self.get(name) == 'y' 1563 1564 def is_module(self, name): 1565 return self.get(name) == 'm' 1566 1567 def is_not_set(self, name): 1568 return self.get(name) == 'n' 1569 1570 def has(self, name): 1571 return self.get(name) in ['m', 'y'] 1572 1573 1574class LocalLinuxTarget(LinuxTarget): 1575 1576 def __init__(self, 1577 connection_settings=None, 1578 platform=None, 1579 working_directory=None, 1580 executables_directory=None, 1581 connect=True, 1582 modules=None, 1583 load_default_modules=True, 1584 shell_prompt=DEFAULT_SHELL_PROMPT, 1585 conn_cls=LocalConnection, 1586 ): 1587 super(LocalLinuxTarget, self).__init__(connection_settings=connection_settings, 1588 platform=platform, 1589 working_directory=working_directory, 1590 executables_directory=executables_directory, 1591 connect=connect, 1592 modules=modules, 1593 load_default_modules=load_default_modules, 1594 shell_prompt=shell_prompt, 1595 conn_cls=conn_cls) 1596 1597 def _resolve_paths(self): 1598 if self.working_directory is None: 1599 self.working_directory = '/tmp' 1600 if self.executables_directory is None: 1601 self.executables_directory = '/tmp' 1602 1603 1604def _get_model_name(section): 1605 name_string = section['model name'] 1606 parts = name_string.split('@')[0].strip().split() 1607 return ' '.join([p for p in parts 1608 if '(' not in p and p != 'CPU']) 1609 1610 1611def _get_part_name(section): 1612 implementer = section.get('CPU implementer', '0x0') 1613 part = section['CPU part'] 1614 variant = section.get('CPU variant', '0x0') 1615 name = get_cpu_name(*map(integer, [implementer, part, variant])) 1616 if name is None: 1617 name = '{}/{}/{}'.format(implementer, part, variant) 1618 return name 1619 1620 1621def _build_path_tree(path_map, basepath, sep=os.path.sep, dictcls=dict): 1622 """ 1623 Convert a flat mapping of paths to values into a nested structure of 1624 dict-line object (``dict``'s by default), mirroring the directory hierarchy 1625 represented by the paths relative to ``basepath``. 1626 1627 """ 1628 def process_node(node, path, value): 1629 parts = path.split(sep, 1) 1630 if len(parts) == 1: # leaf 1631 node[parts[0]] = value 1632 else: # branch 1633 if parts[0] not in node: 1634 node[parts[0]] = dictcls() 1635 process_node(node[parts[0]], parts[1], value) 1636 1637 relpath_map = {os.path.relpath(p, basepath): v 1638 for p, v in path_map.iteritems()} 1639 1640 if len(relpath_map) == 1 and relpath_map.keys()[0] == '.': 1641 result = relpath_map.values()[0] 1642 else: 1643 result = dictcls() 1644 for path, value in relpath_map.iteritems(): 1645 process_node(result, path, value) 1646 1647 return result 1648