1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_non_root(func): 43 def wrapper(self, *args): 44 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 45 if was_root: 46 self.device.unroot() 47 self.device.wait() 48 49 try: 50 func(self, *args) 51 finally: 52 if was_root: 53 self.device.root() 54 self.device.wait() 55 56 return wrapper 57 58 59class DeviceTest(unittest.TestCase): 60 device = adb.get_device() 61 62 63class AbbTest(DeviceTest): 64 def test_smoke(self): 65 abb = subprocess.run(['adb', 'abb'], capture_output=True) 66 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 67 68 # abb squashes all failures to 1. 69 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 70 self.assertEqual(abb.stdout, cmd.stdout) 71 self.assertEqual(abb.stderr, cmd.stderr) 72 73class ForwardReverseTest(DeviceTest): 74 def _test_no_rebind(self, description, direction_list, direction, 75 direction_no_rebind, direction_remove_all): 76 msg = direction_list() 77 self.assertEqual('', msg.strip(), 78 description + ' list must be empty to run this test.') 79 80 # Use --no-rebind with no existing binding 81 direction_no_rebind('tcp:5566', 'tcp:6655') 82 msg = direction_list() 83 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 84 85 # Use --no-rebind with existing binding 86 with self.assertRaises(subprocess.CalledProcessError): 87 direction_no_rebind('tcp:5566', 'tcp:6677') 88 msg = direction_list() 89 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 90 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 91 92 # Use the absence of --no-rebind with existing binding 93 direction('tcp:5566', 'tcp:6677') 94 msg = direction_list() 95 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 96 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 97 98 direction_remove_all() 99 msg = direction_list() 100 self.assertEqual('', msg.strip()) 101 102 def test_forward_no_rebind(self): 103 self._test_no_rebind('forward', self.device.forward_list, 104 self.device.forward, self.device.forward_no_rebind, 105 self.device.forward_remove_all) 106 107 def test_reverse_no_rebind(self): 108 self._test_no_rebind('reverse', self.device.reverse_list, 109 self.device.reverse, self.device.reverse_no_rebind, 110 self.device.reverse_remove_all) 111 112 def test_forward(self): 113 msg = self.device.forward_list() 114 self.assertEqual('', msg.strip(), 115 'Forwarding list must be empty to run this test.') 116 self.device.forward('tcp:5566', 'tcp:6655') 117 msg = self.device.forward_list() 118 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 119 self.device.forward('tcp:7788', 'tcp:8877') 120 msg = self.device.forward_list() 121 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 122 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 123 self.device.forward_remove('tcp:5566') 124 msg = self.device.forward_list() 125 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 126 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 127 self.device.forward_remove_all() 128 msg = self.device.forward_list() 129 self.assertEqual('', msg.strip()) 130 131 def test_forward_old_protocol(self): 132 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 133 134 msg = self.device.forward_list() 135 self.assertEqual('', msg.strip(), 136 'Forwarding list must be empty to run this test.') 137 138 s = socket.create_connection(("localhost", 5037)) 139 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 140 cmd = b"%04x%s" % (len(service), service) 141 s.sendall(cmd) 142 143 msg = self.device.forward_list() 144 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 145 146 self.device.forward_remove_all() 147 msg = self.device.forward_list() 148 self.assertEqual('', msg.strip()) 149 150 def test_forward_tcp_port_0(self): 151 self.assertEqual('', self.device.forward_list().strip(), 152 'Forwarding list must be empty to run this test.') 153 154 try: 155 # If resolving TCP port 0 is supported, `adb forward` will print 156 # the actual port number. 157 port = self.device.forward('tcp:0', 'tcp:8888').strip() 158 if not port: 159 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 160 161 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 162 self.device.forward_list())) 163 finally: 164 self.device.forward_remove_all() 165 166 def test_reverse(self): 167 msg = self.device.reverse_list() 168 self.assertEqual('', msg.strip(), 169 'Reverse forwarding list must be empty to run this test.') 170 self.device.reverse('tcp:5566', 'tcp:6655') 171 msg = self.device.reverse_list() 172 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 173 self.device.reverse('tcp:7788', 'tcp:8877') 174 msg = self.device.reverse_list() 175 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 176 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 177 self.device.reverse_remove('tcp:5566') 178 msg = self.device.reverse_list() 179 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 180 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 181 self.device.reverse_remove_all() 182 msg = self.device.reverse_list() 183 self.assertEqual('', msg.strip()) 184 185 def test_reverse_tcp_port_0(self): 186 self.assertEqual('', self.device.reverse_list().strip(), 187 'Reverse list must be empty to run this test.') 188 189 try: 190 # If resolving TCP port 0 is supported, `adb reverse` will print 191 # the actual port number. 192 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 193 if not port: 194 raise unittest.SkipTest('Reversing tcp:0 is not available.') 195 196 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 197 self.device.reverse_list())) 198 finally: 199 self.device.reverse_remove_all() 200 201 def test_forward_reverse_echo(self): 202 """Send data through adb forward and read it back via adb reverse""" 203 forward_port = 12345 204 reverse_port = forward_port + 1 205 forward_spec = 'tcp:' + str(forward_port) 206 reverse_spec = 'tcp:' + str(reverse_port) 207 forward_setup = False 208 reverse_setup = False 209 210 try: 211 # listen on localhost:forward_port, connect to remote:forward_port 212 self.device.forward(forward_spec, forward_spec) 213 forward_setup = True 214 # listen on remote:forward_port, connect to localhost:reverse_port 215 self.device.reverse(forward_spec, reverse_spec) 216 reverse_setup = True 217 218 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 219 with contextlib.closing(listener): 220 # Use SO_REUSEADDR so that subsequent runs of the test can grab 221 # the port even if it is in TIME_WAIT. 222 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 223 224 # Listen on localhost:reverse_port before connecting to 225 # localhost:forward_port because that will cause adb to connect 226 # back to localhost:reverse_port. 227 listener.bind(('127.0.0.1', reverse_port)) 228 listener.listen(4) 229 230 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 231 with contextlib.closing(client): 232 # Connect to the listener. 233 client.connect(('127.0.0.1', forward_port)) 234 235 # Accept the client connection. 236 accepted_connection, addr = listener.accept() 237 with contextlib.closing(accepted_connection) as server: 238 data = b'hello' 239 240 # Send data into the port setup by adb forward. 241 client.sendall(data) 242 # Explicitly close() so that server gets EOF. 243 client.close() 244 245 # Verify that the data came back via adb reverse. 246 self.assertEqual(data, server.makefile().read().encode("utf8")) 247 finally: 248 if reverse_setup: 249 self.device.reverse_remove(forward_spec) 250 if forward_setup: 251 self.device.forward_remove(forward_spec) 252 253 254class ShellTest(DeviceTest): 255 def _interactive_shell(self, shell_args, input): 256 """Runs an interactive adb shell. 257 258 Args: 259 shell_args: List of string arguments to `adb shell`. 260 input: bytes input to send to the interactive shell. 261 262 Returns: 263 The remote exit code. 264 265 Raises: 266 unittest.SkipTest: The device doesn't support exit codes. 267 """ 268 if not self.device.has_shell_protocol(): 269 raise unittest.SkipTest('exit codes are unavailable on this device') 270 271 proc = subprocess.Popen( 272 self.device.adb_cmd + ['shell'] + shell_args, 273 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 274 stderr=subprocess.PIPE) 275 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 276 # to explicitly add an exit command to close the session from the device 277 # side, plus the necessary newline to complete the interactive command. 278 proc.communicate(input + b'; exit\n') 279 return proc.returncode 280 281 def test_cat(self): 282 """Check that we can at least cat a file.""" 283 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 284 elements = out.split() 285 self.assertEqual(len(elements), 2) 286 287 uptime, idle = elements 288 self.assertGreater(float(uptime), 0.0) 289 self.assertGreater(float(idle), 0.0) 290 291 def test_throws_on_failure(self): 292 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 293 294 def test_output_not_stripped(self): 295 out = self.device.shell(['echo', 'foo'])[0] 296 self.assertEqual(out, 'foo' + self.device.linesep) 297 298 def test_shell_command_length(self): 299 # Devices that have shell_v2 should be able to handle long commands. 300 if self.device.has_shell_protocol(): 301 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 302 self.assertEqual(rc, 0) 303 self.assertTrue(out == ('x' * 16384 + '\n')) 304 305 def test_shell_nocheck_failure(self): 306 rc, out, _ = self.device.shell_nocheck(['false']) 307 self.assertNotEqual(rc, 0) 308 self.assertEqual(out, '') 309 310 def test_shell_nocheck_output_not_stripped(self): 311 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 312 self.assertEqual(rc, 0) 313 self.assertEqual(out, 'foo' + self.device.linesep) 314 315 def test_can_distinguish_tricky_results(self): 316 # If result checking on ADB shell is naively implemented as 317 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 318 # output from the result for a cmd of `echo -n 1`. 319 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 320 self.assertEqual(rc, 0) 321 self.assertEqual(out, '1') 322 323 def test_line_endings(self): 324 """Ensure that line ending translation is not happening in the pty. 325 326 Bug: http://b/19735063 327 """ 328 output = self.device.shell(['uname'])[0] 329 self.assertEqual(output, 'Linux' + self.device.linesep) 330 331 def test_pty_logic(self): 332 """Tests that a PTY is allocated when it should be. 333 334 PTY allocation behavior should match ssh. 335 """ 336 def check_pty(args): 337 """Checks adb shell PTY allocation. 338 339 Tests |args| for terminal and non-terminal stdin. 340 341 Args: 342 args: -Tt args in a list (e.g. ['-t', '-t']). 343 344 Returns: 345 A tuple (<terminal>, <non-terminal>). True indicates 346 the corresponding shell allocated a remote PTY. 347 """ 348 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 349 350 terminal = subprocess.Popen( 351 test_cmd, stdin=None, 352 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 353 terminal.communicate() 354 355 non_terminal = subprocess.Popen( 356 test_cmd, stdin=subprocess.PIPE, 357 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 358 non_terminal.communicate() 359 360 return (terminal.returncode == 0, non_terminal.returncode == 0) 361 362 # -T: never allocate PTY. 363 self.assertEqual((False, False), check_pty(['-T'])) 364 365 # These tests require a new device. 366 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 367 # No args: PTY only if stdin is a terminal and shell is interactive, 368 # which is difficult to reliably test from a script. 369 self.assertEqual((False, False), check_pty([])) 370 371 # -t: PTY if stdin is a terminal. 372 self.assertEqual((True, False), check_pty(['-t'])) 373 374 # -t -t: always allocate PTY. 375 self.assertEqual((True, True), check_pty(['-t', '-t'])) 376 377 # -tt: always allocate PTY, POSIX style (http://b/32216152). 378 self.assertEqual((True, True), check_pty(['-tt'])) 379 380 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 381 # we follow the man page instead. 382 self.assertEqual((True, True), check_pty(['-ttt'])) 383 384 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 385 self.assertEqual((True, True), check_pty(['-ttx'])) 386 387 # -Ttt: -tt cancels out -T. 388 self.assertEqual((True, True), check_pty(['-Ttt'])) 389 390 # -ttT: -T cancels out -tt. 391 self.assertEqual((False, False), check_pty(['-ttT'])) 392 393 def test_shell_protocol(self): 394 """Tests the shell protocol on the device. 395 396 If the device supports shell protocol, this gives us the ability 397 to separate stdout/stderr and return the exit code directly. 398 399 Bug: http://b/19734861 400 """ 401 if not self.device.has_shell_protocol(): 402 raise unittest.SkipTest('shell protocol unsupported on this device') 403 404 # Shell protocol should be used by default. 405 result = self.device.shell_nocheck( 406 shlex.split('echo foo; echo bar >&2; exit 17')) 407 self.assertEqual(17, result[0]) 408 self.assertEqual('foo' + self.device.linesep, result[1]) 409 self.assertEqual('bar' + self.device.linesep, result[2]) 410 411 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 412 413 # -x flag should disable shell protocol. 414 result = self.device.shell_nocheck( 415 shlex.split('-x echo foo; echo bar >&2; exit 17')) 416 self.assertEqual(0, result[0]) 417 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 418 self.assertEqual('', result[2]) 419 420 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 421 422 def test_non_interactive_sigint(self): 423 """Tests that SIGINT in a non-interactive shell kills the process. 424 425 This requires the shell protocol in order to detect the broken 426 pipe; raw data transfer mode will only see the break once the 427 subprocess tries to read or write. 428 429 Bug: http://b/23825725 430 """ 431 if not self.device.has_shell_protocol(): 432 raise unittest.SkipTest('shell protocol unsupported on this device') 433 434 # Start a long-running process. 435 sleep_proc = subprocess.Popen( 436 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 437 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 438 stderr=subprocess.STDOUT) 439 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 440 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 441 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 442 443 # Verify that the process is running, send signal, verify it stopped. 444 self.device.shell(proc_query) 445 os.kill(sleep_proc.pid, signal.SIGINT) 446 sleep_proc.communicate() 447 448 # It can take some time for the process to receive the signal and die. 449 end_time = time.time() + 3 450 while self.device.shell_nocheck(proc_query)[0] != 1: 451 self.assertFalse(time.time() > end_time, 452 'subprocess failed to terminate in time') 453 454 def test_non_interactive_stdin(self): 455 """Tests that non-interactive shells send stdin.""" 456 if not self.device.has_shell_protocol(): 457 raise unittest.SkipTest('non-interactive stdin unsupported ' 458 'on this device') 459 460 # Test both small and large inputs. 461 small_input = b'foo' 462 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 463 large_input = b'\n'.join(characters) 464 465 466 for input in (small_input, large_input): 467 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 468 stdin=subprocess.PIPE, 469 stdout=subprocess.PIPE, 470 stderr=subprocess.PIPE) 471 stdout, stderr = proc.communicate(input) 472 self.assertEqual(input.splitlines(), stdout.splitlines()) 473 self.assertEqual(b'', stderr) 474 475 def test_sighup(self): 476 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 477 log_path = "/data/local/tmp/adb_signal_test.log" 478 479 # Clear the output file. 480 self.device.shell_nocheck(["echo", ">", log_path]) 481 482 script = """ 483 trap "echo SIGINT > {path}; exit 0" SIGINT 484 trap "echo SIGHUP > {path}; exit 0" SIGHUP 485 echo Waiting 486 read 487 """.format(path=log_path) 488 489 script = ";".join([x.strip() for x in script.strip().splitlines()]) 490 491 process = self.device.shell_popen([script], kill_atexit=False, 492 stdin=subprocess.PIPE, 493 stdout=subprocess.PIPE) 494 495 self.assertEqual(b"Waiting\n", process.stdout.readline()) 496 process.send_signal(signal.SIGINT) 497 process.wait() 498 499 # Waiting for the local adb to finish is insufficient, since it hangs 500 # up immediately. 501 time.sleep(1) 502 503 stdout, _ = self.device.shell(["cat", log_path]) 504 self.assertEqual(stdout.strip(), "SIGHUP") 505 506 # Temporarily disabled because it seems to cause later instability. 507 # http://b/228114748 508 def disabled_test_exit_stress(self): 509 """Hammer `adb shell exit 42` with multiple threads.""" 510 thread_count = 48 511 result = dict() 512 def hammer(thread_idx, thread_count, result): 513 success = True 514 for i in range(thread_idx, 240, thread_count): 515 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 516 if ret != i % 256: 517 success = False 518 break 519 result[thread_idx] = success 520 521 threads = [] 522 for i in range(thread_count): 523 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 524 thread.start() 525 threads.append(thread) 526 for thread in threads: 527 thread.join() 528 for i, success in result.items(): 529 self.assertTrue(success) 530 531 def disabled_test_parallel(self): 532 """Spawn a bunch of `adb shell` instances in parallel. 533 534 This was broken historically due to the use of select, which only works 535 for fds that are numerically less than 1024. 536 537 Bug: http://b/141955761""" 538 539 n_procs = 2048 540 procs = dict() 541 for i in range(0, n_procs): 542 procs[i] = subprocess.Popen( 543 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 544 stdin=subprocess.PIPE, 545 stdout=subprocess.PIPE 546 ) 547 548 for i in range(0, n_procs): 549 procs[i].stdin.write("%d\n" % i) 550 551 for i in range(0, n_procs): 552 response = procs[i].stdout.readline() 553 assert(response == "%d\n" % i) 554 555 for i in range(0, n_procs): 556 procs[i].stdin.write("%d\n" % (i % 256)) 557 558 for i in range(0, n_procs): 559 assert(procs[i].wait() == i % 256) 560 561 562class ArgumentEscapingTest(DeviceTest): 563 def test_shell_escaping(self): 564 """Make sure that argument escaping is somewhat sane.""" 565 566 # http://b/19734868 567 # Note that this actually matches ssh(1)'s behavior --- it's 568 # converted to `sh -c echo hello; echo world` which sh interprets 569 # as `sh -c echo` (with an argument to that shell of "hello"), 570 # and then `echo world` back in the first shell. 571 result = self.device.shell( 572 shlex.split("sh -c 'echo hello; echo world'"))[0] 573 result = result.splitlines() 574 self.assertEqual(['', 'world'], result) 575 # If you really wanted "hello" and "world", here's what you'd do: 576 result = self.device.shell( 577 shlex.split(r'echo hello\;echo world'))[0].splitlines() 578 self.assertEqual(['hello', 'world'], result) 579 580 # http://b/15479704 581 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 582 self.assertEqual('t', result) 583 result = self.device.shell( 584 shlex.split("sh -c 'true && echo t'"))[0].strip() 585 self.assertEqual('t', result) 586 587 # http://b/20564385 588 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 589 self.assertEqual('t', result) 590 result = self.device.shell( 591 shlex.split(r'echo -n 123\;uname'))[0].strip() 592 self.assertEqual('123Linux', result) 593 594 def test_install_argument_escaping(self): 595 """Make sure that install argument escaping works.""" 596 # http://b/20323053, http://b/3090932. 597 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 598 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 599 delete=False) 600 tf.close() 601 602 # Installing bogus .apks fails if the device supports exit codes. 603 try: 604 output = self.device.install(tf.name.decode("utf8")) 605 except subprocess.CalledProcessError as e: 606 output = e.output 607 608 self.assertIn(file_suffix, output) 609 os.remove(tf.name) 610 611 612class RootUnrootTest(DeviceTest): 613 def _test_root(self): 614 message = self.device.root() 615 if 'adbd cannot run as root in production builds' in message: 616 return 617 self.device.wait() 618 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 619 620 def _test_unroot(self): 621 self.device.unroot() 622 self.device.wait() 623 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 624 625 def test_root_unroot(self): 626 """Make sure that adb root and adb unroot work, using id(1).""" 627 if self.device.get_prop('ro.debuggable') != '1': 628 raise unittest.SkipTest('requires rootable build') 629 630 original_user = self.device.shell(['id', '-un'])[0].strip() 631 try: 632 if original_user == 'root': 633 self._test_unroot() 634 self._test_root() 635 elif original_user == 'shell': 636 self._test_root() 637 self._test_unroot() 638 finally: 639 if original_user == 'root': 640 self.device.root() 641 else: 642 self.device.unroot() 643 self.device.wait() 644 645 646class TcpIpTest(DeviceTest): 647 def test_tcpip_failure_raises(self): 648 """adb tcpip requires a port. 649 650 Bug: http://b/22636927 651 """ 652 self.assertRaises( 653 subprocess.CalledProcessError, self.device.tcpip, '') 654 self.assertRaises( 655 subprocess.CalledProcessError, self.device.tcpip, 'foo') 656 657 658class SystemPropertiesTest(DeviceTest): 659 def test_get_prop(self): 660 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 661 662 def test_set_prop(self): 663 # debug.* prop does not require root privileges 664 prop_name = 'debug.foo' 665 self.device.shell(['setprop', prop_name, '""']) 666 667 val = random.random() 668 self.device.set_prop(prop_name, str(val)) 669 self.assertEqual( 670 self.device.shell(['getprop', prop_name])[0].strip(), str(val)) 671 672 673def compute_md5(string): 674 hsh = hashlib.md5() 675 hsh.update(string) 676 return hsh.hexdigest() 677 678 679def get_md5_prog(device): 680 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 681 try: 682 device.shell(['md5sum', '/proc/uptime']) 683 return 'md5sum' 684 except adb.ShellError: 685 return 'md5' 686 687 688class HostFile(object): 689 def __init__(self, handle, checksum): 690 self.handle = handle 691 self.checksum = checksum 692 self.full_path = handle.name 693 self.base_name = os.path.basename(self.full_path) 694 695 696class DeviceFile(object): 697 def __init__(self, checksum, full_path): 698 self.checksum = checksum 699 self.full_path = full_path 700 self.base_name = posixpath.basename(self.full_path) 701 702 703def make_random_host_files(in_dir, num_files): 704 min_size = 1 * (1 << 10) 705 max_size = 16 * (1 << 10) 706 707 files = [] 708 for _ in range(num_files): 709 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 710 711 size = random.randrange(min_size, max_size, 1024) 712 rand_str = os.urandom(size) 713 file_handle.write(rand_str) 714 file_handle.flush() 715 file_handle.close() 716 717 md5 = compute_md5(rand_str) 718 files.append(HostFile(file_handle, md5)) 719 return files 720 721 722def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 723 min_size = 1 * (1 << 10) 724 max_size = 16 * (1 << 10) 725 726 files = [] 727 for file_num in range(num_files): 728 size = random.randrange(min_size, max_size, 1024) 729 730 base_name = prefix + str(file_num) 731 full_path = posixpath.join(in_dir, base_name) 732 733 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 734 'bs={}'.format(size), 'count=1']) 735 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 736 737 files.append(DeviceFile(dev_md5, full_path)) 738 return files 739 740 741class FileOperationsTest: 742 class Base(DeviceTest): 743 SCRATCH_DIR = '/data/local/tmp' 744 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 745 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 746 747 def setUp(self): 748 self.previous_env = os.environ.get("ADB_COMPRESSION") 749 os.environ["ADB_COMPRESSION"] = self.compression 750 751 def tearDown(self): 752 if self.previous_env is None: 753 del os.environ["ADB_COMPRESSION"] 754 else: 755 os.environ["ADB_COMPRESSION"] = self.previous_env 756 757 def _verify_remote(self, checksum, remote_path): 758 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 759 remote_path])[0].split() 760 self.assertEqual(checksum, dev_md5) 761 762 def _verify_local(self, checksum, local_path): 763 with open(local_path, 'rb') as host_file: 764 host_md5 = compute_md5(host_file.read()) 765 self.assertEqual(host_md5, checksum) 766 767 def test_push(self): 768 """Push a randomly generated file to specified device.""" 769 kbytes = 512 770 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 771 rand_str = os.urandom(1024 * kbytes) 772 tmp.write(rand_str) 773 tmp.close() 774 775 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 776 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 777 778 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 779 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 780 781 os.remove(tmp.name) 782 783 def test_push_dir(self): 784 """Push a randomly generated directory of files to the device.""" 785 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 786 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 787 788 try: 789 host_dir = tempfile.mkdtemp() 790 791 # Make sure the temp directory isn't setuid, or else adb will complain. 792 os.chmod(host_dir, 0o700) 793 794 # Create 32 random files. 795 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 796 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 797 798 for temp_file in temp_files: 799 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 800 os.path.basename(host_dir), 801 temp_file.base_name) 802 self._verify_remote(temp_file.checksum, remote_path) 803 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 804 finally: 805 if host_dir is not None: 806 shutil.rmtree(host_dir) 807 808 def disabled_test_push_empty(self): 809 """Push an empty directory to the device.""" 810 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 811 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 812 813 try: 814 host_dir = tempfile.mkdtemp() 815 816 # Make sure the temp directory isn't setuid, or else adb will complain. 817 os.chmod(host_dir, 0o700) 818 819 # Create an empty directory. 820 empty_dir_path = os.path.join(host_dir, 'empty') 821 os.mkdir(empty_dir_path); 822 823 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 824 825 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 826 test_empty_cmd = ["[", "-d", remote_path, "]"] 827 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 828 829 self.assertEqual(rc, 0) 830 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 831 finally: 832 if host_dir is not None: 833 shutil.rmtree(host_dir) 834 835 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 836 def test_push_symlink(self): 837 """Push a symlink. 838 839 Bug: http://b/31491920 840 """ 841 try: 842 host_dir = tempfile.mkdtemp() 843 844 # Make sure the temp directory isn't setuid, or else adb will 845 # complain. 846 os.chmod(host_dir, 0o700) 847 848 with open(os.path.join(host_dir, 'foo'), 'w') as f: 849 f.write('foo') 850 851 symlink_path = os.path.join(host_dir, 'symlink') 852 os.symlink('foo', symlink_path) 853 854 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 855 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 856 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 857 rc, out, _ = self.device.shell_nocheck( 858 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 859 self.assertEqual(0, rc) 860 self.assertEqual(out.strip(), 'foo') 861 finally: 862 if host_dir is not None: 863 shutil.rmtree(host_dir) 864 865 def test_multiple_push(self): 866 """Push multiple files to the device in one adb push command. 867 868 Bug: http://b/25324823 869 """ 870 871 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 872 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 873 874 try: 875 host_dir = tempfile.mkdtemp() 876 877 # Create some random files and a subdirectory containing more files. 878 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 879 880 subdir = os.path.join(host_dir, 'subdir') 881 os.mkdir(subdir) 882 subdir_temp_files = make_random_host_files(in_dir=subdir, 883 num_files=4) 884 885 paths = [x.full_path for x in temp_files] 886 paths.append(subdir) 887 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 888 889 for temp_file in temp_files: 890 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 891 temp_file.base_name) 892 self._verify_remote(temp_file.checksum, remote_path) 893 894 for subdir_temp_file in subdir_temp_files: 895 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 896 # BROKEN: http://b/25394682 897 # 'subdir'; 898 temp_file.base_name) 899 self._verify_remote(temp_file.checksum, remote_path) 900 901 902 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 903 finally: 904 if host_dir is not None: 905 shutil.rmtree(host_dir) 906 907 @requires_non_root 908 def test_push_error_reporting(self): 909 """Make sure that errors that occur while pushing a file get reported 910 911 Bug: http://b/26816782 912 """ 913 with tempfile.NamedTemporaryFile() as tmp_file: 914 tmp_file.write(b'\0' * 1024 * 1024) 915 tmp_file.flush() 916 try: 917 self.device.push(local=tmp_file.name, remote='/system/') 918 self.fail('push should not have succeeded') 919 except subprocess.CalledProcessError as e: 920 output = e.output 921 922 self.assertTrue(b'Permission denied' in output or 923 b'Read-only file system' in output) 924 925 @requires_non_root 926 def test_push_directory_creation(self): 927 """Regression test for directory creation. 928 929 Bug: http://b/110953234 930 """ 931 with tempfile.NamedTemporaryFile() as tmp_file: 932 tmp_file.write(b'\0' * 1024 * 1024) 933 tmp_file.flush() 934 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 935 self.device.shell(['rm', '-rf', remote_path]) 936 937 remote_path += '/filename' 938 self.device.push(local=tmp_file.name, remote=remote_path) 939 940 def disabled_test_push_multiple_slash_root(self): 941 """Regression test for pushing to //data/local/tmp. 942 943 Bug: http://b/141311284 944 945 Disabled because this broken on the adbd side as well: b/141943968 946 """ 947 with tempfile.NamedTemporaryFile() as tmp_file: 948 tmp_file.write(b'\0' * 1024 * 1024) 949 tmp_file.flush() 950 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 951 self.device.shell(['rm', '-rf', remote_path]) 952 self.device.push(local=tmp_file.name, remote=remote_path) 953 954 def _test_pull(self, remote_file, checksum): 955 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 956 tmp_write.close() 957 self.device.pull(remote=remote_file, local=tmp_write.name) 958 with open(tmp_write.name, 'rb') as tmp_read: 959 host_contents = tmp_read.read() 960 host_md5 = compute_md5(host_contents) 961 self.assertEqual(checksum, host_md5) 962 os.remove(tmp_write.name) 963 964 @requires_non_root 965 def test_pull_error_reporting(self): 966 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 967 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 968 969 try: 970 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 971 except subprocess.CalledProcessError as e: 972 output = e.output 973 974 self.assertIn(b'Permission denied', output) 975 976 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 977 978 def test_pull(self): 979 """Pull a randomly generated file from specified device.""" 980 kbytes = 512 981 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 982 cmd = ['dd', 'if=/dev/urandom', 983 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 984 'count={}'.format(kbytes)] 985 self.device.shell(cmd) 986 dev_md5, _ = self.device.shell( 987 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 988 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 989 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 990 991 def test_pull_dir(self): 992 """Pull a randomly generated directory of files from the device.""" 993 try: 994 host_dir = tempfile.mkdtemp() 995 996 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 997 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 998 999 # Populate device directory with random files. 1000 temp_files = make_random_device_files( 1001 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1002 1003 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1004 1005 for temp_file in temp_files: 1006 host_path = os.path.join( 1007 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1008 temp_file.base_name) 1009 self._verify_local(temp_file.checksum, host_path) 1010 1011 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1012 finally: 1013 if host_dir is not None: 1014 shutil.rmtree(host_dir) 1015 1016 def test_pull_dir_symlink(self): 1017 """Pull a directory into a symlink to a directory. 1018 1019 Bug: http://b/27362811 1020 """ 1021 if os.name != 'posix': 1022 raise unittest.SkipTest('requires POSIX') 1023 1024 try: 1025 host_dir = tempfile.mkdtemp() 1026 real_dir = os.path.join(host_dir, 'dir') 1027 symlink = os.path.join(host_dir, 'symlink') 1028 os.mkdir(real_dir) 1029 os.symlink(real_dir, symlink) 1030 1031 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1032 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1033 1034 # Populate device directory with random files. 1035 temp_files = make_random_device_files( 1036 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1037 1038 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1039 1040 for temp_file in temp_files: 1041 host_path = os.path.join( 1042 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1043 temp_file.base_name) 1044 self._verify_local(temp_file.checksum, host_path) 1045 1046 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1047 finally: 1048 if host_dir is not None: 1049 shutil.rmtree(host_dir) 1050 1051 def test_pull_dir_symlink_collision(self): 1052 """Pull a directory into a colliding symlink to directory.""" 1053 if os.name != 'posix': 1054 raise unittest.SkipTest('requires POSIX') 1055 1056 try: 1057 host_dir = tempfile.mkdtemp() 1058 real_dir = os.path.join(host_dir, 'real') 1059 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1060 symlink = os.path.join(host_dir, tmp_dirname) 1061 os.mkdir(real_dir) 1062 os.symlink(real_dir, symlink) 1063 1064 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1065 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1066 1067 # Populate device directory with random files. 1068 temp_files = make_random_device_files( 1069 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1070 1071 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1072 1073 for temp_file in temp_files: 1074 host_path = os.path.join(real_dir, temp_file.base_name) 1075 self._verify_local(temp_file.checksum, host_path) 1076 1077 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1078 finally: 1079 if host_dir is not None: 1080 shutil.rmtree(host_dir) 1081 1082 def test_pull_dir_nonexistent(self): 1083 """Pull a directory of files from the device to a nonexistent path.""" 1084 try: 1085 host_dir = tempfile.mkdtemp() 1086 dest_dir = os.path.join(host_dir, 'dest') 1087 1088 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1089 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1090 1091 # Populate device directory with random files. 1092 temp_files = make_random_device_files( 1093 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1094 1095 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1096 1097 for temp_file in temp_files: 1098 host_path = os.path.join(dest_dir, temp_file.base_name) 1099 self._verify_local(temp_file.checksum, host_path) 1100 1101 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1102 finally: 1103 if host_dir is not None: 1104 shutil.rmtree(host_dir) 1105 1106 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1107 def disabled_test_pull_symlink_dir(self): 1108 """Pull a symlink to a directory of symlinks to files.""" 1109 try: 1110 host_dir = tempfile.mkdtemp() 1111 1112 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1113 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1114 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1115 1116 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1117 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1118 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1119 1120 # Populate device directory with random files. 1121 temp_files = make_random_device_files( 1122 self.device, in_dir=remote_dir, num_files=32) 1123 1124 for temp_file in temp_files: 1125 self.device.shell( 1126 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1127 posixpath.join(remote_links, temp_file.base_name)]) 1128 1129 self.device.pull(remote=remote_symlink, local=host_dir) 1130 1131 for temp_file in temp_files: 1132 host_path = os.path.join( 1133 host_dir, 'symlink', temp_file.base_name) 1134 self._verify_local(temp_file.checksum, host_path) 1135 1136 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1137 finally: 1138 if host_dir is not None: 1139 shutil.rmtree(host_dir) 1140 1141 def test_pull_empty(self): 1142 """Pull a directory containing an empty directory from the device.""" 1143 try: 1144 host_dir = tempfile.mkdtemp() 1145 1146 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1147 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1148 self.device.shell(['mkdir', '-p', remote_empty_path]) 1149 1150 self.device.pull(remote=remote_empty_path, local=host_dir) 1151 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1152 finally: 1153 if host_dir is not None: 1154 shutil.rmtree(host_dir) 1155 1156 def test_multiple_pull(self): 1157 """Pull a randomly generated directory of files from the device.""" 1158 1159 try: 1160 host_dir = tempfile.mkdtemp() 1161 1162 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1163 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1164 self.device.shell(['mkdir', '-p', subdir]) 1165 1166 # Create some random files and a subdirectory containing more files. 1167 temp_files = make_random_device_files( 1168 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1169 1170 subdir_temp_files = make_random_device_files( 1171 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1172 1173 paths = [x.full_path for x in temp_files] 1174 paths.append(subdir) 1175 self.device._simple_call(['pull'] + paths + [host_dir]) 1176 1177 for temp_file in temp_files: 1178 local_path = os.path.join(host_dir, temp_file.base_name) 1179 self._verify_local(temp_file.checksum, local_path) 1180 1181 for subdir_temp_file in subdir_temp_files: 1182 local_path = os.path.join(host_dir, 1183 'subdir', 1184 subdir_temp_file.base_name) 1185 self._verify_local(subdir_temp_file.checksum, local_path) 1186 1187 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1188 finally: 1189 if host_dir is not None: 1190 shutil.rmtree(host_dir) 1191 1192 def verify_sync(self, device, temp_files, device_dir): 1193 """Verifies that a list of temp files was synced to the device.""" 1194 # Confirm that every file on the device mirrors that on the host. 1195 for temp_file in temp_files: 1196 device_full_path = posixpath.join( 1197 device_dir, temp_file.base_name) 1198 dev_md5, _ = device.shell( 1199 [get_md5_prog(self.device), device_full_path])[0].split() 1200 self.assertEqual(temp_file.checksum, dev_md5) 1201 1202 def test_sync(self): 1203 """Sync a host directory to the data partition.""" 1204 1205 try: 1206 base_dir = tempfile.mkdtemp() 1207 1208 # Create mirror device directory hierarchy within base_dir. 1209 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1210 os.makedirs(full_dir_path) 1211 1212 # Create 32 random files within the host mirror. 1213 temp_files = make_random_host_files( 1214 in_dir=full_dir_path, num_files=32) 1215 1216 # Clean up any stale files on the device. 1217 device = adb.get_device() # pylint: disable=no-member 1218 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1219 1220 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1221 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1222 device.sync('data') 1223 if old_product_out is None: 1224 del os.environ['ANDROID_PRODUCT_OUT'] 1225 else: 1226 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1227 1228 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1229 1230 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1231 finally: 1232 if base_dir is not None: 1233 shutil.rmtree(base_dir) 1234 1235 def test_push_sync(self): 1236 """Sync a host directory to a specific path.""" 1237 1238 try: 1239 temp_dir = tempfile.mkdtemp() 1240 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1241 1242 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1243 1244 # Clean up any stale files on the device. 1245 device = adb.get_device() # pylint: disable=no-member 1246 device.shell(['rm', '-rf', device_dir]) 1247 1248 device.push(temp_dir, device_dir, sync=True) 1249 1250 self.verify_sync(device, temp_files, device_dir) 1251 1252 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1253 finally: 1254 if temp_dir is not None: 1255 shutil.rmtree(temp_dir) 1256 1257 def test_push_sync_multiple(self): 1258 """Sync multiple host directories to a specific path.""" 1259 1260 try: 1261 temp_dir = tempfile.mkdtemp() 1262 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1263 1264 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1265 1266 # Clean up any stale files on the device. 1267 device = adb.get_device() # pylint: disable=no-member 1268 device.shell(['rm', '-rf', device_dir]) 1269 device.shell(['mkdir', '-p', device_dir]) 1270 1271 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1272 device.push(host_paths, device_dir, sync=True) 1273 1274 self.verify_sync(device, temp_files, device_dir) 1275 1276 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1277 finally: 1278 if temp_dir is not None: 1279 shutil.rmtree(temp_dir) 1280 1281 1282 def test_push_dry_run_nonexistent_file(self): 1283 """Push with dry run.""" 1284 1285 for file_size in [8, 1024 * 1024]: 1286 try: 1287 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1288 device_file = posixpath.join(device_dir, 'file') 1289 1290 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1291 self.device.shell(['mkdir', '-p', device_dir]) 1292 1293 host_dir = tempfile.mkdtemp() 1294 host_file = posixpath.join(host_dir, 'file') 1295 1296 with open(host_file, "w") as f: 1297 f.write('x' * file_size) 1298 1299 self.device._simple_call(['push', '-n', host_file, device_file]) 1300 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1301 self.assertNotEqual(0, rc) 1302 1303 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1304 finally: 1305 if host_dir is not None: 1306 shutil.rmtree(host_dir) 1307 1308 def test_push_dry_run_existent_file(self): 1309 """Push with dry run.""" 1310 1311 for file_size in [8, 1024 * 1024]: 1312 try: 1313 host_dir = tempfile.mkdtemp() 1314 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1315 device_file = posixpath.join(device_dir, 'file') 1316 1317 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1318 self.device.shell(['mkdir', '-p', device_dir]) 1319 self.device.shell(['echo', 'foo', '>', device_file]) 1320 1321 host_file = posixpath.join(host_dir, 'file') 1322 1323 with open(host_file, "w") as f: 1324 f.write('x' * file_size) 1325 1326 self.device._simple_call(['push', '-n', host_file, device_file]) 1327 stdout, stderr = self.device.shell(['cat', device_file]) 1328 self.assertEqual(stdout.strip(), "foo") 1329 1330 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1331 finally: 1332 if host_dir is not None: 1333 shutil.rmtree(host_dir) 1334 1335 def test_unicode_paths(self): 1336 """Ensure that we can support non-ASCII paths, even on Windows.""" 1337 name = u'로보카 폴리' 1338 1339 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1340 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1341 1342 ## push. 1343 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1344 tf.close() 1345 self.device.push(tf.name, remote_path) 1346 os.remove(tf.name) 1347 self.assertFalse(os.path.exists(tf.name)) 1348 1349 # Verify that the device ended up with the expected UTF-8 path 1350 output = self.device.shell( 1351 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1352 self.assertEqual(remote_path, output) 1353 1354 # pull. 1355 self.device.pull(remote_path, tf.name) 1356 self.assertTrue(os.path.exists(tf.name)) 1357 os.remove(tf.name) 1358 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1359 1360 1361class FileOperationsTestUncompressed(FileOperationsTest.Base): 1362 compression = "none" 1363 1364 1365class FileOperationsTestBrotli(FileOperationsTest.Base): 1366 compression = "brotli" 1367 1368 1369class FileOperationsTestLZ4(FileOperationsTest.Base): 1370 compression = "lz4" 1371 1372 1373class FileOperationsTestZstd(FileOperationsTest.Base): 1374 compression = "zstd" 1375 1376 1377class DeviceOfflineTest(DeviceTest): 1378 def _get_device_state(self, serialno): 1379 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1380 for line in output.split('\n'): 1381 m = re.match('(\S+)\s+(\S+)', line) 1382 if m and m.group(1) == serialno: 1383 return m.group(2) 1384 return None 1385 1386 def disabled_test_killed_when_pushing_a_large_file(self): 1387 """ 1388 While running adb push with a large file, kill adb server. 1389 Occasionally the device becomes offline. Because the device is still 1390 reading data without realizing that the adb server has been restarted. 1391 Test if we can bring the device online automatically now. 1392 http://b/32952319 1393 """ 1394 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1395 # 1. Push a large file 1396 file_path = 'tmp_large_file' 1397 try: 1398 fh = open(file_path, 'w') 1399 fh.write('\0' * (100 * 1024 * 1024)) 1400 fh.close() 1401 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1402 time.sleep(0.1) 1403 # 2. Kill the adb server 1404 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1405 subproc.terminate() 1406 finally: 1407 try: 1408 os.unlink(file_path) 1409 except: 1410 pass 1411 # 3. See if the device still exist. 1412 # Sleep to wait for the adb server exit. 1413 time.sleep(0.5) 1414 # 4. The device should be online 1415 self.assertEqual(self._get_device_state(serialno), 'device') 1416 1417 def disabled_test_killed_when_pulling_a_large_file(self): 1418 """ 1419 While running adb pull with a large file, kill adb server. 1420 Occasionally the device can't be connected. Because the device is trying to 1421 send a message larger than what is expected by the adb server. 1422 Test if we can bring the device online automatically now. 1423 """ 1424 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1425 file_path = 'tmp_large_file' 1426 try: 1427 # 1. Create a large file on device. 1428 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1429 'bs=1000000', 'count=100']) 1430 # 2. Pull the large file on host. 1431 subproc = subprocess.Popen(self.device.adb_cmd + 1432 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1433 time.sleep(0.1) 1434 # 3. Kill the adb server 1435 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1436 subproc.terminate() 1437 finally: 1438 try: 1439 os.unlink(file_path) 1440 except: 1441 pass 1442 # 4. See if the device still exist. 1443 # Sleep to wait for the adb server exit. 1444 time.sleep(0.5) 1445 self.assertEqual(self._get_device_state(serialno), 'device') 1446 1447 1448 def test_packet_size_regression(self): 1449 """Test for http://b/37783561 1450 1451 Receiving packets of a length divisible by 512 but not 1024 resulted in 1452 the adb client waiting indefinitely for more input. 1453 """ 1454 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1455 # Probe some surrounding values as well, for the hell of it. 1456 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1457 for offset in [-6, -5, -4]: 1458 length = base + offset 1459 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1460 'echo', 'foo'] 1461 rc, stdout, _ = self.device.shell_nocheck(cmd) 1462 1463 self.assertEqual(0, rc) 1464 1465 # Output should be '\0' * length, followed by "foo\n" 1466 self.assertEqual(length, len(stdout) - 4) 1467 self.assertEqual(stdout, "\0" * length + "foo\n") 1468 1469 def test_zero_packet(self): 1470 """Test for http://b/113070258 1471 1472 Make sure that we don't blow up when sending USB transfers that line up 1473 exactly with the USB packet size. 1474 """ 1475 1476 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1477 try: 1478 for size in [512, 1024]: 1479 def listener(): 1480 cmd = ["echo foo | nc -l -p 12345; echo done"] 1481 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1482 1483 thread = threading.Thread(target=listener) 1484 thread.start() 1485 1486 # Wait a bit to let the shell command start. 1487 time.sleep(0.25) 1488 1489 sock = socket.create_connection(("localhost", local_port)) 1490 with contextlib.closing(sock): 1491 bytesWritten = sock.send(b"a" * size) 1492 self.assertEqual(size, bytesWritten) 1493 readBytes = sock.recv(4096) 1494 self.assertEqual(b"foo\n", readBytes) 1495 1496 thread.join() 1497 finally: 1498 self.device.forward_remove("tcp:{}".format(local_port)) 1499 1500 1501class SocketTest(DeviceTest): 1502 def test_socket_flush(self): 1503 """Test that we handle socket closure properly. 1504 1505 If we're done writing to a socket, closing before the other end has 1506 closed will send a TCP_RST if we have incoming data queued up, which 1507 may result in data that we've written being discarded. 1508 1509 Bug: http://b/74616284 1510 """ 1511 s = socket.create_connection(("localhost", 5037)) 1512 1513 def adb_length_prefixed(string): 1514 encoded = string.encode("utf8") 1515 result = b"%04x%s" % (len(encoded), encoded) 1516 return result 1517 1518 if "ANDROID_SERIAL" in os.environ: 1519 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1520 else: 1521 transport_string = "host:transport-any" 1522 1523 s.sendall(adb_length_prefixed(transport_string)) 1524 response = s.recv(4) 1525 self.assertEqual(b"OKAY", response) 1526 1527 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1528 s.sendall(adb_length_prefixed(shell_string)) 1529 1530 response = s.recv(4) 1531 self.assertEqual(b"OKAY", response) 1532 1533 # Spawn a thread that dumps garbage into the socket until failure. 1534 def spam(): 1535 buf = b"\0" * 16384 1536 try: 1537 while True: 1538 s.sendall(buf) 1539 except Exception as ex: 1540 print(ex) 1541 1542 thread = threading.Thread(target=spam) 1543 thread.start() 1544 1545 time.sleep(1) 1546 1547 received = b"" 1548 while True: 1549 read = s.recv(512) 1550 if len(read) == 0: 1551 break 1552 received += read 1553 1554 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1555 thread.join() 1556 1557 1558class FramebufferTest(DeviceTest): 1559 def test_framebuffer(self): 1560 """Test that we get something from the framebuffer service.""" 1561 output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) 1562 self.assertFalse(len(output) == 0) 1563 1564 1565if sys.platform == "win32": 1566 # From https://stackoverflow.com/a/38749458 1567 import os 1568 import contextlib 1569 import msvcrt 1570 import ctypes 1571 from ctypes import wintypes 1572 1573 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1574 1575 GENERIC_READ = 0x80000000 1576 GENERIC_WRITE = 0x40000000 1577 FILE_SHARE_READ = 1 1578 FILE_SHARE_WRITE = 2 1579 CONSOLE_TEXTMODE_BUFFER = 1 1580 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1581 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1582 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1583 1584 def _check_zero(result, func, args): 1585 if not result: 1586 raise ctypes.WinError(ctypes.get_last_error()) 1587 return args 1588 1589 def _check_invalid(result, func, args): 1590 if result == INVALID_HANDLE_VALUE: 1591 raise ctypes.WinError(ctypes.get_last_error()) 1592 return args 1593 1594 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1595 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1596 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1597 1598 class COORD(ctypes.Structure): 1599 _fields_ = (('X', wintypes.SHORT), 1600 ('Y', wintypes.SHORT)) 1601 1602 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1603 _fields_ = (('cbSize', wintypes.ULONG), 1604 ('dwSize', COORD), 1605 ('dwCursorPosition', COORD), 1606 ('wAttributes', wintypes.WORD), 1607 ('srWindow', wintypes.SMALL_RECT), 1608 ('dwMaximumWindowSize', COORD), 1609 ('wPopupAttributes', wintypes.WORD), 1610 ('bFullscreenSupported', wintypes.BOOL), 1611 ('ColorTable', wintypes.DWORD * 16)) 1612 def __init__(self, *args, **kwds): 1613 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1614 *args, **kwds) 1615 self.cbSize = ctypes.sizeof(self) 1616 1617 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1618 CONSOLE_SCREEN_BUFFER_INFOEX) 1619 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1620 1621 kernel32.GetStdHandle.errcheck = _check_invalid 1622 kernel32.GetStdHandle.restype = wintypes.HANDLE 1623 kernel32.GetStdHandle.argtypes = ( 1624 wintypes.DWORD,) # _In_ nStdHandle 1625 1626 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1627 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1628 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1629 wintypes.DWORD, # _In_ dwDesiredAccess 1630 wintypes.DWORD, # _In_ dwShareMode 1631 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1632 wintypes.DWORD, # _In_ dwFlags 1633 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1634 1635 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1636 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1637 wintypes.HANDLE, # _In_ hConsoleOutput 1638 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1639 1640 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1641 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1642 wintypes.HANDLE, # _In_ hConsoleOutput 1643 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1644 1645 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1646 kernel32.SetConsoleWindowInfo.argtypes = ( 1647 wintypes.HANDLE, # _In_ hConsoleOutput 1648 wintypes.BOOL, # _In_ bAbsolute 1649 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1650 1651 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1652 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1653 wintypes.HANDLE, # _In_ hConsoleOutput 1654 wintypes.WCHAR, # _In_ cCharacter 1655 wintypes.DWORD, # _In_ nLength 1656 COORD, # _In_ dwWriteCoord 1657 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1658 1659 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1660 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1661 wintypes.HANDLE, # _In_ hConsoleOutput 1662 wintypes.LPWSTR, # _Out_ lpCharacter 1663 wintypes.DWORD, # _In_ nLength 1664 COORD, # _In_ dwReadCoord 1665 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1666 1667 @contextlib.contextmanager 1668 def allocate_console(): 1669 allocated = kernel32.AllocConsole() 1670 try: 1671 yield allocated 1672 finally: 1673 if allocated: 1674 kernel32.FreeConsole() 1675 1676 @contextlib.contextmanager 1677 def console_screen(ncols=None, nrows=None): 1678 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1679 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1680 nwritten = (wintypes.DWORD * 1)() 1681 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1682 kernel32.GetConsoleScreenBufferInfoEx( 1683 hStdOut, ctypes.byref(info)) 1684 if ncols is None: 1685 ncols = info.dwSize.X 1686 if nrows is None: 1687 nrows = info.dwSize.Y 1688 elif nrows > 9999: 1689 raise ValueError('nrows must be 9999 or less') 1690 fd_screen = None 1691 hScreen = kernel32.CreateConsoleScreenBuffer( 1692 GENERIC_READ | GENERIC_WRITE, 1693 FILE_SHARE_READ | FILE_SHARE_WRITE, 1694 None, CONSOLE_TEXTMODE_BUFFER, None) 1695 try: 1696 fd_screen = msvcrt.open_osfhandle( 1697 hScreen, os.O_RDWR | os.O_BINARY) 1698 kernel32.GetConsoleScreenBufferInfoEx( 1699 hScreen, ctypes.byref(new_info)) 1700 new_info.dwSize = COORD(ncols, nrows) 1701 new_info.srWindow = wintypes.SMALL_RECT( 1702 Left=0, Top=0, Right=(ncols - 1), 1703 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1704 kernel32.SetConsoleScreenBufferInfoEx( 1705 hScreen, ctypes.byref(new_info)) 1706 kernel32.SetConsoleWindowInfo(hScreen, True, 1707 ctypes.byref(new_info.srWindow)) 1708 kernel32.FillConsoleOutputCharacterW( 1709 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1710 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1711 try: 1712 yield fd_screen 1713 finally: 1714 kernel32.SetConsoleScreenBufferInfoEx( 1715 hStdOut, ctypes.byref(info)) 1716 kernel32.SetConsoleWindowInfo(hStdOut, True, 1717 ctypes.byref(info.srWindow)) 1718 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1719 finally: 1720 if fd_screen is not None: 1721 os.close(fd_screen) 1722 else: 1723 kernel32.CloseHandle(hScreen) 1724 1725 def read_screen(fd): 1726 hScreen = msvcrt.get_osfhandle(fd) 1727 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1728 kernel32.GetConsoleScreenBufferInfoEx( 1729 hScreen, ctypes.byref(csbi)) 1730 ncols = csbi.dwSize.X 1731 pos = csbi.dwCursorPosition 1732 length = ncols * pos.Y + pos.X + 1 1733 buf = (ctypes.c_wchar * length)() 1734 n = (wintypes.DWORD * 1)() 1735 kernel32.ReadConsoleOutputCharacterW( 1736 hScreen, buf, length, COORD(0,0), n) 1737 lines = [buf[i:i+ncols].rstrip(u'\0') 1738 for i in range(0, n[0], ncols)] 1739 return u'\n'.join(lines) 1740 1741@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1742class WindowsConsoleTest(DeviceTest): 1743 def test_unicode_output(self): 1744 """Test Unicode command line parameters and Unicode console window output. 1745 1746 Bug: https://issuetracker.google.com/issues/111972753 1747 """ 1748 # If we don't have a console window, allocate one. This isn't necessary if we're already 1749 # being run from a console window, which is typical. 1750 with allocate_console() as allocated_console: 1751 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1752 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1753 # likely unnecessary given the typical console window size. 1754 with console_screen(nrows=1000) as screen: 1755 unicode_string = u'로보카 폴리' 1756 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1757 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1758 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1759 process.wait() 1760 # Read what was written by adb to the temporary console buffer. 1761 console_output = read_screen(screen) 1762 self.assertEqual(unicode_string, console_output) 1763 1764 1765def main(): 1766 random.seed(0) 1767 if len(adb.get_devices()) > 0: 1768 suite = unittest.TestLoader().loadTestsFromName(__name__) 1769 unittest.TextTestRunner(verbosity=3).run(suite) 1770 else: 1771 print('Test suite must be run with attached devices') 1772 1773 1774if __name__ == '__main__': 1775 main() 1776