• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_non_root(func):
43    def wrapper(self, *args):
44        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
45        if was_root:
46            self.device.unroot()
47            self.device.wait()
48
49        try:
50            func(self, *args)
51        finally:
52            if was_root:
53                self.device.root()
54                self.device.wait()
55
56    return wrapper
57
58
59class DeviceTest(unittest.TestCase):
60    device = adb.get_device()
61
62
63class AbbTest(DeviceTest):
64    def test_smoke(self):
65        abb = subprocess.run(['adb', 'abb'], capture_output=True)
66        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
67
68        # abb squashes all failures to 1.
69        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
70        self.assertEqual(abb.stdout, cmd.stdout)
71        self.assertEqual(abb.stderr, cmd.stderr)
72
73class ForwardReverseTest(DeviceTest):
74    def _test_no_rebind(self, description, direction_list, direction,
75                       direction_no_rebind, direction_remove_all):
76        msg = direction_list()
77        self.assertEqual('', msg.strip(),
78                         description + ' list must be empty to run this test.')
79
80        # Use --no-rebind with no existing binding
81        direction_no_rebind('tcp:5566', 'tcp:6655')
82        msg = direction_list()
83        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
84
85        # Use --no-rebind with existing binding
86        with self.assertRaises(subprocess.CalledProcessError):
87            direction_no_rebind('tcp:5566', 'tcp:6677')
88        msg = direction_list()
89        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
90        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
91
92        # Use the absence of --no-rebind with existing binding
93        direction('tcp:5566', 'tcp:6677')
94        msg = direction_list()
95        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
96        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
97
98        direction_remove_all()
99        msg = direction_list()
100        self.assertEqual('', msg.strip())
101
102    def test_forward_no_rebind(self):
103        self._test_no_rebind('forward', self.device.forward_list,
104                            self.device.forward, self.device.forward_no_rebind,
105                            self.device.forward_remove_all)
106
107    def test_reverse_no_rebind(self):
108        self._test_no_rebind('reverse', self.device.reverse_list,
109                            self.device.reverse, self.device.reverse_no_rebind,
110                            self.device.reverse_remove_all)
111
112    def test_forward(self):
113        msg = self.device.forward_list()
114        self.assertEqual('', msg.strip(),
115                         'Forwarding list must be empty to run this test.')
116        self.device.forward('tcp:5566', 'tcp:6655')
117        msg = self.device.forward_list()
118        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
119        self.device.forward('tcp:7788', 'tcp:8877')
120        msg = self.device.forward_list()
121        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
122        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
123        self.device.forward_remove('tcp:5566')
124        msg = self.device.forward_list()
125        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
126        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
127        self.device.forward_remove_all()
128        msg = self.device.forward_list()
129        self.assertEqual('', msg.strip())
130
131    def test_forward_old_protocol(self):
132        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
133
134        msg = self.device.forward_list()
135        self.assertEqual('', msg.strip(),
136                         'Forwarding list must be empty to run this test.')
137
138        s = socket.create_connection(("localhost", 5037))
139        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
140        cmd = b"%04x%s" % (len(service), service)
141        s.sendall(cmd)
142
143        msg = self.device.forward_list()
144        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
145
146        self.device.forward_remove_all()
147        msg = self.device.forward_list()
148        self.assertEqual('', msg.strip())
149
150    def test_forward_tcp_port_0(self):
151        self.assertEqual('', self.device.forward_list().strip(),
152                         'Forwarding list must be empty to run this test.')
153
154        try:
155            # If resolving TCP port 0 is supported, `adb forward` will print
156            # the actual port number.
157            port = self.device.forward('tcp:0', 'tcp:8888').strip()
158            if not port:
159                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
160
161            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
162                                      self.device.forward_list()))
163        finally:
164            self.device.forward_remove_all()
165
166    def test_reverse(self):
167        msg = self.device.reverse_list()
168        self.assertEqual('', msg.strip(),
169                         'Reverse forwarding list must be empty to run this test.')
170        self.device.reverse('tcp:5566', 'tcp:6655')
171        msg = self.device.reverse_list()
172        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
173        self.device.reverse('tcp:7788', 'tcp:8877')
174        msg = self.device.reverse_list()
175        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
176        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
177        self.device.reverse_remove('tcp:5566')
178        msg = self.device.reverse_list()
179        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
180        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
181        self.device.reverse_remove_all()
182        msg = self.device.reverse_list()
183        self.assertEqual('', msg.strip())
184
185    def test_reverse_tcp_port_0(self):
186        self.assertEqual('', self.device.reverse_list().strip(),
187                         'Reverse list must be empty to run this test.')
188
189        try:
190            # If resolving TCP port 0 is supported, `adb reverse` will print
191            # the actual port number.
192            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
193            if not port:
194                raise unittest.SkipTest('Reversing tcp:0 is not available.')
195
196            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
197                                      self.device.reverse_list()))
198        finally:
199            self.device.reverse_remove_all()
200
201    def test_forward_reverse_echo(self):
202        """Send data through adb forward and read it back via adb reverse"""
203        forward_port = 12345
204        reverse_port = forward_port + 1
205        forward_spec = 'tcp:' + str(forward_port)
206        reverse_spec = 'tcp:' + str(reverse_port)
207        forward_setup = False
208        reverse_setup = False
209
210        try:
211            # listen on localhost:forward_port, connect to remote:forward_port
212            self.device.forward(forward_spec, forward_spec)
213            forward_setup = True
214            # listen on remote:forward_port, connect to localhost:reverse_port
215            self.device.reverse(forward_spec, reverse_spec)
216            reverse_setup = True
217
218            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
219            with contextlib.closing(listener):
220                # Use SO_REUSEADDR so that subsequent runs of the test can grab
221                # the port even if it is in TIME_WAIT.
222                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223
224                # Listen on localhost:reverse_port before connecting to
225                # localhost:forward_port because that will cause adb to connect
226                # back to localhost:reverse_port.
227                listener.bind(('127.0.0.1', reverse_port))
228                listener.listen(4)
229
230                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
231                with contextlib.closing(client):
232                    # Connect to the listener.
233                    client.connect(('127.0.0.1', forward_port))
234
235                    # Accept the client connection.
236                    accepted_connection, addr = listener.accept()
237                    with contextlib.closing(accepted_connection) as server:
238                        data = b'hello'
239
240                        # Send data into the port setup by adb forward.
241                        client.sendall(data)
242                        # Explicitly close() so that server gets EOF.
243                        client.close()
244
245                        # Verify that the data came back via adb reverse.
246                        self.assertEqual(data, server.makefile().read().encode("utf8"))
247        finally:
248            if reverse_setup:
249                self.device.reverse_remove(forward_spec)
250            if forward_setup:
251                self.device.forward_remove(forward_spec)
252
253
254class ShellTest(DeviceTest):
255    def _interactive_shell(self, shell_args, input):
256        """Runs an interactive adb shell.
257
258        Args:
259          shell_args: List of string arguments to `adb shell`.
260          input: bytes input to send to the interactive shell.
261
262        Returns:
263          The remote exit code.
264
265        Raises:
266          unittest.SkipTest: The device doesn't support exit codes.
267        """
268        if not self.device.has_shell_protocol():
269            raise unittest.SkipTest('exit codes are unavailable on this device')
270
271        proc = subprocess.Popen(
272                self.device.adb_cmd + ['shell'] + shell_args,
273                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
274                stderr=subprocess.PIPE)
275        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
276        # to explicitly add an exit command to close the session from the device
277        # side, plus the necessary newline to complete the interactive command.
278        proc.communicate(input + b'; exit\n')
279        return proc.returncode
280
281    def test_cat(self):
282        """Check that we can at least cat a file."""
283        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
284        elements = out.split()
285        self.assertEqual(len(elements), 2)
286
287        uptime, idle = elements
288        self.assertGreater(float(uptime), 0.0)
289        self.assertGreater(float(idle), 0.0)
290
291    def test_throws_on_failure(self):
292        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
293
294    def test_output_not_stripped(self):
295        out = self.device.shell(['echo', 'foo'])[0]
296        self.assertEqual(out, 'foo' + self.device.linesep)
297
298    def test_shell_command_length(self):
299        # Devices that have shell_v2 should be able to handle long commands.
300        if self.device.has_shell_protocol():
301            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
302            self.assertEqual(rc, 0)
303            self.assertTrue(out == ('x' * 16384 + '\n'))
304
305    def test_shell_nocheck_failure(self):
306        rc, out, _ = self.device.shell_nocheck(['false'])
307        self.assertNotEqual(rc, 0)
308        self.assertEqual(out, '')
309
310    def test_shell_nocheck_output_not_stripped(self):
311        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
312        self.assertEqual(rc, 0)
313        self.assertEqual(out, 'foo' + self.device.linesep)
314
315    def test_can_distinguish_tricky_results(self):
316        # If result checking on ADB shell is naively implemented as
317        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
318        # output from the result for a cmd of `echo -n 1`.
319        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
320        self.assertEqual(rc, 0)
321        self.assertEqual(out, '1')
322
323    def test_line_endings(self):
324        """Ensure that line ending translation is not happening in the pty.
325
326        Bug: http://b/19735063
327        """
328        output = self.device.shell(['uname'])[0]
329        self.assertEqual(output, 'Linux' + self.device.linesep)
330
331    def test_pty_logic(self):
332        """Tests that a PTY is allocated when it should be.
333
334        PTY allocation behavior should match ssh.
335        """
336        def check_pty(args):
337            """Checks adb shell PTY allocation.
338
339            Tests |args| for terminal and non-terminal stdin.
340
341            Args:
342                args: -Tt args in a list (e.g. ['-t', '-t']).
343
344            Returns:
345                A tuple (<terminal>, <non-terminal>). True indicates
346                the corresponding shell allocated a remote PTY.
347            """
348            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
349
350            terminal = subprocess.Popen(
351                    test_cmd, stdin=None,
352                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
353            terminal.communicate()
354
355            non_terminal = subprocess.Popen(
356                    test_cmd, stdin=subprocess.PIPE,
357                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
358            non_terminal.communicate()
359
360            return (terminal.returncode == 0, non_terminal.returncode == 0)
361
362        # -T: never allocate PTY.
363        self.assertEqual((False, False), check_pty(['-T']))
364
365        # These tests require a new device.
366        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
367            # No args: PTY only if stdin is a terminal and shell is interactive,
368            # which is difficult to reliably test from a script.
369            self.assertEqual((False, False), check_pty([]))
370
371            # -t: PTY if stdin is a terminal.
372            self.assertEqual((True, False), check_pty(['-t']))
373
374        # -t -t: always allocate PTY.
375        self.assertEqual((True, True), check_pty(['-t', '-t']))
376
377        # -tt: always allocate PTY, POSIX style (http://b/32216152).
378        self.assertEqual((True, True), check_pty(['-tt']))
379
380        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
381        # we follow the man page instead.
382        self.assertEqual((True, True), check_pty(['-ttt']))
383
384        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
385        self.assertEqual((True, True), check_pty(['-ttx']))
386
387        # -Ttt: -tt cancels out -T.
388        self.assertEqual((True, True), check_pty(['-Ttt']))
389
390        # -ttT: -T cancels out -tt.
391        self.assertEqual((False, False), check_pty(['-ttT']))
392
393    def test_shell_protocol(self):
394        """Tests the shell protocol on the device.
395
396        If the device supports shell protocol, this gives us the ability
397        to separate stdout/stderr and return the exit code directly.
398
399        Bug: http://b/19734861
400        """
401        if not self.device.has_shell_protocol():
402            raise unittest.SkipTest('shell protocol unsupported on this device')
403
404        # Shell protocol should be used by default.
405        result = self.device.shell_nocheck(
406                shlex.split('echo foo; echo bar >&2; exit 17'))
407        self.assertEqual(17, result[0])
408        self.assertEqual('foo' + self.device.linesep, result[1])
409        self.assertEqual('bar' + self.device.linesep, result[2])
410
411        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
412
413        # -x flag should disable shell protocol.
414        result = self.device.shell_nocheck(
415                shlex.split('-x echo foo; echo bar >&2; exit 17'))
416        self.assertEqual(0, result[0])
417        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
418        self.assertEqual('', result[2])
419
420        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
421
422    def test_non_interactive_sigint(self):
423        """Tests that SIGINT in a non-interactive shell kills the process.
424
425        This requires the shell protocol in order to detect the broken
426        pipe; raw data transfer mode will only see the break once the
427        subprocess tries to read or write.
428
429        Bug: http://b/23825725
430        """
431        if not self.device.has_shell_protocol():
432            raise unittest.SkipTest('shell protocol unsupported on this device')
433
434        # Start a long-running process.
435        sleep_proc = subprocess.Popen(
436                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
437                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
438                stderr=subprocess.STDOUT)
439        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
440        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
441        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
442
443        # Verify that the process is running, send signal, verify it stopped.
444        self.device.shell(proc_query)
445        os.kill(sleep_proc.pid, signal.SIGINT)
446        sleep_proc.communicate()
447
448        # It can take some time for the process to receive the signal and die.
449        end_time = time.time() + 3
450        while self.device.shell_nocheck(proc_query)[0] != 1:
451            self.assertFalse(time.time() > end_time,
452                             'subprocess failed to terminate in time')
453
454    def test_non_interactive_stdin(self):
455        """Tests that non-interactive shells send stdin."""
456        if not self.device.has_shell_protocol():
457            raise unittest.SkipTest('non-interactive stdin unsupported '
458                                    'on this device')
459
460        # Test both small and large inputs.
461        small_input = b'foo'
462        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
463        large_input = b'\n'.join(characters)
464
465
466        for input in (small_input, large_input):
467            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
468                                    stdin=subprocess.PIPE,
469                                    stdout=subprocess.PIPE,
470                                    stderr=subprocess.PIPE)
471            stdout, stderr = proc.communicate(input)
472            self.assertEqual(input.splitlines(), stdout.splitlines())
473            self.assertEqual(b'', stderr)
474
475    def test_sighup(self):
476        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
477        log_path = "/data/local/tmp/adb_signal_test.log"
478
479        # Clear the output file.
480        self.device.shell_nocheck(["echo", ">", log_path])
481
482        script = """
483            trap "echo SIGINT > {path}; exit 0" SIGINT
484            trap "echo SIGHUP > {path}; exit 0" SIGHUP
485            echo Waiting
486            read
487        """.format(path=log_path)
488
489        script = ";".join([x.strip() for x in script.strip().splitlines()])
490
491        process = self.device.shell_popen([script], kill_atexit=False,
492                                          stdin=subprocess.PIPE,
493                                          stdout=subprocess.PIPE)
494
495        self.assertEqual(b"Waiting\n", process.stdout.readline())
496        process.send_signal(signal.SIGINT)
497        process.wait()
498
499        # Waiting for the local adb to finish is insufficient, since it hangs
500        # up immediately.
501        time.sleep(1)
502
503        stdout, _ = self.device.shell(["cat", log_path])
504        self.assertEqual(stdout.strip(), "SIGHUP")
505
506    # Temporarily disabled because it seems to cause later instability.
507    # http://b/228114748
508    def disabled_test_exit_stress(self):
509        """Hammer `adb shell exit 42` with multiple threads."""
510        thread_count = 48
511        result = dict()
512        def hammer(thread_idx, thread_count, result):
513            success = True
514            for i in range(thread_idx, 240, thread_count):
515                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
516                if ret != i % 256:
517                    success = False
518                    break
519            result[thread_idx] = success
520
521        threads = []
522        for i in range(thread_count):
523            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
524            thread.start()
525            threads.append(thread)
526        for thread in threads:
527            thread.join()
528        for i, success in result.items():
529            self.assertTrue(success)
530
531    def disabled_test_parallel(self):
532        """Spawn a bunch of `adb shell` instances in parallel.
533
534        This was broken historically due to the use of select, which only works
535        for fds that are numerically less than 1024.
536
537        Bug: http://b/141955761"""
538
539        n_procs = 2048
540        procs = dict()
541        for i in range(0, n_procs):
542            procs[i] = subprocess.Popen(
543                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
544                stdin=subprocess.PIPE,
545                stdout=subprocess.PIPE
546            )
547
548        for i in range(0, n_procs):
549            procs[i].stdin.write("%d\n" % i)
550
551        for i in range(0, n_procs):
552            response = procs[i].stdout.readline()
553            assert(response == "%d\n" % i)
554
555        for i in range(0, n_procs):
556            procs[i].stdin.write("%d\n" % (i % 256))
557
558        for i in range(0, n_procs):
559            assert(procs[i].wait() == i % 256)
560
561
562class ArgumentEscapingTest(DeviceTest):
563    def test_shell_escaping(self):
564        """Make sure that argument escaping is somewhat sane."""
565
566        # http://b/19734868
567        # Note that this actually matches ssh(1)'s behavior --- it's
568        # converted to `sh -c echo hello; echo world` which sh interprets
569        # as `sh -c echo` (with an argument to that shell of "hello"),
570        # and then `echo world` back in the first shell.
571        result = self.device.shell(
572            shlex.split("sh -c 'echo hello; echo world'"))[0]
573        result = result.splitlines()
574        self.assertEqual(['', 'world'], result)
575        # If you really wanted "hello" and "world", here's what you'd do:
576        result = self.device.shell(
577            shlex.split(r'echo hello\;echo world'))[0].splitlines()
578        self.assertEqual(['hello', 'world'], result)
579
580        # http://b/15479704
581        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
582        self.assertEqual('t', result)
583        result = self.device.shell(
584            shlex.split("sh -c 'true && echo t'"))[0].strip()
585        self.assertEqual('t', result)
586
587        # http://b/20564385
588        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
589        self.assertEqual('t', result)
590        result = self.device.shell(
591            shlex.split(r'echo -n 123\;uname'))[0].strip()
592        self.assertEqual('123Linux', result)
593
594    def test_install_argument_escaping(self):
595        """Make sure that install argument escaping works."""
596        # http://b/20323053, http://b/3090932.
597        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
598            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
599                                             delete=False)
600            tf.close()
601
602            # Installing bogus .apks fails if the device supports exit codes.
603            try:
604                output = self.device.install(tf.name.decode("utf8"))
605            except subprocess.CalledProcessError as e:
606                output = e.output
607
608            self.assertIn(file_suffix, output)
609            os.remove(tf.name)
610
611
612class RootUnrootTest(DeviceTest):
613    def _test_root(self):
614        message = self.device.root()
615        if 'adbd cannot run as root in production builds' in message:
616            return
617        self.device.wait()
618        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
619
620    def _test_unroot(self):
621        self.device.unroot()
622        self.device.wait()
623        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
624
625    def test_root_unroot(self):
626        """Make sure that adb root and adb unroot work, using id(1)."""
627        if self.device.get_prop('ro.debuggable') != '1':
628            raise unittest.SkipTest('requires rootable build')
629
630        original_user = self.device.shell(['id', '-un'])[0].strip()
631        try:
632            if original_user == 'root':
633                self._test_unroot()
634                self._test_root()
635            elif original_user == 'shell':
636                self._test_root()
637                self._test_unroot()
638        finally:
639            if original_user == 'root':
640                self.device.root()
641            else:
642                self.device.unroot()
643            self.device.wait()
644
645
646class TcpIpTest(DeviceTest):
647    def test_tcpip_failure_raises(self):
648        """adb tcpip requires a port.
649
650        Bug: http://b/22636927
651        """
652        self.assertRaises(
653            subprocess.CalledProcessError, self.device.tcpip, '')
654        self.assertRaises(
655            subprocess.CalledProcessError, self.device.tcpip, 'foo')
656
657
658class SystemPropertiesTest(DeviceTest):
659    def test_get_prop(self):
660        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
661
662    def test_set_prop(self):
663        # debug.* prop does not require root privileges
664        prop_name = 'debug.foo'
665        self.device.shell(['setprop', prop_name, '""'])
666
667        val = random.random()
668        self.device.set_prop(prop_name, str(val))
669        self.assertEqual(
670            self.device.shell(['getprop', prop_name])[0].strip(), str(val))
671
672
673def compute_md5(string):
674    hsh = hashlib.md5()
675    hsh.update(string)
676    return hsh.hexdigest()
677
678
679def get_md5_prog(device):
680    """Older platforms (pre-L) had the name md5 rather than md5sum."""
681    try:
682        device.shell(['md5sum', '/proc/uptime'])
683        return 'md5sum'
684    except adb.ShellError:
685        return 'md5'
686
687
688class HostFile(object):
689    def __init__(self, handle, checksum):
690        self.handle = handle
691        self.checksum = checksum
692        self.full_path = handle.name
693        self.base_name = os.path.basename(self.full_path)
694
695
696class DeviceFile(object):
697    def __init__(self, checksum, full_path):
698        self.checksum = checksum
699        self.full_path = full_path
700        self.base_name = posixpath.basename(self.full_path)
701
702
703def make_random_host_files(in_dir, num_files):
704    min_size = 1 * (1 << 10)
705    max_size = 16 * (1 << 10)
706
707    files = []
708    for _ in range(num_files):
709        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
710
711        size = random.randrange(min_size, max_size, 1024)
712        rand_str = os.urandom(size)
713        file_handle.write(rand_str)
714        file_handle.flush()
715        file_handle.close()
716
717        md5 = compute_md5(rand_str)
718        files.append(HostFile(file_handle, md5))
719    return files
720
721
722def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
723    min_size = 1 * (1 << 10)
724    max_size = 16 * (1 << 10)
725
726    files = []
727    for file_num in range(num_files):
728        size = random.randrange(min_size, max_size, 1024)
729
730        base_name = prefix + str(file_num)
731        full_path = posixpath.join(in_dir, base_name)
732
733        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
734                      'bs={}'.format(size), 'count=1'])
735        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
736
737        files.append(DeviceFile(dev_md5, full_path))
738    return files
739
740
741class FileOperationsTest:
742    class Base(DeviceTest):
743        SCRATCH_DIR = '/data/local/tmp'
744        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
745        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
746
747        def setUp(self):
748            self.previous_env = os.environ.get("ADB_COMPRESSION")
749            os.environ["ADB_COMPRESSION"] = self.compression
750
751        def tearDown(self):
752            if self.previous_env is None:
753                del os.environ["ADB_COMPRESSION"]
754            else:
755                os.environ["ADB_COMPRESSION"] = self.previous_env
756
757        def _verify_remote(self, checksum, remote_path):
758            dev_md5, _ = self.device.shell([get_md5_prog(self.device),
759                                            remote_path])[0].split()
760            self.assertEqual(checksum, dev_md5)
761
762        def _verify_local(self, checksum, local_path):
763            with open(local_path, 'rb') as host_file:
764                host_md5 = compute_md5(host_file.read())
765                self.assertEqual(host_md5, checksum)
766
767        def test_push(self):
768            """Push a randomly generated file to specified device."""
769            kbytes = 512
770            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
771            rand_str = os.urandom(1024 * kbytes)
772            tmp.write(rand_str)
773            tmp.close()
774
775            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
776            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
777
778            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
779            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
780
781            os.remove(tmp.name)
782
783        def test_push_dir(self):
784            """Push a randomly generated directory of files to the device."""
785            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
786            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
787
788            try:
789                host_dir = tempfile.mkdtemp()
790
791                # Make sure the temp directory isn't setuid, or else adb will complain.
792                os.chmod(host_dir, 0o700)
793
794                # Create 32 random files.
795                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
796                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
797
798                for temp_file in temp_files:
799                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
800                                                 os.path.basename(host_dir),
801                                                 temp_file.base_name)
802                    self._verify_remote(temp_file.checksum, remote_path)
803                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
804            finally:
805                if host_dir is not None:
806                    shutil.rmtree(host_dir)
807
808        def disabled_test_push_empty(self):
809            """Push an empty directory to the device."""
810            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
811            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
812
813            try:
814                host_dir = tempfile.mkdtemp()
815
816                # Make sure the temp directory isn't setuid, or else adb will complain.
817                os.chmod(host_dir, 0o700)
818
819                # Create an empty directory.
820                empty_dir_path = os.path.join(host_dir, 'empty')
821                os.mkdir(empty_dir_path);
822
823                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
824
825                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
826                test_empty_cmd = ["[", "-d", remote_path, "]"]
827                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
828
829                self.assertEqual(rc, 0)
830                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
831            finally:
832                if host_dir is not None:
833                    shutil.rmtree(host_dir)
834
835        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
836        def test_push_symlink(self):
837            """Push a symlink.
838
839            Bug: http://b/31491920
840            """
841            try:
842                host_dir = tempfile.mkdtemp()
843
844                # Make sure the temp directory isn't setuid, or else adb will
845                # complain.
846                os.chmod(host_dir, 0o700)
847
848                with open(os.path.join(host_dir, 'foo'), 'w') as f:
849                    f.write('foo')
850
851                symlink_path = os.path.join(host_dir, 'symlink')
852                os.symlink('foo', symlink_path)
853
854                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
855                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
856                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
857                rc, out, _ = self.device.shell_nocheck(
858                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
859                self.assertEqual(0, rc)
860                self.assertEqual(out.strip(), 'foo')
861            finally:
862                if host_dir is not None:
863                    shutil.rmtree(host_dir)
864
865        def test_multiple_push(self):
866            """Push multiple files to the device in one adb push command.
867
868            Bug: http://b/25324823
869            """
870
871            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
872            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
873
874            try:
875                host_dir = tempfile.mkdtemp()
876
877                # Create some random files and a subdirectory containing more files.
878                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
879
880                subdir = os.path.join(host_dir, 'subdir')
881                os.mkdir(subdir)
882                subdir_temp_files = make_random_host_files(in_dir=subdir,
883                                                           num_files=4)
884
885                paths = [x.full_path for x in temp_files]
886                paths.append(subdir)
887                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
888
889                for temp_file in temp_files:
890                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
891                                                 temp_file.base_name)
892                    self._verify_remote(temp_file.checksum, remote_path)
893
894                for subdir_temp_file in subdir_temp_files:
895                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
896                                                 # BROKEN: http://b/25394682
897                                                 # 'subdir';
898                                                 temp_file.base_name)
899                    self._verify_remote(temp_file.checksum, remote_path)
900
901
902                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
903            finally:
904                if host_dir is not None:
905                    shutil.rmtree(host_dir)
906
907        @requires_non_root
908        def test_push_error_reporting(self):
909            """Make sure that errors that occur while pushing a file get reported
910
911            Bug: http://b/26816782
912            """
913            with tempfile.NamedTemporaryFile() as tmp_file:
914                tmp_file.write(b'\0' * 1024 * 1024)
915                tmp_file.flush()
916                try:
917                    self.device.push(local=tmp_file.name, remote='/system/')
918                    self.fail('push should not have succeeded')
919                except subprocess.CalledProcessError as e:
920                    output = e.output
921
922                self.assertTrue(b'Permission denied' in output or
923                                b'Read-only file system' in output)
924
925        @requires_non_root
926        def test_push_directory_creation(self):
927            """Regression test for directory creation.
928
929            Bug: http://b/110953234
930            """
931            with tempfile.NamedTemporaryFile() as tmp_file:
932                tmp_file.write(b'\0' * 1024 * 1024)
933                tmp_file.flush()
934                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
935                self.device.shell(['rm', '-rf', remote_path])
936
937                remote_path += '/filename'
938                self.device.push(local=tmp_file.name, remote=remote_path)
939
940        def disabled_test_push_multiple_slash_root(self):
941            """Regression test for pushing to //data/local/tmp.
942
943            Bug: http://b/141311284
944
945            Disabled because this broken on the adbd side as well: b/141943968
946            """
947            with tempfile.NamedTemporaryFile() as tmp_file:
948                tmp_file.write(b'\0' * 1024 * 1024)
949                tmp_file.flush()
950                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
951                self.device.shell(['rm', '-rf', remote_path])
952                self.device.push(local=tmp_file.name, remote=remote_path)
953
954        def _test_pull(self, remote_file, checksum):
955            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
956            tmp_write.close()
957            self.device.pull(remote=remote_file, local=tmp_write.name)
958            with open(tmp_write.name, 'rb') as tmp_read:
959                host_contents = tmp_read.read()
960                host_md5 = compute_md5(host_contents)
961            self.assertEqual(checksum, host_md5)
962            os.remove(tmp_write.name)
963
964        @requires_non_root
965        def test_pull_error_reporting(self):
966            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
967            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
968
969            try:
970                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
971            except subprocess.CalledProcessError as e:
972                output = e.output
973
974            self.assertIn(b'Permission denied', output)
975
976            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
977
978        def test_pull(self):
979            """Pull a randomly generated file from specified device."""
980            kbytes = 512
981            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
982            cmd = ['dd', 'if=/dev/urandom',
983                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
984                   'count={}'.format(kbytes)]
985            self.device.shell(cmd)
986            dev_md5, _ = self.device.shell(
987                [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
988            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
989            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
990
991        def test_pull_dir(self):
992            """Pull a randomly generated directory of files from the device."""
993            try:
994                host_dir = tempfile.mkdtemp()
995
996                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
997                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
998
999                # Populate device directory with random files.
1000                temp_files = make_random_device_files(
1001                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1002
1003                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1004
1005                for temp_file in temp_files:
1006                    host_path = os.path.join(
1007                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1008                        temp_file.base_name)
1009                    self._verify_local(temp_file.checksum, host_path)
1010
1011                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1012            finally:
1013                if host_dir is not None:
1014                    shutil.rmtree(host_dir)
1015
1016        def test_pull_dir_symlink(self):
1017            """Pull a directory into a symlink to a directory.
1018
1019            Bug: http://b/27362811
1020            """
1021            if os.name != 'posix':
1022                raise unittest.SkipTest('requires POSIX')
1023
1024            try:
1025                host_dir = tempfile.mkdtemp()
1026                real_dir = os.path.join(host_dir, 'dir')
1027                symlink = os.path.join(host_dir, 'symlink')
1028                os.mkdir(real_dir)
1029                os.symlink(real_dir, symlink)
1030
1031                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1032                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1033
1034                # Populate device directory with random files.
1035                temp_files = make_random_device_files(
1036                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1037
1038                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1039
1040                for temp_file in temp_files:
1041                    host_path = os.path.join(
1042                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1043                        temp_file.base_name)
1044                    self._verify_local(temp_file.checksum, host_path)
1045
1046                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1047            finally:
1048                if host_dir is not None:
1049                    shutil.rmtree(host_dir)
1050
1051        def test_pull_dir_symlink_collision(self):
1052            """Pull a directory into a colliding symlink to directory."""
1053            if os.name != 'posix':
1054                raise unittest.SkipTest('requires POSIX')
1055
1056            try:
1057                host_dir = tempfile.mkdtemp()
1058                real_dir = os.path.join(host_dir, 'real')
1059                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1060                symlink = os.path.join(host_dir, tmp_dirname)
1061                os.mkdir(real_dir)
1062                os.symlink(real_dir, symlink)
1063
1064                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1065                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1066
1067                # Populate device directory with random files.
1068                temp_files = make_random_device_files(
1069                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1070
1071                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1072
1073                for temp_file in temp_files:
1074                    host_path = os.path.join(real_dir, temp_file.base_name)
1075                    self._verify_local(temp_file.checksum, host_path)
1076
1077                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1078            finally:
1079                if host_dir is not None:
1080                    shutil.rmtree(host_dir)
1081
1082        def test_pull_dir_nonexistent(self):
1083            """Pull a directory of files from the device to a nonexistent path."""
1084            try:
1085                host_dir = tempfile.mkdtemp()
1086                dest_dir = os.path.join(host_dir, 'dest')
1087
1088                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1089                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1090
1091                # Populate device directory with random files.
1092                temp_files = make_random_device_files(
1093                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1094
1095                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1096
1097                for temp_file in temp_files:
1098                    host_path = os.path.join(dest_dir, temp_file.base_name)
1099                    self._verify_local(temp_file.checksum, host_path)
1100
1101                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1102            finally:
1103                if host_dir is not None:
1104                    shutil.rmtree(host_dir)
1105
1106        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1107        def disabled_test_pull_symlink_dir(self):
1108            """Pull a symlink to a directory of symlinks to files."""
1109            try:
1110                host_dir = tempfile.mkdtemp()
1111
1112                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1113                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1114                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1115
1116                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1117                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1118                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1119
1120                # Populate device directory with random files.
1121                temp_files = make_random_device_files(
1122                    self.device, in_dir=remote_dir, num_files=32)
1123
1124                for temp_file in temp_files:
1125                    self.device.shell(
1126                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1127                         posixpath.join(remote_links, temp_file.base_name)])
1128
1129                self.device.pull(remote=remote_symlink, local=host_dir)
1130
1131                for temp_file in temp_files:
1132                    host_path = os.path.join(
1133                        host_dir, 'symlink', temp_file.base_name)
1134                    self._verify_local(temp_file.checksum, host_path)
1135
1136                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1137            finally:
1138                if host_dir is not None:
1139                    shutil.rmtree(host_dir)
1140
1141        def test_pull_empty(self):
1142            """Pull a directory containing an empty directory from the device."""
1143            try:
1144                host_dir = tempfile.mkdtemp()
1145
1146                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1147                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1148                self.device.shell(['mkdir', '-p', remote_empty_path])
1149
1150                self.device.pull(remote=remote_empty_path, local=host_dir)
1151                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1152            finally:
1153                if host_dir is not None:
1154                    shutil.rmtree(host_dir)
1155
1156        def test_multiple_pull(self):
1157            """Pull a randomly generated directory of files from the device."""
1158
1159            try:
1160                host_dir = tempfile.mkdtemp()
1161
1162                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1163                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1164                self.device.shell(['mkdir', '-p', subdir])
1165
1166                # Create some random files and a subdirectory containing more files.
1167                temp_files = make_random_device_files(
1168                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1169
1170                subdir_temp_files = make_random_device_files(
1171                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1172
1173                paths = [x.full_path for x in temp_files]
1174                paths.append(subdir)
1175                self.device._simple_call(['pull'] + paths + [host_dir])
1176
1177                for temp_file in temp_files:
1178                    local_path = os.path.join(host_dir, temp_file.base_name)
1179                    self._verify_local(temp_file.checksum, local_path)
1180
1181                for subdir_temp_file in subdir_temp_files:
1182                    local_path = os.path.join(host_dir,
1183                                              'subdir',
1184                                              subdir_temp_file.base_name)
1185                    self._verify_local(subdir_temp_file.checksum, local_path)
1186
1187                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1188            finally:
1189                if host_dir is not None:
1190                    shutil.rmtree(host_dir)
1191
1192        def verify_sync(self, device, temp_files, device_dir):
1193            """Verifies that a list of temp files was synced to the device."""
1194            # Confirm that every file on the device mirrors that on the host.
1195            for temp_file in temp_files:
1196                device_full_path = posixpath.join(
1197                    device_dir, temp_file.base_name)
1198                dev_md5, _ = device.shell(
1199                    [get_md5_prog(self.device), device_full_path])[0].split()
1200                self.assertEqual(temp_file.checksum, dev_md5)
1201
1202        def test_sync(self):
1203            """Sync a host directory to the data partition."""
1204
1205            try:
1206                base_dir = tempfile.mkdtemp()
1207
1208                # Create mirror device directory hierarchy within base_dir.
1209                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1210                os.makedirs(full_dir_path)
1211
1212                # Create 32 random files within the host mirror.
1213                temp_files = make_random_host_files(
1214                    in_dir=full_dir_path, num_files=32)
1215
1216                # Clean up any stale files on the device.
1217                device = adb.get_device()  # pylint: disable=no-member
1218                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1219
1220                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1221                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1222                device.sync('data')
1223                if old_product_out is None:
1224                    del os.environ['ANDROID_PRODUCT_OUT']
1225                else:
1226                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1227
1228                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1229
1230                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1231            finally:
1232                if base_dir is not None:
1233                    shutil.rmtree(base_dir)
1234
1235        def test_push_sync(self):
1236            """Sync a host directory to a specific path."""
1237
1238            try:
1239                temp_dir = tempfile.mkdtemp()
1240                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1241
1242                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1243
1244                # Clean up any stale files on the device.
1245                device = adb.get_device()  # pylint: disable=no-member
1246                device.shell(['rm', '-rf', device_dir])
1247
1248                device.push(temp_dir, device_dir, sync=True)
1249
1250                self.verify_sync(device, temp_files, device_dir)
1251
1252                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1253            finally:
1254                if temp_dir is not None:
1255                    shutil.rmtree(temp_dir)
1256
1257        def test_push_sync_multiple(self):
1258            """Sync multiple host directories to a specific path."""
1259
1260            try:
1261                temp_dir = tempfile.mkdtemp()
1262                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1263
1264                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1265
1266                # Clean up any stale files on the device.
1267                device = adb.get_device()  # pylint: disable=no-member
1268                device.shell(['rm', '-rf', device_dir])
1269                device.shell(['mkdir', '-p', device_dir])
1270
1271                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1272                device.push(host_paths, device_dir, sync=True)
1273
1274                self.verify_sync(device, temp_files, device_dir)
1275
1276                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1277            finally:
1278                if temp_dir is not None:
1279                    shutil.rmtree(temp_dir)
1280
1281
1282        def test_push_dry_run_nonexistent_file(self):
1283            """Push with dry run."""
1284
1285            for file_size in [8, 1024 * 1024]:
1286                try:
1287                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1288                    device_file = posixpath.join(device_dir, 'file')
1289
1290                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1291                    self.device.shell(['mkdir', '-p', device_dir])
1292
1293                    host_dir = tempfile.mkdtemp()
1294                    host_file = posixpath.join(host_dir, 'file')
1295
1296                    with open(host_file, "w") as f:
1297                        f.write('x' * file_size)
1298
1299                    self.device._simple_call(['push', '-n', host_file, device_file])
1300                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1301                    self.assertNotEqual(0, rc)
1302
1303                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1304                finally:
1305                    if host_dir is not None:
1306                        shutil.rmtree(host_dir)
1307
1308        def test_push_dry_run_existent_file(self):
1309            """Push with dry run."""
1310
1311            for file_size in [8, 1024 * 1024]:
1312                try:
1313                    host_dir = tempfile.mkdtemp()
1314                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1315                    device_file = posixpath.join(device_dir, 'file')
1316
1317                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1318                    self.device.shell(['mkdir', '-p', device_dir])
1319                    self.device.shell(['echo', 'foo', '>', device_file])
1320
1321                    host_file = posixpath.join(host_dir, 'file')
1322
1323                    with open(host_file, "w") as f:
1324                        f.write('x' * file_size)
1325
1326                    self.device._simple_call(['push', '-n', host_file, device_file])
1327                    stdout, stderr = self.device.shell(['cat', device_file])
1328                    self.assertEqual(stdout.strip(), "foo")
1329
1330                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1331                finally:
1332                    if host_dir is not None:
1333                        shutil.rmtree(host_dir)
1334
1335        def test_unicode_paths(self):
1336            """Ensure that we can support non-ASCII paths, even on Windows."""
1337            name = u'로보카 폴리'
1338
1339            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1340            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1341
1342            ## push.
1343            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1344            tf.close()
1345            self.device.push(tf.name, remote_path)
1346            os.remove(tf.name)
1347            self.assertFalse(os.path.exists(tf.name))
1348
1349            # Verify that the device ended up with the expected UTF-8 path
1350            output = self.device.shell(
1351                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1352            self.assertEqual(remote_path, output)
1353
1354            # pull.
1355            self.device.pull(remote_path, tf.name)
1356            self.assertTrue(os.path.exists(tf.name))
1357            os.remove(tf.name)
1358            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1359
1360
1361class FileOperationsTestUncompressed(FileOperationsTest.Base):
1362    compression = "none"
1363
1364
1365class FileOperationsTestBrotli(FileOperationsTest.Base):
1366    compression = "brotli"
1367
1368
1369class FileOperationsTestLZ4(FileOperationsTest.Base):
1370    compression = "lz4"
1371
1372
1373class FileOperationsTestZstd(FileOperationsTest.Base):
1374    compression = "zstd"
1375
1376
1377class DeviceOfflineTest(DeviceTest):
1378    def _get_device_state(self, serialno):
1379        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1380        for line in output.split('\n'):
1381            m = re.match('(\S+)\s+(\S+)', line)
1382            if m and m.group(1) == serialno:
1383                return m.group(2)
1384        return None
1385
1386    def disabled_test_killed_when_pushing_a_large_file(self):
1387        """
1388           While running adb push with a large file, kill adb server.
1389           Occasionally the device becomes offline. Because the device is still
1390           reading data without realizing that the adb server has been restarted.
1391           Test if we can bring the device online automatically now.
1392           http://b/32952319
1393        """
1394        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1395        # 1. Push a large file
1396        file_path = 'tmp_large_file'
1397        try:
1398            fh = open(file_path, 'w')
1399            fh.write('\0' * (100 * 1024 * 1024))
1400            fh.close()
1401            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1402            time.sleep(0.1)
1403            # 2. Kill the adb server
1404            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1405            subproc.terminate()
1406        finally:
1407            try:
1408                os.unlink(file_path)
1409            except:
1410                pass
1411        # 3. See if the device still exist.
1412        # Sleep to wait for the adb server exit.
1413        time.sleep(0.5)
1414        # 4. The device should be online
1415        self.assertEqual(self._get_device_state(serialno), 'device')
1416
1417    def disabled_test_killed_when_pulling_a_large_file(self):
1418        """
1419           While running adb pull with a large file, kill adb server.
1420           Occasionally the device can't be connected. Because the device is trying to
1421           send a message larger than what is expected by the adb server.
1422           Test if we can bring the device online automatically now.
1423        """
1424        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1425        file_path = 'tmp_large_file'
1426        try:
1427            # 1. Create a large file on device.
1428            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1429                               'bs=1000000', 'count=100'])
1430            # 2. Pull the large file on host.
1431            subproc = subprocess.Popen(self.device.adb_cmd +
1432                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1433            time.sleep(0.1)
1434            # 3. Kill the adb server
1435            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1436            subproc.terminate()
1437        finally:
1438            try:
1439                os.unlink(file_path)
1440            except:
1441                pass
1442        # 4. See if the device still exist.
1443        # Sleep to wait for the adb server exit.
1444        time.sleep(0.5)
1445        self.assertEqual(self._get_device_state(serialno), 'device')
1446
1447
1448    def test_packet_size_regression(self):
1449        """Test for http://b/37783561
1450
1451        Receiving packets of a length divisible by 512 but not 1024 resulted in
1452        the adb client waiting indefinitely for more input.
1453        """
1454        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1455        # Probe some surrounding values as well, for the hell of it.
1456        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1457            for offset in [-6, -5, -4]:
1458                length = base + offset
1459                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1460                       'echo', 'foo']
1461                rc, stdout, _ = self.device.shell_nocheck(cmd)
1462
1463                self.assertEqual(0, rc)
1464
1465                # Output should be '\0' * length, followed by "foo\n"
1466                self.assertEqual(length, len(stdout) - 4)
1467                self.assertEqual(stdout, "\0" * length + "foo\n")
1468
1469    def test_zero_packet(self):
1470        """Test for http://b/113070258
1471
1472        Make sure that we don't blow up when sending USB transfers that line up
1473        exactly with the USB packet size.
1474        """
1475
1476        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1477        try:
1478            for size in [512, 1024]:
1479                def listener():
1480                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1481                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1482
1483                thread = threading.Thread(target=listener)
1484                thread.start()
1485
1486                # Wait a bit to let the shell command start.
1487                time.sleep(0.25)
1488
1489                sock = socket.create_connection(("localhost", local_port))
1490                with contextlib.closing(sock):
1491                    bytesWritten = sock.send(b"a" * size)
1492                    self.assertEqual(size, bytesWritten)
1493                    readBytes = sock.recv(4096)
1494                    self.assertEqual(b"foo\n", readBytes)
1495
1496                thread.join()
1497        finally:
1498            self.device.forward_remove("tcp:{}".format(local_port))
1499
1500
1501class SocketTest(DeviceTest):
1502    def test_socket_flush(self):
1503        """Test that we handle socket closure properly.
1504
1505        If we're done writing to a socket, closing before the other end has
1506        closed will send a TCP_RST if we have incoming data queued up, which
1507        may result in data that we've written being discarded.
1508
1509        Bug: http://b/74616284
1510        """
1511        s = socket.create_connection(("localhost", 5037))
1512
1513        def adb_length_prefixed(string):
1514            encoded = string.encode("utf8")
1515            result = b"%04x%s" % (len(encoded), encoded)
1516            return result
1517
1518        if "ANDROID_SERIAL" in os.environ:
1519            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1520        else:
1521            transport_string = "host:transport-any"
1522
1523        s.sendall(adb_length_prefixed(transport_string))
1524        response = s.recv(4)
1525        self.assertEqual(b"OKAY", response)
1526
1527        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1528        s.sendall(adb_length_prefixed(shell_string))
1529
1530        response = s.recv(4)
1531        self.assertEqual(b"OKAY", response)
1532
1533        # Spawn a thread that dumps garbage into the socket until failure.
1534        def spam():
1535            buf = b"\0" * 16384
1536            try:
1537                while True:
1538                    s.sendall(buf)
1539            except Exception as ex:
1540                print(ex)
1541
1542        thread = threading.Thread(target=spam)
1543        thread.start()
1544
1545        time.sleep(1)
1546
1547        received = b""
1548        while True:
1549            read = s.recv(512)
1550            if len(read) == 0:
1551                break
1552            received += read
1553
1554        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1555        thread.join()
1556
1557
1558class FramebufferTest(DeviceTest):
1559    def test_framebuffer(self):
1560        """Test that we get something from the framebuffer service."""
1561        output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"])
1562        self.assertFalse(len(output) == 0)
1563
1564
1565if sys.platform == "win32":
1566    # From https://stackoverflow.com/a/38749458
1567    import os
1568    import contextlib
1569    import msvcrt
1570    import ctypes
1571    from ctypes import wintypes
1572
1573    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1574
1575    GENERIC_READ  = 0x80000000
1576    GENERIC_WRITE = 0x40000000
1577    FILE_SHARE_READ  = 1
1578    FILE_SHARE_WRITE = 2
1579    CONSOLE_TEXTMODE_BUFFER = 1
1580    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1581    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1582    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1583
1584    def _check_zero(result, func, args):
1585        if not result:
1586            raise ctypes.WinError(ctypes.get_last_error())
1587        return args
1588
1589    def _check_invalid(result, func, args):
1590        if result == INVALID_HANDLE_VALUE:
1591            raise ctypes.WinError(ctypes.get_last_error())
1592        return args
1593
1594    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1595        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1596        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1597
1598    class COORD(ctypes.Structure):
1599        _fields_ = (('X', wintypes.SHORT),
1600                    ('Y', wintypes.SHORT))
1601
1602    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1603        _fields_ = (('cbSize',               wintypes.ULONG),
1604                    ('dwSize',               COORD),
1605                    ('dwCursorPosition',     COORD),
1606                    ('wAttributes',          wintypes.WORD),
1607                    ('srWindow',             wintypes.SMALL_RECT),
1608                    ('dwMaximumWindowSize',  COORD),
1609                    ('wPopupAttributes',     wintypes.WORD),
1610                    ('bFullscreenSupported', wintypes.BOOL),
1611                    ('ColorTable',           wintypes.DWORD * 16))
1612        def __init__(self, *args, **kwds):
1613            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1614                    *args, **kwds)
1615            self.cbSize = ctypes.sizeof(self)
1616
1617    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1618                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1619    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1620
1621    kernel32.GetStdHandle.errcheck = _check_invalid
1622    kernel32.GetStdHandle.restype = wintypes.HANDLE
1623    kernel32.GetStdHandle.argtypes = (
1624        wintypes.DWORD,) # _In_ nStdHandle
1625
1626    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1627    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1628    kernel32.CreateConsoleScreenBuffer.argtypes = (
1629        wintypes.DWORD,        # _In_       dwDesiredAccess
1630        wintypes.DWORD,        # _In_       dwShareMode
1631        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1632        wintypes.DWORD,        # _In_       dwFlags
1633        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1634
1635    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1636    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1637        wintypes.HANDLE,               # _In_  hConsoleOutput
1638        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1639
1640    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1641    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1642        wintypes.HANDLE,               # _In_  hConsoleOutput
1643        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1644
1645    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1646    kernel32.SetConsoleWindowInfo.argtypes = (
1647        wintypes.HANDLE,      # _In_ hConsoleOutput
1648        wintypes.BOOL,        # _In_ bAbsolute
1649        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1650
1651    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1652    kernel32.FillConsoleOutputCharacterW.argtypes = (
1653        wintypes.HANDLE,  # _In_  hConsoleOutput
1654        wintypes.WCHAR,   # _In_  cCharacter
1655        wintypes.DWORD,   # _In_  nLength
1656        COORD,            # _In_  dwWriteCoord
1657        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1658
1659    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1660    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1661        wintypes.HANDLE,  # _In_  hConsoleOutput
1662        wintypes.LPWSTR,  # _Out_ lpCharacter
1663        wintypes.DWORD,   # _In_  nLength
1664        COORD,            # _In_  dwReadCoord
1665        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1666
1667    @contextlib.contextmanager
1668    def allocate_console():
1669        allocated = kernel32.AllocConsole()
1670        try:
1671            yield allocated
1672        finally:
1673            if allocated:
1674                kernel32.FreeConsole()
1675
1676    @contextlib.contextmanager
1677    def console_screen(ncols=None, nrows=None):
1678        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1679        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1680        nwritten = (wintypes.DWORD * 1)()
1681        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1682        kernel32.GetConsoleScreenBufferInfoEx(
1683               hStdOut, ctypes.byref(info))
1684        if ncols is None:
1685            ncols = info.dwSize.X
1686        if nrows is None:
1687            nrows = info.dwSize.Y
1688        elif nrows > 9999:
1689            raise ValueError('nrows must be 9999 or less')
1690        fd_screen = None
1691        hScreen = kernel32.CreateConsoleScreenBuffer(
1692                    GENERIC_READ | GENERIC_WRITE,
1693                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1694                    None, CONSOLE_TEXTMODE_BUFFER, None)
1695        try:
1696            fd_screen = msvcrt.open_osfhandle(
1697                            hScreen, os.O_RDWR | os.O_BINARY)
1698            kernel32.GetConsoleScreenBufferInfoEx(
1699                   hScreen, ctypes.byref(new_info))
1700            new_info.dwSize = COORD(ncols, nrows)
1701            new_info.srWindow = wintypes.SMALL_RECT(
1702                    Left=0, Top=0, Right=(ncols - 1),
1703                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1704            kernel32.SetConsoleScreenBufferInfoEx(
1705                    hScreen, ctypes.byref(new_info))
1706            kernel32.SetConsoleWindowInfo(hScreen, True,
1707                    ctypes.byref(new_info.srWindow))
1708            kernel32.FillConsoleOutputCharacterW(
1709                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1710            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1711            try:
1712                yield fd_screen
1713            finally:
1714                kernel32.SetConsoleScreenBufferInfoEx(
1715                    hStdOut, ctypes.byref(info))
1716                kernel32.SetConsoleWindowInfo(hStdOut, True,
1717                        ctypes.byref(info.srWindow))
1718                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1719        finally:
1720            if fd_screen is not None:
1721                os.close(fd_screen)
1722            else:
1723                kernel32.CloseHandle(hScreen)
1724
1725    def read_screen(fd):
1726        hScreen = msvcrt.get_osfhandle(fd)
1727        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1728        kernel32.GetConsoleScreenBufferInfoEx(
1729            hScreen, ctypes.byref(csbi))
1730        ncols = csbi.dwSize.X
1731        pos = csbi.dwCursorPosition
1732        length = ncols * pos.Y + pos.X + 1
1733        buf = (ctypes.c_wchar * length)()
1734        n = (wintypes.DWORD * 1)()
1735        kernel32.ReadConsoleOutputCharacterW(
1736            hScreen, buf, length, COORD(0,0), n)
1737        lines = [buf[i:i+ncols].rstrip(u'\0')
1738                    for i in range(0, n[0], ncols)]
1739        return u'\n'.join(lines)
1740
1741@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1742class WindowsConsoleTest(DeviceTest):
1743    def test_unicode_output(self):
1744        """Test Unicode command line parameters and Unicode console window output.
1745
1746        Bug: https://issuetracker.google.com/issues/111972753
1747        """
1748        # If we don't have a console window, allocate one. This isn't necessary if we're already
1749        # being run from a console window, which is typical.
1750        with allocate_console() as allocated_console:
1751            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1752            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1753            # likely unnecessary given the typical console window size.
1754            with console_screen(nrows=1000) as screen:
1755                unicode_string = u'로보카 폴리'
1756                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1757                # device.shell_popen() which does not use a pipe, unlike device.shell().
1758                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1759                process.wait()
1760                # Read what was written by adb to the temporary console buffer.
1761                console_output = read_screen(screen)
1762                self.assertEqual(unicode_string, console_output)
1763
1764
1765def main():
1766    random.seed(0)
1767    if len(adb.get_devices()) > 0:
1768        suite = unittest.TestLoader().loadTestsFromName(__name__)
1769        unittest.TextTestRunner(verbosity=3).run(suite)
1770    else:
1771        print('Test suite must be run with attached devices')
1772
1773
1774if __name__ == '__main__':
1775    main()
1776