• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    def setUp(self):
81        self.device = adb.get_device()
82
83
84class AbbTest(DeviceTest):
85    def test_smoke(self):
86        result = subprocess.run(['adb', 'abb'], capture_output=True)
87        self.assertEqual(1, result.returncode)
88        expected_output = b"cmd: No service specified; use -l to list all services\n"
89        self.assertEqual(expected_output, result.stderr)
90
91class ForwardReverseTest(DeviceTest):
92    def _test_no_rebind(self, description, direction_list, direction,
93                       direction_no_rebind, direction_remove_all):
94        msg = direction_list()
95        self.assertEqual('', msg.strip(),
96                         description + ' list must be empty to run this test.')
97
98        # Use --no-rebind with no existing binding
99        direction_no_rebind('tcp:5566', 'tcp:6655')
100        msg = direction_list()
101        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103        # Use --no-rebind with existing binding
104        with self.assertRaises(subprocess.CalledProcessError):
105            direction_no_rebind('tcp:5566', 'tcp:6677')
106        msg = direction_list()
107        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
108        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
109
110        # Use the absence of --no-rebind with existing binding
111        direction('tcp:5566', 'tcp:6677')
112        msg = direction_list()
113        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
114        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
115
116        direction_remove_all()
117        msg = direction_list()
118        self.assertEqual('', msg.strip())
119
120    def test_forward_no_rebind(self):
121        self._test_no_rebind('forward', self.device.forward_list,
122                            self.device.forward, self.device.forward_no_rebind,
123                            self.device.forward_remove_all)
124
125    def test_reverse_no_rebind(self):
126        self._test_no_rebind('reverse', self.device.reverse_list,
127                            self.device.reverse, self.device.reverse_no_rebind,
128                            self.device.reverse_remove_all)
129
130    def test_forward(self):
131        msg = self.device.forward_list()
132        self.assertEqual('', msg.strip(),
133                         'Forwarding list must be empty to run this test.')
134        self.device.forward('tcp:5566', 'tcp:6655')
135        msg = self.device.forward_list()
136        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
137        self.device.forward('tcp:7788', 'tcp:8877')
138        msg = self.device.forward_list()
139        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
140        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
141        self.device.forward_remove('tcp:5566')
142        msg = self.device.forward_list()
143        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
144        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
145        self.device.forward_remove_all()
146        msg = self.device.forward_list()
147        self.assertEqual('', msg.strip())
148
149    def test_forward_old_protocol(self):
150        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
151
152        msg = self.device.forward_list()
153        self.assertEqual('', msg.strip(),
154                         'Forwarding list must be empty to run this test.')
155
156        s = socket.create_connection(("localhost", 5037))
157        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
158        cmd = b"%04x%s" % (len(service), service)
159        s.sendall(cmd)
160
161        msg = self.device.forward_list()
162        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
163
164        self.device.forward_remove_all()
165        msg = self.device.forward_list()
166        self.assertEqual('', msg.strip())
167
168    def test_forward_tcp_port_0(self):
169        self.assertEqual('', self.device.forward_list().strip(),
170                         'Forwarding list must be empty to run this test.')
171
172        try:
173            # If resolving TCP port 0 is supported, `adb forward` will print
174            # the actual port number.
175            port = self.device.forward('tcp:0', 'tcp:8888').strip()
176            if not port:
177                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
178
179            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
180                                      self.device.forward_list()))
181        finally:
182            self.device.forward_remove_all()
183
184    def test_reverse(self):
185        msg = self.device.reverse_list()
186        self.assertEqual('', msg.strip(),
187                         'Reverse forwarding list must be empty to run this test.')
188        self.device.reverse('tcp:5566', 'tcp:6655')
189        msg = self.device.reverse_list()
190        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
191        self.device.reverse('tcp:7788', 'tcp:8877')
192        msg = self.device.reverse_list()
193        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
194        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
195        self.device.reverse_remove('tcp:5566')
196        msg = self.device.reverse_list()
197        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
198        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
199        self.device.reverse_remove_all()
200        msg = self.device.reverse_list()
201        self.assertEqual('', msg.strip())
202
203    def test_reverse_tcp_port_0(self):
204        self.assertEqual('', self.device.reverse_list().strip(),
205                         'Reverse list must be empty to run this test.')
206
207        try:
208            # If resolving TCP port 0 is supported, `adb reverse` will print
209            # the actual port number.
210            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
211            if not port:
212                raise unittest.SkipTest('Reversing tcp:0 is not available.')
213
214            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
215                                      self.device.reverse_list()))
216        finally:
217            self.device.reverse_remove_all()
218
219    def test_forward_reverse_echo(self):
220        """Send data through adb forward and read it back via adb reverse"""
221        forward_port = 12345
222        reverse_port = forward_port + 1
223        forward_spec = 'tcp:' + str(forward_port)
224        reverse_spec = 'tcp:' + str(reverse_port)
225        forward_setup = False
226        reverse_setup = False
227
228        try:
229            # listen on localhost:forward_port, connect to remote:forward_port
230            self.device.forward(forward_spec, forward_spec)
231            forward_setup = True
232            # listen on remote:forward_port, connect to localhost:reverse_port
233            self.device.reverse(forward_spec, reverse_spec)
234            reverse_setup = True
235
236            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
237            with contextlib.closing(listener):
238                # Use SO_REUSEADDR so that subsequent runs of the test can grab
239                # the port even if it is in TIME_WAIT.
240                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
241
242                # Listen on localhost:reverse_port before connecting to
243                # localhost:forward_port because that will cause adb to connect
244                # back to localhost:reverse_port.
245                listener.bind(('127.0.0.1', reverse_port))
246                listener.listen(4)
247
248                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
249                with contextlib.closing(client):
250                    # Connect to the listener.
251                    client.connect(('127.0.0.1', forward_port))
252
253                    # Accept the client connection.
254                    accepted_connection, addr = listener.accept()
255                    with contextlib.closing(accepted_connection) as server:
256                        data = b'hello'
257
258                        # Send data into the port setup by adb forward.
259                        client.sendall(data)
260                        # Explicitly close() so that server gets EOF.
261                        client.close()
262
263                        # Verify that the data came back via adb reverse.
264                        self.assertEqual(data, server.makefile().read().encode("utf8"))
265        finally:
266            if reverse_setup:
267                self.device.reverse_remove(forward_spec)
268            if forward_setup:
269                self.device.forward_remove(forward_spec)
270
271
272class ShellTest(DeviceTest):
273    def _interactive_shell(self, shell_args, input):
274        """Runs an interactive adb shell.
275
276        Args:
277          shell_args: List of string arguments to `adb shell`.
278          input: bytes input to send to the interactive shell.
279
280        Returns:
281          The remote exit code.
282
283        Raises:
284          unittest.SkipTest: The device doesn't support exit codes.
285        """
286        if not self.device.has_shell_protocol():
287            raise unittest.SkipTest('exit codes are unavailable on this device')
288
289        proc = subprocess.Popen(
290                self.device.adb_cmd + ['shell'] + shell_args,
291                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
292                stderr=subprocess.PIPE)
293        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
294        # to explicitly add an exit command to close the session from the device
295        # side, plus the necessary newline to complete the interactive command.
296        proc.communicate(input + b'; exit\n')
297        return proc.returncode
298
299    def test_cat(self):
300        """Check that we can at least cat a file."""
301        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
302        elements = out.split()
303        self.assertEqual(len(elements), 2)
304
305        uptime, idle = elements
306        self.assertGreater(float(uptime), 0.0)
307        self.assertGreater(float(idle), 0.0)
308
309    def test_throws_on_failure(self):
310        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
311
312    def test_output_not_stripped(self):
313        out = self.device.shell(['echo', 'foo'])[0]
314        self.assertEqual(out, 'foo' + self.device.linesep)
315
316    def test_shell_command_length(self):
317        # Devices that have shell_v2 should be able to handle long commands.
318        if self.device.has_shell_protocol():
319            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
320            self.assertEqual(rc, 0)
321            self.assertTrue(out == ('x' * 16384 + '\n'))
322
323    def test_shell_nocheck_failure(self):
324        rc, out, _ = self.device.shell_nocheck(['false'])
325        self.assertNotEqual(rc, 0)
326        self.assertEqual(out, '')
327
328    def test_shell_nocheck_output_not_stripped(self):
329        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
330        self.assertEqual(rc, 0)
331        self.assertEqual(out, 'foo' + self.device.linesep)
332
333    def test_can_distinguish_tricky_results(self):
334        # If result checking on ADB shell is naively implemented as
335        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
336        # output from the result for a cmd of `echo -n 1`.
337        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
338        self.assertEqual(rc, 0)
339        self.assertEqual(out, '1')
340
341    def test_line_endings(self):
342        """Ensure that line ending translation is not happening in the pty.
343
344        Bug: http://b/19735063
345        """
346        output = self.device.shell(['uname'])[0]
347        self.assertEqual(output, 'Linux' + self.device.linesep)
348
349    def test_pty_logic(self):
350        """Tests that a PTY is allocated when it should be.
351
352        PTY allocation behavior should match ssh.
353        """
354        def check_pty(args):
355            """Checks adb shell PTY allocation.
356
357            Tests |args| for terminal and non-terminal stdin.
358
359            Args:
360                args: -Tt args in a list (e.g. ['-t', '-t']).
361
362            Returns:
363                A tuple (<terminal>, <non-terminal>). True indicates
364                the corresponding shell allocated a remote PTY.
365            """
366            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
367
368            terminal = subprocess.Popen(
369                    test_cmd, stdin=None,
370                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
371            terminal.communicate()
372
373            non_terminal = subprocess.Popen(
374                    test_cmd, stdin=subprocess.PIPE,
375                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
376            non_terminal.communicate()
377
378            return (terminal.returncode == 0, non_terminal.returncode == 0)
379
380        # -T: never allocate PTY.
381        self.assertEqual((False, False), check_pty(['-T']))
382
383        # These tests require a new device.
384        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
385            # No args: PTY only if stdin is a terminal and shell is interactive,
386            # which is difficult to reliably test from a script.
387            self.assertEqual((False, False), check_pty([]))
388
389            # -t: PTY if stdin is a terminal.
390            self.assertEqual((True, False), check_pty(['-t']))
391
392        # -t -t: always allocate PTY.
393        self.assertEqual((True, True), check_pty(['-t', '-t']))
394
395        # -tt: always allocate PTY, POSIX style (http://b/32216152).
396        self.assertEqual((True, True), check_pty(['-tt']))
397
398        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
399        # we follow the man page instead.
400        self.assertEqual((True, True), check_pty(['-ttt']))
401
402        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
403        self.assertEqual((True, True), check_pty(['-ttx']))
404
405        # -Ttt: -tt cancels out -T.
406        self.assertEqual((True, True), check_pty(['-Ttt']))
407
408        # -ttT: -T cancels out -tt.
409        self.assertEqual((False, False), check_pty(['-ttT']))
410
411    def test_shell_protocol(self):
412        """Tests the shell protocol on the device.
413
414        If the device supports shell protocol, this gives us the ability
415        to separate stdout/stderr and return the exit code directly.
416
417        Bug: http://b/19734861
418        """
419        if not self.device.has_shell_protocol():
420            raise unittest.SkipTest('shell protocol unsupported on this device')
421
422        # Shell protocol should be used by default.
423        result = self.device.shell_nocheck(
424                shlex.split('echo foo; echo bar >&2; exit 17'))
425        self.assertEqual(17, result[0])
426        self.assertEqual('foo' + self.device.linesep, result[1])
427        self.assertEqual('bar' + self.device.linesep, result[2])
428
429        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
430
431        # -x flag should disable shell protocol.
432        result = self.device.shell_nocheck(
433                shlex.split('-x echo foo; echo bar >&2; exit 17'))
434        self.assertEqual(0, result[0])
435        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
436        self.assertEqual('', result[2])
437
438        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
439
440    def test_non_interactive_sigint(self):
441        """Tests that SIGINT in a non-interactive shell kills the process.
442
443        This requires the shell protocol in order to detect the broken
444        pipe; raw data transfer mode will only see the break once the
445        subprocess tries to read or write.
446
447        Bug: http://b/23825725
448        """
449        if not self.device.has_shell_protocol():
450            raise unittest.SkipTest('shell protocol unsupported on this device')
451
452        # Start a long-running process.
453        sleep_proc = subprocess.Popen(
454                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
455                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
456                stderr=subprocess.STDOUT)
457        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
458        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
459        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
460
461        # Verify that the process is running, send signal, verify it stopped.
462        self.device.shell(proc_query)
463        os.kill(sleep_proc.pid, signal.SIGINT)
464        sleep_proc.communicate()
465
466        # It can take some time for the process to receive the signal and die.
467        end_time = time.time() + 3
468        while self.device.shell_nocheck(proc_query)[0] != 1:
469            self.assertFalse(time.time() > end_time,
470                             'subprocess failed to terminate in time')
471
472    def test_non_interactive_stdin(self):
473        """Tests that non-interactive shells send stdin."""
474        if not self.device.has_shell_protocol():
475            raise unittest.SkipTest('non-interactive stdin unsupported '
476                                    'on this device')
477
478        # Test both small and large inputs.
479        small_input = b'foo'
480        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
481        large_input = b'\n'.join(characters)
482
483
484        for input in (small_input, large_input):
485            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
486                                    stdin=subprocess.PIPE,
487                                    stdout=subprocess.PIPE,
488                                    stderr=subprocess.PIPE)
489            stdout, stderr = proc.communicate(input)
490            self.assertEqual(input.splitlines(), stdout.splitlines())
491            self.assertEqual(b'', stderr)
492
493    def test_sighup(self):
494        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
495        log_path = "/data/local/tmp/adb_signal_test.log"
496
497        # Clear the output file.
498        self.device.shell_nocheck(["echo", ">", log_path])
499
500        script = """
501            trap "echo SIGINT > {path}; exit 0" SIGINT
502            trap "echo SIGHUP > {path}; exit 0" SIGHUP
503            echo Waiting
504            read
505        """.format(path=log_path)
506
507        script = ";".join([x.strip() for x in script.strip().splitlines()])
508
509        process = self.device.shell_popen([script], kill_atexit=False,
510                                          stdin=subprocess.PIPE,
511                                          stdout=subprocess.PIPE)
512
513        self.assertEqual(b"Waiting\n", process.stdout.readline())
514        process.send_signal(signal.SIGINT)
515        process.wait()
516
517        # Waiting for the local adb to finish is insufficient, since it hangs
518        # up immediately.
519        time.sleep(1)
520
521        stdout, _ = self.device.shell(["cat", log_path])
522        self.assertEqual(stdout.strip(), "SIGHUP")
523
524    def test_exit_stress(self):
525        """Hammer `adb shell exit 42` with multiple threads."""
526        thread_count = 48
527        result = dict()
528        def hammer(thread_idx, thread_count, result):
529            success = True
530            for i in range(thread_idx, 240, thread_count):
531                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
532                if ret != i % 256:
533                    success = False
534                    break
535            result[thread_idx] = success
536
537        threads = []
538        for i in range(thread_count):
539            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
540            thread.start()
541            threads.append(thread)
542        for thread in threads:
543            thread.join()
544        for i, success in result.items():
545            self.assertTrue(success)
546
547    def disabled_test_parallel(self):
548        """Spawn a bunch of `adb shell` instances in parallel.
549
550        This was broken historically due to the use of select, which only works
551        for fds that are numerically less than 1024.
552
553        Bug: http://b/141955761"""
554
555        n_procs = 2048
556        procs = dict()
557        for i in range(0, n_procs):
558            procs[i] = subprocess.Popen(
559                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
560                stdin=subprocess.PIPE,
561                stdout=subprocess.PIPE
562            )
563
564        for i in range(0, n_procs):
565            procs[i].stdin.write("%d\n" % i)
566
567        for i in range(0, n_procs):
568            response = procs[i].stdout.readline()
569            assert(response == "%d\n" % i)
570
571        for i in range(0, n_procs):
572            procs[i].stdin.write("%d\n" % (i % 256))
573
574        for i in range(0, n_procs):
575            assert(procs[i].wait() == i % 256)
576
577
578class ArgumentEscapingTest(DeviceTest):
579    def test_shell_escaping(self):
580        """Make sure that argument escaping is somewhat sane."""
581
582        # http://b/19734868
583        # Note that this actually matches ssh(1)'s behavior --- it's
584        # converted to `sh -c echo hello; echo world` which sh interprets
585        # as `sh -c echo` (with an argument to that shell of "hello"),
586        # and then `echo world` back in the first shell.
587        result = self.device.shell(
588            shlex.split("sh -c 'echo hello; echo world'"))[0]
589        result = result.splitlines()
590        self.assertEqual(['', 'world'], result)
591        # If you really wanted "hello" and "world", here's what you'd do:
592        result = self.device.shell(
593            shlex.split(r'echo hello\;echo world'))[0].splitlines()
594        self.assertEqual(['hello', 'world'], result)
595
596        # http://b/15479704
597        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
598        self.assertEqual('t', result)
599        result = self.device.shell(
600            shlex.split("sh -c 'true && echo t'"))[0].strip()
601        self.assertEqual('t', result)
602
603        # http://b/20564385
604        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
605        self.assertEqual('t', result)
606        result = self.device.shell(
607            shlex.split(r'echo -n 123\;uname'))[0].strip()
608        self.assertEqual('123Linux', result)
609
610    def test_install_argument_escaping(self):
611        """Make sure that install argument escaping works."""
612        # http://b/20323053, http://b/3090932.
613        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
614            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
615                                             delete=False)
616            tf.close()
617
618            # Installing bogus .apks fails if the device supports exit codes.
619            try:
620                output = self.device.install(tf.name.decode("utf8"))
621            except subprocess.CalledProcessError as e:
622                output = e.output
623
624            self.assertIn(file_suffix, output)
625            os.remove(tf.name)
626
627
628class RootUnrootTest(DeviceTest):
629    def _test_root(self):
630        message = self.device.root()
631        if 'adbd cannot run as root in production builds' in message:
632            return
633        self.device.wait()
634        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
635
636    def _test_unroot(self):
637        self.device.unroot()
638        self.device.wait()
639        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
640
641    def test_root_unroot(self):
642        """Make sure that adb root and adb unroot work, using id(1)."""
643        if self.device.get_prop('ro.debuggable') != '1':
644            raise unittest.SkipTest('requires rootable build')
645
646        original_user = self.device.shell(['id', '-un'])[0].strip()
647        try:
648            if original_user == 'root':
649                self._test_unroot()
650                self._test_root()
651            elif original_user == 'shell':
652                self._test_root()
653                self._test_unroot()
654        finally:
655            if original_user == 'root':
656                self.device.root()
657            else:
658                self.device.unroot()
659            self.device.wait()
660
661
662class TcpIpTest(DeviceTest):
663    def test_tcpip_failure_raises(self):
664        """adb tcpip requires a port.
665
666        Bug: http://b/22636927
667        """
668        self.assertRaises(
669            subprocess.CalledProcessError, self.device.tcpip, '')
670        self.assertRaises(
671            subprocess.CalledProcessError, self.device.tcpip, 'foo')
672
673
674class SystemPropertiesTest(DeviceTest):
675    def test_get_prop(self):
676        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
677
678    @requires_root
679    def test_set_prop(self):
680        prop_name = 'foo.bar'
681        self.device.shell(['setprop', prop_name, '""'])
682
683        self.device.set_prop(prop_name, 'qux')
684        self.assertEqual(
685            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
686
687
688def compute_md5(string):
689    hsh = hashlib.md5()
690    hsh.update(string)
691    return hsh.hexdigest()
692
693
694def get_md5_prog(device):
695    """Older platforms (pre-L) had the name md5 rather than md5sum."""
696    try:
697        device.shell(['md5sum', '/proc/uptime'])
698        return 'md5sum'
699    except adb.ShellError:
700        return 'md5'
701
702
703class HostFile(object):
704    def __init__(self, handle, checksum):
705        self.handle = handle
706        self.checksum = checksum
707        self.full_path = handle.name
708        self.base_name = os.path.basename(self.full_path)
709
710
711class DeviceFile(object):
712    def __init__(self, checksum, full_path):
713        self.checksum = checksum
714        self.full_path = full_path
715        self.base_name = posixpath.basename(self.full_path)
716
717
718def make_random_host_files(in_dir, num_files):
719    min_size = 1 * (1 << 10)
720    max_size = 16 * (1 << 10)
721
722    files = []
723    for _ in range(num_files):
724        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
725
726        size = random.randrange(min_size, max_size, 1024)
727        rand_str = os.urandom(size)
728        file_handle.write(rand_str)
729        file_handle.flush()
730        file_handle.close()
731
732        md5 = compute_md5(rand_str)
733        files.append(HostFile(file_handle, md5))
734    return files
735
736
737def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
738    min_size = 1 * (1 << 10)
739    max_size = 16 * (1 << 10)
740
741    files = []
742    for file_num in range(num_files):
743        size = random.randrange(min_size, max_size, 1024)
744
745        base_name = prefix + str(file_num)
746        full_path = posixpath.join(in_dir, base_name)
747
748        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
749                      'bs={}'.format(size), 'count=1'])
750        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
751
752        files.append(DeviceFile(dev_md5, full_path))
753    return files
754
755
756class FileOperationsTest(DeviceTest):
757    SCRATCH_DIR = '/data/local/tmp'
758    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
759    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
760
761    def _verify_remote(self, checksum, remote_path):
762        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
763                                        remote_path])[0].split()
764        self.assertEqual(checksum, dev_md5)
765
766    def _verify_local(self, checksum, local_path):
767        with open(local_path, 'rb') as host_file:
768            host_md5 = compute_md5(host_file.read())
769            self.assertEqual(host_md5, checksum)
770
771    def test_push(self):
772        """Push a randomly generated file to specified device."""
773        kbytes = 512
774        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
775        rand_str = os.urandom(1024 * kbytes)
776        tmp.write(rand_str)
777        tmp.close()
778
779        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
780        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
781
782        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
783        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
784
785        os.remove(tmp.name)
786
787    def test_push_dir(self):
788        """Push a randomly generated directory of files to the device."""
789        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
790        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
791
792        try:
793            host_dir = tempfile.mkdtemp()
794
795            # Make sure the temp directory isn't setuid, or else adb will complain.
796            os.chmod(host_dir, 0o700)
797
798            # Create 32 random files.
799            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
800            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
801
802            for temp_file in temp_files:
803                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
804                                             os.path.basename(host_dir),
805                                             temp_file.base_name)
806                self._verify_remote(temp_file.checksum, remote_path)
807            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
808        finally:
809            if host_dir is not None:
810                shutil.rmtree(host_dir)
811
812    def disabled_test_push_empty(self):
813        """Push an empty directory to the device."""
814        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
815        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
816
817        try:
818            host_dir = tempfile.mkdtemp()
819
820            # Make sure the temp directory isn't setuid, or else adb will complain.
821            os.chmod(host_dir, 0o700)
822
823            # Create an empty directory.
824            empty_dir_path = os.path.join(host_dir, 'empty')
825            os.mkdir(empty_dir_path);
826
827            self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
828
829            remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
830            test_empty_cmd = ["[", "-d", remote_path, "]"]
831            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
832
833            self.assertEqual(rc, 0)
834            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
835        finally:
836            if host_dir is not None:
837                shutil.rmtree(host_dir)
838
839    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
840    def test_push_symlink(self):
841        """Push a symlink.
842
843        Bug: http://b/31491920
844        """
845        try:
846            host_dir = tempfile.mkdtemp()
847
848            # Make sure the temp directory isn't setuid, or else adb will
849            # complain.
850            os.chmod(host_dir, 0o700)
851
852            with open(os.path.join(host_dir, 'foo'), 'w') as f:
853                f.write('foo')
854
855            symlink_path = os.path.join(host_dir, 'symlink')
856            os.symlink('foo', symlink_path)
857
858            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
859            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
860            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
861            rc, out, _ = self.device.shell_nocheck(
862                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
863            self.assertEqual(0, rc)
864            self.assertEqual(out.strip(), 'foo')
865        finally:
866            if host_dir is not None:
867                shutil.rmtree(host_dir)
868
869    def test_multiple_push(self):
870        """Push multiple files to the device in one adb push command.
871
872        Bug: http://b/25324823
873        """
874
875        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
876        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
877
878        try:
879            host_dir = tempfile.mkdtemp()
880
881            # Create some random files and a subdirectory containing more files.
882            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
883
884            subdir = os.path.join(host_dir, 'subdir')
885            os.mkdir(subdir)
886            subdir_temp_files = make_random_host_files(in_dir=subdir,
887                                                       num_files=4)
888
889            paths = [x.full_path for x in temp_files]
890            paths.append(subdir)
891            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
892
893            for temp_file in temp_files:
894                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
895                                             temp_file.base_name)
896                self._verify_remote(temp_file.checksum, remote_path)
897
898            for subdir_temp_file in subdir_temp_files:
899                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
900                                             # BROKEN: http://b/25394682
901                                             # 'subdir';
902                                             temp_file.base_name)
903                self._verify_remote(temp_file.checksum, remote_path)
904
905
906            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
907        finally:
908            if host_dir is not None:
909                shutil.rmtree(host_dir)
910
911    @requires_non_root
912    def test_push_error_reporting(self):
913        """Make sure that errors that occur while pushing a file get reported
914
915        Bug: http://b/26816782
916        """
917        with tempfile.NamedTemporaryFile() as tmp_file:
918            tmp_file.write(b'\0' * 1024 * 1024)
919            tmp_file.flush()
920            try:
921                self.device.push(local=tmp_file.name, remote='/system/')
922                self.fail('push should not have succeeded')
923            except subprocess.CalledProcessError as e:
924                output = e.output
925
926            self.assertTrue(b'Permission denied' in output or
927                            b'Read-only file system' in output)
928
929    @requires_non_root
930    def test_push_directory_creation(self):
931        """Regression test for directory creation.
932
933        Bug: http://b/110953234
934        """
935        with tempfile.NamedTemporaryFile() as tmp_file:
936            tmp_file.write(b'\0' * 1024 * 1024)
937            tmp_file.flush()
938            remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
939            self.device.shell(['rm', '-rf', remote_path])
940
941            remote_path += '/filename'
942            self.device.push(local=tmp_file.name, remote=remote_path)
943
944    def disabled_test_push_multiple_slash_root(self):
945        """Regression test for pushing to //data/local/tmp.
946
947        Bug: http://b/141311284
948
949        Disabled because this broken on the adbd side as well: b/141943968
950        """
951        with tempfile.NamedTemporaryFile() as tmp_file:
952            tmp_file.write('\0' * 1024 * 1024)
953            tmp_file.flush()
954            remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
955            self.device.shell(['rm', '-rf', remote_path])
956            self.device.push(local=tmp_file.name, remote=remote_path)
957
958    def _test_pull(self, remote_file, checksum):
959        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
960        tmp_write.close()
961        self.device.pull(remote=remote_file, local=tmp_write.name)
962        with open(tmp_write.name, 'rb') as tmp_read:
963            host_contents = tmp_read.read()
964            host_md5 = compute_md5(host_contents)
965        self.assertEqual(checksum, host_md5)
966        os.remove(tmp_write.name)
967
968    @requires_non_root
969    def test_pull_error_reporting(self):
970        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
971        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
972
973        try:
974            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
975        except subprocess.CalledProcessError as e:
976            output = e.output
977
978        self.assertIn(b'Permission denied', output)
979
980        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
981
982    def test_pull(self):
983        """Pull a randomly generated file from specified device."""
984        kbytes = 512
985        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
986        cmd = ['dd', 'if=/dev/urandom',
987               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
988               'count={}'.format(kbytes)]
989        self.device.shell(cmd)
990        dev_md5, _ = self.device.shell(
991            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
992        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
993        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
994
995    def test_pull_dir(self):
996        """Pull a randomly generated directory of files from the device."""
997        try:
998            host_dir = tempfile.mkdtemp()
999
1000            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1001            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1002
1003            # Populate device directory with random files.
1004            temp_files = make_random_device_files(
1005                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1006
1007            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1008
1009            for temp_file in temp_files:
1010                host_path = os.path.join(
1011                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1012                    temp_file.base_name)
1013                self._verify_local(temp_file.checksum, host_path)
1014
1015            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1016        finally:
1017            if host_dir is not None:
1018                shutil.rmtree(host_dir)
1019
1020    def test_pull_dir_symlink(self):
1021        """Pull a directory into a symlink to a directory.
1022
1023        Bug: http://b/27362811
1024        """
1025        if os.name != 'posix':
1026            raise unittest.SkipTest('requires POSIX')
1027
1028        try:
1029            host_dir = tempfile.mkdtemp()
1030            real_dir = os.path.join(host_dir, 'dir')
1031            symlink = os.path.join(host_dir, 'symlink')
1032            os.mkdir(real_dir)
1033            os.symlink(real_dir, symlink)
1034
1035            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1036            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1037
1038            # Populate device directory with random files.
1039            temp_files = make_random_device_files(
1040                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1041
1042            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1043
1044            for temp_file in temp_files:
1045                host_path = os.path.join(
1046                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1047                    temp_file.base_name)
1048                self._verify_local(temp_file.checksum, host_path)
1049
1050            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1051        finally:
1052            if host_dir is not None:
1053                shutil.rmtree(host_dir)
1054
1055    def test_pull_dir_symlink_collision(self):
1056        """Pull a directory into a colliding symlink to directory."""
1057        if os.name != 'posix':
1058            raise unittest.SkipTest('requires POSIX')
1059
1060        try:
1061            host_dir = tempfile.mkdtemp()
1062            real_dir = os.path.join(host_dir, 'real')
1063            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1064            symlink = os.path.join(host_dir, tmp_dirname)
1065            os.mkdir(real_dir)
1066            os.symlink(real_dir, symlink)
1067
1068            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1069            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1070
1071            # Populate device directory with random files.
1072            temp_files = make_random_device_files(
1073                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1074
1075            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1076
1077            for temp_file in temp_files:
1078                host_path = os.path.join(real_dir, temp_file.base_name)
1079                self._verify_local(temp_file.checksum, host_path)
1080
1081            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1082        finally:
1083            if host_dir is not None:
1084                shutil.rmtree(host_dir)
1085
1086    def test_pull_dir_nonexistent(self):
1087        """Pull a directory of files from the device to a nonexistent path."""
1088        try:
1089            host_dir = tempfile.mkdtemp()
1090            dest_dir = os.path.join(host_dir, 'dest')
1091
1092            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1093            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1094
1095            # Populate device directory with random files.
1096            temp_files = make_random_device_files(
1097                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1098
1099            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1100
1101            for temp_file in temp_files:
1102                host_path = os.path.join(dest_dir, temp_file.base_name)
1103                self._verify_local(temp_file.checksum, host_path)
1104
1105            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1106        finally:
1107            if host_dir is not None:
1108                shutil.rmtree(host_dir)
1109
1110    # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1111    def disabled_test_pull_symlink_dir(self):
1112        """Pull a symlink to a directory of symlinks to files."""
1113        try:
1114            host_dir = tempfile.mkdtemp()
1115
1116            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1117            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1118            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1119
1120            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1121            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1122            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1123
1124            # Populate device directory with random files.
1125            temp_files = make_random_device_files(
1126                self.device, in_dir=remote_dir, num_files=32)
1127
1128            for temp_file in temp_files:
1129                self.device.shell(
1130                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1131                     posixpath.join(remote_links, temp_file.base_name)])
1132
1133            self.device.pull(remote=remote_symlink, local=host_dir)
1134
1135            for temp_file in temp_files:
1136                host_path = os.path.join(
1137                    host_dir, 'symlink', temp_file.base_name)
1138                self._verify_local(temp_file.checksum, host_path)
1139
1140            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1141        finally:
1142            if host_dir is not None:
1143                shutil.rmtree(host_dir)
1144
1145    def test_pull_empty(self):
1146        """Pull a directory containing an empty directory from the device."""
1147        try:
1148            host_dir = tempfile.mkdtemp()
1149
1150            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1151            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1152            self.device.shell(['mkdir', '-p', remote_empty_path])
1153
1154            self.device.pull(remote=remote_empty_path, local=host_dir)
1155            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1156        finally:
1157            if host_dir is not None:
1158                shutil.rmtree(host_dir)
1159
1160    def test_multiple_pull(self):
1161        """Pull a randomly generated directory of files from the device."""
1162
1163        try:
1164            host_dir = tempfile.mkdtemp()
1165
1166            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1167            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1168            self.device.shell(['mkdir', '-p', subdir])
1169
1170            # Create some random files and a subdirectory containing more files.
1171            temp_files = make_random_device_files(
1172                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1173
1174            subdir_temp_files = make_random_device_files(
1175                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1176
1177            paths = [x.full_path for x in temp_files]
1178            paths.append(subdir)
1179            self.device._simple_call(['pull'] + paths + [host_dir])
1180
1181            for temp_file in temp_files:
1182                local_path = os.path.join(host_dir, temp_file.base_name)
1183                self._verify_local(temp_file.checksum, local_path)
1184
1185            for subdir_temp_file in subdir_temp_files:
1186                local_path = os.path.join(host_dir,
1187                                          'subdir',
1188                                          subdir_temp_file.base_name)
1189                self._verify_local(subdir_temp_file.checksum, local_path)
1190
1191            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1192        finally:
1193            if host_dir is not None:
1194                shutil.rmtree(host_dir)
1195
1196    def verify_sync(self, device, temp_files, device_dir):
1197        """Verifies that a list of temp files was synced to the device."""
1198        # Confirm that every file on the device mirrors that on the host.
1199        for temp_file in temp_files:
1200            device_full_path = posixpath.join(
1201                device_dir, temp_file.base_name)
1202            dev_md5, _ = device.shell(
1203                [get_md5_prog(self.device), device_full_path])[0].split()
1204            self.assertEqual(temp_file.checksum, dev_md5)
1205
1206    def test_sync(self):
1207        """Sync a host directory to the data partition."""
1208
1209        try:
1210            base_dir = tempfile.mkdtemp()
1211
1212            # Create mirror device directory hierarchy within base_dir.
1213            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1214            os.makedirs(full_dir_path)
1215
1216            # Create 32 random files within the host mirror.
1217            temp_files = make_random_host_files(
1218                in_dir=full_dir_path, num_files=32)
1219
1220            # Clean up any stale files on the device.
1221            device = adb.get_device()  # pylint: disable=no-member
1222            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1223
1224            old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1225            os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1226            device.sync('data')
1227            if old_product_out is None:
1228                del os.environ['ANDROID_PRODUCT_OUT']
1229            else:
1230                os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1231
1232            self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1233
1234            #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1235        finally:
1236            if base_dir is not None:
1237                shutil.rmtree(base_dir)
1238
1239    def test_push_sync(self):
1240        """Sync a host directory to a specific path."""
1241
1242        try:
1243            temp_dir = tempfile.mkdtemp()
1244            temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1245
1246            device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1247
1248            # Clean up any stale files on the device.
1249            device = adb.get_device()  # pylint: disable=no-member
1250            device.shell(['rm', '-rf', device_dir])
1251
1252            device.push(temp_dir, device_dir, sync=True)
1253
1254            self.verify_sync(device, temp_files, device_dir)
1255
1256            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1257        finally:
1258            if temp_dir is not None:
1259                shutil.rmtree(temp_dir)
1260
1261    def test_unicode_paths(self):
1262        """Ensure that we can support non-ASCII paths, even on Windows."""
1263        name = u'로보카 폴리'
1264
1265        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1266        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1267
1268        ## push.
1269        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1270        tf.close()
1271        self.device.push(tf.name, remote_path)
1272        os.remove(tf.name)
1273        self.assertFalse(os.path.exists(tf.name))
1274
1275        # Verify that the device ended up with the expected UTF-8 path
1276        output = self.device.shell(
1277                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1278        self.assertEqual(remote_path, output)
1279
1280        # pull.
1281        self.device.pull(remote_path, tf.name)
1282        self.assertTrue(os.path.exists(tf.name))
1283        os.remove(tf.name)
1284        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1285
1286
1287class DeviceOfflineTest(DeviceTest):
1288    def _get_device_state(self, serialno):
1289        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1290        for line in output.split('\n'):
1291            m = re.match('(\S+)\s+(\S+)', line)
1292            if m and m.group(1) == serialno:
1293                return m.group(2)
1294        return None
1295
1296    def disabled_test_killed_when_pushing_a_large_file(self):
1297        """
1298           While running adb push with a large file, kill adb server.
1299           Occasionally the device becomes offline. Because the device is still
1300           reading data without realizing that the adb server has been restarted.
1301           Test if we can bring the device online automatically now.
1302           http://b/32952319
1303        """
1304        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1305        # 1. Push a large file
1306        file_path = 'tmp_large_file'
1307        try:
1308            fh = open(file_path, 'w')
1309            fh.write('\0' * (100 * 1024 * 1024))
1310            fh.close()
1311            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1312            time.sleep(0.1)
1313            # 2. Kill the adb server
1314            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1315            subproc.terminate()
1316        finally:
1317            try:
1318                os.unlink(file_path)
1319            except:
1320                pass
1321        # 3. See if the device still exist.
1322        # Sleep to wait for the adb server exit.
1323        time.sleep(0.5)
1324        # 4. The device should be online
1325        self.assertEqual(self._get_device_state(serialno), 'device')
1326
1327    def disabled_test_killed_when_pulling_a_large_file(self):
1328        """
1329           While running adb pull with a large file, kill adb server.
1330           Occasionally the device can't be connected. Because the device is trying to
1331           send a message larger than what is expected by the adb server.
1332           Test if we can bring the device online automatically now.
1333        """
1334        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1335        file_path = 'tmp_large_file'
1336        try:
1337            # 1. Create a large file on device.
1338            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1339                               'bs=1000000', 'count=100'])
1340            # 2. Pull the large file on host.
1341            subproc = subprocess.Popen(self.device.adb_cmd +
1342                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1343            time.sleep(0.1)
1344            # 3. Kill the adb server
1345            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1346            subproc.terminate()
1347        finally:
1348            try:
1349                os.unlink(file_path)
1350            except:
1351                pass
1352        # 4. See if the device still exist.
1353        # Sleep to wait for the adb server exit.
1354        time.sleep(0.5)
1355        self.assertEqual(self._get_device_state(serialno), 'device')
1356
1357
1358    def test_packet_size_regression(self):
1359        """Test for http://b/37783561
1360
1361        Receiving packets of a length divisible by 512 but not 1024 resulted in
1362        the adb client waiting indefinitely for more input.
1363        """
1364        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1365        # Probe some surrounding values as well, for the hell of it.
1366        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1367            for offset in [-6, -5, -4]:
1368                length = base + offset
1369                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1370                       'echo', 'foo']
1371                rc, stdout, _ = self.device.shell_nocheck(cmd)
1372
1373                self.assertEqual(0, rc)
1374
1375                # Output should be '\0' * length, followed by "foo\n"
1376                self.assertEqual(length, len(stdout) - 4)
1377                self.assertEqual(stdout, "\0" * length + "foo\n")
1378
1379    def test_zero_packet(self):
1380        """Test for http://b/113070258
1381
1382        Make sure that we don't blow up when sending USB transfers that line up
1383        exactly with the USB packet size.
1384        """
1385
1386        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1387        try:
1388            for size in [512, 1024]:
1389                def listener():
1390                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1391                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1392
1393                thread = threading.Thread(target=listener)
1394                thread.start()
1395
1396                # Wait a bit to let the shell command start.
1397                time.sleep(0.25)
1398
1399                sock = socket.create_connection(("localhost", local_port))
1400                with contextlib.closing(sock):
1401                    bytesWritten = sock.send(b"a" * size)
1402                    self.assertEqual(size, bytesWritten)
1403                    readBytes = sock.recv(4096)
1404                    self.assertEqual(b"foo\n", readBytes)
1405
1406                thread.join()
1407        finally:
1408            self.device.forward_remove("tcp:{}".format(local_port))
1409
1410
1411class SocketTest(DeviceTest):
1412    def test_socket_flush(self):
1413        """Test that we handle socket closure properly.
1414
1415        If we're done writing to a socket, closing before the other end has
1416        closed will send a TCP_RST if we have incoming data queued up, which
1417        may result in data that we've written being discarded.
1418
1419        Bug: http://b/74616284
1420        """
1421        s = socket.create_connection(("localhost", 5037))
1422
1423        def adb_length_prefixed(string):
1424            encoded = string.encode("utf8")
1425            result = b"%04x%s" % (len(encoded), encoded)
1426            return result
1427
1428        if "ANDROID_SERIAL" in os.environ:
1429            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1430        else:
1431            transport_string = "host:transport-any"
1432
1433        s.sendall(adb_length_prefixed(transport_string))
1434        response = s.recv(4)
1435        self.assertEqual(b"OKAY", response)
1436
1437        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1438        s.sendall(adb_length_prefixed(shell_string))
1439
1440        response = s.recv(4)
1441        self.assertEqual(b"OKAY", response)
1442
1443        # Spawn a thread that dumps garbage into the socket until failure.
1444        def spam():
1445            buf = b"\0" * 16384
1446            try:
1447                while True:
1448                    s.sendall(buf)
1449            except Exception as ex:
1450                print(ex)
1451
1452        thread = threading.Thread(target=spam)
1453        thread.start()
1454
1455        time.sleep(1)
1456
1457        received = b""
1458        while True:
1459            read = s.recv(512)
1460            if len(read) == 0:
1461                break
1462            received += read
1463
1464        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1465        thread.join()
1466
1467
1468if sys.platform == "win32":
1469    # From https://stackoverflow.com/a/38749458
1470    import os
1471    import contextlib
1472    import msvcrt
1473    import ctypes
1474    from ctypes import wintypes
1475
1476    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1477
1478    GENERIC_READ  = 0x80000000
1479    GENERIC_WRITE = 0x40000000
1480    FILE_SHARE_READ  = 1
1481    FILE_SHARE_WRITE = 2
1482    CONSOLE_TEXTMODE_BUFFER = 1
1483    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1484    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1485    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1486
1487    def _check_zero(result, func, args):
1488        if not result:
1489            raise ctypes.WinError(ctypes.get_last_error())
1490        return args
1491
1492    def _check_invalid(result, func, args):
1493        if result == INVALID_HANDLE_VALUE:
1494            raise ctypes.WinError(ctypes.get_last_error())
1495        return args
1496
1497    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1498        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1499        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1500
1501    class COORD(ctypes.Structure):
1502        _fields_ = (('X', wintypes.SHORT),
1503                    ('Y', wintypes.SHORT))
1504
1505    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1506        _fields_ = (('cbSize',               wintypes.ULONG),
1507                    ('dwSize',               COORD),
1508                    ('dwCursorPosition',     COORD),
1509                    ('wAttributes',          wintypes.WORD),
1510                    ('srWindow',             wintypes.SMALL_RECT),
1511                    ('dwMaximumWindowSize',  COORD),
1512                    ('wPopupAttributes',     wintypes.WORD),
1513                    ('bFullscreenSupported', wintypes.BOOL),
1514                    ('ColorTable',           wintypes.DWORD * 16))
1515        def __init__(self, *args, **kwds):
1516            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1517                    *args, **kwds)
1518            self.cbSize = ctypes.sizeof(self)
1519
1520    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1521                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1522    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1523
1524    kernel32.GetStdHandle.errcheck = _check_invalid
1525    kernel32.GetStdHandle.restype = wintypes.HANDLE
1526    kernel32.GetStdHandle.argtypes = (
1527        wintypes.DWORD,) # _In_ nStdHandle
1528
1529    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1530    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1531    kernel32.CreateConsoleScreenBuffer.argtypes = (
1532        wintypes.DWORD,        # _In_       dwDesiredAccess
1533        wintypes.DWORD,        # _In_       dwShareMode
1534        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1535        wintypes.DWORD,        # _In_       dwFlags
1536        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1537
1538    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1539    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1540        wintypes.HANDLE,               # _In_  hConsoleOutput
1541        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1542
1543    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1544    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1545        wintypes.HANDLE,               # _In_  hConsoleOutput
1546        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1547
1548    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1549    kernel32.SetConsoleWindowInfo.argtypes = (
1550        wintypes.HANDLE,      # _In_ hConsoleOutput
1551        wintypes.BOOL,        # _In_ bAbsolute
1552        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1553
1554    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1555    kernel32.FillConsoleOutputCharacterW.argtypes = (
1556        wintypes.HANDLE,  # _In_  hConsoleOutput
1557        wintypes.WCHAR,   # _In_  cCharacter
1558        wintypes.DWORD,   # _In_  nLength
1559        COORD,            # _In_  dwWriteCoord
1560        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1561
1562    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1563    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1564        wintypes.HANDLE,  # _In_  hConsoleOutput
1565        wintypes.LPWSTR,  # _Out_ lpCharacter
1566        wintypes.DWORD,   # _In_  nLength
1567        COORD,            # _In_  dwReadCoord
1568        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1569
1570    @contextlib.contextmanager
1571    def allocate_console():
1572        allocated = kernel32.AllocConsole()
1573        try:
1574            yield allocated
1575        finally:
1576            if allocated:
1577                kernel32.FreeConsole()
1578
1579    @contextlib.contextmanager
1580    def console_screen(ncols=None, nrows=None):
1581        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1582        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1583        nwritten = (wintypes.DWORD * 1)()
1584        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1585        kernel32.GetConsoleScreenBufferInfoEx(
1586               hStdOut, ctypes.byref(info))
1587        if ncols is None:
1588            ncols = info.dwSize.X
1589        if nrows is None:
1590            nrows = info.dwSize.Y
1591        elif nrows > 9999:
1592            raise ValueError('nrows must be 9999 or less')
1593        fd_screen = None
1594        hScreen = kernel32.CreateConsoleScreenBuffer(
1595                    GENERIC_READ | GENERIC_WRITE,
1596                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1597                    None, CONSOLE_TEXTMODE_BUFFER, None)
1598        try:
1599            fd_screen = msvcrt.open_osfhandle(
1600                            hScreen, os.O_RDWR | os.O_BINARY)
1601            kernel32.GetConsoleScreenBufferInfoEx(
1602                   hScreen, ctypes.byref(new_info))
1603            new_info.dwSize = COORD(ncols, nrows)
1604            new_info.srWindow = wintypes.SMALL_RECT(
1605                    Left=0, Top=0, Right=(ncols - 1),
1606                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1607            kernel32.SetConsoleScreenBufferInfoEx(
1608                    hScreen, ctypes.byref(new_info))
1609            kernel32.SetConsoleWindowInfo(hScreen, True,
1610                    ctypes.byref(new_info.srWindow))
1611            kernel32.FillConsoleOutputCharacterW(
1612                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1613            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1614            try:
1615                yield fd_screen
1616            finally:
1617                kernel32.SetConsoleScreenBufferInfoEx(
1618                    hStdOut, ctypes.byref(info))
1619                kernel32.SetConsoleWindowInfo(hStdOut, True,
1620                        ctypes.byref(info.srWindow))
1621                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1622        finally:
1623            if fd_screen is not None:
1624                os.close(fd_screen)
1625            else:
1626                kernel32.CloseHandle(hScreen)
1627
1628    def read_screen(fd):
1629        hScreen = msvcrt.get_osfhandle(fd)
1630        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1631        kernel32.GetConsoleScreenBufferInfoEx(
1632            hScreen, ctypes.byref(csbi))
1633        ncols = csbi.dwSize.X
1634        pos = csbi.dwCursorPosition
1635        length = ncols * pos.Y + pos.X + 1
1636        buf = (ctypes.c_wchar * length)()
1637        n = (wintypes.DWORD * 1)()
1638        kernel32.ReadConsoleOutputCharacterW(
1639            hScreen, buf, length, COORD(0,0), n)
1640        lines = [buf[i:i+ncols].rstrip(u'\0')
1641                    for i in range(0, n[0], ncols)]
1642        return u'\n'.join(lines)
1643
1644@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1645class WindowsConsoleTest(DeviceTest):
1646    def test_unicode_output(self):
1647        """Test Unicode command line parameters and Unicode console window output.
1648
1649        Bug: https://issuetracker.google.com/issues/111972753
1650        """
1651        # If we don't have a console window, allocate one. This isn't necessary if we're already
1652        # being run from a console window, which is typical.
1653        with allocate_console() as allocated_console:
1654            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1655            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1656            # likely unnecessary given the typical console window size.
1657            with console_screen(nrows=1000) as screen:
1658                unicode_string = u'로보카 폴리'
1659                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1660                # device.shell_popen() which does not use a pipe, unlike device.shell().
1661                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1662                process.wait()
1663                # Read what was written by adb to the temporary console buffer.
1664                console_output = read_screen(screen)
1665                self.assertEqual(unicode_string, console_output)
1666
1667
1668def main():
1669    random.seed(0)
1670    if len(adb.get_devices()) > 0:
1671        suite = unittest.TestLoader().loadTestsFromName(__name__)
1672        unittest.TextTestRunner(verbosity=3).run(suite)
1673    else:
1674        print('Test suite must be run with attached devices')
1675
1676
1677if __name__ == '__main__':
1678    main()
1679