• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    def setUp(self):
81        self.device = adb.get_device()
82
83
84class ForwardReverseTest(DeviceTest):
85    def _test_no_rebind(self, description, direction_list, direction,
86                       direction_no_rebind, direction_remove_all):
87        msg = direction_list()
88        self.assertEqual('', msg.strip(),
89                         description + ' list must be empty to run this test.')
90
91        # Use --no-rebind with no existing binding
92        direction_no_rebind('tcp:5566', 'tcp:6655')
93        msg = direction_list()
94        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96        # Use --no-rebind with existing binding
97        with self.assertRaises(subprocess.CalledProcessError):
98            direction_no_rebind('tcp:5566', 'tcp:6677')
99        msg = direction_list()
100        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
101        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103        # Use the absence of --no-rebind with existing binding
104        direction('tcp:5566', 'tcp:6677')
105        msg = direction_list()
106        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
107        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
108
109        direction_remove_all()
110        msg = direction_list()
111        self.assertEqual('', msg.strip())
112
113    def test_forward_no_rebind(self):
114        self._test_no_rebind('forward', self.device.forward_list,
115                            self.device.forward, self.device.forward_no_rebind,
116                            self.device.forward_remove_all)
117
118    def test_reverse_no_rebind(self):
119        self._test_no_rebind('reverse', self.device.reverse_list,
120                            self.device.reverse, self.device.reverse_no_rebind,
121                            self.device.reverse_remove_all)
122
123    def test_forward(self):
124        msg = self.device.forward_list()
125        self.assertEqual('', msg.strip(),
126                         'Forwarding list must be empty to run this test.')
127        self.device.forward('tcp:5566', 'tcp:6655')
128        msg = self.device.forward_list()
129        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
130        self.device.forward('tcp:7788', 'tcp:8877')
131        msg = self.device.forward_list()
132        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
133        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
134        self.device.forward_remove('tcp:5566')
135        msg = self.device.forward_list()
136        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
137        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
138        self.device.forward_remove_all()
139        msg = self.device.forward_list()
140        self.assertEqual('', msg.strip())
141
142    def test_forward_tcp_port_0(self):
143        self.assertEqual('', self.device.forward_list().strip(),
144                         'Forwarding list must be empty to run this test.')
145
146        try:
147            # If resolving TCP port 0 is supported, `adb forward` will print
148            # the actual port number.
149            port = self.device.forward('tcp:0', 'tcp:8888').strip()
150            if not port:
151                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
152
153            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
154                                      self.device.forward_list()))
155        finally:
156            self.device.forward_remove_all()
157
158    def test_reverse(self):
159        msg = self.device.reverse_list()
160        self.assertEqual('', msg.strip(),
161                         'Reverse forwarding list must be empty to run this test.')
162        self.device.reverse('tcp:5566', 'tcp:6655')
163        msg = self.device.reverse_list()
164        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
165        self.device.reverse('tcp:7788', 'tcp:8877')
166        msg = self.device.reverse_list()
167        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
168        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
169        self.device.reverse_remove('tcp:5566')
170        msg = self.device.reverse_list()
171        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
172        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
173        self.device.reverse_remove_all()
174        msg = self.device.reverse_list()
175        self.assertEqual('', msg.strip())
176
177    def test_reverse_tcp_port_0(self):
178        self.assertEqual('', self.device.reverse_list().strip(),
179                         'Reverse list must be empty to run this test.')
180
181        try:
182            # If resolving TCP port 0 is supported, `adb reverse` will print
183            # the actual port number.
184            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
185            if not port:
186                raise unittest.SkipTest('Reversing tcp:0 is not available.')
187
188            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
189                                      self.device.reverse_list()))
190        finally:
191            self.device.reverse_remove_all()
192
193    def test_forward_reverse_echo(self):
194        """Send data through adb forward and read it back via adb reverse"""
195        forward_port = 12345
196        reverse_port = forward_port + 1
197        forward_spec = 'tcp:' + str(forward_port)
198        reverse_spec = 'tcp:' + str(reverse_port)
199        forward_setup = False
200        reverse_setup = False
201
202        try:
203            # listen on localhost:forward_port, connect to remote:forward_port
204            self.device.forward(forward_spec, forward_spec)
205            forward_setup = True
206            # listen on remote:forward_port, connect to localhost:reverse_port
207            self.device.reverse(forward_spec, reverse_spec)
208            reverse_setup = True
209
210            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
211            with contextlib.closing(listener):
212                # Use SO_REUSEADDR so that subsequent runs of the test can grab
213                # the port even if it is in TIME_WAIT.
214                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
215
216                # Listen on localhost:reverse_port before connecting to
217                # localhost:forward_port because that will cause adb to connect
218                # back to localhost:reverse_port.
219                listener.bind(('127.0.0.1', reverse_port))
220                listener.listen(4)
221
222                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
223                with contextlib.closing(client):
224                    # Connect to the listener.
225                    client.connect(('127.0.0.1', forward_port))
226
227                    # Accept the client connection.
228                    accepted_connection, addr = listener.accept()
229                    with contextlib.closing(accepted_connection) as server:
230                        data = 'hello'
231
232                        # Send data into the port setup by adb forward.
233                        client.sendall(data)
234                        # Explicitly close() so that server gets EOF.
235                        client.close()
236
237                        # Verify that the data came back via adb reverse.
238                        self.assertEqual(data, server.makefile().read())
239        finally:
240            if reverse_setup:
241                self.device.reverse_remove(forward_spec)
242            if forward_setup:
243                self.device.forward_remove(forward_spec)
244
245
246class ShellTest(DeviceTest):
247    def _interactive_shell(self, shell_args, input):
248        """Runs an interactive adb shell.
249
250        Args:
251          shell_args: List of string arguments to `adb shell`.
252          input: String input to send to the interactive shell.
253
254        Returns:
255          The remote exit code.
256
257        Raises:
258          unittest.SkipTest: The device doesn't support exit codes.
259        """
260        if not self.device.has_shell_protocol():
261            raise unittest.SkipTest('exit codes are unavailable on this device')
262
263        proc = subprocess.Popen(
264                self.device.adb_cmd + ['shell'] + shell_args,
265                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
266                stderr=subprocess.PIPE)
267        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
268        # to explicitly add an exit command to close the session from the device
269        # side, plus the necessary newline to complete the interactive command.
270        proc.communicate(input + '; exit\n')
271        return proc.returncode
272
273    def test_cat(self):
274        """Check that we can at least cat a file."""
275        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
276        elements = out.split()
277        self.assertEqual(len(elements), 2)
278
279        uptime, idle = elements
280        self.assertGreater(float(uptime), 0.0)
281        self.assertGreater(float(idle), 0.0)
282
283    def test_throws_on_failure(self):
284        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
285
286    def test_output_not_stripped(self):
287        out = self.device.shell(['echo', 'foo'])[0]
288        self.assertEqual(out, 'foo' + self.device.linesep)
289
290    def test_shell_command_length(self):
291        # Devices that have shell_v2 should be able to handle long commands.
292        if self.device.has_shell_protocol():
293            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
294            self.assertEqual(rc, 0)
295            self.assertTrue(out == ('x' * 16384 + '\n'))
296
297    def test_shell_nocheck_failure(self):
298        rc, out, _ = self.device.shell_nocheck(['false'])
299        self.assertNotEqual(rc, 0)
300        self.assertEqual(out, '')
301
302    def test_shell_nocheck_output_not_stripped(self):
303        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
304        self.assertEqual(rc, 0)
305        self.assertEqual(out, 'foo' + self.device.linesep)
306
307    def test_can_distinguish_tricky_results(self):
308        # If result checking on ADB shell is naively implemented as
309        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
310        # output from the result for a cmd of `echo -n 1`.
311        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
312        self.assertEqual(rc, 0)
313        self.assertEqual(out, '1')
314
315    def test_line_endings(self):
316        """Ensure that line ending translation is not happening in the pty.
317
318        Bug: http://b/19735063
319        """
320        output = self.device.shell(['uname'])[0]
321        self.assertEqual(output, 'Linux' + self.device.linesep)
322
323    def test_pty_logic(self):
324        """Tests that a PTY is allocated when it should be.
325
326        PTY allocation behavior should match ssh.
327        """
328        def check_pty(args):
329            """Checks adb shell PTY allocation.
330
331            Tests |args| for terminal and non-terminal stdin.
332
333            Args:
334                args: -Tt args in a list (e.g. ['-t', '-t']).
335
336            Returns:
337                A tuple (<terminal>, <non-terminal>). True indicates
338                the corresponding shell allocated a remote PTY.
339            """
340            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
341
342            terminal = subprocess.Popen(
343                    test_cmd, stdin=None,
344                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
345            terminal.communicate()
346
347            non_terminal = subprocess.Popen(
348                    test_cmd, stdin=subprocess.PIPE,
349                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
350            non_terminal.communicate()
351
352            return (terminal.returncode == 0, non_terminal.returncode == 0)
353
354        # -T: never allocate PTY.
355        self.assertEqual((False, False), check_pty(['-T']))
356
357        # These tests require a new device.
358        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
359            # No args: PTY only if stdin is a terminal and shell is interactive,
360            # which is difficult to reliably test from a script.
361            self.assertEqual((False, False), check_pty([]))
362
363            # -t: PTY if stdin is a terminal.
364            self.assertEqual((True, False), check_pty(['-t']))
365
366        # -t -t: always allocate PTY.
367        self.assertEqual((True, True), check_pty(['-t', '-t']))
368
369        # -tt: always allocate PTY, POSIX style (http://b/32216152).
370        self.assertEqual((True, True), check_pty(['-tt']))
371
372        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
373        # we follow the man page instead.
374        self.assertEqual((True, True), check_pty(['-ttt']))
375
376        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
377        self.assertEqual((True, True), check_pty(['-ttx']))
378
379        # -Ttt: -tt cancels out -T.
380        self.assertEqual((True, True), check_pty(['-Ttt']))
381
382        # -ttT: -T cancels out -tt.
383        self.assertEqual((False, False), check_pty(['-ttT']))
384
385    def test_shell_protocol(self):
386        """Tests the shell protocol on the device.
387
388        If the device supports shell protocol, this gives us the ability
389        to separate stdout/stderr and return the exit code directly.
390
391        Bug: http://b/19734861
392        """
393        if not self.device.has_shell_protocol():
394            raise unittest.SkipTest('shell protocol unsupported on this device')
395
396        # Shell protocol should be used by default.
397        result = self.device.shell_nocheck(
398                shlex.split('echo foo; echo bar >&2; exit 17'))
399        self.assertEqual(17, result[0])
400        self.assertEqual('foo' + self.device.linesep, result[1])
401        self.assertEqual('bar' + self.device.linesep, result[2])
402
403        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
404
405        # -x flag should disable shell protocol.
406        result = self.device.shell_nocheck(
407                shlex.split('-x echo foo; echo bar >&2; exit 17'))
408        self.assertEqual(0, result[0])
409        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
410        self.assertEqual('', result[2])
411
412        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
413
414    def test_non_interactive_sigint(self):
415        """Tests that SIGINT in a non-interactive shell kills the process.
416
417        This requires the shell protocol in order to detect the broken
418        pipe; raw data transfer mode will only see the break once the
419        subprocess tries to read or write.
420
421        Bug: http://b/23825725
422        """
423        if not self.device.has_shell_protocol():
424            raise unittest.SkipTest('shell protocol unsupported on this device')
425
426        # Start a long-running process.
427        sleep_proc = subprocess.Popen(
428                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
429                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
430                stderr=subprocess.STDOUT)
431        remote_pid = sleep_proc.stdout.readline().strip()
432        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
433        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
434
435        # Verify that the process is running, send signal, verify it stopped.
436        self.device.shell(proc_query)
437        os.kill(sleep_proc.pid, signal.SIGINT)
438        sleep_proc.communicate()
439
440        # It can take some time for the process to receive the signal and die.
441        end_time = time.time() + 3
442        while self.device.shell_nocheck(proc_query)[0] != 1:
443            self.assertFalse(time.time() > end_time,
444                             'subprocess failed to terminate in time')
445
446    def test_non_interactive_stdin(self):
447        """Tests that non-interactive shells send stdin."""
448        if not self.device.has_shell_protocol():
449            raise unittest.SkipTest('non-interactive stdin unsupported '
450                                    'on this device')
451
452        # Test both small and large inputs.
453        small_input = 'foo'
454        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
455                                                  string.digits))
456
457        for input in (small_input, large_input):
458            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
459                                    stdin=subprocess.PIPE,
460                                    stdout=subprocess.PIPE,
461                                    stderr=subprocess.PIPE)
462            stdout, stderr = proc.communicate(input)
463            self.assertEqual(input.splitlines(), stdout.splitlines())
464            self.assertEqual('', stderr)
465
466    def test_sighup(self):
467        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
468        log_path = "/data/local/tmp/adb_signal_test.log"
469
470        # Clear the output file.
471        self.device.shell_nocheck(["echo", ">", log_path])
472
473        script = """
474            trap "echo SIGINT > {path}; exit 0" SIGINT
475            trap "echo SIGHUP > {path}; exit 0" SIGHUP
476            echo Waiting
477            read
478        """.format(path=log_path)
479
480        script = ";".join([x.strip() for x in script.strip().splitlines()])
481
482        process = self.device.shell_popen([script], kill_atexit=False,
483                                          stdin=subprocess.PIPE,
484                                          stdout=subprocess.PIPE)
485
486        self.assertEqual("Waiting\n", process.stdout.readline())
487        process.send_signal(signal.SIGINT)
488        process.wait()
489
490        # Waiting for the local adb to finish is insufficient, since it hangs
491        # up immediately.
492        time.sleep(1)
493
494        stdout, _ = self.device.shell(["cat", log_path])
495        self.assertEqual(stdout.strip(), "SIGHUP")
496
497    def test_exit_stress(self):
498        """Hammer `adb shell exit 42` with multiple threads."""
499        thread_count = 48
500        result = dict()
501        def hammer(thread_idx, thread_count, result):
502            success = True
503            for i in range(thread_idx, 240, thread_count):
504                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
505                if ret != i % 256:
506                    success = False
507                    break
508            result[thread_idx] = success
509
510        threads = []
511        for i in range(thread_count):
512            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
513            thread.start()
514            threads.append(thread)
515        for thread in threads:
516            thread.join()
517        for i, success in result.iteritems():
518            self.assertTrue(success)
519
520
521class ArgumentEscapingTest(DeviceTest):
522    def test_shell_escaping(self):
523        """Make sure that argument escaping is somewhat sane."""
524
525        # http://b/19734868
526        # Note that this actually matches ssh(1)'s behavior --- it's
527        # converted to `sh -c echo hello; echo world` which sh interprets
528        # as `sh -c echo` (with an argument to that shell of "hello"),
529        # and then `echo world` back in the first shell.
530        result = self.device.shell(
531            shlex.split("sh -c 'echo hello; echo world'"))[0]
532        result = result.splitlines()
533        self.assertEqual(['', 'world'], result)
534        # If you really wanted "hello" and "world", here's what you'd do:
535        result = self.device.shell(
536            shlex.split(r'echo hello\;echo world'))[0].splitlines()
537        self.assertEqual(['hello', 'world'], result)
538
539        # http://b/15479704
540        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
541        self.assertEqual('t', result)
542        result = self.device.shell(
543            shlex.split("sh -c 'true && echo t'"))[0].strip()
544        self.assertEqual('t', result)
545
546        # http://b/20564385
547        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
548        self.assertEqual('t', result)
549        result = self.device.shell(
550            shlex.split(r'echo -n 123\;uname'))[0].strip()
551        self.assertEqual('123Linux', result)
552
553    def test_install_argument_escaping(self):
554        """Make sure that install argument escaping works."""
555        # http://b/20323053, http://b/3090932.
556        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
557            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
558                                             delete=False)
559            tf.close()
560
561            # Installing bogus .apks fails if the device supports exit codes.
562            try:
563                output = self.device.install(tf.name)
564            except subprocess.CalledProcessError as e:
565                output = e.output
566
567            self.assertIn(file_suffix, output)
568            os.remove(tf.name)
569
570
571class RootUnrootTest(DeviceTest):
572    def _test_root(self):
573        message = self.device.root()
574        if 'adbd cannot run as root in production builds' in message:
575            return
576        self.device.wait()
577        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
578
579    def _test_unroot(self):
580        self.device.unroot()
581        self.device.wait()
582        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
583
584    def test_root_unroot(self):
585        """Make sure that adb root and adb unroot work, using id(1)."""
586        if self.device.get_prop('ro.debuggable') != '1':
587            raise unittest.SkipTest('requires rootable build')
588
589        original_user = self.device.shell(['id', '-un'])[0].strip()
590        try:
591            if original_user == 'root':
592                self._test_unroot()
593                self._test_root()
594            elif original_user == 'shell':
595                self._test_root()
596                self._test_unroot()
597        finally:
598            if original_user == 'root':
599                self.device.root()
600            else:
601                self.device.unroot()
602            self.device.wait()
603
604
605class TcpIpTest(DeviceTest):
606    def test_tcpip_failure_raises(self):
607        """adb tcpip requires a port.
608
609        Bug: http://b/22636927
610        """
611        self.assertRaises(
612            subprocess.CalledProcessError, self.device.tcpip, '')
613        self.assertRaises(
614            subprocess.CalledProcessError, self.device.tcpip, 'foo')
615
616
617class SystemPropertiesTest(DeviceTest):
618    def test_get_prop(self):
619        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
620
621    @requires_root
622    def test_set_prop(self):
623        prop_name = 'foo.bar'
624        self.device.shell(['setprop', prop_name, '""'])
625
626        self.device.set_prop(prop_name, 'qux')
627        self.assertEqual(
628            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
629
630
631def compute_md5(string):
632    hsh = hashlib.md5()
633    hsh.update(string)
634    return hsh.hexdigest()
635
636
637def get_md5_prog(device):
638    """Older platforms (pre-L) had the name md5 rather than md5sum."""
639    try:
640        device.shell(['md5sum', '/proc/uptime'])
641        return 'md5sum'
642    except adb.ShellError:
643        return 'md5'
644
645
646class HostFile(object):
647    def __init__(self, handle, checksum):
648        self.handle = handle
649        self.checksum = checksum
650        self.full_path = handle.name
651        self.base_name = os.path.basename(self.full_path)
652
653
654class DeviceFile(object):
655    def __init__(self, checksum, full_path):
656        self.checksum = checksum
657        self.full_path = full_path
658        self.base_name = posixpath.basename(self.full_path)
659
660
661def make_random_host_files(in_dir, num_files):
662    min_size = 1 * (1 << 10)
663    max_size = 16 * (1 << 10)
664
665    files = []
666    for _ in xrange(num_files):
667        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
668
669        size = random.randrange(min_size, max_size, 1024)
670        rand_str = os.urandom(size)
671        file_handle.write(rand_str)
672        file_handle.flush()
673        file_handle.close()
674
675        md5 = compute_md5(rand_str)
676        files.append(HostFile(file_handle, md5))
677    return files
678
679
680def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
681    min_size = 1 * (1 << 10)
682    max_size = 16 * (1 << 10)
683
684    files = []
685    for file_num in xrange(num_files):
686        size = random.randrange(min_size, max_size, 1024)
687
688        base_name = prefix + str(file_num)
689        full_path = posixpath.join(in_dir, base_name)
690
691        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
692                      'bs={}'.format(size), 'count=1'])
693        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
694
695        files.append(DeviceFile(dev_md5, full_path))
696    return files
697
698
699class FileOperationsTest(DeviceTest):
700    SCRATCH_DIR = '/data/local/tmp'
701    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
702    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
703
704    def _verify_remote(self, checksum, remote_path):
705        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
706                                        remote_path])[0].split()
707        self.assertEqual(checksum, dev_md5)
708
709    def _verify_local(self, checksum, local_path):
710        with open(local_path, 'rb') as host_file:
711            host_md5 = compute_md5(host_file.read())
712            self.assertEqual(host_md5, checksum)
713
714    def test_push(self):
715        """Push a randomly generated file to specified device."""
716        kbytes = 512
717        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
718        rand_str = os.urandom(1024 * kbytes)
719        tmp.write(rand_str)
720        tmp.close()
721
722        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
723        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
724
725        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
726        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
727
728        os.remove(tmp.name)
729
730    def test_push_dir(self):
731        """Push a randomly generated directory of files to the device."""
732        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
733        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
734
735        try:
736            host_dir = tempfile.mkdtemp()
737
738            # Make sure the temp directory isn't setuid, or else adb will complain.
739            os.chmod(host_dir, 0o700)
740
741            # Create 32 random files.
742            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
743            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
744
745            for temp_file in temp_files:
746                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
747                                             os.path.basename(host_dir),
748                                             temp_file.base_name)
749                self._verify_remote(temp_file.checksum, remote_path)
750            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
751        finally:
752            if host_dir is not None:
753                shutil.rmtree(host_dir)
754
755    def disabled_test_push_empty(self):
756        """Push an empty directory to the device."""
757        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
758        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
759
760        try:
761            host_dir = tempfile.mkdtemp()
762
763            # Make sure the temp directory isn't setuid, or else adb will complain.
764            os.chmod(host_dir, 0o700)
765
766            # Create an empty directory.
767            empty_dir_path = os.path.join(host_dir, 'empty')
768            os.mkdir(empty_dir_path);
769
770            self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
771
772            remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
773            test_empty_cmd = ["[", "-d", remote_path, "]"]
774            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
775
776            self.assertEqual(rc, 0)
777            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
778        finally:
779            if host_dir is not None:
780                shutil.rmtree(host_dir)
781
782    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
783    def test_push_symlink(self):
784        """Push a symlink.
785
786        Bug: http://b/31491920
787        """
788        try:
789            host_dir = tempfile.mkdtemp()
790
791            # Make sure the temp directory isn't setuid, or else adb will
792            # complain.
793            os.chmod(host_dir, 0o700)
794
795            with open(os.path.join(host_dir, 'foo'), 'w') as f:
796                f.write('foo')
797
798            symlink_path = os.path.join(host_dir, 'symlink')
799            os.symlink('foo', symlink_path)
800
801            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
802            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
803            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
804            rc, out, _ = self.device.shell_nocheck(
805                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
806            self.assertEqual(0, rc)
807            self.assertEqual(out.strip(), 'foo')
808        finally:
809            if host_dir is not None:
810                shutil.rmtree(host_dir)
811
812    def test_multiple_push(self):
813        """Push multiple files to the device in one adb push command.
814
815        Bug: http://b/25324823
816        """
817
818        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
819        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
820
821        try:
822            host_dir = tempfile.mkdtemp()
823
824            # Create some random files and a subdirectory containing more files.
825            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
826
827            subdir = os.path.join(host_dir, 'subdir')
828            os.mkdir(subdir)
829            subdir_temp_files = make_random_host_files(in_dir=subdir,
830                                                       num_files=4)
831
832            paths = map(lambda temp_file: temp_file.full_path, temp_files)
833            paths.append(subdir)
834            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
835
836            for temp_file in temp_files:
837                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
838                                             temp_file.base_name)
839                self._verify_remote(temp_file.checksum, remote_path)
840
841            for subdir_temp_file in subdir_temp_files:
842                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
843                                             # BROKEN: http://b/25394682
844                                             # 'subdir';
845                                             temp_file.base_name)
846                self._verify_remote(temp_file.checksum, remote_path)
847
848
849            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
850        finally:
851            if host_dir is not None:
852                shutil.rmtree(host_dir)
853
854    @requires_non_root
855    def test_push_error_reporting(self):
856        """Make sure that errors that occur while pushing a file get reported
857
858        Bug: http://b/26816782
859        """
860        with tempfile.NamedTemporaryFile() as tmp_file:
861            tmp_file.write('\0' * 1024 * 1024)
862            tmp_file.flush()
863            try:
864                self.device.push(local=tmp_file.name, remote='/system/')
865                self.fail('push should not have succeeded')
866            except subprocess.CalledProcessError as e:
867                output = e.output
868
869            self.assertTrue('Permission denied' in output or
870                            'Read-only file system' in output)
871
872    @requires_non_root
873    def test_push_directory_creation(self):
874        """Regression test for directory creation.
875
876        Bug: http://b/110953234
877        """
878        with tempfile.NamedTemporaryFile() as tmp_file:
879            tmp_file.write('\0' * 1024 * 1024)
880            tmp_file.flush()
881            remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
882            self.device.shell(['rm', '-rf', remote_path])
883
884            remote_path += '/filename'
885            self.device.push(local=tmp_file.name, remote=remote_path)
886
887    def _test_pull(self, remote_file, checksum):
888        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
889        tmp_write.close()
890        self.device.pull(remote=remote_file, local=tmp_write.name)
891        with open(tmp_write.name, 'rb') as tmp_read:
892            host_contents = tmp_read.read()
893            host_md5 = compute_md5(host_contents)
894        self.assertEqual(checksum, host_md5)
895        os.remove(tmp_write.name)
896
897    @requires_non_root
898    def test_pull_error_reporting(self):
899        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
900        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
901
902        try:
903            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
904        except subprocess.CalledProcessError as e:
905            output = e.output
906
907        self.assertIn('Permission denied', output)
908
909        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
910
911    def test_pull(self):
912        """Pull a randomly generated file from specified device."""
913        kbytes = 512
914        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
915        cmd = ['dd', 'if=/dev/urandom',
916               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
917               'count={}'.format(kbytes)]
918        self.device.shell(cmd)
919        dev_md5, _ = self.device.shell(
920            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
921        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
922        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
923
924    def test_pull_dir(self):
925        """Pull a randomly generated directory of files from the device."""
926        try:
927            host_dir = tempfile.mkdtemp()
928
929            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
930            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
931
932            # Populate device directory with random files.
933            temp_files = make_random_device_files(
934                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
935
936            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
937
938            for temp_file in temp_files:
939                host_path = os.path.join(
940                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
941                    temp_file.base_name)
942                self._verify_local(temp_file.checksum, host_path)
943
944            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
945        finally:
946            if host_dir is not None:
947                shutil.rmtree(host_dir)
948
949    def test_pull_dir_symlink(self):
950        """Pull a directory into a symlink to a directory.
951
952        Bug: http://b/27362811
953        """
954        if os.name != 'posix':
955            raise unittest.SkipTest('requires POSIX')
956
957        try:
958            host_dir = tempfile.mkdtemp()
959            real_dir = os.path.join(host_dir, 'dir')
960            symlink = os.path.join(host_dir, 'symlink')
961            os.mkdir(real_dir)
962            os.symlink(real_dir, symlink)
963
964            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
965            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
966
967            # Populate device directory with random files.
968            temp_files = make_random_device_files(
969                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
970
971            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
972
973            for temp_file in temp_files:
974                host_path = os.path.join(
975                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
976                    temp_file.base_name)
977                self._verify_local(temp_file.checksum, host_path)
978
979            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
980        finally:
981            if host_dir is not None:
982                shutil.rmtree(host_dir)
983
984    def test_pull_dir_symlink_collision(self):
985        """Pull a directory into a colliding symlink to directory."""
986        if os.name != 'posix':
987            raise unittest.SkipTest('requires POSIX')
988
989        try:
990            host_dir = tempfile.mkdtemp()
991            real_dir = os.path.join(host_dir, 'real')
992            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
993            symlink = os.path.join(host_dir, tmp_dirname)
994            os.mkdir(real_dir)
995            os.symlink(real_dir, symlink)
996
997            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
998            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
999
1000            # Populate device directory with random files.
1001            temp_files = make_random_device_files(
1002                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1003
1004            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1005
1006            for temp_file in temp_files:
1007                host_path = os.path.join(real_dir, temp_file.base_name)
1008                self._verify_local(temp_file.checksum, host_path)
1009
1010            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1011        finally:
1012            if host_dir is not None:
1013                shutil.rmtree(host_dir)
1014
1015    def test_pull_dir_nonexistent(self):
1016        """Pull a directory of files from the device to a nonexistent path."""
1017        try:
1018            host_dir = tempfile.mkdtemp()
1019            dest_dir = os.path.join(host_dir, 'dest')
1020
1021            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1022            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1023
1024            # Populate device directory with random files.
1025            temp_files = make_random_device_files(
1026                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1027
1028            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1029
1030            for temp_file in temp_files:
1031                host_path = os.path.join(dest_dir, temp_file.base_name)
1032                self._verify_local(temp_file.checksum, host_path)
1033
1034            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1035        finally:
1036            if host_dir is not None:
1037                shutil.rmtree(host_dir)
1038
1039    # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1040    def disabled_test_pull_symlink_dir(self):
1041        """Pull a symlink to a directory of symlinks to files."""
1042        try:
1043            host_dir = tempfile.mkdtemp()
1044
1045            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1046            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1047            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1048
1049            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1050            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1051            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1052
1053            # Populate device directory with random files.
1054            temp_files = make_random_device_files(
1055                self.device, in_dir=remote_dir, num_files=32)
1056
1057            for temp_file in temp_files:
1058                self.device.shell(
1059                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1060                     posixpath.join(remote_links, temp_file.base_name)])
1061
1062            self.device.pull(remote=remote_symlink, local=host_dir)
1063
1064            for temp_file in temp_files:
1065                host_path = os.path.join(
1066                    host_dir, 'symlink', temp_file.base_name)
1067                self._verify_local(temp_file.checksum, host_path)
1068
1069            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1070        finally:
1071            if host_dir is not None:
1072                shutil.rmtree(host_dir)
1073
1074    def test_pull_empty(self):
1075        """Pull a directory containing an empty directory from the device."""
1076        try:
1077            host_dir = tempfile.mkdtemp()
1078
1079            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1080            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1081            self.device.shell(['mkdir', '-p', remote_empty_path])
1082
1083            self.device.pull(remote=remote_empty_path, local=host_dir)
1084            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1085        finally:
1086            if host_dir is not None:
1087                shutil.rmtree(host_dir)
1088
1089    def test_multiple_pull(self):
1090        """Pull a randomly generated directory of files from the device."""
1091
1092        try:
1093            host_dir = tempfile.mkdtemp()
1094
1095            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1096            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1097            self.device.shell(['mkdir', '-p', subdir])
1098
1099            # Create some random files and a subdirectory containing more files.
1100            temp_files = make_random_device_files(
1101                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1102
1103            subdir_temp_files = make_random_device_files(
1104                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1105
1106            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1107            paths.append(subdir)
1108            self.device._simple_call(['pull'] + paths + [host_dir])
1109
1110            for temp_file in temp_files:
1111                local_path = os.path.join(host_dir, temp_file.base_name)
1112                self._verify_local(temp_file.checksum, local_path)
1113
1114            for subdir_temp_file in subdir_temp_files:
1115                local_path = os.path.join(host_dir,
1116                                          'subdir',
1117                                          subdir_temp_file.base_name)
1118                self._verify_local(subdir_temp_file.checksum, local_path)
1119
1120            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1121        finally:
1122            if host_dir is not None:
1123                shutil.rmtree(host_dir)
1124
1125    def verify_sync(self, device, temp_files, device_dir):
1126        """Verifies that a list of temp files was synced to the device."""
1127        # Confirm that every file on the device mirrors that on the host.
1128        for temp_file in temp_files:
1129            device_full_path = posixpath.join(
1130                device_dir, temp_file.base_name)
1131            dev_md5, _ = device.shell(
1132                [get_md5_prog(self.device), device_full_path])[0].split()
1133            self.assertEqual(temp_file.checksum, dev_md5)
1134
1135    def test_sync(self):
1136        """Sync a host directory to the data partition."""
1137
1138        try:
1139            base_dir = tempfile.mkdtemp()
1140
1141            # Create mirror device directory hierarchy within base_dir.
1142            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1143            os.makedirs(full_dir_path)
1144
1145            # Create 32 random files within the host mirror.
1146            temp_files = make_random_host_files(
1147                in_dir=full_dir_path, num_files=32)
1148
1149            # Clean up any stale files on the device.
1150            device = adb.get_device()  # pylint: disable=no-member
1151            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1152
1153            old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1154            os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1155            device.sync('data')
1156            if old_product_out is None:
1157                del os.environ['ANDROID_PRODUCT_OUT']
1158            else:
1159                os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1160
1161            self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1162
1163            #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1164        finally:
1165            if base_dir is not None:
1166                shutil.rmtree(base_dir)
1167
1168    def test_push_sync(self):
1169        """Sync a host directory to a specific path."""
1170
1171        try:
1172            temp_dir = tempfile.mkdtemp()
1173            temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1174
1175            device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1176
1177            # Clean up any stale files on the device.
1178            device = adb.get_device()  # pylint: disable=no-member
1179            device.shell(['rm', '-rf', device_dir])
1180
1181            device.push(temp_dir, device_dir, sync=True)
1182
1183            self.verify_sync(device, temp_files, device_dir)
1184
1185            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1186        finally:
1187            if temp_dir is not None:
1188                shutil.rmtree(temp_dir)
1189
1190    def test_unicode_paths(self):
1191        """Ensure that we can support non-ASCII paths, even on Windows."""
1192        name = u'로보카 폴리'
1193
1194        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1195        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1196
1197        ## push.
1198        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1199        tf.close()
1200        self.device.push(tf.name, remote_path)
1201        os.remove(tf.name)
1202        self.assertFalse(os.path.exists(tf.name))
1203
1204        # Verify that the device ended up with the expected UTF-8 path
1205        output = self.device.shell(
1206                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1207        self.assertEqual(remote_path, output)
1208
1209        # pull.
1210        self.device.pull(remote_path, tf.name)
1211        self.assertTrue(os.path.exists(tf.name))
1212        os.remove(tf.name)
1213        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1214
1215
1216class DeviceOfflineTest(DeviceTest):
1217    def _get_device_state(self, serialno):
1218        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1219        for line in output.split('\n'):
1220            m = re.match('(\S+)\s+(\S+)', line)
1221            if m and m.group(1) == serialno:
1222                return m.group(2)
1223        return None
1224
1225    def disabled_test_killed_when_pushing_a_large_file(self):
1226        """
1227           While running adb push with a large file, kill adb server.
1228           Occasionally the device becomes offline. Because the device is still
1229           reading data without realizing that the adb server has been restarted.
1230           Test if we can bring the device online automatically now.
1231           http://b/32952319
1232        """
1233        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1234        # 1. Push a large file
1235        file_path = 'tmp_large_file'
1236        try:
1237            fh = open(file_path, 'w')
1238            fh.write('\0' * (100 * 1024 * 1024))
1239            fh.close()
1240            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1241            time.sleep(0.1)
1242            # 2. Kill the adb server
1243            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1244            subproc.terminate()
1245        finally:
1246            try:
1247                os.unlink(file_path)
1248            except:
1249                pass
1250        # 3. See if the device still exist.
1251        # Sleep to wait for the adb server exit.
1252        time.sleep(0.5)
1253        # 4. The device should be online
1254        self.assertEqual(self._get_device_state(serialno), 'device')
1255
1256    def disabled_test_killed_when_pulling_a_large_file(self):
1257        """
1258           While running adb pull with a large file, kill adb server.
1259           Occasionally the device can't be connected. Because the device is trying to
1260           send a message larger than what is expected by the adb server.
1261           Test if we can bring the device online automatically now.
1262        """
1263        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1264        file_path = 'tmp_large_file'
1265        try:
1266            # 1. Create a large file on device.
1267            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1268                               'bs=1000000', 'count=100'])
1269            # 2. Pull the large file on host.
1270            subproc = subprocess.Popen(self.device.adb_cmd +
1271                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1272            time.sleep(0.1)
1273            # 3. Kill the adb server
1274            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1275            subproc.terminate()
1276        finally:
1277            try:
1278                os.unlink(file_path)
1279            except:
1280                pass
1281        # 4. See if the device still exist.
1282        # Sleep to wait for the adb server exit.
1283        time.sleep(0.5)
1284        self.assertEqual(self._get_device_state(serialno), 'device')
1285
1286
1287    def test_packet_size_regression(self):
1288        """Test for http://b/37783561
1289
1290        Receiving packets of a length divisible by 512 but not 1024 resulted in
1291        the adb client waiting indefinitely for more input.
1292        """
1293        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1294        # Probe some surrounding values as well, for the hell of it.
1295        for base in [512] + range(1024, 1024 * 16, 1024):
1296            for offset in [-6, -5, -4]:
1297                length = base + offset
1298                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1299                       'echo', 'foo']
1300                rc, stdout, _ = self.device.shell_nocheck(cmd)
1301
1302                self.assertEqual(0, rc)
1303
1304                # Output should be '\0' * length, followed by "foo\n"
1305                self.assertEqual(length, len(stdout) - 4)
1306                self.assertEqual(stdout, "\0" * length + "foo\n")
1307
1308    def test_zero_packet(self):
1309        """Test for http://b/113070258
1310
1311        Make sure that we don't blow up when sending USB transfers that line up
1312        exactly with the USB packet size.
1313        """
1314
1315        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1316        try:
1317            for size in [512, 1024]:
1318                def listener():
1319                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1320                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1321
1322                thread = threading.Thread(target=listener)
1323                thread.start()
1324
1325                # Wait a bit to let the shell command start.
1326                time.sleep(0.25)
1327
1328                sock = socket.create_connection(("localhost", local_port))
1329                with contextlib.closing(sock):
1330                    bytesWritten = sock.send("a" * size)
1331                    self.assertEqual(size, bytesWritten)
1332                    readBytes = sock.recv(4096)
1333                    self.assertEqual("foo\n", readBytes)
1334
1335                thread.join()
1336        finally:
1337            self.device.forward_remove("tcp:{}".format(local_port))
1338
1339
1340class SocketTest(DeviceTest):
1341    def test_socket_flush(self):
1342        """Test that we handle socket closure properly.
1343
1344        If we're done writing to a socket, closing before the other end has
1345        closed will send a TCP_RST if we have incoming data queued up, which
1346        may result in data that we've written being discarded.
1347
1348        Bug: http://b/74616284
1349        """
1350        s = socket.create_connection(("localhost", 5037))
1351
1352        def adb_length_prefixed(string):
1353            encoded = string.encode("utf8")
1354            result = b"%04x%s" % (len(encoded), encoded)
1355            return result
1356
1357        if "ANDROID_SERIAL" in os.environ:
1358            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1359        else:
1360            transport_string = "host:transport-any"
1361
1362        s.sendall(adb_length_prefixed(transport_string))
1363        response = s.recv(4)
1364        self.assertEquals(b"OKAY", response)
1365
1366        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1367        s.sendall(adb_length_prefixed(shell_string))
1368
1369        response = s.recv(4)
1370        self.assertEquals(b"OKAY", response)
1371
1372        # Spawn a thread that dumps garbage into the socket until failure.
1373        def spam():
1374            buf = b"\0" * 16384
1375            try:
1376                while True:
1377                    s.sendall(buf)
1378            except Exception as ex:
1379                print(ex)
1380
1381        thread = threading.Thread(target=spam)
1382        thread.start()
1383
1384        time.sleep(1)
1385
1386        received = b""
1387        while True:
1388            read = s.recv(512)
1389            if len(read) == 0:
1390                break
1391            received += read
1392
1393        self.assertEquals(1024 * 1024 + len("foo\n"), len(received))
1394        thread.join()
1395
1396
1397if sys.platform == "win32":
1398    # From https://stackoverflow.com/a/38749458
1399    import os
1400    import contextlib
1401    import msvcrt
1402    import ctypes
1403    from ctypes import wintypes
1404
1405    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1406
1407    GENERIC_READ  = 0x80000000
1408    GENERIC_WRITE = 0x40000000
1409    FILE_SHARE_READ  = 1
1410    FILE_SHARE_WRITE = 2
1411    CONSOLE_TEXTMODE_BUFFER = 1
1412    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1413    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1414    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1415
1416    def _check_zero(result, func, args):
1417        if not result:
1418            raise ctypes.WinError(ctypes.get_last_error())
1419        return args
1420
1421    def _check_invalid(result, func, args):
1422        if result == INVALID_HANDLE_VALUE:
1423            raise ctypes.WinError(ctypes.get_last_error())
1424        return args
1425
1426    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1427        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1428        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1429
1430    class COORD(ctypes.Structure):
1431        _fields_ = (('X', wintypes.SHORT),
1432                    ('Y', wintypes.SHORT))
1433
1434    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1435        _fields_ = (('cbSize',               wintypes.ULONG),
1436                    ('dwSize',               COORD),
1437                    ('dwCursorPosition',     COORD),
1438                    ('wAttributes',          wintypes.WORD),
1439                    ('srWindow',             wintypes.SMALL_RECT),
1440                    ('dwMaximumWindowSize',  COORD),
1441                    ('wPopupAttributes',     wintypes.WORD),
1442                    ('bFullscreenSupported', wintypes.BOOL),
1443                    ('ColorTable',           wintypes.DWORD * 16))
1444        def __init__(self, *args, **kwds):
1445            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1446                    *args, **kwds)
1447            self.cbSize = ctypes.sizeof(self)
1448
1449    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1450                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1451    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1452
1453    kernel32.GetStdHandle.errcheck = _check_invalid
1454    kernel32.GetStdHandle.restype = wintypes.HANDLE
1455    kernel32.GetStdHandle.argtypes = (
1456        wintypes.DWORD,) # _In_ nStdHandle
1457
1458    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1459    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1460    kernel32.CreateConsoleScreenBuffer.argtypes = (
1461        wintypes.DWORD,        # _In_       dwDesiredAccess
1462        wintypes.DWORD,        # _In_       dwShareMode
1463        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1464        wintypes.DWORD,        # _In_       dwFlags
1465        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1466
1467    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1468    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1469        wintypes.HANDLE,               # _In_  hConsoleOutput
1470        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1471
1472    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1473    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1474        wintypes.HANDLE,               # _In_  hConsoleOutput
1475        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1476
1477    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1478    kernel32.SetConsoleWindowInfo.argtypes = (
1479        wintypes.HANDLE,      # _In_ hConsoleOutput
1480        wintypes.BOOL,        # _In_ bAbsolute
1481        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1482
1483    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1484    kernel32.FillConsoleOutputCharacterW.argtypes = (
1485        wintypes.HANDLE,  # _In_  hConsoleOutput
1486        wintypes.WCHAR,   # _In_  cCharacter
1487        wintypes.DWORD,   # _In_  nLength
1488        COORD,            # _In_  dwWriteCoord
1489        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1490
1491    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1492    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1493        wintypes.HANDLE,  # _In_  hConsoleOutput
1494        wintypes.LPWSTR,  # _Out_ lpCharacter
1495        wintypes.DWORD,   # _In_  nLength
1496        COORD,            # _In_  dwReadCoord
1497        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1498
1499    @contextlib.contextmanager
1500    def allocate_console():
1501        allocated = kernel32.AllocConsole()
1502        try:
1503            yield allocated
1504        finally:
1505            if allocated:
1506                kernel32.FreeConsole()
1507
1508    @contextlib.contextmanager
1509    def console_screen(ncols=None, nrows=None):
1510        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1511        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1512        nwritten = (wintypes.DWORD * 1)()
1513        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1514        kernel32.GetConsoleScreenBufferInfoEx(
1515               hStdOut, ctypes.byref(info))
1516        if ncols is None:
1517            ncols = info.dwSize.X
1518        if nrows is None:
1519            nrows = info.dwSize.Y
1520        elif nrows > 9999:
1521            raise ValueError('nrows must be 9999 or less')
1522        fd_screen = None
1523        hScreen = kernel32.CreateConsoleScreenBuffer(
1524                    GENERIC_READ | GENERIC_WRITE,
1525                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1526                    None, CONSOLE_TEXTMODE_BUFFER, None)
1527        try:
1528            fd_screen = msvcrt.open_osfhandle(
1529                            hScreen, os.O_RDWR | os.O_BINARY)
1530            kernel32.GetConsoleScreenBufferInfoEx(
1531                   hScreen, ctypes.byref(new_info))
1532            new_info.dwSize = COORD(ncols, nrows)
1533            new_info.srWindow = wintypes.SMALL_RECT(
1534                    Left=0, Top=0, Right=(ncols - 1),
1535                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1536            kernel32.SetConsoleScreenBufferInfoEx(
1537                    hScreen, ctypes.byref(new_info))
1538            kernel32.SetConsoleWindowInfo(hScreen, True,
1539                    ctypes.byref(new_info.srWindow))
1540            kernel32.FillConsoleOutputCharacterW(
1541                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1542            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1543            try:
1544                yield fd_screen
1545            finally:
1546                kernel32.SetConsoleScreenBufferInfoEx(
1547                    hStdOut, ctypes.byref(info))
1548                kernel32.SetConsoleWindowInfo(hStdOut, True,
1549                        ctypes.byref(info.srWindow))
1550                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1551        finally:
1552            if fd_screen is not None:
1553                os.close(fd_screen)
1554            else:
1555                kernel32.CloseHandle(hScreen)
1556
1557    def read_screen(fd):
1558        hScreen = msvcrt.get_osfhandle(fd)
1559        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1560        kernel32.GetConsoleScreenBufferInfoEx(
1561            hScreen, ctypes.byref(csbi))
1562        ncols = csbi.dwSize.X
1563        pos = csbi.dwCursorPosition
1564        length = ncols * pos.Y + pos.X + 1
1565        buf = (ctypes.c_wchar * length)()
1566        n = (wintypes.DWORD * 1)()
1567        kernel32.ReadConsoleOutputCharacterW(
1568            hScreen, buf, length, COORD(0,0), n)
1569        lines = [buf[i:i+ncols].rstrip(u'\0')
1570                    for i in range(0, n[0], ncols)]
1571        return u'\n'.join(lines)
1572
1573@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1574class WindowsConsoleTest(DeviceTest):
1575    def test_unicode_output(self):
1576        """Test Unicode command line parameters and Unicode console window output.
1577
1578        Bug: https://issuetracker.google.com/issues/111972753
1579        """
1580        # If we don't have a console window, allocate one. This isn't necessary if we're already
1581        # being run from a console window, which is typical.
1582        with allocate_console() as allocated_console:
1583            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1584            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1585            # likely unnecessary given the typical console window size.
1586            with console_screen(nrows=1000) as screen:
1587                unicode_string = u'로보카 폴리'
1588                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1589                # device.shell_popen() which does not use a pipe, unlike device.shell().
1590                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1591                process.wait()
1592                # Read what was written by adb to the temporary console buffer.
1593                console_output = read_screen(screen)
1594                self.assertEqual(unicode_string, console_output)
1595
1596
1597def main():
1598    random.seed(0)
1599    if len(adb.get_devices()) > 0:
1600        suite = unittest.TestLoader().loadTestsFromName(__name__)
1601        unittest.TextTestRunner(verbosity=3).run(suite)
1602    else:
1603        print('Test suite must be run with attached devices')
1604
1605
1606if __name__ == '__main__':
1607    main()
1608