• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    device = adb.get_device()
81
82
83class AbbTest(DeviceTest):
84    def test_smoke(self):
85        abb = subprocess.run(['adb', 'abb'], capture_output=True)
86        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
87
88        # abb squashes all failures to 1.
89        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
90        self.assertEqual(abb.stdout, cmd.stdout)
91        self.assertEqual(abb.stderr, cmd.stderr)
92
93class ForwardReverseTest(DeviceTest):
94    def _test_no_rebind(self, description, direction_list, direction,
95                       direction_no_rebind, direction_remove_all):
96        msg = direction_list()
97        self.assertEqual('', msg.strip(),
98                         description + ' list must be empty to run this test.')
99
100        # Use --no-rebind with no existing binding
101        direction_no_rebind('tcp:5566', 'tcp:6655')
102        msg = direction_list()
103        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
104
105        # Use --no-rebind with existing binding
106        with self.assertRaises(subprocess.CalledProcessError):
107            direction_no_rebind('tcp:5566', 'tcp:6677')
108        msg = direction_list()
109        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
110        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
111
112        # Use the absence of --no-rebind with existing binding
113        direction('tcp:5566', 'tcp:6677')
114        msg = direction_list()
115        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
116        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
117
118        direction_remove_all()
119        msg = direction_list()
120        self.assertEqual('', msg.strip())
121
122    def test_forward_no_rebind(self):
123        self._test_no_rebind('forward', self.device.forward_list,
124                            self.device.forward, self.device.forward_no_rebind,
125                            self.device.forward_remove_all)
126
127    def test_reverse_no_rebind(self):
128        self._test_no_rebind('reverse', self.device.reverse_list,
129                            self.device.reverse, self.device.reverse_no_rebind,
130                            self.device.reverse_remove_all)
131
132    def test_forward(self):
133        msg = self.device.forward_list()
134        self.assertEqual('', msg.strip(),
135                         'Forwarding list must be empty to run this test.')
136        self.device.forward('tcp:5566', 'tcp:6655')
137        msg = self.device.forward_list()
138        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
139        self.device.forward('tcp:7788', 'tcp:8877')
140        msg = self.device.forward_list()
141        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
142        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
143        self.device.forward_remove('tcp:5566')
144        msg = self.device.forward_list()
145        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
146        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
147        self.device.forward_remove_all()
148        msg = self.device.forward_list()
149        self.assertEqual('', msg.strip())
150
151    def test_forward_old_protocol(self):
152        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
153
154        msg = self.device.forward_list()
155        self.assertEqual('', msg.strip(),
156                         'Forwarding list must be empty to run this test.')
157
158        s = socket.create_connection(("localhost", 5037))
159        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
160        cmd = b"%04x%s" % (len(service), service)
161        s.sendall(cmd)
162
163        msg = self.device.forward_list()
164        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
165
166        self.device.forward_remove_all()
167        msg = self.device.forward_list()
168        self.assertEqual('', msg.strip())
169
170    def test_forward_tcp_port_0(self):
171        self.assertEqual('', self.device.forward_list().strip(),
172                         'Forwarding list must be empty to run this test.')
173
174        try:
175            # If resolving TCP port 0 is supported, `adb forward` will print
176            # the actual port number.
177            port = self.device.forward('tcp:0', 'tcp:8888').strip()
178            if not port:
179                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
180
181            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
182                                      self.device.forward_list()))
183        finally:
184            self.device.forward_remove_all()
185
186    def test_reverse(self):
187        msg = self.device.reverse_list()
188        self.assertEqual('', msg.strip(),
189                         'Reverse forwarding list must be empty to run this test.')
190        self.device.reverse('tcp:5566', 'tcp:6655')
191        msg = self.device.reverse_list()
192        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
193        self.device.reverse('tcp:7788', 'tcp:8877')
194        msg = self.device.reverse_list()
195        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
196        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
197        self.device.reverse_remove('tcp:5566')
198        msg = self.device.reverse_list()
199        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
200        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
201        self.device.reverse_remove_all()
202        msg = self.device.reverse_list()
203        self.assertEqual('', msg.strip())
204
205    def test_reverse_tcp_port_0(self):
206        self.assertEqual('', self.device.reverse_list().strip(),
207                         'Reverse list must be empty to run this test.')
208
209        try:
210            # If resolving TCP port 0 is supported, `adb reverse` will print
211            # the actual port number.
212            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
213            if not port:
214                raise unittest.SkipTest('Reversing tcp:0 is not available.')
215
216            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
217                                      self.device.reverse_list()))
218        finally:
219            self.device.reverse_remove_all()
220
221    def test_forward_reverse_echo(self):
222        """Send data through adb forward and read it back via adb reverse"""
223        forward_port = 12345
224        reverse_port = forward_port + 1
225        forward_spec = 'tcp:' + str(forward_port)
226        reverse_spec = 'tcp:' + str(reverse_port)
227        forward_setup = False
228        reverse_setup = False
229
230        try:
231            # listen on localhost:forward_port, connect to remote:forward_port
232            self.device.forward(forward_spec, forward_spec)
233            forward_setup = True
234            # listen on remote:forward_port, connect to localhost:reverse_port
235            self.device.reverse(forward_spec, reverse_spec)
236            reverse_setup = True
237
238            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239            with contextlib.closing(listener):
240                # Use SO_REUSEADDR so that subsequent runs of the test can grab
241                # the port even if it is in TIME_WAIT.
242                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243
244                # Listen on localhost:reverse_port before connecting to
245                # localhost:forward_port because that will cause adb to connect
246                # back to localhost:reverse_port.
247                listener.bind(('127.0.0.1', reverse_port))
248                listener.listen(4)
249
250                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251                with contextlib.closing(client):
252                    # Connect to the listener.
253                    client.connect(('127.0.0.1', forward_port))
254
255                    # Accept the client connection.
256                    accepted_connection, addr = listener.accept()
257                    with contextlib.closing(accepted_connection) as server:
258                        data = b'hello'
259
260                        # Send data into the port setup by adb forward.
261                        client.sendall(data)
262                        # Explicitly close() so that server gets EOF.
263                        client.close()
264
265                        # Verify that the data came back via adb reverse.
266                        self.assertEqual(data, server.makefile().read().encode("utf8"))
267        finally:
268            if reverse_setup:
269                self.device.reverse_remove(forward_spec)
270            if forward_setup:
271                self.device.forward_remove(forward_spec)
272
273
274class ShellTest(DeviceTest):
275    def _interactive_shell(self, shell_args, input):
276        """Runs an interactive adb shell.
277
278        Args:
279          shell_args: List of string arguments to `adb shell`.
280          input: bytes input to send to the interactive shell.
281
282        Returns:
283          The remote exit code.
284
285        Raises:
286          unittest.SkipTest: The device doesn't support exit codes.
287        """
288        if not self.device.has_shell_protocol():
289            raise unittest.SkipTest('exit codes are unavailable on this device')
290
291        proc = subprocess.Popen(
292                self.device.adb_cmd + ['shell'] + shell_args,
293                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
294                stderr=subprocess.PIPE)
295        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
296        # to explicitly add an exit command to close the session from the device
297        # side, plus the necessary newline to complete the interactive command.
298        proc.communicate(input + b'; exit\n')
299        return proc.returncode
300
301    def test_cat(self):
302        """Check that we can at least cat a file."""
303        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
304        elements = out.split()
305        self.assertEqual(len(elements), 2)
306
307        uptime, idle = elements
308        self.assertGreater(float(uptime), 0.0)
309        self.assertGreater(float(idle), 0.0)
310
311    def test_throws_on_failure(self):
312        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
313
314    def test_output_not_stripped(self):
315        out = self.device.shell(['echo', 'foo'])[0]
316        self.assertEqual(out, 'foo' + self.device.linesep)
317
318    def test_shell_command_length(self):
319        # Devices that have shell_v2 should be able to handle long commands.
320        if self.device.has_shell_protocol():
321            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
322            self.assertEqual(rc, 0)
323            self.assertTrue(out == ('x' * 16384 + '\n'))
324
325    def test_shell_nocheck_failure(self):
326        rc, out, _ = self.device.shell_nocheck(['false'])
327        self.assertNotEqual(rc, 0)
328        self.assertEqual(out, '')
329
330    def test_shell_nocheck_output_not_stripped(self):
331        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
332        self.assertEqual(rc, 0)
333        self.assertEqual(out, 'foo' + self.device.linesep)
334
335    def test_can_distinguish_tricky_results(self):
336        # If result checking on ADB shell is naively implemented as
337        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
338        # output from the result for a cmd of `echo -n 1`.
339        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
340        self.assertEqual(rc, 0)
341        self.assertEqual(out, '1')
342
343    def test_line_endings(self):
344        """Ensure that line ending translation is not happening in the pty.
345
346        Bug: http://b/19735063
347        """
348        output = self.device.shell(['uname'])[0]
349        self.assertEqual(output, 'Linux' + self.device.linesep)
350
351    def test_pty_logic(self):
352        """Tests that a PTY is allocated when it should be.
353
354        PTY allocation behavior should match ssh.
355        """
356        def check_pty(args):
357            """Checks adb shell PTY allocation.
358
359            Tests |args| for terminal and non-terminal stdin.
360
361            Args:
362                args: -Tt args in a list (e.g. ['-t', '-t']).
363
364            Returns:
365                A tuple (<terminal>, <non-terminal>). True indicates
366                the corresponding shell allocated a remote PTY.
367            """
368            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
369
370            terminal = subprocess.Popen(
371                    test_cmd, stdin=None,
372                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
373            terminal.communicate()
374
375            non_terminal = subprocess.Popen(
376                    test_cmd, stdin=subprocess.PIPE,
377                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
378            non_terminal.communicate()
379
380            return (terminal.returncode == 0, non_terminal.returncode == 0)
381
382        # -T: never allocate PTY.
383        self.assertEqual((False, False), check_pty(['-T']))
384
385        # These tests require a new device.
386        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
387            # No args: PTY only if stdin is a terminal and shell is interactive,
388            # which is difficult to reliably test from a script.
389            self.assertEqual((False, False), check_pty([]))
390
391            # -t: PTY if stdin is a terminal.
392            self.assertEqual((True, False), check_pty(['-t']))
393
394        # -t -t: always allocate PTY.
395        self.assertEqual((True, True), check_pty(['-t', '-t']))
396
397        # -tt: always allocate PTY, POSIX style (http://b/32216152).
398        self.assertEqual((True, True), check_pty(['-tt']))
399
400        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
401        # we follow the man page instead.
402        self.assertEqual((True, True), check_pty(['-ttt']))
403
404        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
405        self.assertEqual((True, True), check_pty(['-ttx']))
406
407        # -Ttt: -tt cancels out -T.
408        self.assertEqual((True, True), check_pty(['-Ttt']))
409
410        # -ttT: -T cancels out -tt.
411        self.assertEqual((False, False), check_pty(['-ttT']))
412
413    def test_shell_protocol(self):
414        """Tests the shell protocol on the device.
415
416        If the device supports shell protocol, this gives us the ability
417        to separate stdout/stderr and return the exit code directly.
418
419        Bug: http://b/19734861
420        """
421        if not self.device.has_shell_protocol():
422            raise unittest.SkipTest('shell protocol unsupported on this device')
423
424        # Shell protocol should be used by default.
425        result = self.device.shell_nocheck(
426                shlex.split('echo foo; echo bar >&2; exit 17'))
427        self.assertEqual(17, result[0])
428        self.assertEqual('foo' + self.device.linesep, result[1])
429        self.assertEqual('bar' + self.device.linesep, result[2])
430
431        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
432
433        # -x flag should disable shell protocol.
434        result = self.device.shell_nocheck(
435                shlex.split('-x echo foo; echo bar >&2; exit 17'))
436        self.assertEqual(0, result[0])
437        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
438        self.assertEqual('', result[2])
439
440        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
441
442    def test_non_interactive_sigint(self):
443        """Tests that SIGINT in a non-interactive shell kills the process.
444
445        This requires the shell protocol in order to detect the broken
446        pipe; raw data transfer mode will only see the break once the
447        subprocess tries to read or write.
448
449        Bug: http://b/23825725
450        """
451        if not self.device.has_shell_protocol():
452            raise unittest.SkipTest('shell protocol unsupported on this device')
453
454        # Start a long-running process.
455        sleep_proc = subprocess.Popen(
456                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
457                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
458                stderr=subprocess.STDOUT)
459        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
460        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
461        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
462
463        # Verify that the process is running, send signal, verify it stopped.
464        self.device.shell(proc_query)
465        os.kill(sleep_proc.pid, signal.SIGINT)
466        sleep_proc.communicate()
467
468        # It can take some time for the process to receive the signal and die.
469        end_time = time.time() + 3
470        while self.device.shell_nocheck(proc_query)[0] != 1:
471            self.assertFalse(time.time() > end_time,
472                             'subprocess failed to terminate in time')
473
474    def test_non_interactive_stdin(self):
475        """Tests that non-interactive shells send stdin."""
476        if not self.device.has_shell_protocol():
477            raise unittest.SkipTest('non-interactive stdin unsupported '
478                                    'on this device')
479
480        # Test both small and large inputs.
481        small_input = b'foo'
482        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
483        large_input = b'\n'.join(characters)
484
485
486        for input in (small_input, large_input):
487            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
488                                    stdin=subprocess.PIPE,
489                                    stdout=subprocess.PIPE,
490                                    stderr=subprocess.PIPE)
491            stdout, stderr = proc.communicate(input)
492            self.assertEqual(input.splitlines(), stdout.splitlines())
493            self.assertEqual(b'', stderr)
494
495    def test_sighup(self):
496        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
497        log_path = "/data/local/tmp/adb_signal_test.log"
498
499        # Clear the output file.
500        self.device.shell_nocheck(["echo", ">", log_path])
501
502        script = """
503            trap "echo SIGINT > {path}; exit 0" SIGINT
504            trap "echo SIGHUP > {path}; exit 0" SIGHUP
505            echo Waiting
506            read
507        """.format(path=log_path)
508
509        script = ";".join([x.strip() for x in script.strip().splitlines()])
510
511        process = self.device.shell_popen([script], kill_atexit=False,
512                                          stdin=subprocess.PIPE,
513                                          stdout=subprocess.PIPE)
514
515        self.assertEqual(b"Waiting\n", process.stdout.readline())
516        process.send_signal(signal.SIGINT)
517        process.wait()
518
519        # Waiting for the local adb to finish is insufficient, since it hangs
520        # up immediately.
521        time.sleep(1)
522
523        stdout, _ = self.device.shell(["cat", log_path])
524        self.assertEqual(stdout.strip(), "SIGHUP")
525
526    # Temporarily disabled because it seems to cause later instability.
527    # http://b/228114748
528    def disabled_test_exit_stress(self):
529        """Hammer `adb shell exit 42` with multiple threads."""
530        thread_count = 48
531        result = dict()
532        def hammer(thread_idx, thread_count, result):
533            success = True
534            for i in range(thread_idx, 240, thread_count):
535                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
536                if ret != i % 256:
537                    success = False
538                    break
539            result[thread_idx] = success
540
541        threads = []
542        for i in range(thread_count):
543            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
544            thread.start()
545            threads.append(thread)
546        for thread in threads:
547            thread.join()
548        for i, success in result.items():
549            self.assertTrue(success)
550
551    def disabled_test_parallel(self):
552        """Spawn a bunch of `adb shell` instances in parallel.
553
554        This was broken historically due to the use of select, which only works
555        for fds that are numerically less than 1024.
556
557        Bug: http://b/141955761"""
558
559        n_procs = 2048
560        procs = dict()
561        for i in range(0, n_procs):
562            procs[i] = subprocess.Popen(
563                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
564                stdin=subprocess.PIPE,
565                stdout=subprocess.PIPE
566            )
567
568        for i in range(0, n_procs):
569            procs[i].stdin.write("%d\n" % i)
570
571        for i in range(0, n_procs):
572            response = procs[i].stdout.readline()
573            assert(response == "%d\n" % i)
574
575        for i in range(0, n_procs):
576            procs[i].stdin.write("%d\n" % (i % 256))
577
578        for i in range(0, n_procs):
579            assert(procs[i].wait() == i % 256)
580
581
582class ArgumentEscapingTest(DeviceTest):
583    def test_shell_escaping(self):
584        """Make sure that argument escaping is somewhat sane."""
585
586        # http://b/19734868
587        # Note that this actually matches ssh(1)'s behavior --- it's
588        # converted to `sh -c echo hello; echo world` which sh interprets
589        # as `sh -c echo` (with an argument to that shell of "hello"),
590        # and then `echo world` back in the first shell.
591        result = self.device.shell(
592            shlex.split("sh -c 'echo hello; echo world'"))[0]
593        result = result.splitlines()
594        self.assertEqual(['', 'world'], result)
595        # If you really wanted "hello" and "world", here's what you'd do:
596        result = self.device.shell(
597            shlex.split(r'echo hello\;echo world'))[0].splitlines()
598        self.assertEqual(['hello', 'world'], result)
599
600        # http://b/15479704
601        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
602        self.assertEqual('t', result)
603        result = self.device.shell(
604            shlex.split("sh -c 'true && echo t'"))[0].strip()
605        self.assertEqual('t', result)
606
607        # http://b/20564385
608        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
609        self.assertEqual('t', result)
610        result = self.device.shell(
611            shlex.split(r'echo -n 123\;uname'))[0].strip()
612        self.assertEqual('123Linux', result)
613
614    def test_install_argument_escaping(self):
615        """Make sure that install argument escaping works."""
616        # http://b/20323053, http://b/3090932.
617        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
618            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
619                                             delete=False)
620            tf.close()
621
622            # Installing bogus .apks fails if the device supports exit codes.
623            try:
624                output = self.device.install(tf.name.decode("utf8"))
625            except subprocess.CalledProcessError as e:
626                output = e.output
627
628            self.assertIn(file_suffix, output)
629            os.remove(tf.name)
630
631
632@unittest.skip("b/172372960: temporarily disabled due to flakiness")
633class RootUnrootTest(DeviceTest):
634    def _test_root(self):
635        message = self.device.root()
636        if 'adbd cannot run as root in production builds' in message:
637            return
638        self.device.wait()
639        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
640
641    def _test_unroot(self):
642        self.device.unroot()
643        self.device.wait()
644        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
645
646    def test_root_unroot(self):
647        """Make sure that adb root and adb unroot work, using id(1)."""
648        if self.device.get_prop('ro.debuggable') != '1':
649            raise unittest.SkipTest('requires rootable build')
650
651        original_user = self.device.shell(['id', '-un'])[0].strip()
652        try:
653            if original_user == 'root':
654                self._test_unroot()
655                self._test_root()
656            elif original_user == 'shell':
657                self._test_root()
658                self._test_unroot()
659        finally:
660            if original_user == 'root':
661                self.device.root()
662            else:
663                self.device.unroot()
664            self.device.wait()
665
666
667class TcpIpTest(DeviceTest):
668    def test_tcpip_failure_raises(self):
669        """adb tcpip requires a port.
670
671        Bug: http://b/22636927
672        """
673        self.assertRaises(
674            subprocess.CalledProcessError, self.device.tcpip, '')
675        self.assertRaises(
676            subprocess.CalledProcessError, self.device.tcpip, 'foo')
677
678
679class SystemPropertiesTest(DeviceTest):
680    def test_get_prop(self):
681        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
682
683    @requires_root
684    def test_set_prop(self):
685        prop_name = 'foo.bar'
686        self.device.shell(['setprop', prop_name, '""'])
687
688        self.device.set_prop(prop_name, 'qux')
689        self.assertEqual(
690            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
691
692
693def compute_md5(string):
694    hsh = hashlib.md5()
695    hsh.update(string)
696    return hsh.hexdigest()
697
698
699def get_md5_prog(device):
700    """Older platforms (pre-L) had the name md5 rather than md5sum."""
701    try:
702        device.shell(['md5sum', '/proc/uptime'])
703        return 'md5sum'
704    except adb.ShellError:
705        return 'md5'
706
707
708class HostFile(object):
709    def __init__(self, handle, checksum):
710        self.handle = handle
711        self.checksum = checksum
712        self.full_path = handle.name
713        self.base_name = os.path.basename(self.full_path)
714
715
716class DeviceFile(object):
717    def __init__(self, checksum, full_path):
718        self.checksum = checksum
719        self.full_path = full_path
720        self.base_name = posixpath.basename(self.full_path)
721
722
723def make_random_host_files(in_dir, num_files):
724    min_size = 1 * (1 << 10)
725    max_size = 16 * (1 << 10)
726
727    files = []
728    for _ in range(num_files):
729        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
730
731        size = random.randrange(min_size, max_size, 1024)
732        rand_str = os.urandom(size)
733        file_handle.write(rand_str)
734        file_handle.flush()
735        file_handle.close()
736
737        md5 = compute_md5(rand_str)
738        files.append(HostFile(file_handle, md5))
739    return files
740
741
742def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
743    min_size = 1 * (1 << 10)
744    max_size = 16 * (1 << 10)
745
746    files = []
747    for file_num in range(num_files):
748        size = random.randrange(min_size, max_size, 1024)
749
750        base_name = prefix + str(file_num)
751        full_path = posixpath.join(in_dir, base_name)
752
753        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
754                      'bs={}'.format(size), 'count=1'])
755        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
756
757        files.append(DeviceFile(dev_md5, full_path))
758    return files
759
760
761class FileOperationsTest:
762    class Base(DeviceTest):
763        SCRATCH_DIR = '/data/local/tmp'
764        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
765        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
766
767        def setUp(self):
768            self.previous_env = os.environ.get("ADB_COMPRESSION")
769            os.environ["ADB_COMPRESSION"] = self.compression
770
771        def tearDown(self):
772            if self.previous_env is None:
773                del os.environ["ADB_COMPRESSION"]
774            else:
775                os.environ["ADB_COMPRESSION"] = self.previous_env
776
777        def _verify_remote(self, checksum, remote_path):
778            dev_md5, _ = self.device.shell([get_md5_prog(self.device),
779                                            remote_path])[0].split()
780            self.assertEqual(checksum, dev_md5)
781
782        def _verify_local(self, checksum, local_path):
783            with open(local_path, 'rb') as host_file:
784                host_md5 = compute_md5(host_file.read())
785                self.assertEqual(host_md5, checksum)
786
787        def test_push(self):
788            """Push a randomly generated file to specified device."""
789            kbytes = 512
790            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
791            rand_str = os.urandom(1024 * kbytes)
792            tmp.write(rand_str)
793            tmp.close()
794
795            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
796            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
797
798            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
799            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
800
801            os.remove(tmp.name)
802
803        def test_push_dir(self):
804            """Push a randomly generated directory of files to the device."""
805            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
806            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
807
808            try:
809                host_dir = tempfile.mkdtemp()
810
811                # Make sure the temp directory isn't setuid, or else adb will complain.
812                os.chmod(host_dir, 0o700)
813
814                # Create 32 random files.
815                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
816                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
817
818                for temp_file in temp_files:
819                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
820                                                 os.path.basename(host_dir),
821                                                 temp_file.base_name)
822                    self._verify_remote(temp_file.checksum, remote_path)
823                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
824            finally:
825                if host_dir is not None:
826                    shutil.rmtree(host_dir)
827
828        def disabled_test_push_empty(self):
829            """Push an empty directory to the device."""
830            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
831            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
832
833            try:
834                host_dir = tempfile.mkdtemp()
835
836                # Make sure the temp directory isn't setuid, or else adb will complain.
837                os.chmod(host_dir, 0o700)
838
839                # Create an empty directory.
840                empty_dir_path = os.path.join(host_dir, 'empty')
841                os.mkdir(empty_dir_path);
842
843                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
844
845                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
846                test_empty_cmd = ["[", "-d", remote_path, "]"]
847                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
848
849                self.assertEqual(rc, 0)
850                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
851            finally:
852                if host_dir is not None:
853                    shutil.rmtree(host_dir)
854
855        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
856        def test_push_symlink(self):
857            """Push a symlink.
858
859            Bug: http://b/31491920
860            """
861            try:
862                host_dir = tempfile.mkdtemp()
863
864                # Make sure the temp directory isn't setuid, or else adb will
865                # complain.
866                os.chmod(host_dir, 0o700)
867
868                with open(os.path.join(host_dir, 'foo'), 'w') as f:
869                    f.write('foo')
870
871                symlink_path = os.path.join(host_dir, 'symlink')
872                os.symlink('foo', symlink_path)
873
874                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
875                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
876                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
877                rc, out, _ = self.device.shell_nocheck(
878                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
879                self.assertEqual(0, rc)
880                self.assertEqual(out.strip(), 'foo')
881            finally:
882                if host_dir is not None:
883                    shutil.rmtree(host_dir)
884
885        def test_multiple_push(self):
886            """Push multiple files to the device in one adb push command.
887
888            Bug: http://b/25324823
889            """
890
891            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
892            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
893
894            try:
895                host_dir = tempfile.mkdtemp()
896
897                # Create some random files and a subdirectory containing more files.
898                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
899
900                subdir = os.path.join(host_dir, 'subdir')
901                os.mkdir(subdir)
902                subdir_temp_files = make_random_host_files(in_dir=subdir,
903                                                           num_files=4)
904
905                paths = [x.full_path for x in temp_files]
906                paths.append(subdir)
907                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
908
909                for temp_file in temp_files:
910                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
911                                                 temp_file.base_name)
912                    self._verify_remote(temp_file.checksum, remote_path)
913
914                for subdir_temp_file in subdir_temp_files:
915                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
916                                                 # BROKEN: http://b/25394682
917                                                 # 'subdir';
918                                                 temp_file.base_name)
919                    self._verify_remote(temp_file.checksum, remote_path)
920
921
922                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
923            finally:
924                if host_dir is not None:
925                    shutil.rmtree(host_dir)
926
927        @requires_non_root
928        def test_push_error_reporting(self):
929            """Make sure that errors that occur while pushing a file get reported
930
931            Bug: http://b/26816782
932            """
933            with tempfile.NamedTemporaryFile() as tmp_file:
934                tmp_file.write(b'\0' * 1024 * 1024)
935                tmp_file.flush()
936                try:
937                    self.device.push(local=tmp_file.name, remote='/system/')
938                    self.fail('push should not have succeeded')
939                except subprocess.CalledProcessError as e:
940                    output = e.output
941
942                self.assertTrue(b'Permission denied' in output or
943                                b'Read-only file system' in output)
944
945        @requires_non_root
946        def test_push_directory_creation(self):
947            """Regression test for directory creation.
948
949            Bug: http://b/110953234
950            """
951            with tempfile.NamedTemporaryFile() as tmp_file:
952                tmp_file.write(b'\0' * 1024 * 1024)
953                tmp_file.flush()
954                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
955                self.device.shell(['rm', '-rf', remote_path])
956
957                remote_path += '/filename'
958                self.device.push(local=tmp_file.name, remote=remote_path)
959
960        def disabled_test_push_multiple_slash_root(self):
961            """Regression test for pushing to //data/local/tmp.
962
963            Bug: http://b/141311284
964
965            Disabled because this broken on the adbd side as well: b/141943968
966            """
967            with tempfile.NamedTemporaryFile() as tmp_file:
968                tmp_file.write('\0' * 1024 * 1024)
969                tmp_file.flush()
970                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
971                self.device.shell(['rm', '-rf', remote_path])
972                self.device.push(local=tmp_file.name, remote=remote_path)
973
974        def _test_pull(self, remote_file, checksum):
975            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
976            tmp_write.close()
977            self.device.pull(remote=remote_file, local=tmp_write.name)
978            with open(tmp_write.name, 'rb') as tmp_read:
979                host_contents = tmp_read.read()
980                host_md5 = compute_md5(host_contents)
981            self.assertEqual(checksum, host_md5)
982            os.remove(tmp_write.name)
983
984        @requires_non_root
985        def test_pull_error_reporting(self):
986            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
987            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
988
989            try:
990                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
991            except subprocess.CalledProcessError as e:
992                output = e.output
993
994            self.assertIn(b'Permission denied', output)
995
996            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
997
998        def test_pull(self):
999            """Pull a randomly generated file from specified device."""
1000            kbytes = 512
1001            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
1002            cmd = ['dd', 'if=/dev/urandom',
1003                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
1004                   'count={}'.format(kbytes)]
1005            self.device.shell(cmd)
1006            dev_md5, _ = self.device.shell(
1007                [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
1008            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
1009            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
1010
1011        def test_pull_dir(self):
1012            """Pull a randomly generated directory of files from the device."""
1013            try:
1014                host_dir = tempfile.mkdtemp()
1015
1016                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1017                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1018
1019                # Populate device directory with random files.
1020                temp_files = make_random_device_files(
1021                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1022
1023                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1024
1025                for temp_file in temp_files:
1026                    host_path = os.path.join(
1027                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1028                        temp_file.base_name)
1029                    self._verify_local(temp_file.checksum, host_path)
1030
1031                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1032            finally:
1033                if host_dir is not None:
1034                    shutil.rmtree(host_dir)
1035
1036        def test_pull_dir_symlink(self):
1037            """Pull a directory into a symlink to a directory.
1038
1039            Bug: http://b/27362811
1040            """
1041            if os.name != 'posix':
1042                raise unittest.SkipTest('requires POSIX')
1043
1044            try:
1045                host_dir = tempfile.mkdtemp()
1046                real_dir = os.path.join(host_dir, 'dir')
1047                symlink = os.path.join(host_dir, 'symlink')
1048                os.mkdir(real_dir)
1049                os.symlink(real_dir, symlink)
1050
1051                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1052                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1053
1054                # Populate device directory with random files.
1055                temp_files = make_random_device_files(
1056                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1057
1058                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1059
1060                for temp_file in temp_files:
1061                    host_path = os.path.join(
1062                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1063                        temp_file.base_name)
1064                    self._verify_local(temp_file.checksum, host_path)
1065
1066                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1067            finally:
1068                if host_dir is not None:
1069                    shutil.rmtree(host_dir)
1070
1071        def test_pull_dir_symlink_collision(self):
1072            """Pull a directory into a colliding symlink to directory."""
1073            if os.name != 'posix':
1074                raise unittest.SkipTest('requires POSIX')
1075
1076            try:
1077                host_dir = tempfile.mkdtemp()
1078                real_dir = os.path.join(host_dir, 'real')
1079                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1080                symlink = os.path.join(host_dir, tmp_dirname)
1081                os.mkdir(real_dir)
1082                os.symlink(real_dir, symlink)
1083
1084                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1085                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1086
1087                # Populate device directory with random files.
1088                temp_files = make_random_device_files(
1089                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1090
1091                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1092
1093                for temp_file in temp_files:
1094                    host_path = os.path.join(real_dir, temp_file.base_name)
1095                    self._verify_local(temp_file.checksum, host_path)
1096
1097                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1098            finally:
1099                if host_dir is not None:
1100                    shutil.rmtree(host_dir)
1101
1102        def test_pull_dir_nonexistent(self):
1103            """Pull a directory of files from the device to a nonexistent path."""
1104            try:
1105                host_dir = tempfile.mkdtemp()
1106                dest_dir = os.path.join(host_dir, 'dest')
1107
1108                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1109                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1110
1111                # Populate device directory with random files.
1112                temp_files = make_random_device_files(
1113                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1114
1115                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1116
1117                for temp_file in temp_files:
1118                    host_path = os.path.join(dest_dir, temp_file.base_name)
1119                    self._verify_local(temp_file.checksum, host_path)
1120
1121                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1122            finally:
1123                if host_dir is not None:
1124                    shutil.rmtree(host_dir)
1125
1126        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1127        def disabled_test_pull_symlink_dir(self):
1128            """Pull a symlink to a directory of symlinks to files."""
1129            try:
1130                host_dir = tempfile.mkdtemp()
1131
1132                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1133                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1134                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1135
1136                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1137                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1138                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1139
1140                # Populate device directory with random files.
1141                temp_files = make_random_device_files(
1142                    self.device, in_dir=remote_dir, num_files=32)
1143
1144                for temp_file in temp_files:
1145                    self.device.shell(
1146                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1147                         posixpath.join(remote_links, temp_file.base_name)])
1148
1149                self.device.pull(remote=remote_symlink, local=host_dir)
1150
1151                for temp_file in temp_files:
1152                    host_path = os.path.join(
1153                        host_dir, 'symlink', temp_file.base_name)
1154                    self._verify_local(temp_file.checksum, host_path)
1155
1156                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1157            finally:
1158                if host_dir is not None:
1159                    shutil.rmtree(host_dir)
1160
1161        def test_pull_empty(self):
1162            """Pull a directory containing an empty directory from the device."""
1163            try:
1164                host_dir = tempfile.mkdtemp()
1165
1166                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1167                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1168                self.device.shell(['mkdir', '-p', remote_empty_path])
1169
1170                self.device.pull(remote=remote_empty_path, local=host_dir)
1171                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1172            finally:
1173                if host_dir is not None:
1174                    shutil.rmtree(host_dir)
1175
1176        def test_multiple_pull(self):
1177            """Pull a randomly generated directory of files from the device."""
1178
1179            try:
1180                host_dir = tempfile.mkdtemp()
1181
1182                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1183                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1184                self.device.shell(['mkdir', '-p', subdir])
1185
1186                # Create some random files and a subdirectory containing more files.
1187                temp_files = make_random_device_files(
1188                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1189
1190                subdir_temp_files = make_random_device_files(
1191                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1192
1193                paths = [x.full_path for x in temp_files]
1194                paths.append(subdir)
1195                self.device._simple_call(['pull'] + paths + [host_dir])
1196
1197                for temp_file in temp_files:
1198                    local_path = os.path.join(host_dir, temp_file.base_name)
1199                    self._verify_local(temp_file.checksum, local_path)
1200
1201                for subdir_temp_file in subdir_temp_files:
1202                    local_path = os.path.join(host_dir,
1203                                              'subdir',
1204                                              subdir_temp_file.base_name)
1205                    self._verify_local(subdir_temp_file.checksum, local_path)
1206
1207                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1208            finally:
1209                if host_dir is not None:
1210                    shutil.rmtree(host_dir)
1211
1212        def verify_sync(self, device, temp_files, device_dir):
1213            """Verifies that a list of temp files was synced to the device."""
1214            # Confirm that every file on the device mirrors that on the host.
1215            for temp_file in temp_files:
1216                device_full_path = posixpath.join(
1217                    device_dir, temp_file.base_name)
1218                dev_md5, _ = device.shell(
1219                    [get_md5_prog(self.device), device_full_path])[0].split()
1220                self.assertEqual(temp_file.checksum, dev_md5)
1221
1222        def test_sync(self):
1223            """Sync a host directory to the data partition."""
1224
1225            try:
1226                base_dir = tempfile.mkdtemp()
1227
1228                # Create mirror device directory hierarchy within base_dir.
1229                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1230                os.makedirs(full_dir_path)
1231
1232                # Create 32 random files within the host mirror.
1233                temp_files = make_random_host_files(
1234                    in_dir=full_dir_path, num_files=32)
1235
1236                # Clean up any stale files on the device.
1237                device = adb.get_device()  # pylint: disable=no-member
1238                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1239
1240                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1241                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1242                device.sync('data')
1243                if old_product_out is None:
1244                    del os.environ['ANDROID_PRODUCT_OUT']
1245                else:
1246                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1247
1248                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1249
1250                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1251            finally:
1252                if base_dir is not None:
1253                    shutil.rmtree(base_dir)
1254
1255        def test_push_sync(self):
1256            """Sync a host directory to a specific path."""
1257
1258            try:
1259                temp_dir = tempfile.mkdtemp()
1260                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1261
1262                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1263
1264                # Clean up any stale files on the device.
1265                device = adb.get_device()  # pylint: disable=no-member
1266                device.shell(['rm', '-rf', device_dir])
1267
1268                device.push(temp_dir, device_dir, sync=True)
1269
1270                self.verify_sync(device, temp_files, device_dir)
1271
1272                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1273            finally:
1274                if temp_dir is not None:
1275                    shutil.rmtree(temp_dir)
1276
1277        def test_push_sync_multiple(self):
1278            """Sync multiple host directories to a specific path."""
1279
1280            try:
1281                temp_dir = tempfile.mkdtemp()
1282                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1283
1284                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1285
1286                # Clean up any stale files on the device.
1287                device = adb.get_device()  # pylint: disable=no-member
1288                device.shell(['rm', '-rf', device_dir])
1289                device.shell(['mkdir', '-p', device_dir])
1290
1291                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1292                device.push(host_paths, device_dir, sync=True)
1293
1294                self.verify_sync(device, temp_files, device_dir)
1295
1296                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1297            finally:
1298                if temp_dir is not None:
1299                    shutil.rmtree(temp_dir)
1300
1301
1302        def test_push_dry_run_nonexistent_file(self):
1303            """Push with dry run."""
1304
1305            for file_size in [8, 1024 * 1024]:
1306                try:
1307                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1308                    device_file = posixpath.join(device_dir, 'file')
1309
1310                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1311                    self.device.shell(['mkdir', '-p', device_dir])
1312
1313                    host_dir = tempfile.mkdtemp()
1314                    host_file = posixpath.join(host_dir, 'file')
1315
1316                    with open(host_file, "w") as f:
1317                        f.write('x' * file_size)
1318
1319                    self.device._simple_call(['push', '-n', host_file, device_file])
1320                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1321                    self.assertNotEqual(0, rc)
1322
1323                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1324                finally:
1325                    if host_dir is not None:
1326                        shutil.rmtree(host_dir)
1327
1328        def test_push_dry_run_existent_file(self):
1329            """Push with dry run."""
1330
1331            for file_size in [8, 1024 * 1024]:
1332                try:
1333                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1334                    device_file = posixpath.join(device_dir, 'file')
1335
1336                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1337                    self.device.shell(['mkdir', '-p', device_dir])
1338                    self.device.shell(['echo', 'foo', '>', device_file])
1339
1340                    host_dir = tempfile.mkdtemp()
1341                    host_file = posixpath.join(host_dir, 'file')
1342
1343                    with open(host_file, "w") as f:
1344                        f.write('x' * file_size)
1345
1346                    self.device._simple_call(['push', '-n', host_file, device_file])
1347                    stdout, stderr = self.device.shell(['cat', device_file])
1348                    self.assertEqual(stdout.strip(), "foo")
1349
1350                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1351                finally:
1352                    if host_dir is not None:
1353                        shutil.rmtree(host_dir)
1354
1355        def test_unicode_paths(self):
1356            """Ensure that we can support non-ASCII paths, even on Windows."""
1357            name = u'로보카 폴리'
1358
1359            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1360            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1361
1362            ## push.
1363            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1364            tf.close()
1365            self.device.push(tf.name, remote_path)
1366            os.remove(tf.name)
1367            self.assertFalse(os.path.exists(tf.name))
1368
1369            # Verify that the device ended up with the expected UTF-8 path
1370            output = self.device.shell(
1371                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1372            self.assertEqual(remote_path, output)
1373
1374            # pull.
1375            self.device.pull(remote_path, tf.name)
1376            self.assertTrue(os.path.exists(tf.name))
1377            os.remove(tf.name)
1378            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1379
1380
1381class FileOperationsTestUncompressed(FileOperationsTest.Base):
1382    compression = "none"
1383
1384
1385class FileOperationsTestBrotli(FileOperationsTest.Base):
1386    compression = "brotli"
1387
1388
1389class FileOperationsTestLZ4(FileOperationsTest.Base):
1390    compression = "lz4"
1391
1392
1393class FileOperationsTestZstd(FileOperationsTest.Base):
1394    compression = "zstd"
1395
1396
1397class DeviceOfflineTest(DeviceTest):
1398    def _get_device_state(self, serialno):
1399        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1400        for line in output.split('\n'):
1401            m = re.match('(\S+)\s+(\S+)', line)
1402            if m and m.group(1) == serialno:
1403                return m.group(2)
1404        return None
1405
1406    def disabled_test_killed_when_pushing_a_large_file(self):
1407        """
1408           While running adb push with a large file, kill adb server.
1409           Occasionally the device becomes offline. Because the device is still
1410           reading data without realizing that the adb server has been restarted.
1411           Test if we can bring the device online automatically now.
1412           http://b/32952319
1413        """
1414        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1415        # 1. Push a large file
1416        file_path = 'tmp_large_file'
1417        try:
1418            fh = open(file_path, 'w')
1419            fh.write('\0' * (100 * 1024 * 1024))
1420            fh.close()
1421            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1422            time.sleep(0.1)
1423            # 2. Kill the adb server
1424            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1425            subproc.terminate()
1426        finally:
1427            try:
1428                os.unlink(file_path)
1429            except:
1430                pass
1431        # 3. See if the device still exist.
1432        # Sleep to wait for the adb server exit.
1433        time.sleep(0.5)
1434        # 4. The device should be online
1435        self.assertEqual(self._get_device_state(serialno), 'device')
1436
1437    def disabled_test_killed_when_pulling_a_large_file(self):
1438        """
1439           While running adb pull with a large file, kill adb server.
1440           Occasionally the device can't be connected. Because the device is trying to
1441           send a message larger than what is expected by the adb server.
1442           Test if we can bring the device online automatically now.
1443        """
1444        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1445        file_path = 'tmp_large_file'
1446        try:
1447            # 1. Create a large file on device.
1448            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1449                               'bs=1000000', 'count=100'])
1450            # 2. Pull the large file on host.
1451            subproc = subprocess.Popen(self.device.adb_cmd +
1452                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1453            time.sleep(0.1)
1454            # 3. Kill the adb server
1455            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1456            subproc.terminate()
1457        finally:
1458            try:
1459                os.unlink(file_path)
1460            except:
1461                pass
1462        # 4. See if the device still exist.
1463        # Sleep to wait for the adb server exit.
1464        time.sleep(0.5)
1465        self.assertEqual(self._get_device_state(serialno), 'device')
1466
1467
1468    def test_packet_size_regression(self):
1469        """Test for http://b/37783561
1470
1471        Receiving packets of a length divisible by 512 but not 1024 resulted in
1472        the adb client waiting indefinitely for more input.
1473        """
1474        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1475        # Probe some surrounding values as well, for the hell of it.
1476        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1477            for offset in [-6, -5, -4]:
1478                length = base + offset
1479                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1480                       'echo', 'foo']
1481                rc, stdout, _ = self.device.shell_nocheck(cmd)
1482
1483                self.assertEqual(0, rc)
1484
1485                # Output should be '\0' * length, followed by "foo\n"
1486                self.assertEqual(length, len(stdout) - 4)
1487                self.assertEqual(stdout, "\0" * length + "foo\n")
1488
1489    def test_zero_packet(self):
1490        """Test for http://b/113070258
1491
1492        Make sure that we don't blow up when sending USB transfers that line up
1493        exactly with the USB packet size.
1494        """
1495
1496        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1497        try:
1498            for size in [512, 1024]:
1499                def listener():
1500                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1501                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1502
1503                thread = threading.Thread(target=listener)
1504                thread.start()
1505
1506                # Wait a bit to let the shell command start.
1507                time.sleep(0.25)
1508
1509                sock = socket.create_connection(("localhost", local_port))
1510                with contextlib.closing(sock):
1511                    bytesWritten = sock.send(b"a" * size)
1512                    self.assertEqual(size, bytesWritten)
1513                    readBytes = sock.recv(4096)
1514                    self.assertEqual(b"foo\n", readBytes)
1515
1516                thread.join()
1517        finally:
1518            self.device.forward_remove("tcp:{}".format(local_port))
1519
1520
1521class SocketTest(DeviceTest):
1522    def test_socket_flush(self):
1523        """Test that we handle socket closure properly.
1524
1525        If we're done writing to a socket, closing before the other end has
1526        closed will send a TCP_RST if we have incoming data queued up, which
1527        may result in data that we've written being discarded.
1528
1529        Bug: http://b/74616284
1530        """
1531        s = socket.create_connection(("localhost", 5037))
1532
1533        def adb_length_prefixed(string):
1534            encoded = string.encode("utf8")
1535            result = b"%04x%s" % (len(encoded), encoded)
1536            return result
1537
1538        if "ANDROID_SERIAL" in os.environ:
1539            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1540        else:
1541            transport_string = "host:transport-any"
1542
1543        s.sendall(adb_length_prefixed(transport_string))
1544        response = s.recv(4)
1545        self.assertEqual(b"OKAY", response)
1546
1547        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1548        s.sendall(adb_length_prefixed(shell_string))
1549
1550        response = s.recv(4)
1551        self.assertEqual(b"OKAY", response)
1552
1553        # Spawn a thread that dumps garbage into the socket until failure.
1554        def spam():
1555            buf = b"\0" * 16384
1556            try:
1557                while True:
1558                    s.sendall(buf)
1559            except Exception as ex:
1560                print(ex)
1561
1562        thread = threading.Thread(target=spam)
1563        thread.start()
1564
1565        time.sleep(1)
1566
1567        received = b""
1568        while True:
1569            read = s.recv(512)
1570            if len(read) == 0:
1571                break
1572            received += read
1573
1574        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1575        thread.join()
1576
1577
1578class FramebufferTest(DeviceTest):
1579    @requires_root
1580    def test_framebuffer(self):
1581        """Test that we get something from the framebuffer service."""
1582        output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"])
1583        self.assertFalse(len(output) == 0)
1584
1585
1586if sys.platform == "win32":
1587    # From https://stackoverflow.com/a/38749458
1588    import os
1589    import contextlib
1590    import msvcrt
1591    import ctypes
1592    from ctypes import wintypes
1593
1594    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1595
1596    GENERIC_READ  = 0x80000000
1597    GENERIC_WRITE = 0x40000000
1598    FILE_SHARE_READ  = 1
1599    FILE_SHARE_WRITE = 2
1600    CONSOLE_TEXTMODE_BUFFER = 1
1601    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1602    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1603    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1604
1605    def _check_zero(result, func, args):
1606        if not result:
1607            raise ctypes.WinError(ctypes.get_last_error())
1608        return args
1609
1610    def _check_invalid(result, func, args):
1611        if result == INVALID_HANDLE_VALUE:
1612            raise ctypes.WinError(ctypes.get_last_error())
1613        return args
1614
1615    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1616        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1617        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1618
1619    class COORD(ctypes.Structure):
1620        _fields_ = (('X', wintypes.SHORT),
1621                    ('Y', wintypes.SHORT))
1622
1623    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1624        _fields_ = (('cbSize',               wintypes.ULONG),
1625                    ('dwSize',               COORD),
1626                    ('dwCursorPosition',     COORD),
1627                    ('wAttributes',          wintypes.WORD),
1628                    ('srWindow',             wintypes.SMALL_RECT),
1629                    ('dwMaximumWindowSize',  COORD),
1630                    ('wPopupAttributes',     wintypes.WORD),
1631                    ('bFullscreenSupported', wintypes.BOOL),
1632                    ('ColorTable',           wintypes.DWORD * 16))
1633        def __init__(self, *args, **kwds):
1634            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1635                    *args, **kwds)
1636            self.cbSize = ctypes.sizeof(self)
1637
1638    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1639                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1640    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1641
1642    kernel32.GetStdHandle.errcheck = _check_invalid
1643    kernel32.GetStdHandle.restype = wintypes.HANDLE
1644    kernel32.GetStdHandle.argtypes = (
1645        wintypes.DWORD,) # _In_ nStdHandle
1646
1647    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1648    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1649    kernel32.CreateConsoleScreenBuffer.argtypes = (
1650        wintypes.DWORD,        # _In_       dwDesiredAccess
1651        wintypes.DWORD,        # _In_       dwShareMode
1652        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1653        wintypes.DWORD,        # _In_       dwFlags
1654        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1655
1656    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1657    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1658        wintypes.HANDLE,               # _In_  hConsoleOutput
1659        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1660
1661    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1662    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1663        wintypes.HANDLE,               # _In_  hConsoleOutput
1664        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1665
1666    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1667    kernel32.SetConsoleWindowInfo.argtypes = (
1668        wintypes.HANDLE,      # _In_ hConsoleOutput
1669        wintypes.BOOL,        # _In_ bAbsolute
1670        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1671
1672    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1673    kernel32.FillConsoleOutputCharacterW.argtypes = (
1674        wintypes.HANDLE,  # _In_  hConsoleOutput
1675        wintypes.WCHAR,   # _In_  cCharacter
1676        wintypes.DWORD,   # _In_  nLength
1677        COORD,            # _In_  dwWriteCoord
1678        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1679
1680    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1681    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1682        wintypes.HANDLE,  # _In_  hConsoleOutput
1683        wintypes.LPWSTR,  # _Out_ lpCharacter
1684        wintypes.DWORD,   # _In_  nLength
1685        COORD,            # _In_  dwReadCoord
1686        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1687
1688    @contextlib.contextmanager
1689    def allocate_console():
1690        allocated = kernel32.AllocConsole()
1691        try:
1692            yield allocated
1693        finally:
1694            if allocated:
1695                kernel32.FreeConsole()
1696
1697    @contextlib.contextmanager
1698    def console_screen(ncols=None, nrows=None):
1699        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1700        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1701        nwritten = (wintypes.DWORD * 1)()
1702        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1703        kernel32.GetConsoleScreenBufferInfoEx(
1704               hStdOut, ctypes.byref(info))
1705        if ncols is None:
1706            ncols = info.dwSize.X
1707        if nrows is None:
1708            nrows = info.dwSize.Y
1709        elif nrows > 9999:
1710            raise ValueError('nrows must be 9999 or less')
1711        fd_screen = None
1712        hScreen = kernel32.CreateConsoleScreenBuffer(
1713                    GENERIC_READ | GENERIC_WRITE,
1714                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1715                    None, CONSOLE_TEXTMODE_BUFFER, None)
1716        try:
1717            fd_screen = msvcrt.open_osfhandle(
1718                            hScreen, os.O_RDWR | os.O_BINARY)
1719            kernel32.GetConsoleScreenBufferInfoEx(
1720                   hScreen, ctypes.byref(new_info))
1721            new_info.dwSize = COORD(ncols, nrows)
1722            new_info.srWindow = wintypes.SMALL_RECT(
1723                    Left=0, Top=0, Right=(ncols - 1),
1724                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1725            kernel32.SetConsoleScreenBufferInfoEx(
1726                    hScreen, ctypes.byref(new_info))
1727            kernel32.SetConsoleWindowInfo(hScreen, True,
1728                    ctypes.byref(new_info.srWindow))
1729            kernel32.FillConsoleOutputCharacterW(
1730                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1731            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1732            try:
1733                yield fd_screen
1734            finally:
1735                kernel32.SetConsoleScreenBufferInfoEx(
1736                    hStdOut, ctypes.byref(info))
1737                kernel32.SetConsoleWindowInfo(hStdOut, True,
1738                        ctypes.byref(info.srWindow))
1739                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1740        finally:
1741            if fd_screen is not None:
1742                os.close(fd_screen)
1743            else:
1744                kernel32.CloseHandle(hScreen)
1745
1746    def read_screen(fd):
1747        hScreen = msvcrt.get_osfhandle(fd)
1748        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1749        kernel32.GetConsoleScreenBufferInfoEx(
1750            hScreen, ctypes.byref(csbi))
1751        ncols = csbi.dwSize.X
1752        pos = csbi.dwCursorPosition
1753        length = ncols * pos.Y + pos.X + 1
1754        buf = (ctypes.c_wchar * length)()
1755        n = (wintypes.DWORD * 1)()
1756        kernel32.ReadConsoleOutputCharacterW(
1757            hScreen, buf, length, COORD(0,0), n)
1758        lines = [buf[i:i+ncols].rstrip(u'\0')
1759                    for i in range(0, n[0], ncols)]
1760        return u'\n'.join(lines)
1761
1762@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1763class WindowsConsoleTest(DeviceTest):
1764    def test_unicode_output(self):
1765        """Test Unicode command line parameters and Unicode console window output.
1766
1767        Bug: https://issuetracker.google.com/issues/111972753
1768        """
1769        # If we don't have a console window, allocate one. This isn't necessary if we're already
1770        # being run from a console window, which is typical.
1771        with allocate_console() as allocated_console:
1772            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1773            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1774            # likely unnecessary given the typical console window size.
1775            with console_screen(nrows=1000) as screen:
1776                unicode_string = u'로보카 폴리'
1777                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1778                # device.shell_popen() which does not use a pipe, unlike device.shell().
1779                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1780                process.wait()
1781                # Read what was written by adb to the temporary console buffer.
1782                console_output = read_screen(screen)
1783                self.assertEqual(unicode_string, console_output)
1784
1785
1786def main():
1787    random.seed(0)
1788    if len(adb.get_devices()) > 0:
1789        suite = unittest.TestLoader().loadTestsFromName(__name__)
1790        unittest.TextTestRunner(verbosity=3).run(suite)
1791    else:
1792        print('Test suite must be run with attached devices')
1793
1794
1795if __name__ == '__main__':
1796    main()
1797