• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    device = adb.get_device()
81
82
83class AbbTest(DeviceTest):
84    def test_smoke(self):
85        abb = subprocess.run(['adb', 'abb'], capture_output=True)
86        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
87
88        # abb squashes all failures to 1.
89        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
90        self.assertEqual(abb.stdout, cmd.stdout)
91        self.assertEqual(abb.stderr, cmd.stderr)
92
93class ForwardReverseTest(DeviceTest):
94    def _test_no_rebind(self, description, direction_list, direction,
95                       direction_no_rebind, direction_remove_all):
96        msg = direction_list()
97        self.assertEqual('', msg.strip(),
98                         description + ' list must be empty to run this test.')
99
100        # Use --no-rebind with no existing binding
101        direction_no_rebind('tcp:5566', 'tcp:6655')
102        msg = direction_list()
103        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
104
105        # Use --no-rebind with existing binding
106        with self.assertRaises(subprocess.CalledProcessError):
107            direction_no_rebind('tcp:5566', 'tcp:6677')
108        msg = direction_list()
109        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
110        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
111
112        # Use the absence of --no-rebind with existing binding
113        direction('tcp:5566', 'tcp:6677')
114        msg = direction_list()
115        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
116        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
117
118        direction_remove_all()
119        msg = direction_list()
120        self.assertEqual('', msg.strip())
121
122    def test_forward_no_rebind(self):
123        self._test_no_rebind('forward', self.device.forward_list,
124                            self.device.forward, self.device.forward_no_rebind,
125                            self.device.forward_remove_all)
126
127    def test_reverse_no_rebind(self):
128        self._test_no_rebind('reverse', self.device.reverse_list,
129                            self.device.reverse, self.device.reverse_no_rebind,
130                            self.device.reverse_remove_all)
131
132    def test_forward(self):
133        msg = self.device.forward_list()
134        self.assertEqual('', msg.strip(),
135                         'Forwarding list must be empty to run this test.')
136        self.device.forward('tcp:5566', 'tcp:6655')
137        msg = self.device.forward_list()
138        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
139        self.device.forward('tcp:7788', 'tcp:8877')
140        msg = self.device.forward_list()
141        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
142        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
143        self.device.forward_remove('tcp:5566')
144        msg = self.device.forward_list()
145        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
146        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
147        self.device.forward_remove_all()
148        msg = self.device.forward_list()
149        self.assertEqual('', msg.strip())
150
151    def test_forward_old_protocol(self):
152        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
153
154        msg = self.device.forward_list()
155        self.assertEqual('', msg.strip(),
156                         'Forwarding list must be empty to run this test.')
157
158        s = socket.create_connection(("localhost", 5037))
159        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
160        cmd = b"%04x%s" % (len(service), service)
161        s.sendall(cmd)
162
163        msg = self.device.forward_list()
164        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
165
166        self.device.forward_remove_all()
167        msg = self.device.forward_list()
168        self.assertEqual('', msg.strip())
169
170    def test_forward_tcp_port_0(self):
171        self.assertEqual('', self.device.forward_list().strip(),
172                         'Forwarding list must be empty to run this test.')
173
174        try:
175            # If resolving TCP port 0 is supported, `adb forward` will print
176            # the actual port number.
177            port = self.device.forward('tcp:0', 'tcp:8888').strip()
178            if not port:
179                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
180
181            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
182                                      self.device.forward_list()))
183        finally:
184            self.device.forward_remove_all()
185
186    def test_reverse(self):
187        msg = self.device.reverse_list()
188        self.assertEqual('', msg.strip(),
189                         'Reverse forwarding list must be empty to run this test.')
190        self.device.reverse('tcp:5566', 'tcp:6655')
191        msg = self.device.reverse_list()
192        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
193        self.device.reverse('tcp:7788', 'tcp:8877')
194        msg = self.device.reverse_list()
195        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
196        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
197        self.device.reverse_remove('tcp:5566')
198        msg = self.device.reverse_list()
199        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
200        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
201        self.device.reverse_remove_all()
202        msg = self.device.reverse_list()
203        self.assertEqual('', msg.strip())
204
205    def test_reverse_tcp_port_0(self):
206        self.assertEqual('', self.device.reverse_list().strip(),
207                         'Reverse list must be empty to run this test.')
208
209        try:
210            # If resolving TCP port 0 is supported, `adb reverse` will print
211            # the actual port number.
212            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
213            if not port:
214                raise unittest.SkipTest('Reversing tcp:0 is not available.')
215
216            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
217                                      self.device.reverse_list()))
218        finally:
219            self.device.reverse_remove_all()
220
221    def test_forward_reverse_echo(self):
222        """Send data through adb forward and read it back via adb reverse"""
223        forward_port = 12345
224        reverse_port = forward_port + 1
225        forward_spec = 'tcp:' + str(forward_port)
226        reverse_spec = 'tcp:' + str(reverse_port)
227        forward_setup = False
228        reverse_setup = False
229
230        try:
231            # listen on localhost:forward_port, connect to remote:forward_port
232            self.device.forward(forward_spec, forward_spec)
233            forward_setup = True
234            # listen on remote:forward_port, connect to localhost:reverse_port
235            self.device.reverse(forward_spec, reverse_spec)
236            reverse_setup = True
237
238            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239            with contextlib.closing(listener):
240                # Use SO_REUSEADDR so that subsequent runs of the test can grab
241                # the port even if it is in TIME_WAIT.
242                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243
244                # Listen on localhost:reverse_port before connecting to
245                # localhost:forward_port because that will cause adb to connect
246                # back to localhost:reverse_port.
247                listener.bind(('127.0.0.1', reverse_port))
248                listener.listen(4)
249
250                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251                with contextlib.closing(client):
252                    # Connect to the listener.
253                    client.connect(('127.0.0.1', forward_port))
254
255                    # Accept the client connection.
256                    accepted_connection, addr = listener.accept()
257                    with contextlib.closing(accepted_connection) as server:
258                        data = b'hello'
259
260                        # Send data into the port setup by adb forward.
261                        client.sendall(data)
262                        # Explicitly close() so that server gets EOF.
263                        client.close()
264
265                        # Verify that the data came back via adb reverse.
266                        self.assertEqual(data, server.makefile().read().encode("utf8"))
267        finally:
268            if reverse_setup:
269                self.device.reverse_remove(forward_spec)
270            if forward_setup:
271                self.device.forward_remove(forward_spec)
272
273
274class ShellTest(DeviceTest):
275    def _interactive_shell(self, shell_args, input):
276        """Runs an interactive adb shell.
277
278        Args:
279          shell_args: List of string arguments to `adb shell`.
280          input: bytes input to send to the interactive shell.
281
282        Returns:
283          The remote exit code.
284
285        Raises:
286          unittest.SkipTest: The device doesn't support exit codes.
287        """
288        if not self.device.has_shell_protocol():
289            raise unittest.SkipTest('exit codes are unavailable on this device')
290
291        proc = subprocess.Popen(
292                self.device.adb_cmd + ['shell'] + shell_args,
293                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
294                stderr=subprocess.PIPE)
295        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
296        # to explicitly add an exit command to close the session from the device
297        # side, plus the necessary newline to complete the interactive command.
298        proc.communicate(input + b'; exit\n')
299        return proc.returncode
300
301    def test_cat(self):
302        """Check that we can at least cat a file."""
303        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
304        elements = out.split()
305        self.assertEqual(len(elements), 2)
306
307        uptime, idle = elements
308        self.assertGreater(float(uptime), 0.0)
309        self.assertGreater(float(idle), 0.0)
310
311    def test_throws_on_failure(self):
312        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
313
314    def test_output_not_stripped(self):
315        out = self.device.shell(['echo', 'foo'])[0]
316        self.assertEqual(out, 'foo' + self.device.linesep)
317
318    def test_shell_command_length(self):
319        # Devices that have shell_v2 should be able to handle long commands.
320        if self.device.has_shell_protocol():
321            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
322            self.assertEqual(rc, 0)
323            self.assertTrue(out == ('x' * 16384 + '\n'))
324
325    def test_shell_nocheck_failure(self):
326        rc, out, _ = self.device.shell_nocheck(['false'])
327        self.assertNotEqual(rc, 0)
328        self.assertEqual(out, '')
329
330    def test_shell_nocheck_output_not_stripped(self):
331        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
332        self.assertEqual(rc, 0)
333        self.assertEqual(out, 'foo' + self.device.linesep)
334
335    def test_can_distinguish_tricky_results(self):
336        # If result checking on ADB shell is naively implemented as
337        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
338        # output from the result for a cmd of `echo -n 1`.
339        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
340        self.assertEqual(rc, 0)
341        self.assertEqual(out, '1')
342
343    def test_line_endings(self):
344        """Ensure that line ending translation is not happening in the pty.
345
346        Bug: http://b/19735063
347        """
348        output = self.device.shell(['uname'])[0]
349        self.assertEqual(output, 'Linux' + self.device.linesep)
350
351    def test_pty_logic(self):
352        """Tests that a PTY is allocated when it should be.
353
354        PTY allocation behavior should match ssh.
355        """
356        def check_pty(args):
357            """Checks adb shell PTY allocation.
358
359            Tests |args| for terminal and non-terminal stdin.
360
361            Args:
362                args: -Tt args in a list (e.g. ['-t', '-t']).
363
364            Returns:
365                A tuple (<terminal>, <non-terminal>). True indicates
366                the corresponding shell allocated a remote PTY.
367            """
368            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
369
370            terminal = subprocess.Popen(
371                    test_cmd, stdin=None,
372                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
373            terminal.communicate()
374
375            non_terminal = subprocess.Popen(
376                    test_cmd, stdin=subprocess.PIPE,
377                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
378            non_terminal.communicate()
379
380            return (terminal.returncode == 0, non_terminal.returncode == 0)
381
382        # -T: never allocate PTY.
383        self.assertEqual((False, False), check_pty(['-T']))
384
385        # These tests require a new device.
386        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
387            # No args: PTY only if stdin is a terminal and shell is interactive,
388            # which is difficult to reliably test from a script.
389            self.assertEqual((False, False), check_pty([]))
390
391            # -t: PTY if stdin is a terminal.
392            self.assertEqual((True, False), check_pty(['-t']))
393
394        # -t -t: always allocate PTY.
395        self.assertEqual((True, True), check_pty(['-t', '-t']))
396
397        # -tt: always allocate PTY, POSIX style (http://b/32216152).
398        self.assertEqual((True, True), check_pty(['-tt']))
399
400        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
401        # we follow the man page instead.
402        self.assertEqual((True, True), check_pty(['-ttt']))
403
404        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
405        self.assertEqual((True, True), check_pty(['-ttx']))
406
407        # -Ttt: -tt cancels out -T.
408        self.assertEqual((True, True), check_pty(['-Ttt']))
409
410        # -ttT: -T cancels out -tt.
411        self.assertEqual((False, False), check_pty(['-ttT']))
412
413    def test_shell_protocol(self):
414        """Tests the shell protocol on the device.
415
416        If the device supports shell protocol, this gives us the ability
417        to separate stdout/stderr and return the exit code directly.
418
419        Bug: http://b/19734861
420        """
421        if not self.device.has_shell_protocol():
422            raise unittest.SkipTest('shell protocol unsupported on this device')
423
424        # Shell protocol should be used by default.
425        result = self.device.shell_nocheck(
426                shlex.split('echo foo; echo bar >&2; exit 17'))
427        self.assertEqual(17, result[0])
428        self.assertEqual('foo' + self.device.linesep, result[1])
429        self.assertEqual('bar' + self.device.linesep, result[2])
430
431        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
432
433        # -x flag should disable shell protocol.
434        result = self.device.shell_nocheck(
435                shlex.split('-x echo foo; echo bar >&2; exit 17'))
436        self.assertEqual(0, result[0])
437        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
438        self.assertEqual('', result[2])
439
440        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
441
442    def test_non_interactive_sigint(self):
443        """Tests that SIGINT in a non-interactive shell kills the process.
444
445        This requires the shell protocol in order to detect the broken
446        pipe; raw data transfer mode will only see the break once the
447        subprocess tries to read or write.
448
449        Bug: http://b/23825725
450        """
451        if not self.device.has_shell_protocol():
452            raise unittest.SkipTest('shell protocol unsupported on this device')
453
454        # Start a long-running process.
455        sleep_proc = subprocess.Popen(
456                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
457                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
458                stderr=subprocess.STDOUT)
459        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
460        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
461        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
462
463        # Verify that the process is running, send signal, verify it stopped.
464        self.device.shell(proc_query)
465        os.kill(sleep_proc.pid, signal.SIGINT)
466        sleep_proc.communicate()
467
468        # It can take some time for the process to receive the signal and die.
469        end_time = time.time() + 3
470        while self.device.shell_nocheck(proc_query)[0] != 1:
471            self.assertFalse(time.time() > end_time,
472                             'subprocess failed to terminate in time')
473
474    def test_non_interactive_stdin(self):
475        """Tests that non-interactive shells send stdin."""
476        if not self.device.has_shell_protocol():
477            raise unittest.SkipTest('non-interactive stdin unsupported '
478                                    'on this device')
479
480        # Test both small and large inputs.
481        small_input = b'foo'
482        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
483        large_input = b'\n'.join(characters)
484
485
486        for input in (small_input, large_input):
487            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
488                                    stdin=subprocess.PIPE,
489                                    stdout=subprocess.PIPE,
490                                    stderr=subprocess.PIPE)
491            stdout, stderr = proc.communicate(input)
492            self.assertEqual(input.splitlines(), stdout.splitlines())
493            self.assertEqual(b'', stderr)
494
495    def test_sighup(self):
496        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
497        log_path = "/data/local/tmp/adb_signal_test.log"
498
499        # Clear the output file.
500        self.device.shell_nocheck(["echo", ">", log_path])
501
502        script = """
503            trap "echo SIGINT > {path}; exit 0" SIGINT
504            trap "echo SIGHUP > {path}; exit 0" SIGHUP
505            echo Waiting
506            read
507        """.format(path=log_path)
508
509        script = ";".join([x.strip() for x in script.strip().splitlines()])
510
511        process = self.device.shell_popen([script], kill_atexit=False,
512                                          stdin=subprocess.PIPE,
513                                          stdout=subprocess.PIPE)
514
515        self.assertEqual(b"Waiting\n", process.stdout.readline())
516        process.send_signal(signal.SIGINT)
517        process.wait()
518
519        # Waiting for the local adb to finish is insufficient, since it hangs
520        # up immediately.
521        time.sleep(1)
522
523        stdout, _ = self.device.shell(["cat", log_path])
524        self.assertEqual(stdout.strip(), "SIGHUP")
525
526    def test_exit_stress(self):
527        """Hammer `adb shell exit 42` with multiple threads."""
528        thread_count = 48
529        result = dict()
530        def hammer(thread_idx, thread_count, result):
531            success = True
532            for i in range(thread_idx, 240, thread_count):
533                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
534                if ret != i % 256:
535                    success = False
536                    break
537            result[thread_idx] = success
538
539        threads = []
540        for i in range(thread_count):
541            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
542            thread.start()
543            threads.append(thread)
544        for thread in threads:
545            thread.join()
546        for i, success in result.items():
547            self.assertTrue(success)
548
549    def disabled_test_parallel(self):
550        """Spawn a bunch of `adb shell` instances in parallel.
551
552        This was broken historically due to the use of select, which only works
553        for fds that are numerically less than 1024.
554
555        Bug: http://b/141955761"""
556
557        n_procs = 2048
558        procs = dict()
559        for i in range(0, n_procs):
560            procs[i] = subprocess.Popen(
561                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
562                stdin=subprocess.PIPE,
563                stdout=subprocess.PIPE
564            )
565
566        for i in range(0, n_procs):
567            procs[i].stdin.write("%d\n" % i)
568
569        for i in range(0, n_procs):
570            response = procs[i].stdout.readline()
571            assert(response == "%d\n" % i)
572
573        for i in range(0, n_procs):
574            procs[i].stdin.write("%d\n" % (i % 256))
575
576        for i in range(0, n_procs):
577            assert(procs[i].wait() == i % 256)
578
579
580class ArgumentEscapingTest(DeviceTest):
581    def test_shell_escaping(self):
582        """Make sure that argument escaping is somewhat sane."""
583
584        # http://b/19734868
585        # Note that this actually matches ssh(1)'s behavior --- it's
586        # converted to `sh -c echo hello; echo world` which sh interprets
587        # as `sh -c echo` (with an argument to that shell of "hello"),
588        # and then `echo world` back in the first shell.
589        result = self.device.shell(
590            shlex.split("sh -c 'echo hello; echo world'"))[0]
591        result = result.splitlines()
592        self.assertEqual(['', 'world'], result)
593        # If you really wanted "hello" and "world", here's what you'd do:
594        result = self.device.shell(
595            shlex.split(r'echo hello\;echo world'))[0].splitlines()
596        self.assertEqual(['hello', 'world'], result)
597
598        # http://b/15479704
599        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
600        self.assertEqual('t', result)
601        result = self.device.shell(
602            shlex.split("sh -c 'true && echo t'"))[0].strip()
603        self.assertEqual('t', result)
604
605        # http://b/20564385
606        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
607        self.assertEqual('t', result)
608        result = self.device.shell(
609            shlex.split(r'echo -n 123\;uname'))[0].strip()
610        self.assertEqual('123Linux', result)
611
612    def test_install_argument_escaping(self):
613        """Make sure that install argument escaping works."""
614        # http://b/20323053, http://b/3090932.
615        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
616            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
617                                             delete=False)
618            tf.close()
619
620            # Installing bogus .apks fails if the device supports exit codes.
621            try:
622                output = self.device.install(tf.name.decode("utf8"))
623            except subprocess.CalledProcessError as e:
624                output = e.output
625
626            self.assertIn(file_suffix, output)
627            os.remove(tf.name)
628
629
630@unittest.skip("b/172372960: temporarily disabled due to flakiness")
631class RootUnrootTest(DeviceTest):
632    def _test_root(self):
633        message = self.device.root()
634        if 'adbd cannot run as root in production builds' in message:
635            return
636        self.device.wait()
637        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
638
639    def _test_unroot(self):
640        self.device.unroot()
641        self.device.wait()
642        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
643
644    def test_root_unroot(self):
645        """Make sure that adb root and adb unroot work, using id(1)."""
646        if self.device.get_prop('ro.debuggable') != '1':
647            raise unittest.SkipTest('requires rootable build')
648
649        original_user = self.device.shell(['id', '-un'])[0].strip()
650        try:
651            if original_user == 'root':
652                self._test_unroot()
653                self._test_root()
654            elif original_user == 'shell':
655                self._test_root()
656                self._test_unroot()
657        finally:
658            if original_user == 'root':
659                self.device.root()
660            else:
661                self.device.unroot()
662            self.device.wait()
663
664
665class TcpIpTest(DeviceTest):
666    def test_tcpip_failure_raises(self):
667        """adb tcpip requires a port.
668
669        Bug: http://b/22636927
670        """
671        self.assertRaises(
672            subprocess.CalledProcessError, self.device.tcpip, '')
673        self.assertRaises(
674            subprocess.CalledProcessError, self.device.tcpip, 'foo')
675
676
677class SystemPropertiesTest(DeviceTest):
678    def test_get_prop(self):
679        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
680
681    @requires_root
682    def test_set_prop(self):
683        prop_name = 'foo.bar'
684        self.device.shell(['setprop', prop_name, '""'])
685
686        self.device.set_prop(prop_name, 'qux')
687        self.assertEqual(
688            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
689
690
691def compute_md5(string):
692    hsh = hashlib.md5()
693    hsh.update(string)
694    return hsh.hexdigest()
695
696
697def get_md5_prog(device):
698    """Older platforms (pre-L) had the name md5 rather than md5sum."""
699    try:
700        device.shell(['md5sum', '/proc/uptime'])
701        return 'md5sum'
702    except adb.ShellError:
703        return 'md5'
704
705
706class HostFile(object):
707    def __init__(self, handle, checksum):
708        self.handle = handle
709        self.checksum = checksum
710        self.full_path = handle.name
711        self.base_name = os.path.basename(self.full_path)
712
713
714class DeviceFile(object):
715    def __init__(self, checksum, full_path):
716        self.checksum = checksum
717        self.full_path = full_path
718        self.base_name = posixpath.basename(self.full_path)
719
720
721def make_random_host_files(in_dir, num_files):
722    min_size = 1 * (1 << 10)
723    max_size = 16 * (1 << 10)
724
725    files = []
726    for _ in range(num_files):
727        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
728
729        size = random.randrange(min_size, max_size, 1024)
730        rand_str = os.urandom(size)
731        file_handle.write(rand_str)
732        file_handle.flush()
733        file_handle.close()
734
735        md5 = compute_md5(rand_str)
736        files.append(HostFile(file_handle, md5))
737    return files
738
739
740def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
741    min_size = 1 * (1 << 10)
742    max_size = 16 * (1 << 10)
743
744    files = []
745    for file_num in range(num_files):
746        size = random.randrange(min_size, max_size, 1024)
747
748        base_name = prefix + str(file_num)
749        full_path = posixpath.join(in_dir, base_name)
750
751        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
752                      'bs={}'.format(size), 'count=1'])
753        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
754
755        files.append(DeviceFile(dev_md5, full_path))
756    return files
757
758
759class FileOperationsTest:
760    class Base(DeviceTest):
761        SCRATCH_DIR = '/data/local/tmp'
762        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
763        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
764
765        def setUp(self):
766            self.previous_env = os.environ.get("ADB_COMPRESSION")
767            os.environ["ADB_COMPRESSION"] = self.compression
768
769        def tearDown(self):
770            if self.previous_env is None:
771                del os.environ["ADB_COMPRESSION"]
772            else:
773                os.environ["ADB_COMPRESSION"] = self.previous_env
774
775        def _verify_remote(self, checksum, remote_path):
776            dev_md5, _ = self.device.shell([get_md5_prog(self.device),
777                                            remote_path])[0].split()
778            self.assertEqual(checksum, dev_md5)
779
780        def _verify_local(self, checksum, local_path):
781            with open(local_path, 'rb') as host_file:
782                host_md5 = compute_md5(host_file.read())
783                self.assertEqual(host_md5, checksum)
784
785        def test_push(self):
786            """Push a randomly generated file to specified device."""
787            kbytes = 512
788            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
789            rand_str = os.urandom(1024 * kbytes)
790            tmp.write(rand_str)
791            tmp.close()
792
793            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
794            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
795
796            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
797            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
798
799            os.remove(tmp.name)
800
801        def test_push_dir(self):
802            """Push a randomly generated directory of files to the device."""
803            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
804            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
805
806            try:
807                host_dir = tempfile.mkdtemp()
808
809                # Make sure the temp directory isn't setuid, or else adb will complain.
810                os.chmod(host_dir, 0o700)
811
812                # Create 32 random files.
813                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
814                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
815
816                for temp_file in temp_files:
817                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
818                                                 os.path.basename(host_dir),
819                                                 temp_file.base_name)
820                    self._verify_remote(temp_file.checksum, remote_path)
821                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
822            finally:
823                if host_dir is not None:
824                    shutil.rmtree(host_dir)
825
826        def disabled_test_push_empty(self):
827            """Push an empty directory to the device."""
828            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
829            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
830
831            try:
832                host_dir = tempfile.mkdtemp()
833
834                # Make sure the temp directory isn't setuid, or else adb will complain.
835                os.chmod(host_dir, 0o700)
836
837                # Create an empty directory.
838                empty_dir_path = os.path.join(host_dir, 'empty')
839                os.mkdir(empty_dir_path);
840
841                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
842
843                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
844                test_empty_cmd = ["[", "-d", remote_path, "]"]
845                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
846
847                self.assertEqual(rc, 0)
848                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
849            finally:
850                if host_dir is not None:
851                    shutil.rmtree(host_dir)
852
853        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
854        def test_push_symlink(self):
855            """Push a symlink.
856
857            Bug: http://b/31491920
858            """
859            try:
860                host_dir = tempfile.mkdtemp()
861
862                # Make sure the temp directory isn't setuid, or else adb will
863                # complain.
864                os.chmod(host_dir, 0o700)
865
866                with open(os.path.join(host_dir, 'foo'), 'w') as f:
867                    f.write('foo')
868
869                symlink_path = os.path.join(host_dir, 'symlink')
870                os.symlink('foo', symlink_path)
871
872                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
873                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
874                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
875                rc, out, _ = self.device.shell_nocheck(
876                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
877                self.assertEqual(0, rc)
878                self.assertEqual(out.strip(), 'foo')
879            finally:
880                if host_dir is not None:
881                    shutil.rmtree(host_dir)
882
883        def test_multiple_push(self):
884            """Push multiple files to the device in one adb push command.
885
886            Bug: http://b/25324823
887            """
888
889            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
890            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
891
892            try:
893                host_dir = tempfile.mkdtemp()
894
895                # Create some random files and a subdirectory containing more files.
896                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
897
898                subdir = os.path.join(host_dir, 'subdir')
899                os.mkdir(subdir)
900                subdir_temp_files = make_random_host_files(in_dir=subdir,
901                                                           num_files=4)
902
903                paths = [x.full_path for x in temp_files]
904                paths.append(subdir)
905                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
906
907                for temp_file in temp_files:
908                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
909                                                 temp_file.base_name)
910                    self._verify_remote(temp_file.checksum, remote_path)
911
912                for subdir_temp_file in subdir_temp_files:
913                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
914                                                 # BROKEN: http://b/25394682
915                                                 # 'subdir';
916                                                 temp_file.base_name)
917                    self._verify_remote(temp_file.checksum, remote_path)
918
919
920                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
921            finally:
922                if host_dir is not None:
923                    shutil.rmtree(host_dir)
924
925        @requires_non_root
926        def test_push_error_reporting(self):
927            """Make sure that errors that occur while pushing a file get reported
928
929            Bug: http://b/26816782
930            """
931            with tempfile.NamedTemporaryFile() as tmp_file:
932                tmp_file.write(b'\0' * 1024 * 1024)
933                tmp_file.flush()
934                try:
935                    self.device.push(local=tmp_file.name, remote='/system/')
936                    self.fail('push should not have succeeded')
937                except subprocess.CalledProcessError as e:
938                    output = e.output
939
940                self.assertTrue(b'Permission denied' in output or
941                                b'Read-only file system' in output)
942
943        @requires_non_root
944        def test_push_directory_creation(self):
945            """Regression test for directory creation.
946
947            Bug: http://b/110953234
948            """
949            with tempfile.NamedTemporaryFile() as tmp_file:
950                tmp_file.write(b'\0' * 1024 * 1024)
951                tmp_file.flush()
952                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
953                self.device.shell(['rm', '-rf', remote_path])
954
955                remote_path += '/filename'
956                self.device.push(local=tmp_file.name, remote=remote_path)
957
958        def disabled_test_push_multiple_slash_root(self):
959            """Regression test for pushing to //data/local/tmp.
960
961            Bug: http://b/141311284
962
963            Disabled because this broken on the adbd side as well: b/141943968
964            """
965            with tempfile.NamedTemporaryFile() as tmp_file:
966                tmp_file.write('\0' * 1024 * 1024)
967                tmp_file.flush()
968                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
969                self.device.shell(['rm', '-rf', remote_path])
970                self.device.push(local=tmp_file.name, remote=remote_path)
971
972        def _test_pull(self, remote_file, checksum):
973            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
974            tmp_write.close()
975            self.device.pull(remote=remote_file, local=tmp_write.name)
976            with open(tmp_write.name, 'rb') as tmp_read:
977                host_contents = tmp_read.read()
978                host_md5 = compute_md5(host_contents)
979            self.assertEqual(checksum, host_md5)
980            os.remove(tmp_write.name)
981
982        @requires_non_root
983        def test_pull_error_reporting(self):
984            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
985            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
986
987            try:
988                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
989            except subprocess.CalledProcessError as e:
990                output = e.output
991
992            self.assertIn(b'Permission denied', output)
993
994            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
995
996        def test_pull(self):
997            """Pull a randomly generated file from specified device."""
998            kbytes = 512
999            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
1000            cmd = ['dd', 'if=/dev/urandom',
1001                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
1002                   'count={}'.format(kbytes)]
1003            self.device.shell(cmd)
1004            dev_md5, _ = self.device.shell(
1005                [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
1006            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
1007            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
1008
1009        def test_pull_dir(self):
1010            """Pull a randomly generated directory of files from the device."""
1011            try:
1012                host_dir = tempfile.mkdtemp()
1013
1014                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1015                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1016
1017                # Populate device directory with random files.
1018                temp_files = make_random_device_files(
1019                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1020
1021                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1022
1023                for temp_file in temp_files:
1024                    host_path = os.path.join(
1025                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1026                        temp_file.base_name)
1027                    self._verify_local(temp_file.checksum, host_path)
1028
1029                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1030            finally:
1031                if host_dir is not None:
1032                    shutil.rmtree(host_dir)
1033
1034        def test_pull_dir_symlink(self):
1035            """Pull a directory into a symlink to a directory.
1036
1037            Bug: http://b/27362811
1038            """
1039            if os.name != 'posix':
1040                raise unittest.SkipTest('requires POSIX')
1041
1042            try:
1043                host_dir = tempfile.mkdtemp()
1044                real_dir = os.path.join(host_dir, 'dir')
1045                symlink = os.path.join(host_dir, 'symlink')
1046                os.mkdir(real_dir)
1047                os.symlink(real_dir, symlink)
1048
1049                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1050                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1051
1052                # Populate device directory with random files.
1053                temp_files = make_random_device_files(
1054                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1055
1056                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1057
1058                for temp_file in temp_files:
1059                    host_path = os.path.join(
1060                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1061                        temp_file.base_name)
1062                    self._verify_local(temp_file.checksum, host_path)
1063
1064                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1065            finally:
1066                if host_dir is not None:
1067                    shutil.rmtree(host_dir)
1068
1069        def test_pull_dir_symlink_collision(self):
1070            """Pull a directory into a colliding symlink to directory."""
1071            if os.name != 'posix':
1072                raise unittest.SkipTest('requires POSIX')
1073
1074            try:
1075                host_dir = tempfile.mkdtemp()
1076                real_dir = os.path.join(host_dir, 'real')
1077                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1078                symlink = os.path.join(host_dir, tmp_dirname)
1079                os.mkdir(real_dir)
1080                os.symlink(real_dir, symlink)
1081
1082                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1084
1085                # Populate device directory with random files.
1086                temp_files = make_random_device_files(
1087                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1088
1089                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1090
1091                for temp_file in temp_files:
1092                    host_path = os.path.join(real_dir, temp_file.base_name)
1093                    self._verify_local(temp_file.checksum, host_path)
1094
1095                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1096            finally:
1097                if host_dir is not None:
1098                    shutil.rmtree(host_dir)
1099
1100        def test_pull_dir_nonexistent(self):
1101            """Pull a directory of files from the device to a nonexistent path."""
1102            try:
1103                host_dir = tempfile.mkdtemp()
1104                dest_dir = os.path.join(host_dir, 'dest')
1105
1106                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1107                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1108
1109                # Populate device directory with random files.
1110                temp_files = make_random_device_files(
1111                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1112
1113                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1114
1115                for temp_file in temp_files:
1116                    host_path = os.path.join(dest_dir, temp_file.base_name)
1117                    self._verify_local(temp_file.checksum, host_path)
1118
1119                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1120            finally:
1121                if host_dir is not None:
1122                    shutil.rmtree(host_dir)
1123
1124        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1125        def disabled_test_pull_symlink_dir(self):
1126            """Pull a symlink to a directory of symlinks to files."""
1127            try:
1128                host_dir = tempfile.mkdtemp()
1129
1130                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1131                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1132                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1133
1134                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1135                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1136                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1137
1138                # Populate device directory with random files.
1139                temp_files = make_random_device_files(
1140                    self.device, in_dir=remote_dir, num_files=32)
1141
1142                for temp_file in temp_files:
1143                    self.device.shell(
1144                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1145                         posixpath.join(remote_links, temp_file.base_name)])
1146
1147                self.device.pull(remote=remote_symlink, local=host_dir)
1148
1149                for temp_file in temp_files:
1150                    host_path = os.path.join(
1151                        host_dir, 'symlink', temp_file.base_name)
1152                    self._verify_local(temp_file.checksum, host_path)
1153
1154                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1155            finally:
1156                if host_dir is not None:
1157                    shutil.rmtree(host_dir)
1158
1159        def test_pull_empty(self):
1160            """Pull a directory containing an empty directory from the device."""
1161            try:
1162                host_dir = tempfile.mkdtemp()
1163
1164                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1165                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1166                self.device.shell(['mkdir', '-p', remote_empty_path])
1167
1168                self.device.pull(remote=remote_empty_path, local=host_dir)
1169                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1170            finally:
1171                if host_dir is not None:
1172                    shutil.rmtree(host_dir)
1173
1174        def test_multiple_pull(self):
1175            """Pull a randomly generated directory of files from the device."""
1176
1177            try:
1178                host_dir = tempfile.mkdtemp()
1179
1180                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1181                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1182                self.device.shell(['mkdir', '-p', subdir])
1183
1184                # Create some random files and a subdirectory containing more files.
1185                temp_files = make_random_device_files(
1186                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1187
1188                subdir_temp_files = make_random_device_files(
1189                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1190
1191                paths = [x.full_path for x in temp_files]
1192                paths.append(subdir)
1193                self.device._simple_call(['pull'] + paths + [host_dir])
1194
1195                for temp_file in temp_files:
1196                    local_path = os.path.join(host_dir, temp_file.base_name)
1197                    self._verify_local(temp_file.checksum, local_path)
1198
1199                for subdir_temp_file in subdir_temp_files:
1200                    local_path = os.path.join(host_dir,
1201                                              'subdir',
1202                                              subdir_temp_file.base_name)
1203                    self._verify_local(subdir_temp_file.checksum, local_path)
1204
1205                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1206            finally:
1207                if host_dir is not None:
1208                    shutil.rmtree(host_dir)
1209
1210        def verify_sync(self, device, temp_files, device_dir):
1211            """Verifies that a list of temp files was synced to the device."""
1212            # Confirm that every file on the device mirrors that on the host.
1213            for temp_file in temp_files:
1214                device_full_path = posixpath.join(
1215                    device_dir, temp_file.base_name)
1216                dev_md5, _ = device.shell(
1217                    [get_md5_prog(self.device), device_full_path])[0].split()
1218                self.assertEqual(temp_file.checksum, dev_md5)
1219
1220        def test_sync(self):
1221            """Sync a host directory to the data partition."""
1222
1223            try:
1224                base_dir = tempfile.mkdtemp()
1225
1226                # Create mirror device directory hierarchy within base_dir.
1227                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1228                os.makedirs(full_dir_path)
1229
1230                # Create 32 random files within the host mirror.
1231                temp_files = make_random_host_files(
1232                    in_dir=full_dir_path, num_files=32)
1233
1234                # Clean up any stale files on the device.
1235                device = adb.get_device()  # pylint: disable=no-member
1236                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1237
1238                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1239                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1240                device.sync('data')
1241                if old_product_out is None:
1242                    del os.environ['ANDROID_PRODUCT_OUT']
1243                else:
1244                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1245
1246                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1247
1248                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1249            finally:
1250                if base_dir is not None:
1251                    shutil.rmtree(base_dir)
1252
1253        def test_push_sync(self):
1254            """Sync a host directory to a specific path."""
1255
1256            try:
1257                temp_dir = tempfile.mkdtemp()
1258                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1259
1260                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1261
1262                # Clean up any stale files on the device.
1263                device = adb.get_device()  # pylint: disable=no-member
1264                device.shell(['rm', '-rf', device_dir])
1265
1266                device.push(temp_dir, device_dir, sync=True)
1267
1268                self.verify_sync(device, temp_files, device_dir)
1269
1270                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1271            finally:
1272                if temp_dir is not None:
1273                    shutil.rmtree(temp_dir)
1274
1275        def test_push_sync_multiple(self):
1276            """Sync multiple host directories to a specific path."""
1277
1278            try:
1279                temp_dir = tempfile.mkdtemp()
1280                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1281
1282                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1283
1284                # Clean up any stale files on the device.
1285                device = adb.get_device()  # pylint: disable=no-member
1286                device.shell(['rm', '-rf', device_dir])
1287                device.shell(['mkdir', '-p', device_dir])
1288
1289                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1290                device.push(host_paths, device_dir, sync=True)
1291
1292                self.verify_sync(device, temp_files, device_dir)
1293
1294                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1295            finally:
1296                if temp_dir is not None:
1297                    shutil.rmtree(temp_dir)
1298
1299
1300        def test_push_dry_run_nonexistent_file(self):
1301            """Push with dry run."""
1302
1303            for file_size in [8, 1024 * 1024]:
1304                try:
1305                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1306                    device_file = posixpath.join(device_dir, 'file')
1307
1308                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1309                    self.device.shell(['mkdir', '-p', device_dir])
1310
1311                    host_dir = tempfile.mkdtemp()
1312                    host_file = posixpath.join(host_dir, 'file')
1313
1314                    with open(host_file, "w") as f:
1315                        f.write('x' * file_size)
1316
1317                    self.device._simple_call(['push', '-n', host_file, device_file])
1318                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1319                    self.assertNotEqual(0, rc)
1320
1321                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1322                finally:
1323                    if host_dir is not None:
1324                        shutil.rmtree(host_dir)
1325
1326        def test_push_dry_run_existent_file(self):
1327            """Push with dry run."""
1328
1329            for file_size in [8, 1024 * 1024]:
1330                try:
1331                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1332                    device_file = posixpath.join(device_dir, 'file')
1333
1334                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1335                    self.device.shell(['mkdir', '-p', device_dir])
1336                    self.device.shell(['echo', 'foo', '>', device_file])
1337
1338                    host_dir = tempfile.mkdtemp()
1339                    host_file = posixpath.join(host_dir, 'file')
1340
1341                    with open(host_file, "w") as f:
1342                        f.write('x' * file_size)
1343
1344                    self.device._simple_call(['push', '-n', host_file, device_file])
1345                    stdout, stderr = self.device.shell(['cat', device_file])
1346                    self.assertEqual(stdout.strip(), "foo")
1347
1348                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1349                finally:
1350                    if host_dir is not None:
1351                        shutil.rmtree(host_dir)
1352
1353        def test_unicode_paths(self):
1354            """Ensure that we can support non-ASCII paths, even on Windows."""
1355            name = u'로보카 폴리'
1356
1357            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1358            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1359
1360            ## push.
1361            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1362            tf.close()
1363            self.device.push(tf.name, remote_path)
1364            os.remove(tf.name)
1365            self.assertFalse(os.path.exists(tf.name))
1366
1367            # Verify that the device ended up with the expected UTF-8 path
1368            output = self.device.shell(
1369                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1370            self.assertEqual(remote_path, output)
1371
1372            # pull.
1373            self.device.pull(remote_path, tf.name)
1374            self.assertTrue(os.path.exists(tf.name))
1375            os.remove(tf.name)
1376            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1377
1378
1379class FileOperationsTestUncompressed(FileOperationsTest.Base):
1380    compression = "none"
1381
1382
1383class FileOperationsTestBrotli(FileOperationsTest.Base):
1384    compression = "brotli"
1385
1386
1387class FileOperationsTestLZ4(FileOperationsTest.Base):
1388    compression = "lz4"
1389
1390
1391class FileOperationsTestZstd(FileOperationsTest.Base):
1392    compression = "zstd"
1393
1394
1395class DeviceOfflineTest(DeviceTest):
1396    def _get_device_state(self, serialno):
1397        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1398        for line in output.split('\n'):
1399            m = re.match('(\S+)\s+(\S+)', line)
1400            if m and m.group(1) == serialno:
1401                return m.group(2)
1402        return None
1403
1404    def disabled_test_killed_when_pushing_a_large_file(self):
1405        """
1406           While running adb push with a large file, kill adb server.
1407           Occasionally the device becomes offline. Because the device is still
1408           reading data without realizing that the adb server has been restarted.
1409           Test if we can bring the device online automatically now.
1410           http://b/32952319
1411        """
1412        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1413        # 1. Push a large file
1414        file_path = 'tmp_large_file'
1415        try:
1416            fh = open(file_path, 'w')
1417            fh.write('\0' * (100 * 1024 * 1024))
1418            fh.close()
1419            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1420            time.sleep(0.1)
1421            # 2. Kill the adb server
1422            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1423            subproc.terminate()
1424        finally:
1425            try:
1426                os.unlink(file_path)
1427            except:
1428                pass
1429        # 3. See if the device still exist.
1430        # Sleep to wait for the adb server exit.
1431        time.sleep(0.5)
1432        # 4. The device should be online
1433        self.assertEqual(self._get_device_state(serialno), 'device')
1434
1435    def disabled_test_killed_when_pulling_a_large_file(self):
1436        """
1437           While running adb pull with a large file, kill adb server.
1438           Occasionally the device can't be connected. Because the device is trying to
1439           send a message larger than what is expected by the adb server.
1440           Test if we can bring the device online automatically now.
1441        """
1442        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1443        file_path = 'tmp_large_file'
1444        try:
1445            # 1. Create a large file on device.
1446            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1447                               'bs=1000000', 'count=100'])
1448            # 2. Pull the large file on host.
1449            subproc = subprocess.Popen(self.device.adb_cmd +
1450                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1451            time.sleep(0.1)
1452            # 3. Kill the adb server
1453            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1454            subproc.terminate()
1455        finally:
1456            try:
1457                os.unlink(file_path)
1458            except:
1459                pass
1460        # 4. See if the device still exist.
1461        # Sleep to wait for the adb server exit.
1462        time.sleep(0.5)
1463        self.assertEqual(self._get_device_state(serialno), 'device')
1464
1465
1466    def test_packet_size_regression(self):
1467        """Test for http://b/37783561
1468
1469        Receiving packets of a length divisible by 512 but not 1024 resulted in
1470        the adb client waiting indefinitely for more input.
1471        """
1472        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1473        # Probe some surrounding values as well, for the hell of it.
1474        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1475            for offset in [-6, -5, -4]:
1476                length = base + offset
1477                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1478                       'echo', 'foo']
1479                rc, stdout, _ = self.device.shell_nocheck(cmd)
1480
1481                self.assertEqual(0, rc)
1482
1483                # Output should be '\0' * length, followed by "foo\n"
1484                self.assertEqual(length, len(stdout) - 4)
1485                self.assertEqual(stdout, "\0" * length + "foo\n")
1486
1487    def test_zero_packet(self):
1488        """Test for http://b/113070258
1489
1490        Make sure that we don't blow up when sending USB transfers that line up
1491        exactly with the USB packet size.
1492        """
1493
1494        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1495        try:
1496            for size in [512, 1024]:
1497                def listener():
1498                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1499                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1500
1501                thread = threading.Thread(target=listener)
1502                thread.start()
1503
1504                # Wait a bit to let the shell command start.
1505                time.sleep(0.25)
1506
1507                sock = socket.create_connection(("localhost", local_port))
1508                with contextlib.closing(sock):
1509                    bytesWritten = sock.send(b"a" * size)
1510                    self.assertEqual(size, bytesWritten)
1511                    readBytes = sock.recv(4096)
1512                    self.assertEqual(b"foo\n", readBytes)
1513
1514                thread.join()
1515        finally:
1516            self.device.forward_remove("tcp:{}".format(local_port))
1517
1518
1519class SocketTest(DeviceTest):
1520    def test_socket_flush(self):
1521        """Test that we handle socket closure properly.
1522
1523        If we're done writing to a socket, closing before the other end has
1524        closed will send a TCP_RST if we have incoming data queued up, which
1525        may result in data that we've written being discarded.
1526
1527        Bug: http://b/74616284
1528        """
1529        s = socket.create_connection(("localhost", 5037))
1530
1531        def adb_length_prefixed(string):
1532            encoded = string.encode("utf8")
1533            result = b"%04x%s" % (len(encoded), encoded)
1534            return result
1535
1536        if "ANDROID_SERIAL" in os.environ:
1537            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1538        else:
1539            transport_string = "host:transport-any"
1540
1541        s.sendall(adb_length_prefixed(transport_string))
1542        response = s.recv(4)
1543        self.assertEqual(b"OKAY", response)
1544
1545        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1546        s.sendall(adb_length_prefixed(shell_string))
1547
1548        response = s.recv(4)
1549        self.assertEqual(b"OKAY", response)
1550
1551        # Spawn a thread that dumps garbage into the socket until failure.
1552        def spam():
1553            buf = b"\0" * 16384
1554            try:
1555                while True:
1556                    s.sendall(buf)
1557            except Exception as ex:
1558                print(ex)
1559
1560        thread = threading.Thread(target=spam)
1561        thread.start()
1562
1563        time.sleep(1)
1564
1565        received = b""
1566        while True:
1567            read = s.recv(512)
1568            if len(read) == 0:
1569                break
1570            received += read
1571
1572        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1573        thread.join()
1574
1575
1576if sys.platform == "win32":
1577    # From https://stackoverflow.com/a/38749458
1578    import os
1579    import contextlib
1580    import msvcrt
1581    import ctypes
1582    from ctypes import wintypes
1583
1584    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1585
1586    GENERIC_READ  = 0x80000000
1587    GENERIC_WRITE = 0x40000000
1588    FILE_SHARE_READ  = 1
1589    FILE_SHARE_WRITE = 2
1590    CONSOLE_TEXTMODE_BUFFER = 1
1591    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1592    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1593    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1594
1595    def _check_zero(result, func, args):
1596        if not result:
1597            raise ctypes.WinError(ctypes.get_last_error())
1598        return args
1599
1600    def _check_invalid(result, func, args):
1601        if result == INVALID_HANDLE_VALUE:
1602            raise ctypes.WinError(ctypes.get_last_error())
1603        return args
1604
1605    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1606        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1607        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1608
1609    class COORD(ctypes.Structure):
1610        _fields_ = (('X', wintypes.SHORT),
1611                    ('Y', wintypes.SHORT))
1612
1613    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1614        _fields_ = (('cbSize',               wintypes.ULONG),
1615                    ('dwSize',               COORD),
1616                    ('dwCursorPosition',     COORD),
1617                    ('wAttributes',          wintypes.WORD),
1618                    ('srWindow',             wintypes.SMALL_RECT),
1619                    ('dwMaximumWindowSize',  COORD),
1620                    ('wPopupAttributes',     wintypes.WORD),
1621                    ('bFullscreenSupported', wintypes.BOOL),
1622                    ('ColorTable',           wintypes.DWORD * 16))
1623        def __init__(self, *args, **kwds):
1624            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1625                    *args, **kwds)
1626            self.cbSize = ctypes.sizeof(self)
1627
1628    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1629                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1630    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1631
1632    kernel32.GetStdHandle.errcheck = _check_invalid
1633    kernel32.GetStdHandle.restype = wintypes.HANDLE
1634    kernel32.GetStdHandle.argtypes = (
1635        wintypes.DWORD,) # _In_ nStdHandle
1636
1637    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1638    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1639    kernel32.CreateConsoleScreenBuffer.argtypes = (
1640        wintypes.DWORD,        # _In_       dwDesiredAccess
1641        wintypes.DWORD,        # _In_       dwShareMode
1642        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1643        wintypes.DWORD,        # _In_       dwFlags
1644        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1645
1646    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1647    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1648        wintypes.HANDLE,               # _In_  hConsoleOutput
1649        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1650
1651    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1652    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1653        wintypes.HANDLE,               # _In_  hConsoleOutput
1654        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1655
1656    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1657    kernel32.SetConsoleWindowInfo.argtypes = (
1658        wintypes.HANDLE,      # _In_ hConsoleOutput
1659        wintypes.BOOL,        # _In_ bAbsolute
1660        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1661
1662    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1663    kernel32.FillConsoleOutputCharacterW.argtypes = (
1664        wintypes.HANDLE,  # _In_  hConsoleOutput
1665        wintypes.WCHAR,   # _In_  cCharacter
1666        wintypes.DWORD,   # _In_  nLength
1667        COORD,            # _In_  dwWriteCoord
1668        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1669
1670    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1671    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1672        wintypes.HANDLE,  # _In_  hConsoleOutput
1673        wintypes.LPWSTR,  # _Out_ lpCharacter
1674        wintypes.DWORD,   # _In_  nLength
1675        COORD,            # _In_  dwReadCoord
1676        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1677
1678    @contextlib.contextmanager
1679    def allocate_console():
1680        allocated = kernel32.AllocConsole()
1681        try:
1682            yield allocated
1683        finally:
1684            if allocated:
1685                kernel32.FreeConsole()
1686
1687    @contextlib.contextmanager
1688    def console_screen(ncols=None, nrows=None):
1689        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1690        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1691        nwritten = (wintypes.DWORD * 1)()
1692        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1693        kernel32.GetConsoleScreenBufferInfoEx(
1694               hStdOut, ctypes.byref(info))
1695        if ncols is None:
1696            ncols = info.dwSize.X
1697        if nrows is None:
1698            nrows = info.dwSize.Y
1699        elif nrows > 9999:
1700            raise ValueError('nrows must be 9999 or less')
1701        fd_screen = None
1702        hScreen = kernel32.CreateConsoleScreenBuffer(
1703                    GENERIC_READ | GENERIC_WRITE,
1704                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1705                    None, CONSOLE_TEXTMODE_BUFFER, None)
1706        try:
1707            fd_screen = msvcrt.open_osfhandle(
1708                            hScreen, os.O_RDWR | os.O_BINARY)
1709            kernel32.GetConsoleScreenBufferInfoEx(
1710                   hScreen, ctypes.byref(new_info))
1711            new_info.dwSize = COORD(ncols, nrows)
1712            new_info.srWindow = wintypes.SMALL_RECT(
1713                    Left=0, Top=0, Right=(ncols - 1),
1714                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1715            kernel32.SetConsoleScreenBufferInfoEx(
1716                    hScreen, ctypes.byref(new_info))
1717            kernel32.SetConsoleWindowInfo(hScreen, True,
1718                    ctypes.byref(new_info.srWindow))
1719            kernel32.FillConsoleOutputCharacterW(
1720                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1721            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1722            try:
1723                yield fd_screen
1724            finally:
1725                kernel32.SetConsoleScreenBufferInfoEx(
1726                    hStdOut, ctypes.byref(info))
1727                kernel32.SetConsoleWindowInfo(hStdOut, True,
1728                        ctypes.byref(info.srWindow))
1729                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1730        finally:
1731            if fd_screen is not None:
1732                os.close(fd_screen)
1733            else:
1734                kernel32.CloseHandle(hScreen)
1735
1736    def read_screen(fd):
1737        hScreen = msvcrt.get_osfhandle(fd)
1738        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1739        kernel32.GetConsoleScreenBufferInfoEx(
1740            hScreen, ctypes.byref(csbi))
1741        ncols = csbi.dwSize.X
1742        pos = csbi.dwCursorPosition
1743        length = ncols * pos.Y + pos.X + 1
1744        buf = (ctypes.c_wchar * length)()
1745        n = (wintypes.DWORD * 1)()
1746        kernel32.ReadConsoleOutputCharacterW(
1747            hScreen, buf, length, COORD(0,0), n)
1748        lines = [buf[i:i+ncols].rstrip(u'\0')
1749                    for i in range(0, n[0], ncols)]
1750        return u'\n'.join(lines)
1751
1752@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1753class WindowsConsoleTest(DeviceTest):
1754    def test_unicode_output(self):
1755        """Test Unicode command line parameters and Unicode console window output.
1756
1757        Bug: https://issuetracker.google.com/issues/111972753
1758        """
1759        # If we don't have a console window, allocate one. This isn't necessary if we're already
1760        # being run from a console window, which is typical.
1761        with allocate_console() as allocated_console:
1762            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1763            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1764            # likely unnecessary given the typical console window size.
1765            with console_screen(nrows=1000) as screen:
1766                unicode_string = u'로보카 폴리'
1767                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1768                # device.shell_popen() which does not use a pipe, unlike device.shell().
1769                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1770                process.wait()
1771                # Read what was written by adb to the temporary console buffer.
1772                console_output = read_screen(screen)
1773                self.assertEqual(unicode_string, console_output)
1774
1775
1776def main():
1777    random.seed(0)
1778    if len(adb.get_devices()) > 0:
1779        suite = unittest.TestLoader().loadTestsFromName(__name__)
1780        unittest.TextTestRunner(verbosity=3).run(suite)
1781    else:
1782        print('Test suite must be run with attached devices')
1783
1784
1785if __name__ == '__main__':
1786    main()
1787