• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from __future__ import annotations
17
18import atexit
19import base64
20import logging
21import os
22import re
23import subprocess
24from typing import Any, Callable
25
26
27class FindDeviceError(RuntimeError):
28    pass
29
30
31class DeviceNotFoundError(FindDeviceError):
32    def __init__(self, serial: str) -> None:
33        self.serial = serial
34        super(DeviceNotFoundError, self).__init__(
35            'No device with serial {}'.format(serial))
36
37
38class NoUniqueDeviceError(FindDeviceError):
39    def __init__(self) -> None:
40        super(NoUniqueDeviceError, self).__init__('No unique device')
41
42
43class ShellError(RuntimeError):
44    def __init__(
45        self, cmd: list[str], stdout: str, stderr: str, exit_code: int
46    ) -> None:
47        super(ShellError, self).__init__(
48            '`{0}` exited with code {1}'.format(cmd, exit_code))
49        self.cmd = cmd
50        self.stdout = stdout
51        self.stderr = stderr
52        self.exit_code = exit_code
53
54
55def get_devices(adb_path: str = 'adb') -> list[str]:
56    with open(os.devnull, 'wb') as devnull:
57        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
58                              stderr=devnull)
59    out = split_lines(
60        subprocess.check_output([adb_path, 'devices']).decode('utf-8'))
61
62    # The first line of `adb devices` just says "List of attached devices", so
63    # skip that.
64    devices = []
65    for line in out[1:]:
66        if not line.strip():
67            continue
68        if 'offline' in line:
69            continue
70
71        serial, _ = re.split(r'\s+', line, maxsplit=1)
72        devices.append(serial)
73    return devices
74
75
76def _get_unique_device(
77    product: str | None = None, adb_path: str = 'adb'
78) -> AndroidDevice:
79    devices = get_devices(adb_path=adb_path)
80    if len(devices) != 1:
81        raise NoUniqueDeviceError()
82    return AndroidDevice(devices[0], product, adb_path)
83
84
85def _get_device_by_serial(
86    serial: str, product: str | None = None, adb_path: str = 'adb'
87) -> AndroidDevice:
88    for device in get_devices(adb_path=adb_path):
89        if device == serial:
90            return AndroidDevice(serial, product, adb_path)
91    raise DeviceNotFoundError(serial)
92
93
94def get_device(
95    serial: str | None = None, product: str | None = None, adb_path: str = 'adb'
96) -> AndroidDevice:
97    """Get a uniquely identified AndroidDevice if one is available.
98
99    Raises:
100        DeviceNotFoundError:
101            The serial specified by `serial` or $ANDROID_SERIAL is not
102            connected.
103
104        NoUniqueDeviceError:
105            Neither `serial` nor $ANDROID_SERIAL was set, and the number of
106            devices connected to the system is not 1. Having 0 connected
107            devices will also result in this error.
108
109    Returns:
110        An AndroidDevice associated with the first non-None identifier in the
111        following order of preference:
112
113        1) The `serial` argument.
114        2) The environment variable $ANDROID_SERIAL.
115        3) The single device connnected to the system.
116    """
117    if serial is not None:
118        return _get_device_by_serial(serial, product, adb_path)
119
120    android_serial = os.getenv('ANDROID_SERIAL')
121    if android_serial is not None:
122        return _get_device_by_serial(android_serial, product, adb_path)
123
124    return _get_unique_device(product, adb_path=adb_path)
125
126
127def _get_device_by_type(flag: str, adb_path: str) -> AndroidDevice:
128    with open(os.devnull, 'wb') as devnull:
129        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
130                              stderr=devnull)
131    try:
132        serial = subprocess.check_output(
133            [adb_path, flag, 'get-serialno']).decode('utf-8').strip()
134    except subprocess.CalledProcessError:
135        raise RuntimeError('adb unexpectedly returned nonzero')
136    if serial == 'unknown':
137        raise NoUniqueDeviceError()
138    return _get_device_by_serial(serial, adb_path=adb_path)
139
140
141def get_usb_device(adb_path: str = 'adb') -> AndroidDevice:
142    """Get the unique USB-connected AndroidDevice if it is available.
143
144    Raises:
145        NoUniqueDeviceError:
146            0 or multiple devices are connected via USB.
147
148    Returns:
149        An AndroidDevice associated with the unique USB-connected device.
150    """
151    return _get_device_by_type('-d', adb_path=adb_path)
152
153
154def get_emulator_device(adb_path: str = 'adb') -> AndroidDevice:
155    """Get the unique emulator AndroidDevice if it is available.
156
157    Raises:
158        NoUniqueDeviceError:
159            0 or multiple emulators are running.
160
161    Returns:
162        An AndroidDevice associated with the unique running emulator.
163    """
164    return _get_device_by_type('-e', adb_path=adb_path)
165
166
167def split_lines(s: str) -> list[str]:
168    """Splits lines in a way that works even on Windows and old devices.
169
170    Windows will see \r\n instead of \n, old devices do the same, old devices
171    on Windows will see \r\r\n.
172    """
173    # rstrip is used here to workaround a difference between splitlines and
174    # re.split:
175    # >>> 'foo\n'.splitlines()
176    # ['foo']
177    # >>> re.split(r'\n', 'foo\n')
178    # ['foo', '']
179    return re.split(r'[\r\n]+', s.rstrip())
180
181
182def version(adb_path: list[str] | None = None) -> int:
183    """Get the version of adb (in terms of ADB_SERVER_VERSION)."""
184
185    adb_path = adb_path if adb_path is not None else ['adb']
186    version_output = subprocess.check_output(adb_path + ['version'], encoding='utf-8')
187    pattern = r'^Android Debug Bridge version 1.0.(\d+)$'
188    result = re.match(pattern, version_output.splitlines()[0])
189    if not result:
190        return 0
191    return int(result.group(1))
192
193
194class AndroidDevice(object):
195    # Delimiter string to indicate the start of the exit code.
196    _RETURN_CODE_DELIMITER = 'x'
197
198    # Follow any shell command with this string to get the exit
199    # status of a program since this isn't propagated by adb.
200    #
201    # The delimiter is needed because `printf 1; echo $?` would print
202    # "10", and we wouldn't be able to distinguish the exit code.
203    _RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)]
204
205    # Maximum search distance from the output end to find the delimiter.
206    # adb on Windows returns \r\n even if adbd returns \n. Some old devices
207    # seem to actually return \r\r\n.
208    _RETURN_CODE_SEARCH_LENGTH = len(
209        '{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER))
210
211    def __init__(
212        self, serial: str | None, product: str | None = None, adb_path: str = 'adb'
213    ) -> None:
214        self.serial = serial
215        self.product = product
216        self.adb_path = adb_path
217        self.adb_cmd = [adb_path]
218
219        if self.serial is not None:
220            self.adb_cmd.extend(['-s', self.serial])
221        if self.product is not None:
222            self.adb_cmd.extend(['-p', self.product])
223        self._linesep: str | None = None
224        self._features: list[str] | None = None
225
226    @property
227    def linesep(self) -> str:
228        if self._linesep is None:
229            self._linesep = subprocess.check_output(
230                self.adb_cmd + ['shell', 'echo'], encoding='utf-8')
231        return self._linesep
232
233    @property
234    def features(self) -> list[str]:
235        if self._features is None:
236            try:
237                self._features = split_lines(self._simple_call(['features']))
238            except subprocess.CalledProcessError:
239                self._features = []
240        return self._features
241
242    def has_shell_protocol(self) -> bool:
243        return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features
244
245    def _make_shell_cmd(self, user_cmd: list[str]) -> list[str]:
246        command = self.adb_cmd + ['shell'] + user_cmd
247        if not self.has_shell_protocol():
248            command += self._RETURN_CODE_PROBE
249        return command
250
251    def _parse_shell_output(self, out: str) -> tuple[int, str]:
252        """Finds the exit code string from shell output.
253
254        Args:
255            out: Shell output string.
256
257        Returns:
258            An (exit_code, output_string) tuple. The output string is
259            cleaned of any additional stuff we appended to find the
260            exit code.
261
262        Raises:
263            RuntimeError: Could not find the exit code in |out|.
264        """
265        search_text = out
266        if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
267            # We don't want to search over massive amounts of data when we know
268            # the part we want is right at the end.
269            search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
270        partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
271        if partition[1] == '':
272            raise RuntimeError('Could not find exit status in shell output.')
273        result = int(partition[2])
274        # partition[0] won't contain the full text if search_text was
275        # truncated, pull from the original string instead.
276        out = out[:-len(partition[1]) - len(partition[2])]
277        return result, out
278
279    def _simple_call(self, cmd: list[str]) -> str:
280        logging.info(' '.join(self.adb_cmd + cmd))
281        return subprocess.check_output(
282            self.adb_cmd + cmd, stderr=subprocess.STDOUT).decode('utf-8')
283
284    def shell(self, cmd: list[str]) -> tuple[str, str]:
285        """Calls `adb shell`
286
287        Args:
288            cmd: command to execute as a list of strings.
289
290        Returns:
291            A (stdout, stderr) tuple. Stderr may be combined into stdout
292            if the device doesn't support separate streams.
293
294        Raises:
295            ShellError: the exit code was non-zero.
296        """
297        exit_code, stdout, stderr = self.shell_nocheck(cmd)
298        if exit_code != 0:
299            raise ShellError(cmd, stdout, stderr, exit_code)
300        return stdout, stderr
301
302    def shell_nocheck(self, cmd: list[str]) -> tuple[int, str, str]:
303        """Calls `adb shell`
304
305        Args:
306            cmd: command to execute as a list of strings.
307
308        Returns:
309            An (exit_code, stdout, stderr) tuple. Stderr may be combined
310            into stdout if the device doesn't support separate streams.
311        """
312        cmd = self._make_shell_cmd(cmd)
313        logging.info(' '.join(cmd))
314        p = subprocess.Popen(
315            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
316        stdout, stderr = p.communicate()
317        if self.has_shell_protocol():
318            exit_code = p.returncode
319        else:
320            exit_code, stdout = self._parse_shell_output(stdout)
321        return exit_code, stdout, stderr
322
323    def shell_popen(
324        self,
325        cmd: list[str],
326        kill_atexit: bool = True,
327        preexec_fn: Callable[[], None] | None = None,
328        creationflags: int = 0,
329        **kwargs: Any,
330    ) -> subprocess.Popen[Any]:
331        """Calls `adb shell` and returns a handle to the adb process.
332
333        This function provides direct access to the subprocess used to run the
334        command, without special return code handling. Users that need the
335        return value must retrieve it themselves.
336
337        Args:
338            cmd: Array of command arguments to execute.
339            kill_atexit: Whether to kill the process upon exiting.
340            preexec_fn: Argument forwarded to subprocess.Popen.
341            creationflags: Argument forwarded to subprocess.Popen.
342            **kwargs: Arguments forwarded to subprocess.Popen.
343
344        Returns:
345            subprocess.Popen handle to the adb shell instance
346        """
347
348        command = self.adb_cmd + ['shell'] + cmd
349
350        # Make sure a ctrl-c in the parent script doesn't kill gdbserver.
351        if os.name == 'nt':
352            creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
353        else:
354            if preexec_fn is None:
355                preexec_fn = os.setpgrp
356            elif preexec_fn is not os.setpgrp:
357                fn = preexec_fn
358                def _wrapper() -> None:
359                    fn()
360                    os.setpgrp()
361                preexec_fn = _wrapper
362
363        p = subprocess.Popen(command, creationflags=creationflags,
364                             preexec_fn=preexec_fn, **kwargs)
365
366        if kill_atexit:
367            atexit.register(p.kill)
368
369        return p
370
371    def install(self, filename: str, replace: bool = False) -> str:
372        cmd = ['install']
373        if replace:
374            cmd.append('-r')
375        cmd.append(filename)
376        return self._simple_call(cmd)
377
378    def push(self, local: str | list[str], remote: str, sync: bool = False) -> str:
379        """Transfer a local file or directory to the device.
380
381        Args:
382            local: The local file or directory to transfer.
383            remote: The remote path to which local should be transferred.
384            sync: If True, only transfers files that are newer on the host than
385                  those on the device. If False, transfers all files.
386
387        Returns:
388            Output of the command.
389        """
390        cmd = ['push']
391        if sync:
392            cmd.append('--sync')
393
394        if isinstance(local, str):
395            cmd.extend([local, remote])
396        else:
397            cmd.extend(local)
398            cmd.append(remote)
399
400        return self._simple_call(cmd)
401
402    def pull(self, remote: str, local: str) -> str:
403        return self._simple_call(['pull', remote, local])
404
405    def sync(self, directory: str | None = None) -> str:
406        cmd = ['sync']
407        if directory is not None:
408            cmd.append(directory)
409        return self._simple_call(cmd)
410
411    def tcpip(self, port: str) -> str:
412        return self._simple_call(['tcpip', port])
413
414    def usb(self) -> str:
415        return self._simple_call(['usb'])
416
417    def reboot(self) -> str:
418        return self._simple_call(['reboot'])
419
420    def remount(self) -> str:
421        return self._simple_call(['remount'])
422
423    def root(self) -> str:
424        return self._simple_call(['root'])
425
426    def unroot(self) -> str:
427        return self._simple_call(['unroot'])
428
429    def connect(self, host: str) -> str:
430        return self._simple_call(['connect', host])
431
432    def disconnect(self, host: str) -> str:
433        return self._simple_call(['disconnect', host])
434
435    def forward(self, local: str, remote: str) -> str:
436        return self._simple_call(['forward', local, remote])
437
438    def forward_list(self) -> str:
439        return self._simple_call(['forward', '--list'])
440
441    def forward_no_rebind(self, local: str, remote: str) -> str:
442        return self._simple_call(['forward', '--no-rebind', local, remote])
443
444    def forward_remove(self, local: str) -> str:
445        return self._simple_call(['forward', '--remove', local])
446
447    def forward_remove_all(self) -> str:
448        return self._simple_call(['forward', '--remove-all'])
449
450    def reverse(self, remote: str, local: str) -> str:
451        return self._simple_call(['reverse', remote, local])
452
453    def reverse_list(self) -> str:
454        return self._simple_call(['reverse', '--list'])
455
456    def reverse_no_rebind(self, local: str, remote: str) -> str:
457        return self._simple_call(['reverse', '--no-rebind', local, remote])
458
459    def reverse_remove_all(self) -> str:
460        return self._simple_call(['reverse', '--remove-all'])
461
462    def reverse_remove(self, remote: str) -> str:
463        return self._simple_call(['reverse', '--remove', remote])
464
465    def wait(self) -> str:
466        return self._simple_call(['wait-for-device'])
467
468    def get_prop(self, prop_name: str) -> str | None:
469        output = split_lines(self.shell(['getprop', prop_name])[0])
470        if len(output) != 1:
471            raise RuntimeError('Too many lines in getprop output:\n' +
472                               '\n'.join(output))
473        value = output[0]
474        if not value.strip():
475            return None
476        return value
477
478    def set_prop(self, prop_name: str, value: str) -> None:
479        self.shell(['setprop', prop_name, value])
480
481    def logcat(self) -> str:
482        """Returns the contents of logcat."""
483        return self._simple_call(['logcat', '-d'])
484
485    def clear_logcat(self) -> None:
486        """Clears the logcat buffer."""
487        self._simple_call(['logcat', '-c'])
488