1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16from __future__ import annotations 17 18import atexit 19import base64 20import logging 21import os 22import re 23import subprocess 24from typing import Any, Callable 25 26 27class FindDeviceError(RuntimeError): 28 pass 29 30 31class DeviceNotFoundError(FindDeviceError): 32 def __init__(self, serial: str) -> None: 33 self.serial = serial 34 super(DeviceNotFoundError, self).__init__( 35 'No device with serial {}'.format(serial)) 36 37 38class NoUniqueDeviceError(FindDeviceError): 39 def __init__(self) -> None: 40 super(NoUniqueDeviceError, self).__init__('No unique device') 41 42 43class ShellError(RuntimeError): 44 def __init__( 45 self, cmd: list[str], stdout: str, stderr: str, exit_code: int 46 ) -> None: 47 super(ShellError, self).__init__( 48 '`{0}` exited with code {1}'.format(cmd, exit_code)) 49 self.cmd = cmd 50 self.stdout = stdout 51 self.stderr = stderr 52 self.exit_code = exit_code 53 54 55def get_devices(adb_path: str = 'adb') -> list[str]: 56 with open(os.devnull, 'wb') as devnull: 57 subprocess.check_call([adb_path, 'start-server'], stdout=devnull, 58 stderr=devnull) 59 out = split_lines( 60 subprocess.check_output([adb_path, 'devices']).decode('utf-8')) 61 62 # The first line of `adb devices` just says "List of attached devices", so 63 # skip that. 64 devices = [] 65 for line in out[1:]: 66 if not line.strip(): 67 continue 68 if 'offline' in line: 69 continue 70 71 serial, _ = re.split(r'\s+', line, maxsplit=1) 72 devices.append(serial) 73 return devices 74 75 76def _get_unique_device( 77 product: str | None = None, adb_path: str = 'adb' 78) -> AndroidDevice: 79 devices = get_devices(adb_path=adb_path) 80 if len(devices) != 1: 81 raise NoUniqueDeviceError() 82 return AndroidDevice(devices[0], product, adb_path) 83 84 85def _get_device_by_serial( 86 serial: str, product: str | None = None, adb_path: str = 'adb' 87) -> AndroidDevice: 88 for device in get_devices(adb_path=adb_path): 89 if device == serial: 90 return AndroidDevice(serial, product, adb_path) 91 raise DeviceNotFoundError(serial) 92 93 94def get_device( 95 serial: str | None = None, product: str | None = None, adb_path: str = 'adb' 96) -> AndroidDevice: 97 """Get a uniquely identified AndroidDevice if one is available. 98 99 Raises: 100 DeviceNotFoundError: 101 The serial specified by `serial` or $ANDROID_SERIAL is not 102 connected. 103 104 NoUniqueDeviceError: 105 Neither `serial` nor $ANDROID_SERIAL was set, and the number of 106 devices connected to the system is not 1. Having 0 connected 107 devices will also result in this error. 108 109 Returns: 110 An AndroidDevice associated with the first non-None identifier in the 111 following order of preference: 112 113 1) The `serial` argument. 114 2) The environment variable $ANDROID_SERIAL. 115 3) The single device connnected to the system. 116 """ 117 if serial is not None: 118 return _get_device_by_serial(serial, product, adb_path) 119 120 android_serial = os.getenv('ANDROID_SERIAL') 121 if android_serial is not None: 122 return _get_device_by_serial(android_serial, product, adb_path) 123 124 return _get_unique_device(product, adb_path=adb_path) 125 126 127def _get_device_by_type(flag: str, adb_path: str) -> AndroidDevice: 128 with open(os.devnull, 'wb') as devnull: 129 subprocess.check_call([adb_path, 'start-server'], stdout=devnull, 130 stderr=devnull) 131 try: 132 serial = subprocess.check_output( 133 [adb_path, flag, 'get-serialno']).decode('utf-8').strip() 134 except subprocess.CalledProcessError: 135 raise RuntimeError('adb unexpectedly returned nonzero') 136 if serial == 'unknown': 137 raise NoUniqueDeviceError() 138 return _get_device_by_serial(serial, adb_path=adb_path) 139 140 141def get_usb_device(adb_path: str = 'adb') -> AndroidDevice: 142 """Get the unique USB-connected AndroidDevice if it is available. 143 144 Raises: 145 NoUniqueDeviceError: 146 0 or multiple devices are connected via USB. 147 148 Returns: 149 An AndroidDevice associated with the unique USB-connected device. 150 """ 151 return _get_device_by_type('-d', adb_path=adb_path) 152 153 154def get_emulator_device(adb_path: str = 'adb') -> AndroidDevice: 155 """Get the unique emulator AndroidDevice if it is available. 156 157 Raises: 158 NoUniqueDeviceError: 159 0 or multiple emulators are running. 160 161 Returns: 162 An AndroidDevice associated with the unique running emulator. 163 """ 164 return _get_device_by_type('-e', adb_path=adb_path) 165 166 167def split_lines(s: str) -> list[str]: 168 """Splits lines in a way that works even on Windows and old devices. 169 170 Windows will see \r\n instead of \n, old devices do the same, old devices 171 on Windows will see \r\r\n. 172 """ 173 # rstrip is used here to workaround a difference between splitlines and 174 # re.split: 175 # >>> 'foo\n'.splitlines() 176 # ['foo'] 177 # >>> re.split(r'\n', 'foo\n') 178 # ['foo', ''] 179 return re.split(r'[\r\n]+', s.rstrip()) 180 181 182def version(adb_path: list[str] | None = None) -> int: 183 """Get the version of adb (in terms of ADB_SERVER_VERSION).""" 184 185 adb_path = adb_path if adb_path is not None else ['adb'] 186 version_output = subprocess.check_output(adb_path + ['version'], encoding='utf-8') 187 pattern = r'^Android Debug Bridge version 1.0.(\d+)$' 188 result = re.match(pattern, version_output.splitlines()[0]) 189 if not result: 190 return 0 191 return int(result.group(1)) 192 193 194class AndroidDevice(object): 195 # Delimiter string to indicate the start of the exit code. 196 _RETURN_CODE_DELIMITER = 'x' 197 198 # Follow any shell command with this string to get the exit 199 # status of a program since this isn't propagated by adb. 200 # 201 # The delimiter is needed because `printf 1; echo $?` would print 202 # "10", and we wouldn't be able to distinguish the exit code. 203 _RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)] 204 205 # Maximum search distance from the output end to find the delimiter. 206 # adb on Windows returns \r\n even if adbd returns \n. Some old devices 207 # seem to actually return \r\r\n. 208 _RETURN_CODE_SEARCH_LENGTH = len( 209 '{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER)) 210 211 def __init__( 212 self, serial: str | None, product: str | None = None, adb_path: str = 'adb' 213 ) -> None: 214 self.serial = serial 215 self.product = product 216 self.adb_path = adb_path 217 self.adb_cmd = [adb_path] 218 219 if self.serial is not None: 220 self.adb_cmd.extend(['-s', self.serial]) 221 if self.product is not None: 222 self.adb_cmd.extend(['-p', self.product]) 223 self._linesep: str | None = None 224 self._features: list[str] | None = None 225 226 @property 227 def linesep(self) -> str: 228 if self._linesep is None: 229 self._linesep = subprocess.check_output( 230 self.adb_cmd + ['shell', 'echo'], encoding='utf-8') 231 return self._linesep 232 233 @property 234 def features(self) -> list[str]: 235 if self._features is None: 236 try: 237 self._features = split_lines(self._simple_call(['features'])) 238 except subprocess.CalledProcessError: 239 self._features = [] 240 return self._features 241 242 def has_shell_protocol(self) -> bool: 243 return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features 244 245 def _make_shell_cmd(self, user_cmd: list[str]) -> list[str]: 246 command = self.adb_cmd + ['shell'] + user_cmd 247 if not self.has_shell_protocol(): 248 command += self._RETURN_CODE_PROBE 249 return command 250 251 def _parse_shell_output(self, out: str) -> tuple[int, str]: 252 """Finds the exit code string from shell output. 253 254 Args: 255 out: Shell output string. 256 257 Returns: 258 An (exit_code, output_string) tuple. The output string is 259 cleaned of any additional stuff we appended to find the 260 exit code. 261 262 Raises: 263 RuntimeError: Could not find the exit code in |out|. 264 """ 265 search_text = out 266 if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH: 267 # We don't want to search over massive amounts of data when we know 268 # the part we want is right at the end. 269 search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:] 270 partition = search_text.rpartition(self._RETURN_CODE_DELIMITER) 271 if partition[1] == '': 272 raise RuntimeError('Could not find exit status in shell output.') 273 result = int(partition[2]) 274 # partition[0] won't contain the full text if search_text was 275 # truncated, pull from the original string instead. 276 out = out[:-len(partition[1]) - len(partition[2])] 277 return result, out 278 279 def _simple_call(self, cmd: list[str]) -> str: 280 logging.info(' '.join(self.adb_cmd + cmd)) 281 return subprocess.check_output( 282 self.adb_cmd + cmd, stderr=subprocess.STDOUT).decode('utf-8') 283 284 def shell(self, cmd: list[str]) -> tuple[str, str]: 285 """Calls `adb shell` 286 287 Args: 288 cmd: command to execute as a list of strings. 289 290 Returns: 291 A (stdout, stderr) tuple. Stderr may be combined into stdout 292 if the device doesn't support separate streams. 293 294 Raises: 295 ShellError: the exit code was non-zero. 296 """ 297 exit_code, stdout, stderr = self.shell_nocheck(cmd) 298 if exit_code != 0: 299 raise ShellError(cmd, stdout, stderr, exit_code) 300 return stdout, stderr 301 302 def shell_nocheck(self, cmd: list[str]) -> tuple[int, str, str]: 303 """Calls `adb shell` 304 305 Args: 306 cmd: command to execute as a list of strings. 307 308 Returns: 309 An (exit_code, stdout, stderr) tuple. Stderr may be combined 310 into stdout if the device doesn't support separate streams. 311 """ 312 cmd = self._make_shell_cmd(cmd) 313 logging.info(' '.join(cmd)) 314 p = subprocess.Popen( 315 cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8') 316 stdout, stderr = p.communicate() 317 if self.has_shell_protocol(): 318 exit_code = p.returncode 319 else: 320 exit_code, stdout = self._parse_shell_output(stdout) 321 return exit_code, stdout, stderr 322 323 def shell_popen( 324 self, 325 cmd: list[str], 326 kill_atexit: bool = True, 327 preexec_fn: Callable[[], None] | None = None, 328 creationflags: int = 0, 329 **kwargs: Any, 330 ) -> subprocess.Popen[Any]: 331 """Calls `adb shell` and returns a handle to the adb process. 332 333 This function provides direct access to the subprocess used to run the 334 command, without special return code handling. Users that need the 335 return value must retrieve it themselves. 336 337 Args: 338 cmd: Array of command arguments to execute. 339 kill_atexit: Whether to kill the process upon exiting. 340 preexec_fn: Argument forwarded to subprocess.Popen. 341 creationflags: Argument forwarded to subprocess.Popen. 342 **kwargs: Arguments forwarded to subprocess.Popen. 343 344 Returns: 345 subprocess.Popen handle to the adb shell instance 346 """ 347 348 command = self.adb_cmd + ['shell'] + cmd 349 350 # Make sure a ctrl-c in the parent script doesn't kill gdbserver. 351 if os.name == 'nt': 352 creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP 353 else: 354 if preexec_fn is None: 355 preexec_fn = os.setpgrp 356 elif preexec_fn is not os.setpgrp: 357 fn = preexec_fn 358 def _wrapper() -> None: 359 fn() 360 os.setpgrp() 361 preexec_fn = _wrapper 362 363 p = subprocess.Popen(command, creationflags=creationflags, 364 preexec_fn=preexec_fn, **kwargs) 365 366 if kill_atexit: 367 atexit.register(p.kill) 368 369 return p 370 371 def install(self, filename: str, replace: bool = False) -> str: 372 cmd = ['install'] 373 if replace: 374 cmd.append('-r') 375 cmd.append(filename) 376 return self._simple_call(cmd) 377 378 def push(self, local: str | list[str], remote: str, sync: bool = False) -> str: 379 """Transfer a local file or directory to the device. 380 381 Args: 382 local: The local file or directory to transfer. 383 remote: The remote path to which local should be transferred. 384 sync: If True, only transfers files that are newer on the host than 385 those on the device. If False, transfers all files. 386 387 Returns: 388 Output of the command. 389 """ 390 cmd = ['push'] 391 if sync: 392 cmd.append('--sync') 393 394 if isinstance(local, str): 395 cmd.extend([local, remote]) 396 else: 397 cmd.extend(local) 398 cmd.append(remote) 399 400 return self._simple_call(cmd) 401 402 def pull(self, remote: str, local: str) -> str: 403 return self._simple_call(['pull', remote, local]) 404 405 def sync(self, directory: str | None = None) -> str: 406 cmd = ['sync'] 407 if directory is not None: 408 cmd.append(directory) 409 return self._simple_call(cmd) 410 411 def tcpip(self, port: str) -> str: 412 return self._simple_call(['tcpip', port]) 413 414 def usb(self) -> str: 415 return self._simple_call(['usb']) 416 417 def reboot(self) -> str: 418 return self._simple_call(['reboot']) 419 420 def remount(self) -> str: 421 return self._simple_call(['remount']) 422 423 def root(self) -> str: 424 return self._simple_call(['root']) 425 426 def unroot(self) -> str: 427 return self._simple_call(['unroot']) 428 429 def connect(self, host: str) -> str: 430 return self._simple_call(['connect', host]) 431 432 def disconnect(self, host: str) -> str: 433 return self._simple_call(['disconnect', host]) 434 435 def forward(self, local: str, remote: str) -> str: 436 return self._simple_call(['forward', local, remote]) 437 438 def forward_list(self) -> str: 439 return self._simple_call(['forward', '--list']) 440 441 def forward_no_rebind(self, local: str, remote: str) -> str: 442 return self._simple_call(['forward', '--no-rebind', local, remote]) 443 444 def forward_remove(self, local: str) -> str: 445 return self._simple_call(['forward', '--remove', local]) 446 447 def forward_remove_all(self) -> str: 448 return self._simple_call(['forward', '--remove-all']) 449 450 def reverse(self, remote: str, local: str) -> str: 451 return self._simple_call(['reverse', remote, local]) 452 453 def reverse_list(self) -> str: 454 return self._simple_call(['reverse', '--list']) 455 456 def reverse_no_rebind(self, local: str, remote: str) -> str: 457 return self._simple_call(['reverse', '--no-rebind', local, remote]) 458 459 def reverse_remove_all(self) -> str: 460 return self._simple_call(['reverse', '--remove-all']) 461 462 def reverse_remove(self, remote: str) -> str: 463 return self._simple_call(['reverse', '--remove', remote]) 464 465 def wait(self) -> str: 466 return self._simple_call(['wait-for-device']) 467 468 def get_prop(self, prop_name: str) -> str | None: 469 output = split_lines(self.shell(['getprop', prop_name])[0]) 470 if len(output) != 1: 471 raise RuntimeError('Too many lines in getprop output:\n' + 472 '\n'.join(output)) 473 value = output[0] 474 if not value.strip(): 475 return None 476 return value 477 478 def set_prop(self, prop_name: str, value: str) -> None: 479 self.shell(['setprop', prop_name, value]) 480 481 def logcat(self) -> str: 482 """Returns the contents of logcat.""" 483 return self._simple_call(['logcat', '-d']) 484 485 def clear_logcat(self) -> None: 486 """Clears the logcat buffer.""" 487 self._simple_call(['logcat', '-c']) 488