1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 device = adb.get_device() 81 82 83class AbbTest(DeviceTest): 84 def test_smoke(self): 85 abb = subprocess.run(['adb', 'abb'], capture_output=True) 86 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 87 88 # abb squashes all failures to 1. 89 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 90 self.assertEqual(abb.stdout, cmd.stdout) 91 self.assertEqual(abb.stderr, cmd.stderr) 92 93class ForwardReverseTest(DeviceTest): 94 def _test_no_rebind(self, description, direction_list, direction, 95 direction_no_rebind, direction_remove_all): 96 msg = direction_list() 97 self.assertEqual('', msg.strip(), 98 description + ' list must be empty to run this test.') 99 100 # Use --no-rebind with no existing binding 101 direction_no_rebind('tcp:5566', 'tcp:6655') 102 msg = direction_list() 103 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 104 105 # Use --no-rebind with existing binding 106 with self.assertRaises(subprocess.CalledProcessError): 107 direction_no_rebind('tcp:5566', 'tcp:6677') 108 msg = direction_list() 109 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 110 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 111 112 # Use the absence of --no-rebind with existing binding 113 direction('tcp:5566', 'tcp:6677') 114 msg = direction_list() 115 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 116 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 117 118 direction_remove_all() 119 msg = direction_list() 120 self.assertEqual('', msg.strip()) 121 122 def test_forward_no_rebind(self): 123 self._test_no_rebind('forward', self.device.forward_list, 124 self.device.forward, self.device.forward_no_rebind, 125 self.device.forward_remove_all) 126 127 def test_reverse_no_rebind(self): 128 self._test_no_rebind('reverse', self.device.reverse_list, 129 self.device.reverse, self.device.reverse_no_rebind, 130 self.device.reverse_remove_all) 131 132 def test_forward(self): 133 msg = self.device.forward_list() 134 self.assertEqual('', msg.strip(), 135 'Forwarding list must be empty to run this test.') 136 self.device.forward('tcp:5566', 'tcp:6655') 137 msg = self.device.forward_list() 138 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 139 self.device.forward('tcp:7788', 'tcp:8877') 140 msg = self.device.forward_list() 141 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 142 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 143 self.device.forward_remove('tcp:5566') 144 msg = self.device.forward_list() 145 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 146 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 147 self.device.forward_remove_all() 148 msg = self.device.forward_list() 149 self.assertEqual('', msg.strip()) 150 151 def test_forward_old_protocol(self): 152 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 153 154 msg = self.device.forward_list() 155 self.assertEqual('', msg.strip(), 156 'Forwarding list must be empty to run this test.') 157 158 s = socket.create_connection(("localhost", 5037)) 159 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 160 cmd = b"%04x%s" % (len(service), service) 161 s.sendall(cmd) 162 163 msg = self.device.forward_list() 164 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 165 166 self.device.forward_remove_all() 167 msg = self.device.forward_list() 168 self.assertEqual('', msg.strip()) 169 170 def test_forward_tcp_port_0(self): 171 self.assertEqual('', self.device.forward_list().strip(), 172 'Forwarding list must be empty to run this test.') 173 174 try: 175 # If resolving TCP port 0 is supported, `adb forward` will print 176 # the actual port number. 177 port = self.device.forward('tcp:0', 'tcp:8888').strip() 178 if not port: 179 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 180 181 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 182 self.device.forward_list())) 183 finally: 184 self.device.forward_remove_all() 185 186 def test_reverse(self): 187 msg = self.device.reverse_list() 188 self.assertEqual('', msg.strip(), 189 'Reverse forwarding list must be empty to run this test.') 190 self.device.reverse('tcp:5566', 'tcp:6655') 191 msg = self.device.reverse_list() 192 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 193 self.device.reverse('tcp:7788', 'tcp:8877') 194 msg = self.device.reverse_list() 195 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 196 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 197 self.device.reverse_remove('tcp:5566') 198 msg = self.device.reverse_list() 199 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 200 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 201 self.device.reverse_remove_all() 202 msg = self.device.reverse_list() 203 self.assertEqual('', msg.strip()) 204 205 def test_reverse_tcp_port_0(self): 206 self.assertEqual('', self.device.reverse_list().strip(), 207 'Reverse list must be empty to run this test.') 208 209 try: 210 # If resolving TCP port 0 is supported, `adb reverse` will print 211 # the actual port number. 212 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 213 if not port: 214 raise unittest.SkipTest('Reversing tcp:0 is not available.') 215 216 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 217 self.device.reverse_list())) 218 finally: 219 self.device.reverse_remove_all() 220 221 def test_forward_reverse_echo(self): 222 """Send data through adb forward and read it back via adb reverse""" 223 forward_port = 12345 224 reverse_port = forward_port + 1 225 forward_spec = 'tcp:' + str(forward_port) 226 reverse_spec = 'tcp:' + str(reverse_port) 227 forward_setup = False 228 reverse_setup = False 229 230 try: 231 # listen on localhost:forward_port, connect to remote:forward_port 232 self.device.forward(forward_spec, forward_spec) 233 forward_setup = True 234 # listen on remote:forward_port, connect to localhost:reverse_port 235 self.device.reverse(forward_spec, reverse_spec) 236 reverse_setup = True 237 238 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 239 with contextlib.closing(listener): 240 # Use SO_REUSEADDR so that subsequent runs of the test can grab 241 # the port even if it is in TIME_WAIT. 242 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 243 244 # Listen on localhost:reverse_port before connecting to 245 # localhost:forward_port because that will cause adb to connect 246 # back to localhost:reverse_port. 247 listener.bind(('127.0.0.1', reverse_port)) 248 listener.listen(4) 249 250 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 251 with contextlib.closing(client): 252 # Connect to the listener. 253 client.connect(('127.0.0.1', forward_port)) 254 255 # Accept the client connection. 256 accepted_connection, addr = listener.accept() 257 with contextlib.closing(accepted_connection) as server: 258 data = b'hello' 259 260 # Send data into the port setup by adb forward. 261 client.sendall(data) 262 # Explicitly close() so that server gets EOF. 263 client.close() 264 265 # Verify that the data came back via adb reverse. 266 self.assertEqual(data, server.makefile().read().encode("utf8")) 267 finally: 268 if reverse_setup: 269 self.device.reverse_remove(forward_spec) 270 if forward_setup: 271 self.device.forward_remove(forward_spec) 272 273 274class ShellTest(DeviceTest): 275 def _interactive_shell(self, shell_args, input): 276 """Runs an interactive adb shell. 277 278 Args: 279 shell_args: List of string arguments to `adb shell`. 280 input: bytes input to send to the interactive shell. 281 282 Returns: 283 The remote exit code. 284 285 Raises: 286 unittest.SkipTest: The device doesn't support exit codes. 287 """ 288 if not self.device.has_shell_protocol(): 289 raise unittest.SkipTest('exit codes are unavailable on this device') 290 291 proc = subprocess.Popen( 292 self.device.adb_cmd + ['shell'] + shell_args, 293 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 294 stderr=subprocess.PIPE) 295 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 296 # to explicitly add an exit command to close the session from the device 297 # side, plus the necessary newline to complete the interactive command. 298 proc.communicate(input + b'; exit\n') 299 return proc.returncode 300 301 def test_cat(self): 302 """Check that we can at least cat a file.""" 303 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 304 elements = out.split() 305 self.assertEqual(len(elements), 2) 306 307 uptime, idle = elements 308 self.assertGreater(float(uptime), 0.0) 309 self.assertGreater(float(idle), 0.0) 310 311 def test_throws_on_failure(self): 312 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 313 314 def test_output_not_stripped(self): 315 out = self.device.shell(['echo', 'foo'])[0] 316 self.assertEqual(out, 'foo' + self.device.linesep) 317 318 def test_shell_command_length(self): 319 # Devices that have shell_v2 should be able to handle long commands. 320 if self.device.has_shell_protocol(): 321 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 322 self.assertEqual(rc, 0) 323 self.assertTrue(out == ('x' * 16384 + '\n')) 324 325 def test_shell_nocheck_failure(self): 326 rc, out, _ = self.device.shell_nocheck(['false']) 327 self.assertNotEqual(rc, 0) 328 self.assertEqual(out, '') 329 330 def test_shell_nocheck_output_not_stripped(self): 331 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 332 self.assertEqual(rc, 0) 333 self.assertEqual(out, 'foo' + self.device.linesep) 334 335 def test_can_distinguish_tricky_results(self): 336 # If result checking on ADB shell is naively implemented as 337 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 338 # output from the result for a cmd of `echo -n 1`. 339 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 340 self.assertEqual(rc, 0) 341 self.assertEqual(out, '1') 342 343 def test_line_endings(self): 344 """Ensure that line ending translation is not happening in the pty. 345 346 Bug: http://b/19735063 347 """ 348 output = self.device.shell(['uname'])[0] 349 self.assertEqual(output, 'Linux' + self.device.linesep) 350 351 def test_pty_logic(self): 352 """Tests that a PTY is allocated when it should be. 353 354 PTY allocation behavior should match ssh. 355 """ 356 def check_pty(args): 357 """Checks adb shell PTY allocation. 358 359 Tests |args| for terminal and non-terminal stdin. 360 361 Args: 362 args: -Tt args in a list (e.g. ['-t', '-t']). 363 364 Returns: 365 A tuple (<terminal>, <non-terminal>). True indicates 366 the corresponding shell allocated a remote PTY. 367 """ 368 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 369 370 terminal = subprocess.Popen( 371 test_cmd, stdin=None, 372 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 373 terminal.communicate() 374 375 non_terminal = subprocess.Popen( 376 test_cmd, stdin=subprocess.PIPE, 377 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 378 non_terminal.communicate() 379 380 return (terminal.returncode == 0, non_terminal.returncode == 0) 381 382 # -T: never allocate PTY. 383 self.assertEqual((False, False), check_pty(['-T'])) 384 385 # These tests require a new device. 386 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 387 # No args: PTY only if stdin is a terminal and shell is interactive, 388 # which is difficult to reliably test from a script. 389 self.assertEqual((False, False), check_pty([])) 390 391 # -t: PTY if stdin is a terminal. 392 self.assertEqual((True, False), check_pty(['-t'])) 393 394 # -t -t: always allocate PTY. 395 self.assertEqual((True, True), check_pty(['-t', '-t'])) 396 397 # -tt: always allocate PTY, POSIX style (http://b/32216152). 398 self.assertEqual((True, True), check_pty(['-tt'])) 399 400 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 401 # we follow the man page instead. 402 self.assertEqual((True, True), check_pty(['-ttt'])) 403 404 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 405 self.assertEqual((True, True), check_pty(['-ttx'])) 406 407 # -Ttt: -tt cancels out -T. 408 self.assertEqual((True, True), check_pty(['-Ttt'])) 409 410 # -ttT: -T cancels out -tt. 411 self.assertEqual((False, False), check_pty(['-ttT'])) 412 413 def test_shell_protocol(self): 414 """Tests the shell protocol on the device. 415 416 If the device supports shell protocol, this gives us the ability 417 to separate stdout/stderr and return the exit code directly. 418 419 Bug: http://b/19734861 420 """ 421 if not self.device.has_shell_protocol(): 422 raise unittest.SkipTest('shell protocol unsupported on this device') 423 424 # Shell protocol should be used by default. 425 result = self.device.shell_nocheck( 426 shlex.split('echo foo; echo bar >&2; exit 17')) 427 self.assertEqual(17, result[0]) 428 self.assertEqual('foo' + self.device.linesep, result[1]) 429 self.assertEqual('bar' + self.device.linesep, result[2]) 430 431 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 432 433 # -x flag should disable shell protocol. 434 result = self.device.shell_nocheck( 435 shlex.split('-x echo foo; echo bar >&2; exit 17')) 436 self.assertEqual(0, result[0]) 437 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 438 self.assertEqual('', result[2]) 439 440 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 441 442 def test_non_interactive_sigint(self): 443 """Tests that SIGINT in a non-interactive shell kills the process. 444 445 This requires the shell protocol in order to detect the broken 446 pipe; raw data transfer mode will only see the break once the 447 subprocess tries to read or write. 448 449 Bug: http://b/23825725 450 """ 451 if not self.device.has_shell_protocol(): 452 raise unittest.SkipTest('shell protocol unsupported on this device') 453 454 # Start a long-running process. 455 sleep_proc = subprocess.Popen( 456 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 457 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 458 stderr=subprocess.STDOUT) 459 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 460 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 461 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 462 463 # Verify that the process is running, send signal, verify it stopped. 464 self.device.shell(proc_query) 465 os.kill(sleep_proc.pid, signal.SIGINT) 466 sleep_proc.communicate() 467 468 # It can take some time for the process to receive the signal and die. 469 end_time = time.time() + 3 470 while self.device.shell_nocheck(proc_query)[0] != 1: 471 self.assertFalse(time.time() > end_time, 472 'subprocess failed to terminate in time') 473 474 def test_non_interactive_stdin(self): 475 """Tests that non-interactive shells send stdin.""" 476 if not self.device.has_shell_protocol(): 477 raise unittest.SkipTest('non-interactive stdin unsupported ' 478 'on this device') 479 480 # Test both small and large inputs. 481 small_input = b'foo' 482 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 483 large_input = b'\n'.join(characters) 484 485 486 for input in (small_input, large_input): 487 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 488 stdin=subprocess.PIPE, 489 stdout=subprocess.PIPE, 490 stderr=subprocess.PIPE) 491 stdout, stderr = proc.communicate(input) 492 self.assertEqual(input.splitlines(), stdout.splitlines()) 493 self.assertEqual(b'', stderr) 494 495 def test_sighup(self): 496 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 497 log_path = "/data/local/tmp/adb_signal_test.log" 498 499 # Clear the output file. 500 self.device.shell_nocheck(["echo", ">", log_path]) 501 502 script = """ 503 trap "echo SIGINT > {path}; exit 0" SIGINT 504 trap "echo SIGHUP > {path}; exit 0" SIGHUP 505 echo Waiting 506 read 507 """.format(path=log_path) 508 509 script = ";".join([x.strip() for x in script.strip().splitlines()]) 510 511 process = self.device.shell_popen([script], kill_atexit=False, 512 stdin=subprocess.PIPE, 513 stdout=subprocess.PIPE) 514 515 self.assertEqual(b"Waiting\n", process.stdout.readline()) 516 process.send_signal(signal.SIGINT) 517 process.wait() 518 519 # Waiting for the local adb to finish is insufficient, since it hangs 520 # up immediately. 521 time.sleep(1) 522 523 stdout, _ = self.device.shell(["cat", log_path]) 524 self.assertEqual(stdout.strip(), "SIGHUP") 525 526 def test_exit_stress(self): 527 """Hammer `adb shell exit 42` with multiple threads.""" 528 thread_count = 48 529 result = dict() 530 def hammer(thread_idx, thread_count, result): 531 success = True 532 for i in range(thread_idx, 240, thread_count): 533 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 534 if ret != i % 256: 535 success = False 536 break 537 result[thread_idx] = success 538 539 threads = [] 540 for i in range(thread_count): 541 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 542 thread.start() 543 threads.append(thread) 544 for thread in threads: 545 thread.join() 546 for i, success in result.items(): 547 self.assertTrue(success) 548 549 def disabled_test_parallel(self): 550 """Spawn a bunch of `adb shell` instances in parallel. 551 552 This was broken historically due to the use of select, which only works 553 for fds that are numerically less than 1024. 554 555 Bug: http://b/141955761""" 556 557 n_procs = 2048 558 procs = dict() 559 for i in range(0, n_procs): 560 procs[i] = subprocess.Popen( 561 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 562 stdin=subprocess.PIPE, 563 stdout=subprocess.PIPE 564 ) 565 566 for i in range(0, n_procs): 567 procs[i].stdin.write("%d\n" % i) 568 569 for i in range(0, n_procs): 570 response = procs[i].stdout.readline() 571 assert(response == "%d\n" % i) 572 573 for i in range(0, n_procs): 574 procs[i].stdin.write("%d\n" % (i % 256)) 575 576 for i in range(0, n_procs): 577 assert(procs[i].wait() == i % 256) 578 579 580class ArgumentEscapingTest(DeviceTest): 581 def test_shell_escaping(self): 582 """Make sure that argument escaping is somewhat sane.""" 583 584 # http://b/19734868 585 # Note that this actually matches ssh(1)'s behavior --- it's 586 # converted to `sh -c echo hello; echo world` which sh interprets 587 # as `sh -c echo` (with an argument to that shell of "hello"), 588 # and then `echo world` back in the first shell. 589 result = self.device.shell( 590 shlex.split("sh -c 'echo hello; echo world'"))[0] 591 result = result.splitlines() 592 self.assertEqual(['', 'world'], result) 593 # If you really wanted "hello" and "world", here's what you'd do: 594 result = self.device.shell( 595 shlex.split(r'echo hello\;echo world'))[0].splitlines() 596 self.assertEqual(['hello', 'world'], result) 597 598 # http://b/15479704 599 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 600 self.assertEqual('t', result) 601 result = self.device.shell( 602 shlex.split("sh -c 'true && echo t'"))[0].strip() 603 self.assertEqual('t', result) 604 605 # http://b/20564385 606 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 607 self.assertEqual('t', result) 608 result = self.device.shell( 609 shlex.split(r'echo -n 123\;uname'))[0].strip() 610 self.assertEqual('123Linux', result) 611 612 def test_install_argument_escaping(self): 613 """Make sure that install argument escaping works.""" 614 # http://b/20323053, http://b/3090932. 615 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 616 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 617 delete=False) 618 tf.close() 619 620 # Installing bogus .apks fails if the device supports exit codes. 621 try: 622 output = self.device.install(tf.name.decode("utf8")) 623 except subprocess.CalledProcessError as e: 624 output = e.output 625 626 self.assertIn(file_suffix, output) 627 os.remove(tf.name) 628 629 630@unittest.skip("b/172372960: temporarily disabled due to flakiness") 631class RootUnrootTest(DeviceTest): 632 def _test_root(self): 633 message = self.device.root() 634 if 'adbd cannot run as root in production builds' in message: 635 return 636 self.device.wait() 637 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 638 639 def _test_unroot(self): 640 self.device.unroot() 641 self.device.wait() 642 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 643 644 def test_root_unroot(self): 645 """Make sure that adb root and adb unroot work, using id(1).""" 646 if self.device.get_prop('ro.debuggable') != '1': 647 raise unittest.SkipTest('requires rootable build') 648 649 original_user = self.device.shell(['id', '-un'])[0].strip() 650 try: 651 if original_user == 'root': 652 self._test_unroot() 653 self._test_root() 654 elif original_user == 'shell': 655 self._test_root() 656 self._test_unroot() 657 finally: 658 if original_user == 'root': 659 self.device.root() 660 else: 661 self.device.unroot() 662 self.device.wait() 663 664 665class TcpIpTest(DeviceTest): 666 def test_tcpip_failure_raises(self): 667 """adb tcpip requires a port. 668 669 Bug: http://b/22636927 670 """ 671 self.assertRaises( 672 subprocess.CalledProcessError, self.device.tcpip, '') 673 self.assertRaises( 674 subprocess.CalledProcessError, self.device.tcpip, 'foo') 675 676 677class SystemPropertiesTest(DeviceTest): 678 def test_get_prop(self): 679 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 680 681 @requires_root 682 def test_set_prop(self): 683 prop_name = 'foo.bar' 684 self.device.shell(['setprop', prop_name, '""']) 685 686 self.device.set_prop(prop_name, 'qux') 687 self.assertEqual( 688 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 689 690 691def compute_md5(string): 692 hsh = hashlib.md5() 693 hsh.update(string) 694 return hsh.hexdigest() 695 696 697def get_md5_prog(device): 698 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 699 try: 700 device.shell(['md5sum', '/proc/uptime']) 701 return 'md5sum' 702 except adb.ShellError: 703 return 'md5' 704 705 706class HostFile(object): 707 def __init__(self, handle, checksum): 708 self.handle = handle 709 self.checksum = checksum 710 self.full_path = handle.name 711 self.base_name = os.path.basename(self.full_path) 712 713 714class DeviceFile(object): 715 def __init__(self, checksum, full_path): 716 self.checksum = checksum 717 self.full_path = full_path 718 self.base_name = posixpath.basename(self.full_path) 719 720 721def make_random_host_files(in_dir, num_files): 722 min_size = 1 * (1 << 10) 723 max_size = 16 * (1 << 10) 724 725 files = [] 726 for _ in range(num_files): 727 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 728 729 size = random.randrange(min_size, max_size, 1024) 730 rand_str = os.urandom(size) 731 file_handle.write(rand_str) 732 file_handle.flush() 733 file_handle.close() 734 735 md5 = compute_md5(rand_str) 736 files.append(HostFile(file_handle, md5)) 737 return files 738 739 740def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 741 min_size = 1 * (1 << 10) 742 max_size = 16 * (1 << 10) 743 744 files = [] 745 for file_num in range(num_files): 746 size = random.randrange(min_size, max_size, 1024) 747 748 base_name = prefix + str(file_num) 749 full_path = posixpath.join(in_dir, base_name) 750 751 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 752 'bs={}'.format(size), 'count=1']) 753 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 754 755 files.append(DeviceFile(dev_md5, full_path)) 756 return files 757 758 759class FileOperationsTest: 760 class Base(DeviceTest): 761 SCRATCH_DIR = '/data/local/tmp' 762 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 763 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 764 765 def setUp(self): 766 self.previous_env = os.environ.get("ADB_COMPRESSION") 767 os.environ["ADB_COMPRESSION"] = self.compression 768 769 def tearDown(self): 770 if self.previous_env is None: 771 del os.environ["ADB_COMPRESSION"] 772 else: 773 os.environ["ADB_COMPRESSION"] = self.previous_env 774 775 def _verify_remote(self, checksum, remote_path): 776 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 777 remote_path])[0].split() 778 self.assertEqual(checksum, dev_md5) 779 780 def _verify_local(self, checksum, local_path): 781 with open(local_path, 'rb') as host_file: 782 host_md5 = compute_md5(host_file.read()) 783 self.assertEqual(host_md5, checksum) 784 785 def test_push(self): 786 """Push a randomly generated file to specified device.""" 787 kbytes = 512 788 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 789 rand_str = os.urandom(1024 * kbytes) 790 tmp.write(rand_str) 791 tmp.close() 792 793 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 794 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 795 796 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 797 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 798 799 os.remove(tmp.name) 800 801 def test_push_dir(self): 802 """Push a randomly generated directory of files to the device.""" 803 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 804 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 805 806 try: 807 host_dir = tempfile.mkdtemp() 808 809 # Make sure the temp directory isn't setuid, or else adb will complain. 810 os.chmod(host_dir, 0o700) 811 812 # Create 32 random files. 813 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 814 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 815 816 for temp_file in temp_files: 817 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 818 os.path.basename(host_dir), 819 temp_file.base_name) 820 self._verify_remote(temp_file.checksum, remote_path) 821 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 822 finally: 823 if host_dir is not None: 824 shutil.rmtree(host_dir) 825 826 def disabled_test_push_empty(self): 827 """Push an empty directory to the device.""" 828 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 829 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 830 831 try: 832 host_dir = tempfile.mkdtemp() 833 834 # Make sure the temp directory isn't setuid, or else adb will complain. 835 os.chmod(host_dir, 0o700) 836 837 # Create an empty directory. 838 empty_dir_path = os.path.join(host_dir, 'empty') 839 os.mkdir(empty_dir_path); 840 841 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 842 843 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 844 test_empty_cmd = ["[", "-d", remote_path, "]"] 845 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 846 847 self.assertEqual(rc, 0) 848 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 849 finally: 850 if host_dir is not None: 851 shutil.rmtree(host_dir) 852 853 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 854 def test_push_symlink(self): 855 """Push a symlink. 856 857 Bug: http://b/31491920 858 """ 859 try: 860 host_dir = tempfile.mkdtemp() 861 862 # Make sure the temp directory isn't setuid, or else adb will 863 # complain. 864 os.chmod(host_dir, 0o700) 865 866 with open(os.path.join(host_dir, 'foo'), 'w') as f: 867 f.write('foo') 868 869 symlink_path = os.path.join(host_dir, 'symlink') 870 os.symlink('foo', symlink_path) 871 872 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 873 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 874 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 875 rc, out, _ = self.device.shell_nocheck( 876 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 877 self.assertEqual(0, rc) 878 self.assertEqual(out.strip(), 'foo') 879 finally: 880 if host_dir is not None: 881 shutil.rmtree(host_dir) 882 883 def test_multiple_push(self): 884 """Push multiple files to the device in one adb push command. 885 886 Bug: http://b/25324823 887 """ 888 889 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 890 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 891 892 try: 893 host_dir = tempfile.mkdtemp() 894 895 # Create some random files and a subdirectory containing more files. 896 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 897 898 subdir = os.path.join(host_dir, 'subdir') 899 os.mkdir(subdir) 900 subdir_temp_files = make_random_host_files(in_dir=subdir, 901 num_files=4) 902 903 paths = [x.full_path for x in temp_files] 904 paths.append(subdir) 905 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 906 907 for temp_file in temp_files: 908 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 909 temp_file.base_name) 910 self._verify_remote(temp_file.checksum, remote_path) 911 912 for subdir_temp_file in subdir_temp_files: 913 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 914 # BROKEN: http://b/25394682 915 # 'subdir'; 916 temp_file.base_name) 917 self._verify_remote(temp_file.checksum, remote_path) 918 919 920 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 921 finally: 922 if host_dir is not None: 923 shutil.rmtree(host_dir) 924 925 @requires_non_root 926 def test_push_error_reporting(self): 927 """Make sure that errors that occur while pushing a file get reported 928 929 Bug: http://b/26816782 930 """ 931 with tempfile.NamedTemporaryFile() as tmp_file: 932 tmp_file.write(b'\0' * 1024 * 1024) 933 tmp_file.flush() 934 try: 935 self.device.push(local=tmp_file.name, remote='/system/') 936 self.fail('push should not have succeeded') 937 except subprocess.CalledProcessError as e: 938 output = e.output 939 940 self.assertTrue(b'Permission denied' in output or 941 b'Read-only file system' in output) 942 943 @requires_non_root 944 def test_push_directory_creation(self): 945 """Regression test for directory creation. 946 947 Bug: http://b/110953234 948 """ 949 with tempfile.NamedTemporaryFile() as tmp_file: 950 tmp_file.write(b'\0' * 1024 * 1024) 951 tmp_file.flush() 952 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 953 self.device.shell(['rm', '-rf', remote_path]) 954 955 remote_path += '/filename' 956 self.device.push(local=tmp_file.name, remote=remote_path) 957 958 def disabled_test_push_multiple_slash_root(self): 959 """Regression test for pushing to //data/local/tmp. 960 961 Bug: http://b/141311284 962 963 Disabled because this broken on the adbd side as well: b/141943968 964 """ 965 with tempfile.NamedTemporaryFile() as tmp_file: 966 tmp_file.write('\0' * 1024 * 1024) 967 tmp_file.flush() 968 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 969 self.device.shell(['rm', '-rf', remote_path]) 970 self.device.push(local=tmp_file.name, remote=remote_path) 971 972 def _test_pull(self, remote_file, checksum): 973 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 974 tmp_write.close() 975 self.device.pull(remote=remote_file, local=tmp_write.name) 976 with open(tmp_write.name, 'rb') as tmp_read: 977 host_contents = tmp_read.read() 978 host_md5 = compute_md5(host_contents) 979 self.assertEqual(checksum, host_md5) 980 os.remove(tmp_write.name) 981 982 @requires_non_root 983 def test_pull_error_reporting(self): 984 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 985 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 986 987 try: 988 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 989 except subprocess.CalledProcessError as e: 990 output = e.output 991 992 self.assertIn(b'Permission denied', output) 993 994 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 995 996 def test_pull(self): 997 """Pull a randomly generated file from specified device.""" 998 kbytes = 512 999 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 1000 cmd = ['dd', 'if=/dev/urandom', 1001 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 1002 'count={}'.format(kbytes)] 1003 self.device.shell(cmd) 1004 dev_md5, _ = self.device.shell( 1005 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 1006 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 1007 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 1008 1009 def test_pull_dir(self): 1010 """Pull a randomly generated directory of files from the device.""" 1011 try: 1012 host_dir = tempfile.mkdtemp() 1013 1014 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1015 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1016 1017 # Populate device directory with random files. 1018 temp_files = make_random_device_files( 1019 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1020 1021 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1022 1023 for temp_file in temp_files: 1024 host_path = os.path.join( 1025 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1026 temp_file.base_name) 1027 self._verify_local(temp_file.checksum, host_path) 1028 1029 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1030 finally: 1031 if host_dir is not None: 1032 shutil.rmtree(host_dir) 1033 1034 def test_pull_dir_symlink(self): 1035 """Pull a directory into a symlink to a directory. 1036 1037 Bug: http://b/27362811 1038 """ 1039 if os.name != 'posix': 1040 raise unittest.SkipTest('requires POSIX') 1041 1042 try: 1043 host_dir = tempfile.mkdtemp() 1044 real_dir = os.path.join(host_dir, 'dir') 1045 symlink = os.path.join(host_dir, 'symlink') 1046 os.mkdir(real_dir) 1047 os.symlink(real_dir, symlink) 1048 1049 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1050 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1051 1052 # Populate device directory with random files. 1053 temp_files = make_random_device_files( 1054 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1055 1056 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1057 1058 for temp_file in temp_files: 1059 host_path = os.path.join( 1060 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1061 temp_file.base_name) 1062 self._verify_local(temp_file.checksum, host_path) 1063 1064 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1065 finally: 1066 if host_dir is not None: 1067 shutil.rmtree(host_dir) 1068 1069 def test_pull_dir_symlink_collision(self): 1070 """Pull a directory into a colliding symlink to directory.""" 1071 if os.name != 'posix': 1072 raise unittest.SkipTest('requires POSIX') 1073 1074 try: 1075 host_dir = tempfile.mkdtemp() 1076 real_dir = os.path.join(host_dir, 'real') 1077 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1078 symlink = os.path.join(host_dir, tmp_dirname) 1079 os.mkdir(real_dir) 1080 os.symlink(real_dir, symlink) 1081 1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1083 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1084 1085 # Populate device directory with random files. 1086 temp_files = make_random_device_files( 1087 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1088 1089 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1090 1091 for temp_file in temp_files: 1092 host_path = os.path.join(real_dir, temp_file.base_name) 1093 self._verify_local(temp_file.checksum, host_path) 1094 1095 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1096 finally: 1097 if host_dir is not None: 1098 shutil.rmtree(host_dir) 1099 1100 def test_pull_dir_nonexistent(self): 1101 """Pull a directory of files from the device to a nonexistent path.""" 1102 try: 1103 host_dir = tempfile.mkdtemp() 1104 dest_dir = os.path.join(host_dir, 'dest') 1105 1106 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1107 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1108 1109 # Populate device directory with random files. 1110 temp_files = make_random_device_files( 1111 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1112 1113 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1114 1115 for temp_file in temp_files: 1116 host_path = os.path.join(dest_dir, temp_file.base_name) 1117 self._verify_local(temp_file.checksum, host_path) 1118 1119 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1120 finally: 1121 if host_dir is not None: 1122 shutil.rmtree(host_dir) 1123 1124 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1125 def disabled_test_pull_symlink_dir(self): 1126 """Pull a symlink to a directory of symlinks to files.""" 1127 try: 1128 host_dir = tempfile.mkdtemp() 1129 1130 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1131 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1132 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1133 1134 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1135 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1136 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1137 1138 # Populate device directory with random files. 1139 temp_files = make_random_device_files( 1140 self.device, in_dir=remote_dir, num_files=32) 1141 1142 for temp_file in temp_files: 1143 self.device.shell( 1144 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1145 posixpath.join(remote_links, temp_file.base_name)]) 1146 1147 self.device.pull(remote=remote_symlink, local=host_dir) 1148 1149 for temp_file in temp_files: 1150 host_path = os.path.join( 1151 host_dir, 'symlink', temp_file.base_name) 1152 self._verify_local(temp_file.checksum, host_path) 1153 1154 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1155 finally: 1156 if host_dir is not None: 1157 shutil.rmtree(host_dir) 1158 1159 def test_pull_empty(self): 1160 """Pull a directory containing an empty directory from the device.""" 1161 try: 1162 host_dir = tempfile.mkdtemp() 1163 1164 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1165 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1166 self.device.shell(['mkdir', '-p', remote_empty_path]) 1167 1168 self.device.pull(remote=remote_empty_path, local=host_dir) 1169 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1170 finally: 1171 if host_dir is not None: 1172 shutil.rmtree(host_dir) 1173 1174 def test_multiple_pull(self): 1175 """Pull a randomly generated directory of files from the device.""" 1176 1177 try: 1178 host_dir = tempfile.mkdtemp() 1179 1180 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1181 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1182 self.device.shell(['mkdir', '-p', subdir]) 1183 1184 # Create some random files and a subdirectory containing more files. 1185 temp_files = make_random_device_files( 1186 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1187 1188 subdir_temp_files = make_random_device_files( 1189 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1190 1191 paths = [x.full_path for x in temp_files] 1192 paths.append(subdir) 1193 self.device._simple_call(['pull'] + paths + [host_dir]) 1194 1195 for temp_file in temp_files: 1196 local_path = os.path.join(host_dir, temp_file.base_name) 1197 self._verify_local(temp_file.checksum, local_path) 1198 1199 for subdir_temp_file in subdir_temp_files: 1200 local_path = os.path.join(host_dir, 1201 'subdir', 1202 subdir_temp_file.base_name) 1203 self._verify_local(subdir_temp_file.checksum, local_path) 1204 1205 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1206 finally: 1207 if host_dir is not None: 1208 shutil.rmtree(host_dir) 1209 1210 def verify_sync(self, device, temp_files, device_dir): 1211 """Verifies that a list of temp files was synced to the device.""" 1212 # Confirm that every file on the device mirrors that on the host. 1213 for temp_file in temp_files: 1214 device_full_path = posixpath.join( 1215 device_dir, temp_file.base_name) 1216 dev_md5, _ = device.shell( 1217 [get_md5_prog(self.device), device_full_path])[0].split() 1218 self.assertEqual(temp_file.checksum, dev_md5) 1219 1220 def test_sync(self): 1221 """Sync a host directory to the data partition.""" 1222 1223 try: 1224 base_dir = tempfile.mkdtemp() 1225 1226 # Create mirror device directory hierarchy within base_dir. 1227 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1228 os.makedirs(full_dir_path) 1229 1230 # Create 32 random files within the host mirror. 1231 temp_files = make_random_host_files( 1232 in_dir=full_dir_path, num_files=32) 1233 1234 # Clean up any stale files on the device. 1235 device = adb.get_device() # pylint: disable=no-member 1236 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1237 1238 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1239 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1240 device.sync('data') 1241 if old_product_out is None: 1242 del os.environ['ANDROID_PRODUCT_OUT'] 1243 else: 1244 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1245 1246 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1247 1248 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1249 finally: 1250 if base_dir is not None: 1251 shutil.rmtree(base_dir) 1252 1253 def test_push_sync(self): 1254 """Sync a host directory to a specific path.""" 1255 1256 try: 1257 temp_dir = tempfile.mkdtemp() 1258 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1259 1260 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1261 1262 # Clean up any stale files on the device. 1263 device = adb.get_device() # pylint: disable=no-member 1264 device.shell(['rm', '-rf', device_dir]) 1265 1266 device.push(temp_dir, device_dir, sync=True) 1267 1268 self.verify_sync(device, temp_files, device_dir) 1269 1270 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1271 finally: 1272 if temp_dir is not None: 1273 shutil.rmtree(temp_dir) 1274 1275 def test_push_sync_multiple(self): 1276 """Sync multiple host directories to a specific path.""" 1277 1278 try: 1279 temp_dir = tempfile.mkdtemp() 1280 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1281 1282 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1283 1284 # Clean up any stale files on the device. 1285 device = adb.get_device() # pylint: disable=no-member 1286 device.shell(['rm', '-rf', device_dir]) 1287 device.shell(['mkdir', '-p', device_dir]) 1288 1289 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1290 device.push(host_paths, device_dir, sync=True) 1291 1292 self.verify_sync(device, temp_files, device_dir) 1293 1294 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1295 finally: 1296 if temp_dir is not None: 1297 shutil.rmtree(temp_dir) 1298 1299 1300 def test_push_dry_run_nonexistent_file(self): 1301 """Push with dry run.""" 1302 1303 for file_size in [8, 1024 * 1024]: 1304 try: 1305 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1306 device_file = posixpath.join(device_dir, 'file') 1307 1308 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1309 self.device.shell(['mkdir', '-p', device_dir]) 1310 1311 host_dir = tempfile.mkdtemp() 1312 host_file = posixpath.join(host_dir, 'file') 1313 1314 with open(host_file, "w") as f: 1315 f.write('x' * file_size) 1316 1317 self.device._simple_call(['push', '-n', host_file, device_file]) 1318 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1319 self.assertNotEqual(0, rc) 1320 1321 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1322 finally: 1323 if host_dir is not None: 1324 shutil.rmtree(host_dir) 1325 1326 def test_push_dry_run_existent_file(self): 1327 """Push with dry run.""" 1328 1329 for file_size in [8, 1024 * 1024]: 1330 try: 1331 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1332 device_file = posixpath.join(device_dir, 'file') 1333 1334 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1335 self.device.shell(['mkdir', '-p', device_dir]) 1336 self.device.shell(['echo', 'foo', '>', device_file]) 1337 1338 host_dir = tempfile.mkdtemp() 1339 host_file = posixpath.join(host_dir, 'file') 1340 1341 with open(host_file, "w") as f: 1342 f.write('x' * file_size) 1343 1344 self.device._simple_call(['push', '-n', host_file, device_file]) 1345 stdout, stderr = self.device.shell(['cat', device_file]) 1346 self.assertEqual(stdout.strip(), "foo") 1347 1348 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1349 finally: 1350 if host_dir is not None: 1351 shutil.rmtree(host_dir) 1352 1353 def test_unicode_paths(self): 1354 """Ensure that we can support non-ASCII paths, even on Windows.""" 1355 name = u'로보카 폴리' 1356 1357 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1358 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1359 1360 ## push. 1361 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1362 tf.close() 1363 self.device.push(tf.name, remote_path) 1364 os.remove(tf.name) 1365 self.assertFalse(os.path.exists(tf.name)) 1366 1367 # Verify that the device ended up with the expected UTF-8 path 1368 output = self.device.shell( 1369 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1370 self.assertEqual(remote_path, output) 1371 1372 # pull. 1373 self.device.pull(remote_path, tf.name) 1374 self.assertTrue(os.path.exists(tf.name)) 1375 os.remove(tf.name) 1376 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1377 1378 1379class FileOperationsTestUncompressed(FileOperationsTest.Base): 1380 compression = "none" 1381 1382 1383class FileOperationsTestBrotli(FileOperationsTest.Base): 1384 compression = "brotli" 1385 1386 1387class FileOperationsTestLZ4(FileOperationsTest.Base): 1388 compression = "lz4" 1389 1390 1391class FileOperationsTestZstd(FileOperationsTest.Base): 1392 compression = "zstd" 1393 1394 1395class DeviceOfflineTest(DeviceTest): 1396 def _get_device_state(self, serialno): 1397 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1398 for line in output.split('\n'): 1399 m = re.match('(\S+)\s+(\S+)', line) 1400 if m and m.group(1) == serialno: 1401 return m.group(2) 1402 return None 1403 1404 def disabled_test_killed_when_pushing_a_large_file(self): 1405 """ 1406 While running adb push with a large file, kill adb server. 1407 Occasionally the device becomes offline. Because the device is still 1408 reading data without realizing that the adb server has been restarted. 1409 Test if we can bring the device online automatically now. 1410 http://b/32952319 1411 """ 1412 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1413 # 1. Push a large file 1414 file_path = 'tmp_large_file' 1415 try: 1416 fh = open(file_path, 'w') 1417 fh.write('\0' * (100 * 1024 * 1024)) 1418 fh.close() 1419 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1420 time.sleep(0.1) 1421 # 2. Kill the adb server 1422 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1423 subproc.terminate() 1424 finally: 1425 try: 1426 os.unlink(file_path) 1427 except: 1428 pass 1429 # 3. See if the device still exist. 1430 # Sleep to wait for the adb server exit. 1431 time.sleep(0.5) 1432 # 4. The device should be online 1433 self.assertEqual(self._get_device_state(serialno), 'device') 1434 1435 def disabled_test_killed_when_pulling_a_large_file(self): 1436 """ 1437 While running adb pull with a large file, kill adb server. 1438 Occasionally the device can't be connected. Because the device is trying to 1439 send a message larger than what is expected by the adb server. 1440 Test if we can bring the device online automatically now. 1441 """ 1442 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1443 file_path = 'tmp_large_file' 1444 try: 1445 # 1. Create a large file on device. 1446 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1447 'bs=1000000', 'count=100']) 1448 # 2. Pull the large file on host. 1449 subproc = subprocess.Popen(self.device.adb_cmd + 1450 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1451 time.sleep(0.1) 1452 # 3. Kill the adb server 1453 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1454 subproc.terminate() 1455 finally: 1456 try: 1457 os.unlink(file_path) 1458 except: 1459 pass 1460 # 4. See if the device still exist. 1461 # Sleep to wait for the adb server exit. 1462 time.sleep(0.5) 1463 self.assertEqual(self._get_device_state(serialno), 'device') 1464 1465 1466 def test_packet_size_regression(self): 1467 """Test for http://b/37783561 1468 1469 Receiving packets of a length divisible by 512 but not 1024 resulted in 1470 the adb client waiting indefinitely for more input. 1471 """ 1472 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1473 # Probe some surrounding values as well, for the hell of it. 1474 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1475 for offset in [-6, -5, -4]: 1476 length = base + offset 1477 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1478 'echo', 'foo'] 1479 rc, stdout, _ = self.device.shell_nocheck(cmd) 1480 1481 self.assertEqual(0, rc) 1482 1483 # Output should be '\0' * length, followed by "foo\n" 1484 self.assertEqual(length, len(stdout) - 4) 1485 self.assertEqual(stdout, "\0" * length + "foo\n") 1486 1487 def test_zero_packet(self): 1488 """Test for http://b/113070258 1489 1490 Make sure that we don't blow up when sending USB transfers that line up 1491 exactly with the USB packet size. 1492 """ 1493 1494 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1495 try: 1496 for size in [512, 1024]: 1497 def listener(): 1498 cmd = ["echo foo | nc -l -p 12345; echo done"] 1499 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1500 1501 thread = threading.Thread(target=listener) 1502 thread.start() 1503 1504 # Wait a bit to let the shell command start. 1505 time.sleep(0.25) 1506 1507 sock = socket.create_connection(("localhost", local_port)) 1508 with contextlib.closing(sock): 1509 bytesWritten = sock.send(b"a" * size) 1510 self.assertEqual(size, bytesWritten) 1511 readBytes = sock.recv(4096) 1512 self.assertEqual(b"foo\n", readBytes) 1513 1514 thread.join() 1515 finally: 1516 self.device.forward_remove("tcp:{}".format(local_port)) 1517 1518 1519class SocketTest(DeviceTest): 1520 def test_socket_flush(self): 1521 """Test that we handle socket closure properly. 1522 1523 If we're done writing to a socket, closing before the other end has 1524 closed will send a TCP_RST if we have incoming data queued up, which 1525 may result in data that we've written being discarded. 1526 1527 Bug: http://b/74616284 1528 """ 1529 s = socket.create_connection(("localhost", 5037)) 1530 1531 def adb_length_prefixed(string): 1532 encoded = string.encode("utf8") 1533 result = b"%04x%s" % (len(encoded), encoded) 1534 return result 1535 1536 if "ANDROID_SERIAL" in os.environ: 1537 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1538 else: 1539 transport_string = "host:transport-any" 1540 1541 s.sendall(adb_length_prefixed(transport_string)) 1542 response = s.recv(4) 1543 self.assertEqual(b"OKAY", response) 1544 1545 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1546 s.sendall(adb_length_prefixed(shell_string)) 1547 1548 response = s.recv(4) 1549 self.assertEqual(b"OKAY", response) 1550 1551 # Spawn a thread that dumps garbage into the socket until failure. 1552 def spam(): 1553 buf = b"\0" * 16384 1554 try: 1555 while True: 1556 s.sendall(buf) 1557 except Exception as ex: 1558 print(ex) 1559 1560 thread = threading.Thread(target=spam) 1561 thread.start() 1562 1563 time.sleep(1) 1564 1565 received = b"" 1566 while True: 1567 read = s.recv(512) 1568 if len(read) == 0: 1569 break 1570 received += read 1571 1572 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1573 thread.join() 1574 1575 1576if sys.platform == "win32": 1577 # From https://stackoverflow.com/a/38749458 1578 import os 1579 import contextlib 1580 import msvcrt 1581 import ctypes 1582 from ctypes import wintypes 1583 1584 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1585 1586 GENERIC_READ = 0x80000000 1587 GENERIC_WRITE = 0x40000000 1588 FILE_SHARE_READ = 1 1589 FILE_SHARE_WRITE = 2 1590 CONSOLE_TEXTMODE_BUFFER = 1 1591 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1592 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1593 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1594 1595 def _check_zero(result, func, args): 1596 if not result: 1597 raise ctypes.WinError(ctypes.get_last_error()) 1598 return args 1599 1600 def _check_invalid(result, func, args): 1601 if result == INVALID_HANDLE_VALUE: 1602 raise ctypes.WinError(ctypes.get_last_error()) 1603 return args 1604 1605 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1606 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1607 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1608 1609 class COORD(ctypes.Structure): 1610 _fields_ = (('X', wintypes.SHORT), 1611 ('Y', wintypes.SHORT)) 1612 1613 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1614 _fields_ = (('cbSize', wintypes.ULONG), 1615 ('dwSize', COORD), 1616 ('dwCursorPosition', COORD), 1617 ('wAttributes', wintypes.WORD), 1618 ('srWindow', wintypes.SMALL_RECT), 1619 ('dwMaximumWindowSize', COORD), 1620 ('wPopupAttributes', wintypes.WORD), 1621 ('bFullscreenSupported', wintypes.BOOL), 1622 ('ColorTable', wintypes.DWORD * 16)) 1623 def __init__(self, *args, **kwds): 1624 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1625 *args, **kwds) 1626 self.cbSize = ctypes.sizeof(self) 1627 1628 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1629 CONSOLE_SCREEN_BUFFER_INFOEX) 1630 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1631 1632 kernel32.GetStdHandle.errcheck = _check_invalid 1633 kernel32.GetStdHandle.restype = wintypes.HANDLE 1634 kernel32.GetStdHandle.argtypes = ( 1635 wintypes.DWORD,) # _In_ nStdHandle 1636 1637 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1638 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1639 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1640 wintypes.DWORD, # _In_ dwDesiredAccess 1641 wintypes.DWORD, # _In_ dwShareMode 1642 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1643 wintypes.DWORD, # _In_ dwFlags 1644 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1645 1646 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1647 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1648 wintypes.HANDLE, # _In_ hConsoleOutput 1649 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1650 1651 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1652 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1653 wintypes.HANDLE, # _In_ hConsoleOutput 1654 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1655 1656 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1657 kernel32.SetConsoleWindowInfo.argtypes = ( 1658 wintypes.HANDLE, # _In_ hConsoleOutput 1659 wintypes.BOOL, # _In_ bAbsolute 1660 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1661 1662 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1663 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1664 wintypes.HANDLE, # _In_ hConsoleOutput 1665 wintypes.WCHAR, # _In_ cCharacter 1666 wintypes.DWORD, # _In_ nLength 1667 COORD, # _In_ dwWriteCoord 1668 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1669 1670 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1671 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1672 wintypes.HANDLE, # _In_ hConsoleOutput 1673 wintypes.LPWSTR, # _Out_ lpCharacter 1674 wintypes.DWORD, # _In_ nLength 1675 COORD, # _In_ dwReadCoord 1676 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1677 1678 @contextlib.contextmanager 1679 def allocate_console(): 1680 allocated = kernel32.AllocConsole() 1681 try: 1682 yield allocated 1683 finally: 1684 if allocated: 1685 kernel32.FreeConsole() 1686 1687 @contextlib.contextmanager 1688 def console_screen(ncols=None, nrows=None): 1689 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1690 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1691 nwritten = (wintypes.DWORD * 1)() 1692 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1693 kernel32.GetConsoleScreenBufferInfoEx( 1694 hStdOut, ctypes.byref(info)) 1695 if ncols is None: 1696 ncols = info.dwSize.X 1697 if nrows is None: 1698 nrows = info.dwSize.Y 1699 elif nrows > 9999: 1700 raise ValueError('nrows must be 9999 or less') 1701 fd_screen = None 1702 hScreen = kernel32.CreateConsoleScreenBuffer( 1703 GENERIC_READ | GENERIC_WRITE, 1704 FILE_SHARE_READ | FILE_SHARE_WRITE, 1705 None, CONSOLE_TEXTMODE_BUFFER, None) 1706 try: 1707 fd_screen = msvcrt.open_osfhandle( 1708 hScreen, os.O_RDWR | os.O_BINARY) 1709 kernel32.GetConsoleScreenBufferInfoEx( 1710 hScreen, ctypes.byref(new_info)) 1711 new_info.dwSize = COORD(ncols, nrows) 1712 new_info.srWindow = wintypes.SMALL_RECT( 1713 Left=0, Top=0, Right=(ncols - 1), 1714 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1715 kernel32.SetConsoleScreenBufferInfoEx( 1716 hScreen, ctypes.byref(new_info)) 1717 kernel32.SetConsoleWindowInfo(hScreen, True, 1718 ctypes.byref(new_info.srWindow)) 1719 kernel32.FillConsoleOutputCharacterW( 1720 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1721 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1722 try: 1723 yield fd_screen 1724 finally: 1725 kernel32.SetConsoleScreenBufferInfoEx( 1726 hStdOut, ctypes.byref(info)) 1727 kernel32.SetConsoleWindowInfo(hStdOut, True, 1728 ctypes.byref(info.srWindow)) 1729 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1730 finally: 1731 if fd_screen is not None: 1732 os.close(fd_screen) 1733 else: 1734 kernel32.CloseHandle(hScreen) 1735 1736 def read_screen(fd): 1737 hScreen = msvcrt.get_osfhandle(fd) 1738 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1739 kernel32.GetConsoleScreenBufferInfoEx( 1740 hScreen, ctypes.byref(csbi)) 1741 ncols = csbi.dwSize.X 1742 pos = csbi.dwCursorPosition 1743 length = ncols * pos.Y + pos.X + 1 1744 buf = (ctypes.c_wchar * length)() 1745 n = (wintypes.DWORD * 1)() 1746 kernel32.ReadConsoleOutputCharacterW( 1747 hScreen, buf, length, COORD(0,0), n) 1748 lines = [buf[i:i+ncols].rstrip(u'\0') 1749 for i in range(0, n[0], ncols)] 1750 return u'\n'.join(lines) 1751 1752@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1753class WindowsConsoleTest(DeviceTest): 1754 def test_unicode_output(self): 1755 """Test Unicode command line parameters and Unicode console window output. 1756 1757 Bug: https://issuetracker.google.com/issues/111972753 1758 """ 1759 # If we don't have a console window, allocate one. This isn't necessary if we're already 1760 # being run from a console window, which is typical. 1761 with allocate_console() as allocated_console: 1762 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1763 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1764 # likely unnecessary given the typical console window size. 1765 with console_screen(nrows=1000) as screen: 1766 unicode_string = u'로보카 폴리' 1767 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1768 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1769 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1770 process.wait() 1771 # Read what was written by adb to the temporary console buffer. 1772 console_output = read_screen(screen) 1773 self.assertEqual(unicode_string, console_output) 1774 1775 1776def main(): 1777 random.seed(0) 1778 if len(adb.get_devices()) > 0: 1779 suite = unittest.TestLoader().loadTestsFromName(__name__) 1780 unittest.TextTestRunner(verbosity=3).run(suite) 1781 else: 1782 print('Test suite must be run with attached devices') 1783 1784 1785if __name__ == '__main__': 1786 main() 1787