1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 def setUp(self): 81 self.device = adb.get_device() 82 83 84class AbbTest(DeviceTest): 85 def test_smoke(self): 86 result = subprocess.run(['adb', 'abb'], capture_output=True) 87 self.assertEqual(1, result.returncode) 88 expected_output = b"cmd: No service specified; use -l to list all services\n" 89 self.assertEqual(expected_output, result.stderr) 90 91class ForwardReverseTest(DeviceTest): 92 def _test_no_rebind(self, description, direction_list, direction, 93 direction_no_rebind, direction_remove_all): 94 msg = direction_list() 95 self.assertEqual('', msg.strip(), 96 description + ' list must be empty to run this test.') 97 98 # Use --no-rebind with no existing binding 99 direction_no_rebind('tcp:5566', 'tcp:6655') 100 msg = direction_list() 101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 102 103 # Use --no-rebind with existing binding 104 with self.assertRaises(subprocess.CalledProcessError): 105 direction_no_rebind('tcp:5566', 'tcp:6677') 106 msg = direction_list() 107 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 108 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 109 110 # Use the absence of --no-rebind with existing binding 111 direction('tcp:5566', 'tcp:6677') 112 msg = direction_list() 113 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 114 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 115 116 direction_remove_all() 117 msg = direction_list() 118 self.assertEqual('', msg.strip()) 119 120 def test_forward_no_rebind(self): 121 self._test_no_rebind('forward', self.device.forward_list, 122 self.device.forward, self.device.forward_no_rebind, 123 self.device.forward_remove_all) 124 125 def test_reverse_no_rebind(self): 126 self._test_no_rebind('reverse', self.device.reverse_list, 127 self.device.reverse, self.device.reverse_no_rebind, 128 self.device.reverse_remove_all) 129 130 def test_forward(self): 131 msg = self.device.forward_list() 132 self.assertEqual('', msg.strip(), 133 'Forwarding list must be empty to run this test.') 134 self.device.forward('tcp:5566', 'tcp:6655') 135 msg = self.device.forward_list() 136 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 137 self.device.forward('tcp:7788', 'tcp:8877') 138 msg = self.device.forward_list() 139 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 140 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 141 self.device.forward_remove('tcp:5566') 142 msg = self.device.forward_list() 143 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 144 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 145 self.device.forward_remove_all() 146 msg = self.device.forward_list() 147 self.assertEqual('', msg.strip()) 148 149 def test_forward_old_protocol(self): 150 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 151 152 msg = self.device.forward_list() 153 self.assertEqual('', msg.strip(), 154 'Forwarding list must be empty to run this test.') 155 156 s = socket.create_connection(("localhost", 5037)) 157 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 158 cmd = b"%04x%s" % (len(service), service) 159 s.sendall(cmd) 160 161 msg = self.device.forward_list() 162 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 163 164 self.device.forward_remove_all() 165 msg = self.device.forward_list() 166 self.assertEqual('', msg.strip()) 167 168 def test_forward_tcp_port_0(self): 169 self.assertEqual('', self.device.forward_list().strip(), 170 'Forwarding list must be empty to run this test.') 171 172 try: 173 # If resolving TCP port 0 is supported, `adb forward` will print 174 # the actual port number. 175 port = self.device.forward('tcp:0', 'tcp:8888').strip() 176 if not port: 177 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 178 179 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 180 self.device.forward_list())) 181 finally: 182 self.device.forward_remove_all() 183 184 def test_reverse(self): 185 msg = self.device.reverse_list() 186 self.assertEqual('', msg.strip(), 187 'Reverse forwarding list must be empty to run this test.') 188 self.device.reverse('tcp:5566', 'tcp:6655') 189 msg = self.device.reverse_list() 190 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 191 self.device.reverse('tcp:7788', 'tcp:8877') 192 msg = self.device.reverse_list() 193 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 194 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 195 self.device.reverse_remove('tcp:5566') 196 msg = self.device.reverse_list() 197 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 198 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 199 self.device.reverse_remove_all() 200 msg = self.device.reverse_list() 201 self.assertEqual('', msg.strip()) 202 203 def test_reverse_tcp_port_0(self): 204 self.assertEqual('', self.device.reverse_list().strip(), 205 'Reverse list must be empty to run this test.') 206 207 try: 208 # If resolving TCP port 0 is supported, `adb reverse` will print 209 # the actual port number. 210 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 211 if not port: 212 raise unittest.SkipTest('Reversing tcp:0 is not available.') 213 214 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 215 self.device.reverse_list())) 216 finally: 217 self.device.reverse_remove_all() 218 219 def test_forward_reverse_echo(self): 220 """Send data through adb forward and read it back via adb reverse""" 221 forward_port = 12345 222 reverse_port = forward_port + 1 223 forward_spec = 'tcp:' + str(forward_port) 224 reverse_spec = 'tcp:' + str(reverse_port) 225 forward_setup = False 226 reverse_setup = False 227 228 try: 229 # listen on localhost:forward_port, connect to remote:forward_port 230 self.device.forward(forward_spec, forward_spec) 231 forward_setup = True 232 # listen on remote:forward_port, connect to localhost:reverse_port 233 self.device.reverse(forward_spec, reverse_spec) 234 reverse_setup = True 235 236 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 237 with contextlib.closing(listener): 238 # Use SO_REUSEADDR so that subsequent runs of the test can grab 239 # the port even if it is in TIME_WAIT. 240 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 241 242 # Listen on localhost:reverse_port before connecting to 243 # localhost:forward_port because that will cause adb to connect 244 # back to localhost:reverse_port. 245 listener.bind(('127.0.0.1', reverse_port)) 246 listener.listen(4) 247 248 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 249 with contextlib.closing(client): 250 # Connect to the listener. 251 client.connect(('127.0.0.1', forward_port)) 252 253 # Accept the client connection. 254 accepted_connection, addr = listener.accept() 255 with contextlib.closing(accepted_connection) as server: 256 data = b'hello' 257 258 # Send data into the port setup by adb forward. 259 client.sendall(data) 260 # Explicitly close() so that server gets EOF. 261 client.close() 262 263 # Verify that the data came back via adb reverse. 264 self.assertEqual(data, server.makefile().read().encode("utf8")) 265 finally: 266 if reverse_setup: 267 self.device.reverse_remove(forward_spec) 268 if forward_setup: 269 self.device.forward_remove(forward_spec) 270 271 272class ShellTest(DeviceTest): 273 def _interactive_shell(self, shell_args, input): 274 """Runs an interactive adb shell. 275 276 Args: 277 shell_args: List of string arguments to `adb shell`. 278 input: bytes input to send to the interactive shell. 279 280 Returns: 281 The remote exit code. 282 283 Raises: 284 unittest.SkipTest: The device doesn't support exit codes. 285 """ 286 if not self.device.has_shell_protocol(): 287 raise unittest.SkipTest('exit codes are unavailable on this device') 288 289 proc = subprocess.Popen( 290 self.device.adb_cmd + ['shell'] + shell_args, 291 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 292 stderr=subprocess.PIPE) 293 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 294 # to explicitly add an exit command to close the session from the device 295 # side, plus the necessary newline to complete the interactive command. 296 proc.communicate(input + b'; exit\n') 297 return proc.returncode 298 299 def test_cat(self): 300 """Check that we can at least cat a file.""" 301 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 302 elements = out.split() 303 self.assertEqual(len(elements), 2) 304 305 uptime, idle = elements 306 self.assertGreater(float(uptime), 0.0) 307 self.assertGreater(float(idle), 0.0) 308 309 def test_throws_on_failure(self): 310 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 311 312 def test_output_not_stripped(self): 313 out = self.device.shell(['echo', 'foo'])[0] 314 self.assertEqual(out, 'foo' + self.device.linesep) 315 316 def test_shell_command_length(self): 317 # Devices that have shell_v2 should be able to handle long commands. 318 if self.device.has_shell_protocol(): 319 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 320 self.assertEqual(rc, 0) 321 self.assertTrue(out == ('x' * 16384 + '\n')) 322 323 def test_shell_nocheck_failure(self): 324 rc, out, _ = self.device.shell_nocheck(['false']) 325 self.assertNotEqual(rc, 0) 326 self.assertEqual(out, '') 327 328 def test_shell_nocheck_output_not_stripped(self): 329 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 330 self.assertEqual(rc, 0) 331 self.assertEqual(out, 'foo' + self.device.linesep) 332 333 def test_can_distinguish_tricky_results(self): 334 # If result checking on ADB shell is naively implemented as 335 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 336 # output from the result for a cmd of `echo -n 1`. 337 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 338 self.assertEqual(rc, 0) 339 self.assertEqual(out, '1') 340 341 def test_line_endings(self): 342 """Ensure that line ending translation is not happening in the pty. 343 344 Bug: http://b/19735063 345 """ 346 output = self.device.shell(['uname'])[0] 347 self.assertEqual(output, 'Linux' + self.device.linesep) 348 349 def test_pty_logic(self): 350 """Tests that a PTY is allocated when it should be. 351 352 PTY allocation behavior should match ssh. 353 """ 354 def check_pty(args): 355 """Checks adb shell PTY allocation. 356 357 Tests |args| for terminal and non-terminal stdin. 358 359 Args: 360 args: -Tt args in a list (e.g. ['-t', '-t']). 361 362 Returns: 363 A tuple (<terminal>, <non-terminal>). True indicates 364 the corresponding shell allocated a remote PTY. 365 """ 366 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 367 368 terminal = subprocess.Popen( 369 test_cmd, stdin=None, 370 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 371 terminal.communicate() 372 373 non_terminal = subprocess.Popen( 374 test_cmd, stdin=subprocess.PIPE, 375 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 376 non_terminal.communicate() 377 378 return (terminal.returncode == 0, non_terminal.returncode == 0) 379 380 # -T: never allocate PTY. 381 self.assertEqual((False, False), check_pty(['-T'])) 382 383 # These tests require a new device. 384 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 385 # No args: PTY only if stdin is a terminal and shell is interactive, 386 # which is difficult to reliably test from a script. 387 self.assertEqual((False, False), check_pty([])) 388 389 # -t: PTY if stdin is a terminal. 390 self.assertEqual((True, False), check_pty(['-t'])) 391 392 # -t -t: always allocate PTY. 393 self.assertEqual((True, True), check_pty(['-t', '-t'])) 394 395 # -tt: always allocate PTY, POSIX style (http://b/32216152). 396 self.assertEqual((True, True), check_pty(['-tt'])) 397 398 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 399 # we follow the man page instead. 400 self.assertEqual((True, True), check_pty(['-ttt'])) 401 402 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 403 self.assertEqual((True, True), check_pty(['-ttx'])) 404 405 # -Ttt: -tt cancels out -T. 406 self.assertEqual((True, True), check_pty(['-Ttt'])) 407 408 # -ttT: -T cancels out -tt. 409 self.assertEqual((False, False), check_pty(['-ttT'])) 410 411 def test_shell_protocol(self): 412 """Tests the shell protocol on the device. 413 414 If the device supports shell protocol, this gives us the ability 415 to separate stdout/stderr and return the exit code directly. 416 417 Bug: http://b/19734861 418 """ 419 if not self.device.has_shell_protocol(): 420 raise unittest.SkipTest('shell protocol unsupported on this device') 421 422 # Shell protocol should be used by default. 423 result = self.device.shell_nocheck( 424 shlex.split('echo foo; echo bar >&2; exit 17')) 425 self.assertEqual(17, result[0]) 426 self.assertEqual('foo' + self.device.linesep, result[1]) 427 self.assertEqual('bar' + self.device.linesep, result[2]) 428 429 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 430 431 # -x flag should disable shell protocol. 432 result = self.device.shell_nocheck( 433 shlex.split('-x echo foo; echo bar >&2; exit 17')) 434 self.assertEqual(0, result[0]) 435 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 436 self.assertEqual('', result[2]) 437 438 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 439 440 def test_non_interactive_sigint(self): 441 """Tests that SIGINT in a non-interactive shell kills the process. 442 443 This requires the shell protocol in order to detect the broken 444 pipe; raw data transfer mode will only see the break once the 445 subprocess tries to read or write. 446 447 Bug: http://b/23825725 448 """ 449 if not self.device.has_shell_protocol(): 450 raise unittest.SkipTest('shell protocol unsupported on this device') 451 452 # Start a long-running process. 453 sleep_proc = subprocess.Popen( 454 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 455 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 456 stderr=subprocess.STDOUT) 457 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 458 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 459 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 460 461 # Verify that the process is running, send signal, verify it stopped. 462 self.device.shell(proc_query) 463 os.kill(sleep_proc.pid, signal.SIGINT) 464 sleep_proc.communicate() 465 466 # It can take some time for the process to receive the signal and die. 467 end_time = time.time() + 3 468 while self.device.shell_nocheck(proc_query)[0] != 1: 469 self.assertFalse(time.time() > end_time, 470 'subprocess failed to terminate in time') 471 472 def test_non_interactive_stdin(self): 473 """Tests that non-interactive shells send stdin.""" 474 if not self.device.has_shell_protocol(): 475 raise unittest.SkipTest('non-interactive stdin unsupported ' 476 'on this device') 477 478 # Test both small and large inputs. 479 small_input = b'foo' 480 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 481 large_input = b'\n'.join(characters) 482 483 484 for input in (small_input, large_input): 485 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 486 stdin=subprocess.PIPE, 487 stdout=subprocess.PIPE, 488 stderr=subprocess.PIPE) 489 stdout, stderr = proc.communicate(input) 490 self.assertEqual(input.splitlines(), stdout.splitlines()) 491 self.assertEqual(b'', stderr) 492 493 def test_sighup(self): 494 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 495 log_path = "/data/local/tmp/adb_signal_test.log" 496 497 # Clear the output file. 498 self.device.shell_nocheck(["echo", ">", log_path]) 499 500 script = """ 501 trap "echo SIGINT > {path}; exit 0" SIGINT 502 trap "echo SIGHUP > {path}; exit 0" SIGHUP 503 echo Waiting 504 read 505 """.format(path=log_path) 506 507 script = ";".join([x.strip() for x in script.strip().splitlines()]) 508 509 process = self.device.shell_popen([script], kill_atexit=False, 510 stdin=subprocess.PIPE, 511 stdout=subprocess.PIPE) 512 513 self.assertEqual(b"Waiting\n", process.stdout.readline()) 514 process.send_signal(signal.SIGINT) 515 process.wait() 516 517 # Waiting for the local adb to finish is insufficient, since it hangs 518 # up immediately. 519 time.sleep(1) 520 521 stdout, _ = self.device.shell(["cat", log_path]) 522 self.assertEqual(stdout.strip(), "SIGHUP") 523 524 def test_exit_stress(self): 525 """Hammer `adb shell exit 42` with multiple threads.""" 526 thread_count = 48 527 result = dict() 528 def hammer(thread_idx, thread_count, result): 529 success = True 530 for i in range(thread_idx, 240, thread_count): 531 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 532 if ret != i % 256: 533 success = False 534 break 535 result[thread_idx] = success 536 537 threads = [] 538 for i in range(thread_count): 539 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 540 thread.start() 541 threads.append(thread) 542 for thread in threads: 543 thread.join() 544 for i, success in result.items(): 545 self.assertTrue(success) 546 547 def disabled_test_parallel(self): 548 """Spawn a bunch of `adb shell` instances in parallel. 549 550 This was broken historically due to the use of select, which only works 551 for fds that are numerically less than 1024. 552 553 Bug: http://b/141955761""" 554 555 n_procs = 2048 556 procs = dict() 557 for i in range(0, n_procs): 558 procs[i] = subprocess.Popen( 559 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 560 stdin=subprocess.PIPE, 561 stdout=subprocess.PIPE 562 ) 563 564 for i in range(0, n_procs): 565 procs[i].stdin.write("%d\n" % i) 566 567 for i in range(0, n_procs): 568 response = procs[i].stdout.readline() 569 assert(response == "%d\n" % i) 570 571 for i in range(0, n_procs): 572 procs[i].stdin.write("%d\n" % (i % 256)) 573 574 for i in range(0, n_procs): 575 assert(procs[i].wait() == i % 256) 576 577 578class ArgumentEscapingTest(DeviceTest): 579 def test_shell_escaping(self): 580 """Make sure that argument escaping is somewhat sane.""" 581 582 # http://b/19734868 583 # Note that this actually matches ssh(1)'s behavior --- it's 584 # converted to `sh -c echo hello; echo world` which sh interprets 585 # as `sh -c echo` (with an argument to that shell of "hello"), 586 # and then `echo world` back in the first shell. 587 result = self.device.shell( 588 shlex.split("sh -c 'echo hello; echo world'"))[0] 589 result = result.splitlines() 590 self.assertEqual(['', 'world'], result) 591 # If you really wanted "hello" and "world", here's what you'd do: 592 result = self.device.shell( 593 shlex.split(r'echo hello\;echo world'))[0].splitlines() 594 self.assertEqual(['hello', 'world'], result) 595 596 # http://b/15479704 597 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 598 self.assertEqual('t', result) 599 result = self.device.shell( 600 shlex.split("sh -c 'true && echo t'"))[0].strip() 601 self.assertEqual('t', result) 602 603 # http://b/20564385 604 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 605 self.assertEqual('t', result) 606 result = self.device.shell( 607 shlex.split(r'echo -n 123\;uname'))[0].strip() 608 self.assertEqual('123Linux', result) 609 610 def test_install_argument_escaping(self): 611 """Make sure that install argument escaping works.""" 612 # http://b/20323053, http://b/3090932. 613 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 614 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 615 delete=False) 616 tf.close() 617 618 # Installing bogus .apks fails if the device supports exit codes. 619 try: 620 output = self.device.install(tf.name.decode("utf8")) 621 except subprocess.CalledProcessError as e: 622 output = e.output 623 624 self.assertIn(file_suffix, output) 625 os.remove(tf.name) 626 627 628class RootUnrootTest(DeviceTest): 629 def _test_root(self): 630 message = self.device.root() 631 if 'adbd cannot run as root in production builds' in message: 632 return 633 self.device.wait() 634 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 635 636 def _test_unroot(self): 637 self.device.unroot() 638 self.device.wait() 639 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 640 641 def test_root_unroot(self): 642 """Make sure that adb root and adb unroot work, using id(1).""" 643 if self.device.get_prop('ro.debuggable') != '1': 644 raise unittest.SkipTest('requires rootable build') 645 646 original_user = self.device.shell(['id', '-un'])[0].strip() 647 try: 648 if original_user == 'root': 649 self._test_unroot() 650 self._test_root() 651 elif original_user == 'shell': 652 self._test_root() 653 self._test_unroot() 654 finally: 655 if original_user == 'root': 656 self.device.root() 657 else: 658 self.device.unroot() 659 self.device.wait() 660 661 662class TcpIpTest(DeviceTest): 663 def test_tcpip_failure_raises(self): 664 """adb tcpip requires a port. 665 666 Bug: http://b/22636927 667 """ 668 self.assertRaises( 669 subprocess.CalledProcessError, self.device.tcpip, '') 670 self.assertRaises( 671 subprocess.CalledProcessError, self.device.tcpip, 'foo') 672 673 674class SystemPropertiesTest(DeviceTest): 675 def test_get_prop(self): 676 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 677 678 @requires_root 679 def test_set_prop(self): 680 prop_name = 'foo.bar' 681 self.device.shell(['setprop', prop_name, '""']) 682 683 self.device.set_prop(prop_name, 'qux') 684 self.assertEqual( 685 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 686 687 688def compute_md5(string): 689 hsh = hashlib.md5() 690 hsh.update(string) 691 return hsh.hexdigest() 692 693 694def get_md5_prog(device): 695 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 696 try: 697 device.shell(['md5sum', '/proc/uptime']) 698 return 'md5sum' 699 except adb.ShellError: 700 return 'md5' 701 702 703class HostFile(object): 704 def __init__(self, handle, checksum): 705 self.handle = handle 706 self.checksum = checksum 707 self.full_path = handle.name 708 self.base_name = os.path.basename(self.full_path) 709 710 711class DeviceFile(object): 712 def __init__(self, checksum, full_path): 713 self.checksum = checksum 714 self.full_path = full_path 715 self.base_name = posixpath.basename(self.full_path) 716 717 718def make_random_host_files(in_dir, num_files): 719 min_size = 1 * (1 << 10) 720 max_size = 16 * (1 << 10) 721 722 files = [] 723 for _ in range(num_files): 724 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 725 726 size = random.randrange(min_size, max_size, 1024) 727 rand_str = os.urandom(size) 728 file_handle.write(rand_str) 729 file_handle.flush() 730 file_handle.close() 731 732 md5 = compute_md5(rand_str) 733 files.append(HostFile(file_handle, md5)) 734 return files 735 736 737def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 738 min_size = 1 * (1 << 10) 739 max_size = 16 * (1 << 10) 740 741 files = [] 742 for file_num in range(num_files): 743 size = random.randrange(min_size, max_size, 1024) 744 745 base_name = prefix + str(file_num) 746 full_path = posixpath.join(in_dir, base_name) 747 748 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 749 'bs={}'.format(size), 'count=1']) 750 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 751 752 files.append(DeviceFile(dev_md5, full_path)) 753 return files 754 755 756class FileOperationsTest(DeviceTest): 757 SCRATCH_DIR = '/data/local/tmp' 758 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 759 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 760 761 def _verify_remote(self, checksum, remote_path): 762 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 763 remote_path])[0].split() 764 self.assertEqual(checksum, dev_md5) 765 766 def _verify_local(self, checksum, local_path): 767 with open(local_path, 'rb') as host_file: 768 host_md5 = compute_md5(host_file.read()) 769 self.assertEqual(host_md5, checksum) 770 771 def test_push(self): 772 """Push a randomly generated file to specified device.""" 773 kbytes = 512 774 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 775 rand_str = os.urandom(1024 * kbytes) 776 tmp.write(rand_str) 777 tmp.close() 778 779 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 780 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 781 782 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 783 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 784 785 os.remove(tmp.name) 786 787 def test_push_dir(self): 788 """Push a randomly generated directory of files to the device.""" 789 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 790 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 791 792 try: 793 host_dir = tempfile.mkdtemp() 794 795 # Make sure the temp directory isn't setuid, or else adb will complain. 796 os.chmod(host_dir, 0o700) 797 798 # Create 32 random files. 799 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 800 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 801 802 for temp_file in temp_files: 803 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 804 os.path.basename(host_dir), 805 temp_file.base_name) 806 self._verify_remote(temp_file.checksum, remote_path) 807 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 808 finally: 809 if host_dir is not None: 810 shutil.rmtree(host_dir) 811 812 def disabled_test_push_empty(self): 813 """Push an empty directory to the device.""" 814 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 815 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 816 817 try: 818 host_dir = tempfile.mkdtemp() 819 820 # Make sure the temp directory isn't setuid, or else adb will complain. 821 os.chmod(host_dir, 0o700) 822 823 # Create an empty directory. 824 empty_dir_path = os.path.join(host_dir, 'empty') 825 os.mkdir(empty_dir_path); 826 827 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 828 829 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 830 test_empty_cmd = ["[", "-d", remote_path, "]"] 831 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 832 833 self.assertEqual(rc, 0) 834 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 835 finally: 836 if host_dir is not None: 837 shutil.rmtree(host_dir) 838 839 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 840 def test_push_symlink(self): 841 """Push a symlink. 842 843 Bug: http://b/31491920 844 """ 845 try: 846 host_dir = tempfile.mkdtemp() 847 848 # Make sure the temp directory isn't setuid, or else adb will 849 # complain. 850 os.chmod(host_dir, 0o700) 851 852 with open(os.path.join(host_dir, 'foo'), 'w') as f: 853 f.write('foo') 854 855 symlink_path = os.path.join(host_dir, 'symlink') 856 os.symlink('foo', symlink_path) 857 858 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 859 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 860 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 861 rc, out, _ = self.device.shell_nocheck( 862 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 863 self.assertEqual(0, rc) 864 self.assertEqual(out.strip(), 'foo') 865 finally: 866 if host_dir is not None: 867 shutil.rmtree(host_dir) 868 869 def test_multiple_push(self): 870 """Push multiple files to the device in one adb push command. 871 872 Bug: http://b/25324823 873 """ 874 875 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 876 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 877 878 try: 879 host_dir = tempfile.mkdtemp() 880 881 # Create some random files and a subdirectory containing more files. 882 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 883 884 subdir = os.path.join(host_dir, 'subdir') 885 os.mkdir(subdir) 886 subdir_temp_files = make_random_host_files(in_dir=subdir, 887 num_files=4) 888 889 paths = [x.full_path for x in temp_files] 890 paths.append(subdir) 891 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 892 893 for temp_file in temp_files: 894 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 895 temp_file.base_name) 896 self._verify_remote(temp_file.checksum, remote_path) 897 898 for subdir_temp_file in subdir_temp_files: 899 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 900 # BROKEN: http://b/25394682 901 # 'subdir'; 902 temp_file.base_name) 903 self._verify_remote(temp_file.checksum, remote_path) 904 905 906 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 907 finally: 908 if host_dir is not None: 909 shutil.rmtree(host_dir) 910 911 @requires_non_root 912 def test_push_error_reporting(self): 913 """Make sure that errors that occur while pushing a file get reported 914 915 Bug: http://b/26816782 916 """ 917 with tempfile.NamedTemporaryFile() as tmp_file: 918 tmp_file.write(b'\0' * 1024 * 1024) 919 tmp_file.flush() 920 try: 921 self.device.push(local=tmp_file.name, remote='/system/') 922 self.fail('push should not have succeeded') 923 except subprocess.CalledProcessError as e: 924 output = e.output 925 926 self.assertTrue(b'Permission denied' in output or 927 b'Read-only file system' in output) 928 929 @requires_non_root 930 def test_push_directory_creation(self): 931 """Regression test for directory creation. 932 933 Bug: http://b/110953234 934 """ 935 with tempfile.NamedTemporaryFile() as tmp_file: 936 tmp_file.write(b'\0' * 1024 * 1024) 937 tmp_file.flush() 938 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 939 self.device.shell(['rm', '-rf', remote_path]) 940 941 remote_path += '/filename' 942 self.device.push(local=tmp_file.name, remote=remote_path) 943 944 def disabled_test_push_multiple_slash_root(self): 945 """Regression test for pushing to //data/local/tmp. 946 947 Bug: http://b/141311284 948 949 Disabled because this broken on the adbd side as well: b/141943968 950 """ 951 with tempfile.NamedTemporaryFile() as tmp_file: 952 tmp_file.write('\0' * 1024 * 1024) 953 tmp_file.flush() 954 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 955 self.device.shell(['rm', '-rf', remote_path]) 956 self.device.push(local=tmp_file.name, remote=remote_path) 957 958 def _test_pull(self, remote_file, checksum): 959 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 960 tmp_write.close() 961 self.device.pull(remote=remote_file, local=tmp_write.name) 962 with open(tmp_write.name, 'rb') as tmp_read: 963 host_contents = tmp_read.read() 964 host_md5 = compute_md5(host_contents) 965 self.assertEqual(checksum, host_md5) 966 os.remove(tmp_write.name) 967 968 @requires_non_root 969 def test_pull_error_reporting(self): 970 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 971 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 972 973 try: 974 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 975 except subprocess.CalledProcessError as e: 976 output = e.output 977 978 self.assertIn(b'Permission denied', output) 979 980 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 981 982 def test_pull(self): 983 """Pull a randomly generated file from specified device.""" 984 kbytes = 512 985 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 986 cmd = ['dd', 'if=/dev/urandom', 987 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 988 'count={}'.format(kbytes)] 989 self.device.shell(cmd) 990 dev_md5, _ = self.device.shell( 991 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 992 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 993 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 994 995 def test_pull_dir(self): 996 """Pull a randomly generated directory of files from the device.""" 997 try: 998 host_dir = tempfile.mkdtemp() 999 1000 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1001 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1002 1003 # Populate device directory with random files. 1004 temp_files = make_random_device_files( 1005 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1006 1007 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1008 1009 for temp_file in temp_files: 1010 host_path = os.path.join( 1011 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1012 temp_file.base_name) 1013 self._verify_local(temp_file.checksum, host_path) 1014 1015 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1016 finally: 1017 if host_dir is not None: 1018 shutil.rmtree(host_dir) 1019 1020 def test_pull_dir_symlink(self): 1021 """Pull a directory into a symlink to a directory. 1022 1023 Bug: http://b/27362811 1024 """ 1025 if os.name != 'posix': 1026 raise unittest.SkipTest('requires POSIX') 1027 1028 try: 1029 host_dir = tempfile.mkdtemp() 1030 real_dir = os.path.join(host_dir, 'dir') 1031 symlink = os.path.join(host_dir, 'symlink') 1032 os.mkdir(real_dir) 1033 os.symlink(real_dir, symlink) 1034 1035 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1036 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1037 1038 # Populate device directory with random files. 1039 temp_files = make_random_device_files( 1040 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1041 1042 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1043 1044 for temp_file in temp_files: 1045 host_path = os.path.join( 1046 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1047 temp_file.base_name) 1048 self._verify_local(temp_file.checksum, host_path) 1049 1050 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1051 finally: 1052 if host_dir is not None: 1053 shutil.rmtree(host_dir) 1054 1055 def test_pull_dir_symlink_collision(self): 1056 """Pull a directory into a colliding symlink to directory.""" 1057 if os.name != 'posix': 1058 raise unittest.SkipTest('requires POSIX') 1059 1060 try: 1061 host_dir = tempfile.mkdtemp() 1062 real_dir = os.path.join(host_dir, 'real') 1063 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1064 symlink = os.path.join(host_dir, tmp_dirname) 1065 os.mkdir(real_dir) 1066 os.symlink(real_dir, symlink) 1067 1068 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1069 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1070 1071 # Populate device directory with random files. 1072 temp_files = make_random_device_files( 1073 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1074 1075 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1076 1077 for temp_file in temp_files: 1078 host_path = os.path.join(real_dir, temp_file.base_name) 1079 self._verify_local(temp_file.checksum, host_path) 1080 1081 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1082 finally: 1083 if host_dir is not None: 1084 shutil.rmtree(host_dir) 1085 1086 def test_pull_dir_nonexistent(self): 1087 """Pull a directory of files from the device to a nonexistent path.""" 1088 try: 1089 host_dir = tempfile.mkdtemp() 1090 dest_dir = os.path.join(host_dir, 'dest') 1091 1092 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1093 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1094 1095 # Populate device directory with random files. 1096 temp_files = make_random_device_files( 1097 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1098 1099 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1100 1101 for temp_file in temp_files: 1102 host_path = os.path.join(dest_dir, temp_file.base_name) 1103 self._verify_local(temp_file.checksum, host_path) 1104 1105 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1106 finally: 1107 if host_dir is not None: 1108 shutil.rmtree(host_dir) 1109 1110 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1111 def disabled_test_pull_symlink_dir(self): 1112 """Pull a symlink to a directory of symlinks to files.""" 1113 try: 1114 host_dir = tempfile.mkdtemp() 1115 1116 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1117 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1118 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1119 1120 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1121 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1122 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1123 1124 # Populate device directory with random files. 1125 temp_files = make_random_device_files( 1126 self.device, in_dir=remote_dir, num_files=32) 1127 1128 for temp_file in temp_files: 1129 self.device.shell( 1130 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1131 posixpath.join(remote_links, temp_file.base_name)]) 1132 1133 self.device.pull(remote=remote_symlink, local=host_dir) 1134 1135 for temp_file in temp_files: 1136 host_path = os.path.join( 1137 host_dir, 'symlink', temp_file.base_name) 1138 self._verify_local(temp_file.checksum, host_path) 1139 1140 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1141 finally: 1142 if host_dir is not None: 1143 shutil.rmtree(host_dir) 1144 1145 def test_pull_empty(self): 1146 """Pull a directory containing an empty directory from the device.""" 1147 try: 1148 host_dir = tempfile.mkdtemp() 1149 1150 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1151 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1152 self.device.shell(['mkdir', '-p', remote_empty_path]) 1153 1154 self.device.pull(remote=remote_empty_path, local=host_dir) 1155 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1156 finally: 1157 if host_dir is not None: 1158 shutil.rmtree(host_dir) 1159 1160 def test_multiple_pull(self): 1161 """Pull a randomly generated directory of files from the device.""" 1162 1163 try: 1164 host_dir = tempfile.mkdtemp() 1165 1166 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1167 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1168 self.device.shell(['mkdir', '-p', subdir]) 1169 1170 # Create some random files and a subdirectory containing more files. 1171 temp_files = make_random_device_files( 1172 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1173 1174 subdir_temp_files = make_random_device_files( 1175 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1176 1177 paths = [x.full_path for x in temp_files] 1178 paths.append(subdir) 1179 self.device._simple_call(['pull'] + paths + [host_dir]) 1180 1181 for temp_file in temp_files: 1182 local_path = os.path.join(host_dir, temp_file.base_name) 1183 self._verify_local(temp_file.checksum, local_path) 1184 1185 for subdir_temp_file in subdir_temp_files: 1186 local_path = os.path.join(host_dir, 1187 'subdir', 1188 subdir_temp_file.base_name) 1189 self._verify_local(subdir_temp_file.checksum, local_path) 1190 1191 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1192 finally: 1193 if host_dir is not None: 1194 shutil.rmtree(host_dir) 1195 1196 def verify_sync(self, device, temp_files, device_dir): 1197 """Verifies that a list of temp files was synced to the device.""" 1198 # Confirm that every file on the device mirrors that on the host. 1199 for temp_file in temp_files: 1200 device_full_path = posixpath.join( 1201 device_dir, temp_file.base_name) 1202 dev_md5, _ = device.shell( 1203 [get_md5_prog(self.device), device_full_path])[0].split() 1204 self.assertEqual(temp_file.checksum, dev_md5) 1205 1206 def test_sync(self): 1207 """Sync a host directory to the data partition.""" 1208 1209 try: 1210 base_dir = tempfile.mkdtemp() 1211 1212 # Create mirror device directory hierarchy within base_dir. 1213 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1214 os.makedirs(full_dir_path) 1215 1216 # Create 32 random files within the host mirror. 1217 temp_files = make_random_host_files( 1218 in_dir=full_dir_path, num_files=32) 1219 1220 # Clean up any stale files on the device. 1221 device = adb.get_device() # pylint: disable=no-member 1222 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1223 1224 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1225 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1226 device.sync('data') 1227 if old_product_out is None: 1228 del os.environ['ANDROID_PRODUCT_OUT'] 1229 else: 1230 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1231 1232 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1233 1234 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1235 finally: 1236 if base_dir is not None: 1237 shutil.rmtree(base_dir) 1238 1239 def test_push_sync(self): 1240 """Sync a host directory to a specific path.""" 1241 1242 try: 1243 temp_dir = tempfile.mkdtemp() 1244 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1245 1246 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1247 1248 # Clean up any stale files on the device. 1249 device = adb.get_device() # pylint: disable=no-member 1250 device.shell(['rm', '-rf', device_dir]) 1251 1252 device.push(temp_dir, device_dir, sync=True) 1253 1254 self.verify_sync(device, temp_files, device_dir) 1255 1256 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1257 finally: 1258 if temp_dir is not None: 1259 shutil.rmtree(temp_dir) 1260 1261 def test_unicode_paths(self): 1262 """Ensure that we can support non-ASCII paths, even on Windows.""" 1263 name = u'로보카 폴리' 1264 1265 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1266 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1267 1268 ## push. 1269 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1270 tf.close() 1271 self.device.push(tf.name, remote_path) 1272 os.remove(tf.name) 1273 self.assertFalse(os.path.exists(tf.name)) 1274 1275 # Verify that the device ended up with the expected UTF-8 path 1276 output = self.device.shell( 1277 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1278 self.assertEqual(remote_path, output) 1279 1280 # pull. 1281 self.device.pull(remote_path, tf.name) 1282 self.assertTrue(os.path.exists(tf.name)) 1283 os.remove(tf.name) 1284 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1285 1286 1287class DeviceOfflineTest(DeviceTest): 1288 def _get_device_state(self, serialno): 1289 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1290 for line in output.split('\n'): 1291 m = re.match('(\S+)\s+(\S+)', line) 1292 if m and m.group(1) == serialno: 1293 return m.group(2) 1294 return None 1295 1296 def disabled_test_killed_when_pushing_a_large_file(self): 1297 """ 1298 While running adb push with a large file, kill adb server. 1299 Occasionally the device becomes offline. Because the device is still 1300 reading data without realizing that the adb server has been restarted. 1301 Test if we can bring the device online automatically now. 1302 http://b/32952319 1303 """ 1304 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1305 # 1. Push a large file 1306 file_path = 'tmp_large_file' 1307 try: 1308 fh = open(file_path, 'w') 1309 fh.write('\0' * (100 * 1024 * 1024)) 1310 fh.close() 1311 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1312 time.sleep(0.1) 1313 # 2. Kill the adb server 1314 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1315 subproc.terminate() 1316 finally: 1317 try: 1318 os.unlink(file_path) 1319 except: 1320 pass 1321 # 3. See if the device still exist. 1322 # Sleep to wait for the adb server exit. 1323 time.sleep(0.5) 1324 # 4. The device should be online 1325 self.assertEqual(self._get_device_state(serialno), 'device') 1326 1327 def disabled_test_killed_when_pulling_a_large_file(self): 1328 """ 1329 While running adb pull with a large file, kill adb server. 1330 Occasionally the device can't be connected. Because the device is trying to 1331 send a message larger than what is expected by the adb server. 1332 Test if we can bring the device online automatically now. 1333 """ 1334 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1335 file_path = 'tmp_large_file' 1336 try: 1337 # 1. Create a large file on device. 1338 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1339 'bs=1000000', 'count=100']) 1340 # 2. Pull the large file on host. 1341 subproc = subprocess.Popen(self.device.adb_cmd + 1342 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1343 time.sleep(0.1) 1344 # 3. Kill the adb server 1345 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1346 subproc.terminate() 1347 finally: 1348 try: 1349 os.unlink(file_path) 1350 except: 1351 pass 1352 # 4. See if the device still exist. 1353 # Sleep to wait for the adb server exit. 1354 time.sleep(0.5) 1355 self.assertEqual(self._get_device_state(serialno), 'device') 1356 1357 1358 def test_packet_size_regression(self): 1359 """Test for http://b/37783561 1360 1361 Receiving packets of a length divisible by 512 but not 1024 resulted in 1362 the adb client waiting indefinitely for more input. 1363 """ 1364 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1365 # Probe some surrounding values as well, for the hell of it. 1366 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1367 for offset in [-6, -5, -4]: 1368 length = base + offset 1369 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1370 'echo', 'foo'] 1371 rc, stdout, _ = self.device.shell_nocheck(cmd) 1372 1373 self.assertEqual(0, rc) 1374 1375 # Output should be '\0' * length, followed by "foo\n" 1376 self.assertEqual(length, len(stdout) - 4) 1377 self.assertEqual(stdout, "\0" * length + "foo\n") 1378 1379 def test_zero_packet(self): 1380 """Test for http://b/113070258 1381 1382 Make sure that we don't blow up when sending USB transfers that line up 1383 exactly with the USB packet size. 1384 """ 1385 1386 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1387 try: 1388 for size in [512, 1024]: 1389 def listener(): 1390 cmd = ["echo foo | nc -l -p 12345; echo done"] 1391 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1392 1393 thread = threading.Thread(target=listener) 1394 thread.start() 1395 1396 # Wait a bit to let the shell command start. 1397 time.sleep(0.25) 1398 1399 sock = socket.create_connection(("localhost", local_port)) 1400 with contextlib.closing(sock): 1401 bytesWritten = sock.send(b"a" * size) 1402 self.assertEqual(size, bytesWritten) 1403 readBytes = sock.recv(4096) 1404 self.assertEqual(b"foo\n", readBytes) 1405 1406 thread.join() 1407 finally: 1408 self.device.forward_remove("tcp:{}".format(local_port)) 1409 1410 1411class SocketTest(DeviceTest): 1412 def test_socket_flush(self): 1413 """Test that we handle socket closure properly. 1414 1415 If we're done writing to a socket, closing before the other end has 1416 closed will send a TCP_RST if we have incoming data queued up, which 1417 may result in data that we've written being discarded. 1418 1419 Bug: http://b/74616284 1420 """ 1421 s = socket.create_connection(("localhost", 5037)) 1422 1423 def adb_length_prefixed(string): 1424 encoded = string.encode("utf8") 1425 result = b"%04x%s" % (len(encoded), encoded) 1426 return result 1427 1428 if "ANDROID_SERIAL" in os.environ: 1429 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1430 else: 1431 transport_string = "host:transport-any" 1432 1433 s.sendall(adb_length_prefixed(transport_string)) 1434 response = s.recv(4) 1435 self.assertEqual(b"OKAY", response) 1436 1437 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1438 s.sendall(adb_length_prefixed(shell_string)) 1439 1440 response = s.recv(4) 1441 self.assertEqual(b"OKAY", response) 1442 1443 # Spawn a thread that dumps garbage into the socket until failure. 1444 def spam(): 1445 buf = b"\0" * 16384 1446 try: 1447 while True: 1448 s.sendall(buf) 1449 except Exception as ex: 1450 print(ex) 1451 1452 thread = threading.Thread(target=spam) 1453 thread.start() 1454 1455 time.sleep(1) 1456 1457 received = b"" 1458 while True: 1459 read = s.recv(512) 1460 if len(read) == 0: 1461 break 1462 received += read 1463 1464 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1465 thread.join() 1466 1467 1468if sys.platform == "win32": 1469 # From https://stackoverflow.com/a/38749458 1470 import os 1471 import contextlib 1472 import msvcrt 1473 import ctypes 1474 from ctypes import wintypes 1475 1476 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1477 1478 GENERIC_READ = 0x80000000 1479 GENERIC_WRITE = 0x40000000 1480 FILE_SHARE_READ = 1 1481 FILE_SHARE_WRITE = 2 1482 CONSOLE_TEXTMODE_BUFFER = 1 1483 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1484 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1485 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1486 1487 def _check_zero(result, func, args): 1488 if not result: 1489 raise ctypes.WinError(ctypes.get_last_error()) 1490 return args 1491 1492 def _check_invalid(result, func, args): 1493 if result == INVALID_HANDLE_VALUE: 1494 raise ctypes.WinError(ctypes.get_last_error()) 1495 return args 1496 1497 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1498 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1499 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1500 1501 class COORD(ctypes.Structure): 1502 _fields_ = (('X', wintypes.SHORT), 1503 ('Y', wintypes.SHORT)) 1504 1505 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1506 _fields_ = (('cbSize', wintypes.ULONG), 1507 ('dwSize', COORD), 1508 ('dwCursorPosition', COORD), 1509 ('wAttributes', wintypes.WORD), 1510 ('srWindow', wintypes.SMALL_RECT), 1511 ('dwMaximumWindowSize', COORD), 1512 ('wPopupAttributes', wintypes.WORD), 1513 ('bFullscreenSupported', wintypes.BOOL), 1514 ('ColorTable', wintypes.DWORD * 16)) 1515 def __init__(self, *args, **kwds): 1516 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1517 *args, **kwds) 1518 self.cbSize = ctypes.sizeof(self) 1519 1520 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1521 CONSOLE_SCREEN_BUFFER_INFOEX) 1522 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1523 1524 kernel32.GetStdHandle.errcheck = _check_invalid 1525 kernel32.GetStdHandle.restype = wintypes.HANDLE 1526 kernel32.GetStdHandle.argtypes = ( 1527 wintypes.DWORD,) # _In_ nStdHandle 1528 1529 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1530 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1531 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1532 wintypes.DWORD, # _In_ dwDesiredAccess 1533 wintypes.DWORD, # _In_ dwShareMode 1534 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1535 wintypes.DWORD, # _In_ dwFlags 1536 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1537 1538 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1539 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1540 wintypes.HANDLE, # _In_ hConsoleOutput 1541 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1542 1543 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1544 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1545 wintypes.HANDLE, # _In_ hConsoleOutput 1546 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1547 1548 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1549 kernel32.SetConsoleWindowInfo.argtypes = ( 1550 wintypes.HANDLE, # _In_ hConsoleOutput 1551 wintypes.BOOL, # _In_ bAbsolute 1552 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1553 1554 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1555 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1556 wintypes.HANDLE, # _In_ hConsoleOutput 1557 wintypes.WCHAR, # _In_ cCharacter 1558 wintypes.DWORD, # _In_ nLength 1559 COORD, # _In_ dwWriteCoord 1560 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1561 1562 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1563 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1564 wintypes.HANDLE, # _In_ hConsoleOutput 1565 wintypes.LPWSTR, # _Out_ lpCharacter 1566 wintypes.DWORD, # _In_ nLength 1567 COORD, # _In_ dwReadCoord 1568 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1569 1570 @contextlib.contextmanager 1571 def allocate_console(): 1572 allocated = kernel32.AllocConsole() 1573 try: 1574 yield allocated 1575 finally: 1576 if allocated: 1577 kernel32.FreeConsole() 1578 1579 @contextlib.contextmanager 1580 def console_screen(ncols=None, nrows=None): 1581 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1582 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1583 nwritten = (wintypes.DWORD * 1)() 1584 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1585 kernel32.GetConsoleScreenBufferInfoEx( 1586 hStdOut, ctypes.byref(info)) 1587 if ncols is None: 1588 ncols = info.dwSize.X 1589 if nrows is None: 1590 nrows = info.dwSize.Y 1591 elif nrows > 9999: 1592 raise ValueError('nrows must be 9999 or less') 1593 fd_screen = None 1594 hScreen = kernel32.CreateConsoleScreenBuffer( 1595 GENERIC_READ | GENERIC_WRITE, 1596 FILE_SHARE_READ | FILE_SHARE_WRITE, 1597 None, CONSOLE_TEXTMODE_BUFFER, None) 1598 try: 1599 fd_screen = msvcrt.open_osfhandle( 1600 hScreen, os.O_RDWR | os.O_BINARY) 1601 kernel32.GetConsoleScreenBufferInfoEx( 1602 hScreen, ctypes.byref(new_info)) 1603 new_info.dwSize = COORD(ncols, nrows) 1604 new_info.srWindow = wintypes.SMALL_RECT( 1605 Left=0, Top=0, Right=(ncols - 1), 1606 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1607 kernel32.SetConsoleScreenBufferInfoEx( 1608 hScreen, ctypes.byref(new_info)) 1609 kernel32.SetConsoleWindowInfo(hScreen, True, 1610 ctypes.byref(new_info.srWindow)) 1611 kernel32.FillConsoleOutputCharacterW( 1612 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1613 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1614 try: 1615 yield fd_screen 1616 finally: 1617 kernel32.SetConsoleScreenBufferInfoEx( 1618 hStdOut, ctypes.byref(info)) 1619 kernel32.SetConsoleWindowInfo(hStdOut, True, 1620 ctypes.byref(info.srWindow)) 1621 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1622 finally: 1623 if fd_screen is not None: 1624 os.close(fd_screen) 1625 else: 1626 kernel32.CloseHandle(hScreen) 1627 1628 def read_screen(fd): 1629 hScreen = msvcrt.get_osfhandle(fd) 1630 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1631 kernel32.GetConsoleScreenBufferInfoEx( 1632 hScreen, ctypes.byref(csbi)) 1633 ncols = csbi.dwSize.X 1634 pos = csbi.dwCursorPosition 1635 length = ncols * pos.Y + pos.X + 1 1636 buf = (ctypes.c_wchar * length)() 1637 n = (wintypes.DWORD * 1)() 1638 kernel32.ReadConsoleOutputCharacterW( 1639 hScreen, buf, length, COORD(0,0), n) 1640 lines = [buf[i:i+ncols].rstrip(u'\0') 1641 for i in range(0, n[0], ncols)] 1642 return u'\n'.join(lines) 1643 1644@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1645class WindowsConsoleTest(DeviceTest): 1646 def test_unicode_output(self): 1647 """Test Unicode command line parameters and Unicode console window output. 1648 1649 Bug: https://issuetracker.google.com/issues/111972753 1650 """ 1651 # If we don't have a console window, allocate one. This isn't necessary if we're already 1652 # being run from a console window, which is typical. 1653 with allocate_console() as allocated_console: 1654 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1655 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1656 # likely unnecessary given the typical console window size. 1657 with console_screen(nrows=1000) as screen: 1658 unicode_string = u'로보카 폴리' 1659 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1660 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1661 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1662 process.wait() 1663 # Read what was written by adb to the temporary console buffer. 1664 console_output = read_screen(screen) 1665 self.assertEqual(unicode_string, console_output) 1666 1667 1668def main(): 1669 random.seed(0) 1670 if len(adb.get_devices()) > 0: 1671 suite = unittest.TestLoader().loadTestsFromName(__name__) 1672 unittest.TextTestRunner(verbosity=3).run(suite) 1673 else: 1674 print('Test suite must be run with attached devices') 1675 1676 1677if __name__ == '__main__': 1678 main() 1679