• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import io
23import os
24import posixpath
25import random
26import re
27import shlex
28import shutil
29import signal
30import socket
31import string
32import subprocess
33import sys
34import tempfile
35import threading
36import time
37import unittest
38
39import adb_host_pb2 as adb_host_proto
40
41from datetime import datetime
42
43import adb
44
45def requires_non_root(func):
46    def wrapper(self, *args):
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if was_root:
49            self.device.unroot()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if was_root:
56                self.device.root()
57                self.device.wait()
58
59    return wrapper
60
61
62class DeviceTest(unittest.TestCase):
63    def setUp(self) -> None:
64        self.device = adb.get_device()
65
66
67class AbbTest(DeviceTest):
68    def test_smoke(self):
69        abb = subprocess.run(['adb', 'abb'], capture_output=True)
70        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
71
72        # abb squashes all failures to 1.
73        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
74        self.assertEqual(abb.stdout, cmd.stdout)
75        self.assertEqual(abb.stderr, cmd.stderr)
76
77class ForwardReverseTest(DeviceTest):
78    def _test_no_rebind(self, description, direction_list, direction,
79                       direction_no_rebind, direction_remove_all):
80        msg = direction_list()
81        self.assertEqual('', msg.strip(),
82                         description + ' list must be empty to run this test.')
83
84        # Use --no-rebind with no existing binding
85        direction_no_rebind('tcp:5566', 'tcp:6655')
86        msg = direction_list()
87        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
88
89        # Use --no-rebind with existing binding
90        with self.assertRaises(subprocess.CalledProcessError):
91            direction_no_rebind('tcp:5566', 'tcp:6677')
92        msg = direction_list()
93        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
94        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96        # Use the absence of --no-rebind with existing binding
97        direction('tcp:5566', 'tcp:6677')
98        msg = direction_list()
99        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
100        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
101
102        direction_remove_all()
103        msg = direction_list()
104        self.assertEqual('', msg.strip())
105
106    def test_forward_no_rebind(self):
107        self._test_no_rebind('forward', self.device.forward_list,
108                            self.device.forward, self.device.forward_no_rebind,
109                            self.device.forward_remove_all)
110
111    def test_reverse_no_rebind(self):
112        self._test_no_rebind('reverse', self.device.reverse_list,
113                            self.device.reverse, self.device.reverse_no_rebind,
114                            self.device.reverse_remove_all)
115
116    def test_forward(self):
117        msg = self.device.forward_list()
118        self.assertEqual('', msg.strip(),
119                         'Forwarding list must be empty to run this test.')
120        self.device.forward('tcp:5566', 'tcp:6655')
121        msg = self.device.forward_list()
122        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
123        self.device.forward('tcp:7788', 'tcp:8877')
124        msg = self.device.forward_list()
125        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
126        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
127        self.device.forward_remove('tcp:5566')
128        msg = self.device.forward_list()
129        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
130        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
131        self.device.forward_remove_all()
132        msg = self.device.forward_list()
133        self.assertEqual('', msg.strip())
134
135    def test_forward_old_protocol(self):
136        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
137
138        msg = self.device.forward_list()
139        self.assertEqual('', msg.strip(),
140                         'Forwarding list must be empty to run this test.')
141
142        with socket.create_connection(("localhost", 5037)) as s:
143            service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
144            cmd = b"%04x%s" % (len(service), service)
145            s.sendall(cmd)
146
147        msg = self.device.forward_list()
148        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
149
150        self.device.forward_remove_all()
151        msg = self.device.forward_list()
152        self.assertEqual('', msg.strip())
153
154    def test_forward_tcp_port_0(self):
155        self.assertEqual('', self.device.forward_list().strip(),
156                         'Forwarding list must be empty to run this test.')
157
158        try:
159            # If resolving TCP port 0 is supported, `adb forward` will print
160            # the actual port number.
161            port = self.device.forward('tcp:0', 'tcp:8888').strip()
162            if not port:
163                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
164
165            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
166                                      self.device.forward_list()))
167        finally:
168            self.device.forward_remove_all()
169
170    def test_reverse(self):
171        msg = self.device.reverse_list()
172        self.assertEqual('', msg.strip(),
173                         'Reverse forwarding list must be empty to run this test.')
174        self.device.reverse('tcp:5566', 'tcp:6655')
175        msg = self.device.reverse_list()
176        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
177        self.device.reverse('tcp:7788', 'tcp:8877')
178        msg = self.device.reverse_list()
179        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
180        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
181        self.device.reverse_remove('tcp:5566')
182        msg = self.device.reverse_list()
183        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
184        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
185        self.device.reverse_remove_all()
186        msg = self.device.reverse_list()
187        self.assertEqual('', msg.strip())
188
189    def test_reverse_tcp_port_0(self):
190        self.assertEqual('', self.device.reverse_list().strip(),
191                         'Reverse list must be empty to run this test.')
192
193        try:
194            # If resolving TCP port 0 is supported, `adb reverse` will print
195            # the actual port number.
196            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
197            if not port:
198                raise unittest.SkipTest('Reversing tcp:0 is not available.')
199
200            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
201                                      self.device.reverse_list()))
202        finally:
203            self.device.reverse_remove_all()
204
205    def test_forward_reverse_echo(self):
206        """Send data through adb forward and read it back via adb reverse"""
207        forward_port = 12345
208        reverse_port = forward_port + 1
209        forward_spec = 'tcp:' + str(forward_port)
210        reverse_spec = 'tcp:' + str(reverse_port)
211        forward_setup = False
212        reverse_setup = False
213
214        try:
215            # listen on localhost:forward_port, connect to remote:forward_port
216            self.device.forward(forward_spec, forward_spec)
217            forward_setup = True
218            # listen on remote:forward_port, connect to localhost:reverse_port
219            self.device.reverse(forward_spec, reverse_spec)
220            reverse_setup = True
221
222            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
223            with contextlib.closing(listener):
224                # Use SO_REUSEADDR so that subsequent runs of the test can grab
225                # the port even if it is in TIME_WAIT.
226                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227
228                # Listen on localhost:reverse_port before connecting to
229                # localhost:forward_port because that will cause adb to connect
230                # back to localhost:reverse_port.
231                listener.bind(('127.0.0.1', reverse_port))
232                listener.listen(4)
233
234                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
235                with contextlib.closing(client):
236                    # Connect to the listener.
237                    client.connect(('127.0.0.1', forward_port))
238
239                    # Accept the client connection.
240                    accepted_connection, addr = listener.accept()
241                    with contextlib.closing(accepted_connection) as server:
242                        data = b'hello'
243
244                        # Send data into the port setup by adb forward.
245                        client.sendall(data)
246                        # Explicitly close() so that server gets EOF.
247                        client.close()
248
249                        # Verify that the data came back via adb reverse.
250                        self.assertEqual(data, server.makefile().read().encode("utf8"))
251        finally:
252            if reverse_setup:
253                self.device.reverse_remove(forward_spec)
254            if forward_setup:
255                self.device.forward_remove(forward_spec)
256
257
258class ShellTest(DeviceTest):
259    def _interactive_shell(self, shell_args, input):
260        """Runs an interactive adb shell.
261
262        Args:
263          shell_args: List of string arguments to `adb shell`.
264          input: bytes input to send to the interactive shell.
265
266        Returns:
267          The remote exit code.
268
269        Raises:
270          unittest.SkipTest: The device doesn't support exit codes.
271        """
272        if not self.device.has_shell_protocol():
273            raise unittest.SkipTest('exit codes are unavailable on this device')
274
275        proc = subprocess.Popen(
276                self.device.adb_cmd + ['shell'] + shell_args,
277                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
278                stderr=subprocess.PIPE)
279        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
280        # to explicitly add an exit command to close the session from the device
281        # side, plus the necessary newline to complete the interactive command.
282        proc.communicate(input + b'; exit\n')
283        return proc.returncode
284
285    def test_cat(self):
286        """Check that we can at least cat a file."""
287        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
288        elements = out.split()
289        self.assertEqual(len(elements), 2)
290
291        uptime, idle = elements
292        self.assertGreater(float(uptime), 0.0)
293        self.assertGreater(float(idle), 0.0)
294
295    def test_throws_on_failure(self):
296        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
297
298    def test_output_not_stripped(self):
299        out = self.device.shell(['echo', 'foo'])[0]
300        self.assertEqual(out, 'foo' + self.device.linesep)
301
302    def test_shell_command_length(self):
303        # Devices that have shell_v2 should be able to handle long commands.
304        if self.device.has_shell_protocol():
305            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
306            self.assertEqual(rc, 0)
307            self.assertTrue(out == ('x' * 16384 + '\n'))
308
309    def test_shell_nocheck_failure(self):
310        rc, out, _ = self.device.shell_nocheck(['false'])
311        self.assertNotEqual(rc, 0)
312        self.assertEqual(out, '')
313
314    def test_shell_nocheck_output_not_stripped(self):
315        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
316        self.assertEqual(rc, 0)
317        self.assertEqual(out, 'foo' + self.device.linesep)
318
319    def test_can_distinguish_tricky_results(self):
320        # If result checking on ADB shell is naively implemented as
321        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
322        # output from the result for a cmd of `echo -n 1`.
323        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
324        self.assertEqual(rc, 0)
325        self.assertEqual(out, '1')
326
327    def test_line_endings(self):
328        """Ensure that line ending translation is not happening in the pty.
329
330        Bug: http://b/19735063
331        """
332        output = self.device.shell(['uname'])[0]
333        self.assertEqual(output, 'Linux' + self.device.linesep)
334
335    def test_pty_logic(self):
336        """Tests that a PTY is allocated when it should be.
337
338        PTY allocation behavior should match ssh.
339        """
340        def check_pty(args):
341            """Checks adb shell PTY allocation.
342
343            Tests |args| for terminal and non-terminal stdin.
344
345            Args:
346                args: -Tt args in a list (e.g. ['-t', '-t']).
347
348            Returns:
349                A tuple (<terminal>, <non-terminal>). True indicates
350                the corresponding shell allocated a remote PTY.
351            """
352            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
353
354            terminal = subprocess.Popen(
355                    test_cmd, stdin=None,
356                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
357            terminal.communicate()
358
359            non_terminal = subprocess.Popen(
360                    test_cmd, stdin=subprocess.PIPE,
361                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
362            non_terminal.communicate()
363
364            return (terminal.returncode == 0, non_terminal.returncode == 0)
365
366        # -T: never allocate PTY.
367        self.assertEqual((False, False), check_pty(['-T']))
368
369        # These tests require a new device.
370        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
371            # No args: PTY only if stdin is a terminal and shell is interactive,
372            # which is difficult to reliably test from a script.
373            self.assertEqual((False, False), check_pty([]))
374
375            # -t: PTY if stdin is a terminal.
376            self.assertEqual((True, False), check_pty(['-t']))
377
378        # -t -t: always allocate PTY.
379        self.assertEqual((True, True), check_pty(['-t', '-t']))
380
381        # -tt: always allocate PTY, POSIX style (http://b/32216152).
382        self.assertEqual((True, True), check_pty(['-tt']))
383
384        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
385        # we follow the man page instead.
386        self.assertEqual((True, True), check_pty(['-ttt']))
387
388        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
389        self.assertEqual((True, True), check_pty(['-ttx']))
390
391        # -Ttt: -tt cancels out -T.
392        self.assertEqual((True, True), check_pty(['-Ttt']))
393
394        # -ttT: -T cancels out -tt.
395        self.assertEqual((False, False), check_pty(['-ttT']))
396
397    def test_shell_protocol(self):
398        """Tests the shell protocol on the device.
399
400        If the device supports shell protocol, this gives us the ability
401        to separate stdout/stderr and return the exit code directly.
402
403        Bug: http://b/19734861
404        """
405        if not self.device.has_shell_protocol():
406            raise unittest.SkipTest('shell protocol unsupported on this device')
407
408        # Shell protocol should be used by default.
409        result = self.device.shell_nocheck(
410                shlex.split('echo foo; echo bar >&2; exit 17'))
411        self.assertEqual(17, result[0])
412        self.assertEqual('foo' + self.device.linesep, result[1])
413        self.assertEqual('bar' + self.device.linesep, result[2])
414
415        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
416
417        # -x flag should disable shell protocol.
418        result = self.device.shell_nocheck(
419                shlex.split('-x echo foo; echo bar >&2; exit 17'))
420        self.assertEqual(0, result[0])
421        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
422        self.assertEqual('', result[2])
423
424        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
425
426    def test_non_interactive_sigint(self):
427        """Tests that SIGINT in a non-interactive shell kills the process.
428
429        This requires the shell protocol in order to detect the broken
430        pipe; raw data transfer mode will only see the break once the
431        subprocess tries to read or write.
432
433        Bug: http://b/23825725
434        """
435        if not self.device.has_shell_protocol():
436            raise unittest.SkipTest('shell protocol unsupported on this device')
437
438        # Start a long-running process.
439        sleep_proc = subprocess.Popen(
440                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
441                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
442                stderr=subprocess.STDOUT)
443        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
444        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
445        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
446
447        # Verify that the process is running, send signal, verify it stopped.
448        self.device.shell(proc_query)
449        os.kill(sleep_proc.pid, signal.SIGINT)
450        sleep_proc.communicate()
451
452        # It can take some time for the process to receive the signal and die.
453        end_time = time.time() + 3
454        while self.device.shell_nocheck(proc_query)[0] != 1:
455            self.assertFalse(time.time() > end_time,
456                             'subprocess failed to terminate in time')
457
458    def test_non_interactive_stdin(self):
459        """Tests that non-interactive shells send stdin."""
460        if not self.device.has_shell_protocol():
461            raise unittest.SkipTest('non-interactive stdin unsupported '
462                                    'on this device')
463
464        # Test both small and large inputs.
465        small_input = b'foo'
466        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
467        large_input = b'\n'.join(characters)
468
469
470        for input in (small_input, large_input):
471            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
472                                    stdin=subprocess.PIPE,
473                                    stdout=subprocess.PIPE,
474                                    stderr=subprocess.PIPE)
475            stdout, stderr = proc.communicate(input)
476            self.assertEqual(input.splitlines(), stdout.splitlines())
477            self.assertEqual(b'', stderr)
478
479    def test_sighup(self):
480        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
481        log_path = "/data/local/tmp/adb_signal_test.log"
482
483        # Clear the output file.
484        self.device.shell_nocheck(["echo", ">", log_path])
485
486        script = """
487            trap "echo SIGINT > {path}; exit 0" SIGINT
488            trap "echo SIGHUP > {path}; exit 0" SIGHUP
489            echo Waiting
490            read
491        """.format(path=log_path)
492
493        script = ";".join([x.strip() for x in script.strip().splitlines()])
494
495        with self.device.shell_popen([script], kill_atexit=False,
496                                     stdin=subprocess.PIPE,
497                                     stdout=subprocess.PIPE) as process:
498
499            self.assertEqual(b"Waiting\n", process.stdout.readline())
500            process.send_signal(signal.SIGINT)
501            process.wait()
502
503        # Waiting for the local adb to finish is insufficient, since it hangs
504        # up immediately.
505        time.sleep(1)
506
507        stdout, _ = self.device.shell(["cat", log_path])
508        self.assertEqual(stdout.strip(), "SIGHUP")
509
510    # Temporarily disabled because it seems to cause later instability.
511    # http://b/228114748
512    def disabled_test_exit_stress(self):
513        """Hammer `adb shell exit 42` with multiple threads."""
514        thread_count = 48
515        result = dict()
516        def hammer(thread_idx, thread_count, result):
517            success = True
518            for i in range(thread_idx, 240, thread_count):
519                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
520                if ret != i % 256:
521                    success = False
522                    break
523            result[thread_idx] = success
524
525        threads = []
526        for i in range(thread_count):
527            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
528            thread.start()
529            threads.append(thread)
530        for thread in threads:
531            thread.join()
532        for i, success in result.items():
533            self.assertTrue(success)
534
535    def disabled_test_parallel(self):
536        """Spawn a bunch of `adb shell` instances in parallel.
537
538        This was broken historically due to the use of select, which only works
539        for fds that are numerically less than 1024.
540
541        Bug: http://b/141955761"""
542
543        n_procs = 2048
544        procs = dict()
545        for i in range(0, n_procs):
546            procs[i] = subprocess.Popen(
547                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
548                stdin=subprocess.PIPE,
549                stdout=subprocess.PIPE
550            )
551
552        for i in range(0, n_procs):
553            procs[i].stdin.write("%d\n" % i)
554
555        for i in range(0, n_procs):
556            response = procs[i].stdout.readline()
557            assert(response == "%d\n" % i)
558
559        for i in range(0, n_procs):
560            procs[i].stdin.write("%d\n" % (i % 256))
561
562        for i in range(0, n_procs):
563            assert(procs[i].wait() == i % 256)
564
565
566class ArgumentEscapingTest(DeviceTest):
567    def test_shell_escaping(self):
568        """Make sure that argument escaping is somewhat sane."""
569
570        # http://b/19734868
571        # Note that this actually matches ssh(1)'s behavior --- it's
572        # converted to `sh -c echo hello; echo world` which sh interprets
573        # as `sh -c echo` (with an argument to that shell of "hello"),
574        # and then `echo world` back in the first shell.
575        result = self.device.shell(
576            shlex.split("sh -c 'echo hello; echo world'"))[0]
577        result = result.splitlines()
578        self.assertEqual(['', 'world'], result)
579        # If you really wanted "hello" and "world", here's what you'd do:
580        result = self.device.shell(
581            shlex.split(r'echo hello\;echo world'))[0].splitlines()
582        self.assertEqual(['hello', 'world'], result)
583
584        # http://b/15479704
585        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
586        self.assertEqual('t', result)
587        result = self.device.shell(
588            shlex.split("sh -c 'true && echo t'"))[0].strip()
589        self.assertEqual('t', result)
590
591        # http://b/20564385
592        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
593        self.assertEqual('t', result)
594        result = self.device.shell(
595            shlex.split(r'echo -n 123\;uname'))[0].strip()
596        self.assertEqual('123Linux', result)
597
598    def test_install_argument_escaping(self):
599        """Make sure that install argument escaping works."""
600        # http://b/20323053, http://b/3090932.
601        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
602            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
603                                             delete=False)
604            tf.close()
605
606            # Installing bogus .apks fails if the device supports exit codes.
607            try:
608                output = self.device.install(tf.name.decode("utf8"))
609            except subprocess.CalledProcessError as e:
610                output = e.output
611
612            self.assertIn(file_suffix, output)
613            os.remove(tf.name)
614
615
616class RootUnrootTest(DeviceTest):
617    def _test_root(self):
618        message = self.device.root()
619        if 'adbd cannot run as root in production builds' in message:
620            return
621        self.device.wait()
622        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
623
624    def _test_unroot(self):
625        self.device.unroot()
626        self.device.wait()
627        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
628
629    def test_root_unroot(self):
630        """Make sure that adb root and adb unroot work, using id(1)."""
631        if self.device.get_prop('ro.debuggable') != '1':
632            raise unittest.SkipTest('requires rootable build')
633
634        original_user = self.device.shell(['id', '-un'])[0].strip()
635        try:
636            if original_user == 'root':
637                self._test_unroot()
638                self._test_root()
639            elif original_user == 'shell':
640                self._test_root()
641                self._test_unroot()
642        finally:
643            if original_user == 'root':
644                self.device.root()
645            else:
646                self.device.unroot()
647            self.device.wait()
648
649
650class TcpIpTest(DeviceTest):
651    def test_tcpip_failure_raises(self):
652        """adb tcpip requires a port.
653
654        Bug: http://b/22636927
655        """
656        self.assertRaises(
657            subprocess.CalledProcessError, self.device.tcpip, '')
658        self.assertRaises(
659            subprocess.CalledProcessError, self.device.tcpip, 'foo')
660
661
662class SystemPropertiesTest(DeviceTest):
663    def test_get_prop(self):
664        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
665
666    def test_set_prop(self):
667        # debug.* prop does not require root privileges
668        prop_name = 'debug.foo'
669        self.device.shell(['setprop', prop_name, '""'])
670
671        val = random.random()
672        self.device.set_prop(prop_name, str(val))
673        self.assertEqual(
674            self.device.shell(['getprop', prop_name])[0].strip(), str(val))
675
676
677def compute_md5(string):
678    hsh = hashlib.md5()
679    hsh.update(string)
680    return hsh.hexdigest()
681
682
683class HostFile(object):
684    def __init__(self, handle, checksum):
685        self.handle = handle
686        self.checksum = checksum
687        self.full_path = handle.name
688        self.base_name = os.path.basename(self.full_path)
689
690
691class DeviceFile(object):
692    def __init__(self, checksum, full_path):
693        self.checksum = checksum
694        self.full_path = full_path
695        self.base_name = posixpath.basename(self.full_path)
696
697
698def make_random_host_files(in_dir, num_files):
699    min_size = 1 * (1 << 10)
700    max_size = 16 * (1 << 10)
701
702    files = []
703    for _ in range(num_files):
704        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
705
706        size = random.randrange(min_size, max_size, 1024)
707        rand_str = os.urandom(size)
708        file_handle.write(rand_str)
709        file_handle.flush()
710        file_handle.close()
711
712        md5 = compute_md5(rand_str)
713        files.append(HostFile(file_handle, md5))
714    return files
715
716
717def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
718    min_size = 1 * (1 << 10)
719    max_size = 16 * (1 << 10)
720
721    files = []
722    for file_num in range(num_files):
723        size = random.randrange(min_size, max_size, 1024)
724
725        base_name = prefix + str(file_num)
726        full_path = posixpath.join(in_dir, base_name)
727
728        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
729                      'bs={}'.format(size), 'count=1'])
730        dev_md5, _ = device.shell(['md5sum', full_path])[0].split()
731
732        files.append(DeviceFile(dev_md5, full_path))
733    return files
734
735
736class FileOperationsTest:
737    class Base(DeviceTest):
738        SCRATCH_DIR = '/data/local/tmp'
739        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
740        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
741
742        def setUp(self):
743            super().setUp()
744            self.previous_env = os.environ.get("ADB_COMPRESSION")
745            os.environ["ADB_COMPRESSION"] = self.compression
746
747        def tearDown(self):
748            if self.previous_env is None:
749                del os.environ["ADB_COMPRESSION"]
750            else:
751                os.environ["ADB_COMPRESSION"] = self.previous_env
752
753        def _verify_remote(self, checksum, remote_path):
754            dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split()
755            self.assertEqual(checksum, dev_md5)
756
757        def _verify_local(self, checksum, local_path):
758            with open(local_path, 'rb') as host_file:
759                host_md5 = compute_md5(host_file.read())
760                self.assertEqual(host_md5, checksum)
761
762        def test_push(self):
763            """Push a randomly generated file to specified device."""
764            kbytes = 512
765            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
766            rand_str = os.urandom(1024 * kbytes)
767            tmp.write(rand_str)
768            tmp.close()
769
770            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
771            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
772
773            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
774            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
775
776            os.remove(tmp.name)
777
778        def test_push_dir(self):
779            """Push a randomly generated directory of files to the device."""
780            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
781            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
782
783            try:
784                host_dir = tempfile.mkdtemp()
785
786                # Make sure the temp directory isn't setuid, or else adb will complain.
787                os.chmod(host_dir, 0o700)
788
789                # Create 32 random files.
790                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
791                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
792
793                for temp_file in temp_files:
794                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
795                                                 os.path.basename(host_dir),
796                                                 temp_file.base_name)
797                    self._verify_remote(temp_file.checksum, remote_path)
798                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
799            finally:
800                if host_dir is not None:
801                    shutil.rmtree(host_dir)
802
803        def disabled_test_push_empty(self):
804            """Push an empty directory to the device."""
805            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
806            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
807
808            try:
809                host_dir = tempfile.mkdtemp()
810
811                # Make sure the temp directory isn't setuid, or else adb will complain.
812                os.chmod(host_dir, 0o700)
813
814                # Create an empty directory.
815                empty_dir_path = os.path.join(host_dir, 'empty')
816                os.mkdir(empty_dir_path);
817
818                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
819
820                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
821                test_empty_cmd = ["[", "-d", remote_path, "]"]
822                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
823
824                self.assertEqual(rc, 0)
825                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
826            finally:
827                if host_dir is not None:
828                    shutil.rmtree(host_dir)
829
830        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
831        def test_push_symlink(self):
832            """Push a symlink.
833
834            Bug: http://b/31491920
835            """
836            try:
837                host_dir = tempfile.mkdtemp()
838
839                # Make sure the temp directory isn't setuid, or else adb will
840                # complain.
841                os.chmod(host_dir, 0o700)
842
843                with open(os.path.join(host_dir, 'foo'), 'w') as f:
844                    f.write('foo')
845
846                symlink_path = os.path.join(host_dir, 'symlink')
847                os.symlink('foo', symlink_path)
848
849                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
850                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
851                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
852                rc, out, _ = self.device.shell_nocheck(
853                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
854                self.assertEqual(0, rc)
855                self.assertEqual(out.strip(), 'foo')
856            finally:
857                if host_dir is not None:
858                    shutil.rmtree(host_dir)
859
860        def test_multiple_push(self):
861            """Push multiple files to the device in one adb push command.
862
863            Bug: http://b/25324823
864            """
865
866            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
867            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
868
869            try:
870                host_dir = tempfile.mkdtemp()
871
872                # Create some random files and a subdirectory containing more files.
873                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
874
875                subdir = os.path.join(host_dir, 'subdir')
876                os.mkdir(subdir)
877                subdir_temp_files = make_random_host_files(in_dir=subdir,
878                                                           num_files=4)
879
880                paths = [x.full_path for x in temp_files]
881                paths.append(subdir)
882                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
883
884                for temp_file in temp_files:
885                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
886                                                 temp_file.base_name)
887                    self._verify_remote(temp_file.checksum, remote_path)
888
889                for subdir_temp_file in subdir_temp_files:
890                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
891                                                 # BROKEN: http://b/25394682
892                                                 # 'subdir';
893                                                 temp_file.base_name)
894                    self._verify_remote(temp_file.checksum, remote_path)
895
896
897                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
898            finally:
899                if host_dir is not None:
900                    shutil.rmtree(host_dir)
901
902        @requires_non_root
903        def test_push_error_reporting(self):
904            """Make sure that errors that occur while pushing a file get reported
905
906            Bug: http://b/26816782
907            """
908            with tempfile.NamedTemporaryFile() as tmp_file:
909                tmp_file.write(b'\0' * 1024 * 1024)
910                tmp_file.flush()
911                try:
912                    self.device.push(local=tmp_file.name, remote='/system/')
913                    self.fail('push should not have succeeded')
914                except subprocess.CalledProcessError as e:
915                    output = e.output
916
917                self.assertTrue(b'Permission denied' in output or
918                                b'Read-only file system' in output)
919
920        @requires_non_root
921        def test_push_directory_creation(self):
922            """Regression test for directory creation.
923
924            Bug: http://b/110953234
925            """
926            with tempfile.NamedTemporaryFile() as tmp_file:
927                tmp_file.write(b'\0' * 1024 * 1024)
928                tmp_file.flush()
929                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
930                self.device.shell(['rm', '-rf', remote_path])
931
932                remote_path += '/filename'
933                self.device.push(local=tmp_file.name, remote=remote_path)
934
935        def disabled_test_push_multiple_slash_root(self):
936            """Regression test for pushing to //data/local/tmp.
937
938            Bug: http://b/141311284
939
940            Disabled because this broken on the adbd side as well: b/141943968
941            """
942            with tempfile.NamedTemporaryFile() as tmp_file:
943                tmp_file.write(b'\0' * 1024 * 1024)
944                tmp_file.flush()
945                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
946                self.device.shell(['rm', '-rf', remote_path])
947                self.device.push(local=tmp_file.name, remote=remote_path)
948
949        def _test_pull(self, remote_file, checksum):
950            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
951            tmp_write.close()
952            self.device.pull(remote=remote_file, local=tmp_write.name)
953            with open(tmp_write.name, 'rb') as tmp_read:
954                host_contents = tmp_read.read()
955                host_md5 = compute_md5(host_contents)
956            self.assertEqual(checksum, host_md5)
957            os.remove(tmp_write.name)
958
959        @requires_non_root
960        def test_pull_error_reporting(self):
961            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
962            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
963
964            try:
965                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
966            except subprocess.CalledProcessError as e:
967                output = e.output
968
969            self.assertIn(b'Permission denied', output)
970
971            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
972
973        def test_pull(self):
974            """Pull a randomly generated file from specified device."""
975            kbytes = 512
976            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
977            cmd = ['dd', 'if=/dev/urandom',
978                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
979                   'count={}'.format(kbytes)]
980            self.device.shell(cmd)
981            dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split()
982            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
983            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
984
985        def test_pull_dir(self):
986            """Pull a randomly generated directory of files from the device."""
987            try:
988                host_dir = tempfile.mkdtemp()
989
990                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
991                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
992
993                # Populate device directory with random files.
994                temp_files = make_random_device_files(
995                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
996
997                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
998
999                for temp_file in temp_files:
1000                    host_path = os.path.join(
1001                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1002                        temp_file.base_name)
1003                    self._verify_local(temp_file.checksum, host_path)
1004
1005                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1006            finally:
1007                if host_dir is not None:
1008                    shutil.rmtree(host_dir)
1009
1010        def test_pull_dir_symlink(self):
1011            """Pull a directory into a symlink to a directory.
1012
1013            Bug: http://b/27362811
1014            """
1015            if os.name != 'posix':
1016                raise unittest.SkipTest('requires POSIX')
1017
1018            try:
1019                host_dir = tempfile.mkdtemp()
1020                real_dir = os.path.join(host_dir, 'dir')
1021                symlink = os.path.join(host_dir, 'symlink')
1022                os.mkdir(real_dir)
1023                os.symlink(real_dir, symlink)
1024
1025                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1026                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1027
1028                # Populate device directory with random files.
1029                temp_files = make_random_device_files(
1030                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1031
1032                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1033
1034                for temp_file in temp_files:
1035                    host_path = os.path.join(
1036                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1037                        temp_file.base_name)
1038                    self._verify_local(temp_file.checksum, host_path)
1039
1040                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1041            finally:
1042                if host_dir is not None:
1043                    shutil.rmtree(host_dir)
1044
1045        def test_pull_dir_symlink_collision(self):
1046            """Pull a directory into a colliding symlink to directory."""
1047            if os.name != 'posix':
1048                raise unittest.SkipTest('requires POSIX')
1049
1050            try:
1051                host_dir = tempfile.mkdtemp()
1052                real_dir = os.path.join(host_dir, 'real')
1053                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1054                symlink = os.path.join(host_dir, tmp_dirname)
1055                os.mkdir(real_dir)
1056                os.symlink(real_dir, symlink)
1057
1058                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1059                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1060
1061                # Populate device directory with random files.
1062                temp_files = make_random_device_files(
1063                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1064
1065                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1066
1067                for temp_file in temp_files:
1068                    host_path = os.path.join(real_dir, temp_file.base_name)
1069                    self._verify_local(temp_file.checksum, host_path)
1070
1071                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1072            finally:
1073                if host_dir is not None:
1074                    shutil.rmtree(host_dir)
1075
1076        def test_pull_dir_nonexistent(self):
1077            """Pull a directory of files from the device to a nonexistent path."""
1078            try:
1079                host_dir = tempfile.mkdtemp()
1080                dest_dir = os.path.join(host_dir, 'dest')
1081
1082                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1084
1085                # Populate device directory with random files.
1086                temp_files = make_random_device_files(
1087                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1088
1089                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1090
1091                for temp_file in temp_files:
1092                    host_path = os.path.join(dest_dir, temp_file.base_name)
1093                    self._verify_local(temp_file.checksum, host_path)
1094
1095                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1096            finally:
1097                if host_dir is not None:
1098                    shutil.rmtree(host_dir)
1099
1100        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1101        def disabled_test_pull_symlink_dir(self):
1102            """Pull a symlink to a directory of symlinks to files."""
1103            try:
1104                host_dir = tempfile.mkdtemp()
1105
1106                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1107                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1108                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1109
1110                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1111                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1112                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1113
1114                # Populate device directory with random files.
1115                temp_files = make_random_device_files(
1116                    self.device, in_dir=remote_dir, num_files=32)
1117
1118                for temp_file in temp_files:
1119                    self.device.shell(
1120                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1121                         posixpath.join(remote_links, temp_file.base_name)])
1122
1123                self.device.pull(remote=remote_symlink, local=host_dir)
1124
1125                for temp_file in temp_files:
1126                    host_path = os.path.join(
1127                        host_dir, 'symlink', temp_file.base_name)
1128                    self._verify_local(temp_file.checksum, host_path)
1129
1130                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1131            finally:
1132                if host_dir is not None:
1133                    shutil.rmtree(host_dir)
1134
1135        def test_pull_empty(self):
1136            """Pull a directory containing an empty directory from the device."""
1137            try:
1138                host_dir = tempfile.mkdtemp()
1139
1140                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1141                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1142                self.device.shell(['mkdir', '-p', remote_empty_path])
1143
1144                self.device.pull(remote=remote_empty_path, local=host_dir)
1145                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1146            finally:
1147                if host_dir is not None:
1148                    shutil.rmtree(host_dir)
1149
1150        def test_multiple_pull(self):
1151            """Pull a randomly generated directory of files from the device."""
1152
1153            try:
1154                host_dir = tempfile.mkdtemp()
1155
1156                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1157                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1158                self.device.shell(['mkdir', '-p', subdir])
1159
1160                # Create some random files and a subdirectory containing more files.
1161                temp_files = make_random_device_files(
1162                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1163
1164                subdir_temp_files = make_random_device_files(
1165                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1166
1167                paths = [x.full_path for x in temp_files]
1168                paths.append(subdir)
1169                self.device._simple_call(['pull'] + paths + [host_dir])
1170
1171                for temp_file in temp_files:
1172                    local_path = os.path.join(host_dir, temp_file.base_name)
1173                    self._verify_local(temp_file.checksum, local_path)
1174
1175                for subdir_temp_file in subdir_temp_files:
1176                    local_path = os.path.join(host_dir,
1177                                              'subdir',
1178                                              subdir_temp_file.base_name)
1179                    self._verify_local(subdir_temp_file.checksum, local_path)
1180
1181                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1182            finally:
1183                if host_dir is not None:
1184                    shutil.rmtree(host_dir)
1185
1186        def verify_sync(self, device, temp_files, device_dir):
1187            """Verifies that a list of temp files was synced to the device."""
1188            # Confirm that every file on the device mirrors that on the host.
1189            for temp_file in temp_files:
1190                device_full_path = posixpath.join(
1191                    device_dir, temp_file.base_name)
1192                dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split()
1193                self.assertEqual(temp_file.checksum, dev_md5)
1194
1195        def do_test_sync(self, partition):
1196            """Sync a host directory to the given partition."""
1197
1198            try:
1199                base_dir = tempfile.mkdtemp()
1200
1201                # Create mirror device directory hierarchy within base_dir.
1202                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1203                os.makedirs(full_dir_path)
1204
1205                # Create 32 random files within the host mirror.
1206                temp_files = make_random_host_files(
1207                    in_dir=full_dir_path, num_files=32)
1208
1209                # Clean up any stale files on the device.
1210                device = adb.get_device()  # pylint: disable=no-member
1211                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1212
1213                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1214                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1215                device.sync(partition)
1216                if old_product_out is None:
1217                    del os.environ['ANDROID_PRODUCT_OUT']
1218                else:
1219                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1220
1221                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1222
1223                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1224            finally:
1225                if base_dir is not None:
1226                    shutil.rmtree(base_dir)
1227
1228        def test_sync_data(self):
1229            """Sync a host directory to the data partition."""
1230
1231            self.do_test_sync('data')
1232
1233        def test_sync_slash_data(self):
1234            """Sync a host directory to the data partition with a leading slash."""
1235
1236            self.do_test_sync('/data')
1237
1238        def test_push_sync(self):
1239            """Sync a host directory to a specific path."""
1240
1241            try:
1242                temp_dir = tempfile.mkdtemp()
1243                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1244
1245                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1246
1247                # Clean up any stale files on the device.
1248                device = adb.get_device()  # pylint: disable=no-member
1249                device.shell(['rm', '-rf', device_dir])
1250
1251                device.push(temp_dir, device_dir, sync=True)
1252
1253                self.verify_sync(device, temp_files, device_dir)
1254
1255                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1256            finally:
1257                if temp_dir is not None:
1258                    shutil.rmtree(temp_dir)
1259
1260        def test_push_sync_multiple(self):
1261            """Sync multiple host directories to a specific path."""
1262
1263            try:
1264                temp_dir = tempfile.mkdtemp()
1265                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1266
1267                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1268
1269                # Clean up any stale files on the device.
1270                device = adb.get_device()  # pylint: disable=no-member
1271                device.shell(['rm', '-rf', device_dir])
1272                device.shell(['mkdir', '-p', device_dir])
1273
1274                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1275                device.push(host_paths, device_dir, sync=True)
1276
1277                self.verify_sync(device, temp_files, device_dir)
1278
1279                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1280            finally:
1281                if temp_dir is not None:
1282                    shutil.rmtree(temp_dir)
1283
1284
1285        def test_push_dry_run_nonexistent_file(self):
1286            """Push with dry run (non-existent file)."""
1287
1288            for file_size in [8, 1024 * 1024]:
1289                try:
1290                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1291                    device_file = posixpath.join(device_dir, 'file')
1292
1293                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1294                    self.device.shell(['mkdir', '-p', device_dir])
1295
1296                    host_dir = tempfile.mkdtemp()
1297                    host_file = posixpath.join(host_dir, 'file')
1298
1299                    with open(host_file, "w") as f:
1300                        f.write('x' * file_size)
1301
1302                    self.device._simple_call(['push', '-n', host_file, device_file])
1303                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1304                    self.assertNotEqual(0, rc)
1305
1306                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1307                finally:
1308                    if host_dir is not None:
1309                        shutil.rmtree(host_dir)
1310
1311        def test_push_dry_run_existent_file(self):
1312            """Push with dry run."""
1313
1314            for file_size in [8, 1024 * 1024]:
1315                try:
1316                    host_dir = tempfile.mkdtemp()
1317                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1318                    device_file = posixpath.join(device_dir, 'file')
1319
1320                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1321                    self.device.shell(['mkdir', '-p', device_dir])
1322                    self.device.shell(['echo', 'foo', '>', device_file])
1323
1324                    host_file = posixpath.join(host_dir, 'file')
1325
1326                    with open(host_file, "w") as f:
1327                        f.write('x' * file_size)
1328
1329                    self.device._simple_call(['push', '-n', host_file, device_file])
1330                    stdout, stderr = self.device.shell(['cat', device_file])
1331                    self.assertEqual(stdout.strip(), "foo")
1332
1333                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1334                finally:
1335                    if host_dir is not None:
1336                        shutil.rmtree(host_dir)
1337
1338        def test_unicode_paths(self):
1339            """Ensure that we can support non-ASCII paths, even on Windows."""
1340            name = u'로보카 폴리'
1341
1342            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1343            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1344
1345            ## push.
1346            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1347            tf.close()
1348            self.device.push(tf.name, remote_path)
1349            os.remove(tf.name)
1350            self.assertFalse(os.path.exists(tf.name))
1351
1352            # Verify that the device ended up with the expected UTF-8 path
1353            output = self.device.shell(
1354                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1355            self.assertEqual(remote_path, output)
1356
1357            # pull.
1358            self.device.pull(remote_path, tf.name)
1359            self.assertTrue(os.path.exists(tf.name))
1360            os.remove(tf.name)
1361            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1362
1363
1364class FileOperationsTestUncompressed(FileOperationsTest.Base):
1365    compression = "none"
1366
1367
1368class FileOperationsTestBrotli(FileOperationsTest.Base):
1369    compression = "brotli"
1370
1371
1372class FileOperationsTestLZ4(FileOperationsTest.Base):
1373    compression = "lz4"
1374
1375
1376class FileOperationsTestZstd(FileOperationsTest.Base):
1377    compression = "zstd"
1378
1379
1380class DeviceOfflineTest(DeviceTest):
1381    def _get_device_state(self, serialno):
1382        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1383        for line in output.split('\n'):
1384            m = re.match('(\S+)\s+(\S+)', line)
1385            if m and m.group(1) == serialno:
1386                return m.group(2)
1387        return None
1388
1389    def disabled_test_killed_when_pushing_a_large_file(self):
1390        """
1391           While running adb push with a large file, kill adb server.
1392           Occasionally the device becomes offline. Because the device is still
1393           reading data without realizing that the adb server has been restarted.
1394           Test if we can bring the device online automatically now.
1395           http://b/32952319
1396        """
1397        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1398        # 1. Push a large file
1399        file_path = 'tmp_large_file'
1400        try:
1401            fh = open(file_path, 'w')
1402            fh.write('\0' * (100 * 1024 * 1024))
1403            fh.close()
1404            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1405            time.sleep(0.1)
1406            # 2. Kill the adb server
1407            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1408            subproc.terminate()
1409        finally:
1410            try:
1411                os.unlink(file_path)
1412            except:
1413                pass
1414        # 3. See if the device still exist.
1415        # Sleep to wait for the adb server exit.
1416        time.sleep(0.5)
1417        # 4. The device should be online
1418        self.assertEqual(self._get_device_state(serialno), 'device')
1419
1420    def disabled_test_killed_when_pulling_a_large_file(self):
1421        """
1422           While running adb pull with a large file, kill adb server.
1423           Occasionally the device can't be connected. Because the device is trying to
1424           send a message larger than what is expected by the adb server.
1425           Test if we can bring the device online automatically now.
1426        """
1427        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1428        file_path = 'tmp_large_file'
1429        try:
1430            # 1. Create a large file on device.
1431            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1432                               'bs=1000000', 'count=100'])
1433            # 2. Pull the large file on host.
1434            subproc = subprocess.Popen(self.device.adb_cmd +
1435                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1436            time.sleep(0.1)
1437            # 3. Kill the adb server
1438            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1439            subproc.terminate()
1440        finally:
1441            try:
1442                os.unlink(file_path)
1443            except:
1444                pass
1445        # 4. See if the device still exist.
1446        # Sleep to wait for the adb server exit.
1447        time.sleep(0.5)
1448        self.assertEqual(self._get_device_state(serialno), 'device')
1449
1450
1451    def test_packet_size_regression(self):
1452        """Test for http://b/37783561
1453
1454        Receiving packets of a length divisible by 512 but not 1024 resulted in
1455        the adb client waiting indefinitely for more input.
1456        """
1457        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1458        # Probe some surrounding values as well, for the hell of it.
1459        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1460            for offset in [-6, -5, -4]:
1461                length = base + offset
1462                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1463                       'echo', 'foo']
1464                rc, stdout, _ = self.device.shell_nocheck(cmd)
1465
1466                self.assertEqual(0, rc)
1467
1468                # Output should be '\0' * length, followed by "foo\n"
1469                self.assertEqual(length, len(stdout) - 4)
1470                self.assertEqual(stdout, "\0" * length + "foo\n")
1471
1472    def test_zero_packet(self):
1473        """Test for http://b/113070258
1474
1475        Make sure that we don't blow up when sending USB transfers that line up
1476        exactly with the USB packet size.
1477        """
1478
1479        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1480        try:
1481            for size in [512, 1024]:
1482                def listener():
1483                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1484                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1485
1486                thread = threading.Thread(target=listener)
1487                thread.start()
1488
1489                # Wait a bit to let the shell command start.
1490                time.sleep(0.25)
1491
1492                sock = socket.create_connection(("localhost", local_port))
1493                with contextlib.closing(sock):
1494                    bytesWritten = sock.send(b"a" * size)
1495                    self.assertEqual(size, bytesWritten)
1496                    readBytes = sock.recv(4096)
1497                    self.assertEqual(b"foo\n", readBytes)
1498
1499                thread.join()
1500        finally:
1501            self.device.forward_remove("tcp:{}".format(local_port))
1502
1503
1504class SocketTest(DeviceTest):
1505    def test_socket_flush(self):
1506        """Test that we handle socket closure properly.
1507
1508        If we're done writing to a socket, closing before the other end has
1509        closed will send a TCP_RST if we have incoming data queued up, which
1510        may result in data that we've written being discarded.
1511
1512        Bug: http://b/74616284
1513        """
1514        def adb_length_prefixed(string):
1515            encoded = string.encode("utf8")
1516            result = b"%04x%s" % (len(encoded), encoded)
1517            return result
1518
1519        if "ANDROID_SERIAL" in os.environ:
1520            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1521        else:
1522            transport_string = "host:transport-any"
1523
1524        with socket.create_connection(("localhost", 5037)) as s:
1525
1526            s.sendall(adb_length_prefixed(transport_string))
1527            response = s.recv(4)
1528            self.assertEqual(b"OKAY", response)
1529
1530            shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1531            s.sendall(adb_length_prefixed(shell_string))
1532
1533            response = s.recv(4)
1534            self.assertEqual(b"OKAY", response)
1535
1536            # Spawn a thread that dumps garbage into the socket until failure.
1537            def spam():
1538                buf = b"\0" * 16384
1539                try:
1540                    while True:
1541                        s.sendall(buf)
1542                except Exception as ex:
1543                    print(ex)
1544
1545            thread = threading.Thread(target=spam)
1546            thread.start()
1547
1548            time.sleep(1)
1549
1550            received = b""
1551            while True:
1552                read = s.recv(512)
1553                if len(read) == 0:
1554                    break
1555                received += read
1556
1557        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1558        thread.join()
1559
1560
1561class FramebufferTest(DeviceTest):
1562    def test_framebuffer(self):
1563        """Test that we get something from the framebuffer service."""
1564        output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"])
1565        self.assertFalse(len(output) == 0)
1566
1567
1568if sys.platform == "win32":
1569    # From https://stackoverflow.com/a/38749458
1570    import os
1571    import contextlib
1572    import msvcrt
1573    import ctypes
1574    from ctypes import wintypes
1575
1576    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1577
1578    GENERIC_READ  = 0x80000000
1579    GENERIC_WRITE = 0x40000000
1580    FILE_SHARE_READ  = 1
1581    FILE_SHARE_WRITE = 2
1582    CONSOLE_TEXTMODE_BUFFER = 1
1583    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1584    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1585    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1586
1587    def _check_zero(result, func, args):
1588        if not result:
1589            raise ctypes.WinError(ctypes.get_last_error())
1590        return args
1591
1592    def _check_invalid(result, func, args):
1593        if result == INVALID_HANDLE_VALUE:
1594            raise ctypes.WinError(ctypes.get_last_error())
1595        return args
1596
1597    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1598        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1599        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1600
1601    class COORD(ctypes.Structure):
1602        _fields_ = (('X', wintypes.SHORT),
1603                    ('Y', wintypes.SHORT))
1604
1605    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1606        _fields_ = (('cbSize',               wintypes.ULONG),
1607                    ('dwSize',               COORD),
1608                    ('dwCursorPosition',     COORD),
1609                    ('wAttributes',          wintypes.WORD),
1610                    ('srWindow',             wintypes.SMALL_RECT),
1611                    ('dwMaximumWindowSize',  COORD),
1612                    ('wPopupAttributes',     wintypes.WORD),
1613                    ('bFullscreenSupported', wintypes.BOOL),
1614                    ('ColorTable',           wintypes.DWORD * 16))
1615        def __init__(self, *args, **kwds):
1616            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1617                    *args, **kwds)
1618            self.cbSize = ctypes.sizeof(self)
1619
1620    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1621                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1622    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1623
1624    kernel32.GetStdHandle.errcheck = _check_invalid
1625    kernel32.GetStdHandle.restype = wintypes.HANDLE
1626    kernel32.GetStdHandle.argtypes = (
1627        wintypes.DWORD,) # _In_ nStdHandle
1628
1629    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1630    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1631    kernel32.CreateConsoleScreenBuffer.argtypes = (
1632        wintypes.DWORD,        # _In_       dwDesiredAccess
1633        wintypes.DWORD,        # _In_       dwShareMode
1634        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1635        wintypes.DWORD,        # _In_       dwFlags
1636        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1637
1638    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1639    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1640        wintypes.HANDLE,               # _In_  hConsoleOutput
1641        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1642
1643    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1644    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1645        wintypes.HANDLE,               # _In_  hConsoleOutput
1646        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1647
1648    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1649    kernel32.SetConsoleWindowInfo.argtypes = (
1650        wintypes.HANDLE,      # _In_ hConsoleOutput
1651        wintypes.BOOL,        # _In_ bAbsolute
1652        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1653
1654    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1655    kernel32.FillConsoleOutputCharacterW.argtypes = (
1656        wintypes.HANDLE,  # _In_  hConsoleOutput
1657        wintypes.WCHAR,   # _In_  cCharacter
1658        wintypes.DWORD,   # _In_  nLength
1659        COORD,            # _In_  dwWriteCoord
1660        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1661
1662    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1663    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1664        wintypes.HANDLE,  # _In_  hConsoleOutput
1665        wintypes.LPWSTR,  # _Out_ lpCharacter
1666        wintypes.DWORD,   # _In_  nLength
1667        COORD,            # _In_  dwReadCoord
1668        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1669
1670    @contextlib.contextmanager
1671    def allocate_console():
1672        allocated = kernel32.AllocConsole()
1673        try:
1674            yield allocated
1675        finally:
1676            if allocated:
1677                kernel32.FreeConsole()
1678
1679    @contextlib.contextmanager
1680    def console_screen(ncols=None, nrows=None):
1681        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1682        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1683        nwritten = (wintypes.DWORD * 1)()
1684        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1685        kernel32.GetConsoleScreenBufferInfoEx(
1686               hStdOut, ctypes.byref(info))
1687        if ncols is None:
1688            ncols = info.dwSize.X
1689        if nrows is None:
1690            nrows = info.dwSize.Y
1691        elif nrows > 9999:
1692            raise ValueError('nrows must be 9999 or less')
1693        fd_screen = None
1694        hScreen = kernel32.CreateConsoleScreenBuffer(
1695                    GENERIC_READ | GENERIC_WRITE,
1696                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1697                    None, CONSOLE_TEXTMODE_BUFFER, None)
1698        try:
1699            fd_screen = msvcrt.open_osfhandle(
1700                            hScreen, os.O_RDWR | os.O_BINARY)
1701            kernel32.GetConsoleScreenBufferInfoEx(
1702                   hScreen, ctypes.byref(new_info))
1703            new_info.dwSize = COORD(ncols, nrows)
1704            new_info.srWindow = wintypes.SMALL_RECT(
1705                    Left=0, Top=0, Right=(ncols - 1),
1706                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1707            kernel32.SetConsoleScreenBufferInfoEx(
1708                    hScreen, ctypes.byref(new_info))
1709            kernel32.SetConsoleWindowInfo(hScreen, True,
1710                    ctypes.byref(new_info.srWindow))
1711            kernel32.FillConsoleOutputCharacterW(
1712                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1713            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1714            try:
1715                yield fd_screen
1716            finally:
1717                kernel32.SetConsoleScreenBufferInfoEx(
1718                    hStdOut, ctypes.byref(info))
1719                kernel32.SetConsoleWindowInfo(hStdOut, True,
1720                        ctypes.byref(info.srWindow))
1721                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1722        finally:
1723            if fd_screen is not None:
1724                os.close(fd_screen)
1725            else:
1726                kernel32.CloseHandle(hScreen)
1727
1728    def read_screen(fd):
1729        hScreen = msvcrt.get_osfhandle(fd)
1730        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1731        kernel32.GetConsoleScreenBufferInfoEx(
1732            hScreen, ctypes.byref(csbi))
1733        ncols = csbi.dwSize.X
1734        pos = csbi.dwCursorPosition
1735        length = ncols * pos.Y + pos.X + 1
1736        buf = (ctypes.c_wchar * length)()
1737        n = (wintypes.DWORD * 1)()
1738        kernel32.ReadConsoleOutputCharacterW(
1739            hScreen, buf, length, COORD(0,0), n)
1740        lines = [buf[i:i+ncols].rstrip(u'\0')
1741                    for i in range(0, n[0], ncols)]
1742        return u'\n'.join(lines)
1743
1744@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1745class WindowsConsoleTest(DeviceTest):
1746    def test_unicode_output(self):
1747        """Test Unicode command line parameters and Unicode console window output.
1748
1749        Bug: https://issuetracker.google.com/issues/111972753
1750        """
1751        # If we don't have a console window, allocate one. This isn't necessary if we're already
1752        # being run from a console window, which is typical.
1753        with allocate_console() as allocated_console:
1754            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1755            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1756            # likely unnecessary given the typical console window size.
1757            with console_screen(nrows=1000) as screen:
1758                unicode_string = u'로보카 폴리'
1759                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1760                # device.shell_popen() which does not use a pipe, unlike device.shell().
1761                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1762                process.wait()
1763                # Read what was written by adb to the temporary console buffer.
1764                console_output = read_screen(screen)
1765                self.assertEqual(unicode_string, console_output)
1766
1767class DevicesListing(DeviceTest):
1768
1769    serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8")
1770    # def get_serial(self):
1771    #     return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8")
1772
1773    def test_devices(self):
1774        with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1775            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1776            self.assertEqual(len(lines), 3)
1777            line = lines[1]
1778            self.assertTrue(self.serial in line)
1779            self.assertFalse("{" in line)
1780            self.assertFalse("}" in line)
1781            self.assertTrue("device" in line)
1782            self.assertFalse("product" in line)
1783            self.assertFalse("transport" in line)
1784
1785    def test_devices_l(self):
1786        with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1787            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1788            self.assertEqual(len(lines), 3)
1789            line = lines[1]
1790            self.assertTrue(self.serial in line)
1791            self.assertFalse("{" in line)
1792            self.assertFalse("}" in line)
1793            self.assertTrue("device" in line)
1794            self.assertTrue("product" in line)
1795            self.assertTrue("transport" in line)
1796
1797    def test_track_devices(self):
1798        with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1799            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1800                output_size = int(reader.read(4), 16)
1801                output = reader.read(output_size)
1802                self.assertFalse("{" in output)
1803                self.assertFalse("}" in output)
1804                self.assertTrue(self.serial in output)
1805                self.assertTrue("device" in output)
1806                self.assertFalse("product" in output)
1807                self.assertFalse("transport" in output)
1808            proc.terminate()
1809
1810    def test_track_devices_l(self):
1811        with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1812            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1813                output_size = int(reader.read(4), 16)
1814                output = reader.read(output_size)
1815                self.assertFalse("{" in output)
1816                self.assertFalse("}" in output)
1817                self.assertTrue(self.serial in output)
1818                self.assertTrue("device" in output)
1819                self.assertTrue("product" in output)
1820                self.assertTrue("transport" in output)
1821            proc.terminate()
1822
1823    def test_track_devices_proto_text(self):
1824        with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1825            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1826                output_size = int(reader.read(4), 16)
1827                output = reader.read(output_size)
1828                self.assertTrue("{" in output)
1829                self.assertTrue("}" in output)
1830                self.assertTrue(self.serial in output)
1831                self.assertTrue("device" in output)
1832                self.assertTrue("product" in output)
1833                self.assertTrue("connection_type" in output)
1834            proc.terminate()
1835
1836    def test_track_devices_proto_binary(self):
1837        with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1838
1839            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1840            proto = proc.stdout.read(output_size)
1841
1842            devices = adb_host_proto.Devices()
1843            devices.ParseFromString(proto)
1844
1845            device = devices.device[0]
1846            self.assertTrue(device.serial == self.serial)
1847            self.assertFalse(device.bus_address == "")
1848            self.assertFalse(device.product == "")
1849            self.assertFalse(device.model == "")
1850            self.assertFalse(device.device == "")
1851            self.assertTrue(device.negotiated_speed == int(device.negotiated_speed))
1852            self.assertTrue(int(device.negotiated_speed) != 0)
1853            self.assertTrue(device.max_speed == int(device.max_speed))
1854            self.assertTrue(int(device.max_speed) != 0)
1855            self.assertTrue(device.transport_id == int(device.transport_id))
1856
1857            proc.terminate()
1858
1859def invoke(*args):
1860    print(args)
1861    try:
1862        output = subprocess.check_output(args, stderr=subprocess.STDOUT).strip().decode("utf-8")
1863        print(output)
1864        return output
1865    except subprocess.CalledProcessError as e:
1866        return "ErrorCode " + str(e.returncode) + ":" + e.output.decode("utf-8")
1867
1868
1869class DevicesListing(DeviceTest):
1870
1871    def test_track_app_appinfo(self):
1872        subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app1.apk']).strip().decode("utf-8")
1873        subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app2.apk']).strip().decode("utf-8")
1874        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8")
1875        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8")
1876        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8")
1877        with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1878            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1879            proto = proc.stdout.read(output_size)
1880
1881            apps = proto_track_app.AppProcesses()
1882            apps.ParseFromString(proto)
1883
1884            foundAdbAppDefProc = False
1885            foundAdbAppOwnProc = False
1886            for app in apps.process:
1887                if (app.process_name == "adb.test.process.name"):
1888                    foundAdbAppDefProc = True
1889                    self.assertTrue(app.debuggable)
1890                    self.assertTrue("adb.test.app1" in app.package_names)
1891                    self.assertTrue("adb.test.app2" in app.package_names)
1892
1893                if (app.process_name == "adb.test.own.process"):
1894                    foundAdbAppOwnProc = True
1895                    self.assertTrue(app.debuggable)
1896                    self.assertTrue("adb.test.app1" in app.package_names)
1897
1898            self.assertTrue(foundAdbAppDefProc)
1899            self.assertTrue(foundAdbAppOwnProc)
1900            proc.terminate()
1901
1902class ServerStatus(unittest.TestCase):
1903    def test_server_status(self):
1904        with subprocess.Popen(['adb', 'server-status'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1905            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1906            self.assertTrue("usb_backend" in lines[0])
1907            self.assertTrue("mdns_backend" in lines[1])
1908            self.assertTrue("version" in lines[2])
1909            self.assertTrue("build" in lines[3])
1910            self.assertTrue("executable_absolute_path" in lines[4])
1911            self.assertTrue("log_absolute_path" in lines[5])
1912            self.assertTrue("os" in lines[6])
1913            self.assertTrue("trace_level" in lines[7])
1914            self.assertTrue("burst_mode" in lines[8])
1915
1916
1917class DetachSingleServer(unittest.TestCase):
1918    serial = invoke("adb", "get-serialno")
1919
1920    def wait_for_device(self):
1921        count = 0
1922        while True:
1923            devices = invoke("adb", "devices")
1924            if (self.serial in devices and "attached" in devices):
1925                return
1926            count = count + 1
1927            if count > 10:
1928                return
1929
1930    def test_detach_then_attach(self):
1931        # Check device is there with comm working
1932        who = invoke("adb", "shell", "whoami")
1933        self.assertTrue(who == "shell" or who == "root")
1934        devices = invoke("adb", "devices")
1935        self.assertFalse("detached" in devices, devices)
1936        self.assertTrue(self.serial in devices, devices)
1937
1938        invoke("adb", "detach")
1939
1940        # Verify detach did not remove the device from list
1941        devices = invoke("adb", "devices")
1942        self.assertTrue(self.serial in devices, devices)
1943        self.assertTrue("detached" in devices, devices)
1944
1945        # Verify detach makes device unreachable
1946        who = invoke("adb", "shell", "whoami")
1947        self.assertFalse(who == "shell" or who == "root", who)
1948
1949        # Re-attach
1950        invoke("adb", "attach")
1951        time.sleep(2)
1952        self.wait_for_device()
1953
1954        # Check devices is there
1955        devices = invoke("adb", "devices")
1956        self.assertTrue(self.serial in devices, devices)
1957        self.assertFalse("detached" in devices, devices)
1958
1959        # Check device comm was started
1960        who = invoke("adb", "shell", "whoami")
1961        self.assertTrue(who == "shell" or who == "root", who)
1962
1963    def tearDown(self):
1964        invoke("adb", "kill-server")
1965
1966class DetachMultiServer(unittest.TestCase):
1967    server1_port = "5038"
1968    server2_port = "5039"
1969    env_var_detached = "ADB_LIBUSB_START_DETACHED"
1970    serial = invoke("adb", "get-serialno")
1971
1972    def wait_for_device(self, server_id):
1973        count = 0
1974        while True:
1975            devices = invoke("adb", "-P", server_id, "devices")
1976            if (self.serial in devices and "attached" in devices):
1977                return
1978            count = count + 1
1979            if count > 10:
1980                return
1981
1982    def test_device_exchange(self):
1983       # Enable once we support invoke with env variable
1984       return
1985       # Start two "detached" servers with ADB_LIBUSB_START_DETACHED
1986       # Attach device to server 1, test it.
1987       # Detach device from server 1.
1988       # Attach device to server 2. test it.
1989
1990       # Make sure everything is clean
1991       invoke("adb", "-P", self.server1_port, "start-server")
1992       invoke("adb", "-P", self.server2_port, "start-server")
1993
1994       # Make sure server1 sees device as detached
1995       devices1= invoke("adb", "-P", self.server1_port, "devices")
1996       self.assertTrue("detached" in devices1)
1997       self.assertTrue(self.serial in devices1)
1998
1999       # Make sure server2 sees device as detached
2000       devices2= invoke("adb", "-P", self.server2_port, "devices")
2001       self.assertTrue("detached" in devices2)
2002       self.assertTrue(self.serial in devices2)
2003
2004       # Attach device to server 1. Verify.
2005       invoke("adb", "-P", self.server1_port, "attach")
2006       time.sleep(4)
2007       self.wait_for_device(self.server1_port)
2008
2009       devices1= invoke("adb", "-P", self.server1_port, "devices")
2010       self.assertFalse("detached" in devices1)
2011       self.assertTrue(self.serial in devices1)
2012
2013       # Make sure server 1 can comm with device
2014       who = invoke("adb", "-P", self.server1_port, "shell", "whoami")
2015       self.assertTrue(who == "shell" or who == "root")
2016
2017       # Now detach and make sure device cannot comm
2018       invoke("adb", "-P", self.server1_port, "detach")
2019       who = invoke("adb", "-P", self.server1_port, "shell", "whoami")
2020       self.assertFalse(who == "shell" or who == "root")
2021       devices1= invoke("adb", "-P", self.server1_port, "devices")
2022       self.assertTrue("detached" in devices1)
2023       self.assertTrue(self.serial in devices1)
2024
2025       # Give device to server2
2026       invoke("adb", "-P", self.server2_port, "attach")
2027       time.sleep(2)
2028       self.wait_for_device(self.server2_port)
2029       devices2= invoke("adb", "-P", self.server2_port, "devices")
2030       self.assertFalse("detached" in devices2)
2031       self.assertTrue(self.serial in devices2)
2032
2033       # Test that sever2 can comm with device
2034       who = invoke("adb", "-P", self.server2_port, "shell", "whoami")
2035       self.assertTrue(who == "shell" or who == "root")
2036
2037       # Detach device from server2. Verify.
2038       invoke("adb", "-P", self.server2_port, "detach")
2039       devices2= invoke("adb", "-P", self.server2_port, "devices")
2040       self.assertTrue("detached" in devices2)
2041       self.assertTrue(self.serial in devices2)
2042
2043       # Verify server2 cannot comm with device
2044       who = invoke("adb", "-P", self.server2_port, "shell", "whoami")
2045       self.assertFalse(who == "shell" or who == "root")
2046
2047    def setUp(self):
2048       os.environ[self.env_var_detached] = "1"
2049       invoke("adb", "kill-server")
2050
2051    def tearDown(self):
2052       invoke("adb", "-P", self.server1_port, "kill-server")
2053       invoke("adb", "-P", self.server2_port, "kill-server")
2054       del os.environ[self.env_var_detached]
2055
2056class OneDevice(unittest.TestCase):
2057
2058    serial = invoke("adb", "get-serialno")
2059    owner_server_port = "14424"
2060
2061    def test_one_device(self):
2062        invoke("adb", "kill-server")
2063        invoke("adb", "--one-device", self.serial, "-P", self.owner_server_port, "start-server")
2064        devices = invoke("adb", "devices")
2065        owned_devices = invoke("adb",  "-P", "14424", "devices")
2066        self.assertTrue(self.serial in owned_devices)
2067        self.assertFalse(self.serial in devices)
2068
2069    def tearDown(self):
2070        invoke("adb",  "-P", self.owner_server_port, "kill-server")
2071        invoke("adb",  "kill-server")
2072
2073class Debugger(unittest.TestCase):
2074
2075    PKG_NAME = "adb.test.app1"
2076    PROCESS_NAME = "adb.test.process.name"
2077    APP_PORT = "8000"
2078    HANDSHAKE = "JDWP-Handshake"
2079
2080    def test_denied_debugger_on_frozen_app(self):
2081        # TODO: Enable once we have a test runner that allows to debug tests.
2082        # -> JAVA
2083
2084        # Install app
2085        apk = self.PKG_NAME.replace(".", "_") + ".apk"
2086        invoke('adb', 'install', '-r', '-t', apk)
2087
2088        # Start app
2089        target = self.PKG_NAME + '/.MainActivity'
2090        invoke('adb', 'shell', 'am', 'start', '-W', target)
2091
2092        # Assert that debugger is allowed
2093        pid = invoke("adb", "shell", "pidof", self.PROCESS_NAME)
2094        self.assertTrue(pid.isdigit(), pid)
2095        invoke("adb", "forward", "tcp:" + self.APP_PORT, "jdwp:" + pid)
2096        # Connect to debugger
2097        sock = socket.socket()
2098        sock.connect(("localhost", int(self.APP_PORT)))
2099        sock.send(self.HANDSHAKE.encode('utf-8'))
2100        resp = sock.recv(len(self.HANDSHAKE))
2101        self.assertTrue(resp.decode("utf-8") == self.HANDSHAKE)
2102        sock.close()
2103
2104        # Freeze app (adb shell am freeze <APK_NAME>)
2105        invoke("adb", "shell", "am", "freeze", self.PROCESS_NAME)
2106
2107        # Asset that debugger is denied
2108        connection_refused = False
2109        try:
2110            sock = socket.socket()
2111            sock.connect(("localhost", int(self.APP_PORT)))
2112        except socket.error as e:
2113            connection_refused = True
2114        self.assertTrue(connection_refused, connection_refused)
2115
2116        # Unfreeze app (adb shell am unfreeze <APK_NAME>)
2117        invoke("adb", "shell", "am", "unfreeze", self.PROCESS_NAME)
2118
2119        # Assert that debugger is allowed
2120        sock = socket.socket()
2121        sock.connect(("localhost", int(self.APP_PORT)))
2122        sock.send(self.HANDSHAKE.encode("utf-8"))
2123        resp = sock.recv(len(self.HANDSHAKE)).decode("utf-8")
2124        self.assertTrue(resp == self.HANDSHAKE, resp)
2125        sock.close()
2126
2127    def tearDown(self):
2128        invoke("adb", "forward", "--remove-all")
2129
2130if __name__ == '__main__':
2131    random.seed(0)
2132    unittest.main()
2133