1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 device = adb.get_device() 81 82 83class AbbTest(DeviceTest): 84 def test_smoke(self): 85 abb = subprocess.run(['adb', 'abb'], capture_output=True) 86 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 87 88 # abb squashes all failures to 1. 89 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 90 self.assertEqual(abb.stdout, cmd.stdout) 91 self.assertEqual(abb.stderr, cmd.stderr) 92 93class ForwardReverseTest(DeviceTest): 94 def _test_no_rebind(self, description, direction_list, direction, 95 direction_no_rebind, direction_remove_all): 96 msg = direction_list() 97 self.assertEqual('', msg.strip(), 98 description + ' list must be empty to run this test.') 99 100 # Use --no-rebind with no existing binding 101 direction_no_rebind('tcp:5566', 'tcp:6655') 102 msg = direction_list() 103 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 104 105 # Use --no-rebind with existing binding 106 with self.assertRaises(subprocess.CalledProcessError): 107 direction_no_rebind('tcp:5566', 'tcp:6677') 108 msg = direction_list() 109 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 110 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 111 112 # Use the absence of --no-rebind with existing binding 113 direction('tcp:5566', 'tcp:6677') 114 msg = direction_list() 115 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 116 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 117 118 direction_remove_all() 119 msg = direction_list() 120 self.assertEqual('', msg.strip()) 121 122 def test_forward_no_rebind(self): 123 self._test_no_rebind('forward', self.device.forward_list, 124 self.device.forward, self.device.forward_no_rebind, 125 self.device.forward_remove_all) 126 127 def test_reverse_no_rebind(self): 128 self._test_no_rebind('reverse', self.device.reverse_list, 129 self.device.reverse, self.device.reverse_no_rebind, 130 self.device.reverse_remove_all) 131 132 def test_forward(self): 133 msg = self.device.forward_list() 134 self.assertEqual('', msg.strip(), 135 'Forwarding list must be empty to run this test.') 136 self.device.forward('tcp:5566', 'tcp:6655') 137 msg = self.device.forward_list() 138 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 139 self.device.forward('tcp:7788', 'tcp:8877') 140 msg = self.device.forward_list() 141 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 142 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 143 self.device.forward_remove('tcp:5566') 144 msg = self.device.forward_list() 145 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 146 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 147 self.device.forward_remove_all() 148 msg = self.device.forward_list() 149 self.assertEqual('', msg.strip()) 150 151 def test_forward_old_protocol(self): 152 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 153 154 msg = self.device.forward_list() 155 self.assertEqual('', msg.strip(), 156 'Forwarding list must be empty to run this test.') 157 158 s = socket.create_connection(("localhost", 5037)) 159 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 160 cmd = b"%04x%s" % (len(service), service) 161 s.sendall(cmd) 162 163 msg = self.device.forward_list() 164 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 165 166 self.device.forward_remove_all() 167 msg = self.device.forward_list() 168 self.assertEqual('', msg.strip()) 169 170 def test_forward_tcp_port_0(self): 171 self.assertEqual('', self.device.forward_list().strip(), 172 'Forwarding list must be empty to run this test.') 173 174 try: 175 # If resolving TCP port 0 is supported, `adb forward` will print 176 # the actual port number. 177 port = self.device.forward('tcp:0', 'tcp:8888').strip() 178 if not port: 179 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 180 181 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 182 self.device.forward_list())) 183 finally: 184 self.device.forward_remove_all() 185 186 def test_reverse(self): 187 msg = self.device.reverse_list() 188 self.assertEqual('', msg.strip(), 189 'Reverse forwarding list must be empty to run this test.') 190 self.device.reverse('tcp:5566', 'tcp:6655') 191 msg = self.device.reverse_list() 192 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 193 self.device.reverse('tcp:7788', 'tcp:8877') 194 msg = self.device.reverse_list() 195 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 196 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 197 self.device.reverse_remove('tcp:5566') 198 msg = self.device.reverse_list() 199 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 200 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 201 self.device.reverse_remove_all() 202 msg = self.device.reverse_list() 203 self.assertEqual('', msg.strip()) 204 205 def test_reverse_tcp_port_0(self): 206 self.assertEqual('', self.device.reverse_list().strip(), 207 'Reverse list must be empty to run this test.') 208 209 try: 210 # If resolving TCP port 0 is supported, `adb reverse` will print 211 # the actual port number. 212 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 213 if not port: 214 raise unittest.SkipTest('Reversing tcp:0 is not available.') 215 216 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 217 self.device.reverse_list())) 218 finally: 219 self.device.reverse_remove_all() 220 221 def test_forward_reverse_echo(self): 222 """Send data through adb forward and read it back via adb reverse""" 223 forward_port = 12345 224 reverse_port = forward_port + 1 225 forward_spec = 'tcp:' + str(forward_port) 226 reverse_spec = 'tcp:' + str(reverse_port) 227 forward_setup = False 228 reverse_setup = False 229 230 try: 231 # listen on localhost:forward_port, connect to remote:forward_port 232 self.device.forward(forward_spec, forward_spec) 233 forward_setup = True 234 # listen on remote:forward_port, connect to localhost:reverse_port 235 self.device.reverse(forward_spec, reverse_spec) 236 reverse_setup = True 237 238 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 239 with contextlib.closing(listener): 240 # Use SO_REUSEADDR so that subsequent runs of the test can grab 241 # the port even if it is in TIME_WAIT. 242 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 243 244 # Listen on localhost:reverse_port before connecting to 245 # localhost:forward_port because that will cause adb to connect 246 # back to localhost:reverse_port. 247 listener.bind(('127.0.0.1', reverse_port)) 248 listener.listen(4) 249 250 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 251 with contextlib.closing(client): 252 # Connect to the listener. 253 client.connect(('127.0.0.1', forward_port)) 254 255 # Accept the client connection. 256 accepted_connection, addr = listener.accept() 257 with contextlib.closing(accepted_connection) as server: 258 data = b'hello' 259 260 # Send data into the port setup by adb forward. 261 client.sendall(data) 262 # Explicitly close() so that server gets EOF. 263 client.close() 264 265 # Verify that the data came back via adb reverse. 266 self.assertEqual(data, server.makefile().read().encode("utf8")) 267 finally: 268 if reverse_setup: 269 self.device.reverse_remove(forward_spec) 270 if forward_setup: 271 self.device.forward_remove(forward_spec) 272 273 274class ShellTest(DeviceTest): 275 def _interactive_shell(self, shell_args, input): 276 """Runs an interactive adb shell. 277 278 Args: 279 shell_args: List of string arguments to `adb shell`. 280 input: bytes input to send to the interactive shell. 281 282 Returns: 283 The remote exit code. 284 285 Raises: 286 unittest.SkipTest: The device doesn't support exit codes. 287 """ 288 if not self.device.has_shell_protocol(): 289 raise unittest.SkipTest('exit codes are unavailable on this device') 290 291 proc = subprocess.Popen( 292 self.device.adb_cmd + ['shell'] + shell_args, 293 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 294 stderr=subprocess.PIPE) 295 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 296 # to explicitly add an exit command to close the session from the device 297 # side, plus the necessary newline to complete the interactive command. 298 proc.communicate(input + b'; exit\n') 299 return proc.returncode 300 301 def test_cat(self): 302 """Check that we can at least cat a file.""" 303 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 304 elements = out.split() 305 self.assertEqual(len(elements), 2) 306 307 uptime, idle = elements 308 self.assertGreater(float(uptime), 0.0) 309 self.assertGreater(float(idle), 0.0) 310 311 def test_throws_on_failure(self): 312 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 313 314 def test_output_not_stripped(self): 315 out = self.device.shell(['echo', 'foo'])[0] 316 self.assertEqual(out, 'foo' + self.device.linesep) 317 318 def test_shell_command_length(self): 319 # Devices that have shell_v2 should be able to handle long commands. 320 if self.device.has_shell_protocol(): 321 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 322 self.assertEqual(rc, 0) 323 self.assertTrue(out == ('x' * 16384 + '\n')) 324 325 def test_shell_nocheck_failure(self): 326 rc, out, _ = self.device.shell_nocheck(['false']) 327 self.assertNotEqual(rc, 0) 328 self.assertEqual(out, '') 329 330 def test_shell_nocheck_output_not_stripped(self): 331 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 332 self.assertEqual(rc, 0) 333 self.assertEqual(out, 'foo' + self.device.linesep) 334 335 def test_can_distinguish_tricky_results(self): 336 # If result checking on ADB shell is naively implemented as 337 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 338 # output from the result for a cmd of `echo -n 1`. 339 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 340 self.assertEqual(rc, 0) 341 self.assertEqual(out, '1') 342 343 def test_line_endings(self): 344 """Ensure that line ending translation is not happening in the pty. 345 346 Bug: http://b/19735063 347 """ 348 output = self.device.shell(['uname'])[0] 349 self.assertEqual(output, 'Linux' + self.device.linesep) 350 351 def test_pty_logic(self): 352 """Tests that a PTY is allocated when it should be. 353 354 PTY allocation behavior should match ssh. 355 """ 356 def check_pty(args): 357 """Checks adb shell PTY allocation. 358 359 Tests |args| for terminal and non-terminal stdin. 360 361 Args: 362 args: -Tt args in a list (e.g. ['-t', '-t']). 363 364 Returns: 365 A tuple (<terminal>, <non-terminal>). True indicates 366 the corresponding shell allocated a remote PTY. 367 """ 368 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 369 370 terminal = subprocess.Popen( 371 test_cmd, stdin=None, 372 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 373 terminal.communicate() 374 375 non_terminal = subprocess.Popen( 376 test_cmd, stdin=subprocess.PIPE, 377 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 378 non_terminal.communicate() 379 380 return (terminal.returncode == 0, non_terminal.returncode == 0) 381 382 # -T: never allocate PTY. 383 self.assertEqual((False, False), check_pty(['-T'])) 384 385 # These tests require a new device. 386 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 387 # No args: PTY only if stdin is a terminal and shell is interactive, 388 # which is difficult to reliably test from a script. 389 self.assertEqual((False, False), check_pty([])) 390 391 # -t: PTY if stdin is a terminal. 392 self.assertEqual((True, False), check_pty(['-t'])) 393 394 # -t -t: always allocate PTY. 395 self.assertEqual((True, True), check_pty(['-t', '-t'])) 396 397 # -tt: always allocate PTY, POSIX style (http://b/32216152). 398 self.assertEqual((True, True), check_pty(['-tt'])) 399 400 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 401 # we follow the man page instead. 402 self.assertEqual((True, True), check_pty(['-ttt'])) 403 404 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 405 self.assertEqual((True, True), check_pty(['-ttx'])) 406 407 # -Ttt: -tt cancels out -T. 408 self.assertEqual((True, True), check_pty(['-Ttt'])) 409 410 # -ttT: -T cancels out -tt. 411 self.assertEqual((False, False), check_pty(['-ttT'])) 412 413 def test_shell_protocol(self): 414 """Tests the shell protocol on the device. 415 416 If the device supports shell protocol, this gives us the ability 417 to separate stdout/stderr and return the exit code directly. 418 419 Bug: http://b/19734861 420 """ 421 if not self.device.has_shell_protocol(): 422 raise unittest.SkipTest('shell protocol unsupported on this device') 423 424 # Shell protocol should be used by default. 425 result = self.device.shell_nocheck( 426 shlex.split('echo foo; echo bar >&2; exit 17')) 427 self.assertEqual(17, result[0]) 428 self.assertEqual('foo' + self.device.linesep, result[1]) 429 self.assertEqual('bar' + self.device.linesep, result[2]) 430 431 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 432 433 # -x flag should disable shell protocol. 434 result = self.device.shell_nocheck( 435 shlex.split('-x echo foo; echo bar >&2; exit 17')) 436 self.assertEqual(0, result[0]) 437 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 438 self.assertEqual('', result[2]) 439 440 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 441 442 def test_non_interactive_sigint(self): 443 """Tests that SIGINT in a non-interactive shell kills the process. 444 445 This requires the shell protocol in order to detect the broken 446 pipe; raw data transfer mode will only see the break once the 447 subprocess tries to read or write. 448 449 Bug: http://b/23825725 450 """ 451 if not self.device.has_shell_protocol(): 452 raise unittest.SkipTest('shell protocol unsupported on this device') 453 454 # Start a long-running process. 455 sleep_proc = subprocess.Popen( 456 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 457 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 458 stderr=subprocess.STDOUT) 459 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 460 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 461 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 462 463 # Verify that the process is running, send signal, verify it stopped. 464 self.device.shell(proc_query) 465 os.kill(sleep_proc.pid, signal.SIGINT) 466 sleep_proc.communicate() 467 468 # It can take some time for the process to receive the signal and die. 469 end_time = time.time() + 3 470 while self.device.shell_nocheck(proc_query)[0] != 1: 471 self.assertFalse(time.time() > end_time, 472 'subprocess failed to terminate in time') 473 474 def test_non_interactive_stdin(self): 475 """Tests that non-interactive shells send stdin.""" 476 if not self.device.has_shell_protocol(): 477 raise unittest.SkipTest('non-interactive stdin unsupported ' 478 'on this device') 479 480 # Test both small and large inputs. 481 small_input = b'foo' 482 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 483 large_input = b'\n'.join(characters) 484 485 486 for input in (small_input, large_input): 487 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 488 stdin=subprocess.PIPE, 489 stdout=subprocess.PIPE, 490 stderr=subprocess.PIPE) 491 stdout, stderr = proc.communicate(input) 492 self.assertEqual(input.splitlines(), stdout.splitlines()) 493 self.assertEqual(b'', stderr) 494 495 def test_sighup(self): 496 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 497 log_path = "/data/local/tmp/adb_signal_test.log" 498 499 # Clear the output file. 500 self.device.shell_nocheck(["echo", ">", log_path]) 501 502 script = """ 503 trap "echo SIGINT > {path}; exit 0" SIGINT 504 trap "echo SIGHUP > {path}; exit 0" SIGHUP 505 echo Waiting 506 read 507 """.format(path=log_path) 508 509 script = ";".join([x.strip() for x in script.strip().splitlines()]) 510 511 process = self.device.shell_popen([script], kill_atexit=False, 512 stdin=subprocess.PIPE, 513 stdout=subprocess.PIPE) 514 515 self.assertEqual(b"Waiting\n", process.stdout.readline()) 516 process.send_signal(signal.SIGINT) 517 process.wait() 518 519 # Waiting for the local adb to finish is insufficient, since it hangs 520 # up immediately. 521 time.sleep(1) 522 523 stdout, _ = self.device.shell(["cat", log_path]) 524 self.assertEqual(stdout.strip(), "SIGHUP") 525 526 # Temporarily disabled because it seems to cause later instability. 527 # http://b/228114748 528 def disabled_test_exit_stress(self): 529 """Hammer `adb shell exit 42` with multiple threads.""" 530 thread_count = 48 531 result = dict() 532 def hammer(thread_idx, thread_count, result): 533 success = True 534 for i in range(thread_idx, 240, thread_count): 535 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 536 if ret != i % 256: 537 success = False 538 break 539 result[thread_idx] = success 540 541 threads = [] 542 for i in range(thread_count): 543 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 544 thread.start() 545 threads.append(thread) 546 for thread in threads: 547 thread.join() 548 for i, success in result.items(): 549 self.assertTrue(success) 550 551 def disabled_test_parallel(self): 552 """Spawn a bunch of `adb shell` instances in parallel. 553 554 This was broken historically due to the use of select, which only works 555 for fds that are numerically less than 1024. 556 557 Bug: http://b/141955761""" 558 559 n_procs = 2048 560 procs = dict() 561 for i in range(0, n_procs): 562 procs[i] = subprocess.Popen( 563 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 564 stdin=subprocess.PIPE, 565 stdout=subprocess.PIPE 566 ) 567 568 for i in range(0, n_procs): 569 procs[i].stdin.write("%d\n" % i) 570 571 for i in range(0, n_procs): 572 response = procs[i].stdout.readline() 573 assert(response == "%d\n" % i) 574 575 for i in range(0, n_procs): 576 procs[i].stdin.write("%d\n" % (i % 256)) 577 578 for i in range(0, n_procs): 579 assert(procs[i].wait() == i % 256) 580 581 582class ArgumentEscapingTest(DeviceTest): 583 def test_shell_escaping(self): 584 """Make sure that argument escaping is somewhat sane.""" 585 586 # http://b/19734868 587 # Note that this actually matches ssh(1)'s behavior --- it's 588 # converted to `sh -c echo hello; echo world` which sh interprets 589 # as `sh -c echo` (with an argument to that shell of "hello"), 590 # and then `echo world` back in the first shell. 591 result = self.device.shell( 592 shlex.split("sh -c 'echo hello; echo world'"))[0] 593 result = result.splitlines() 594 self.assertEqual(['', 'world'], result) 595 # If you really wanted "hello" and "world", here's what you'd do: 596 result = self.device.shell( 597 shlex.split(r'echo hello\;echo world'))[0].splitlines() 598 self.assertEqual(['hello', 'world'], result) 599 600 # http://b/15479704 601 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 602 self.assertEqual('t', result) 603 result = self.device.shell( 604 shlex.split("sh -c 'true && echo t'"))[0].strip() 605 self.assertEqual('t', result) 606 607 # http://b/20564385 608 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 609 self.assertEqual('t', result) 610 result = self.device.shell( 611 shlex.split(r'echo -n 123\;uname'))[0].strip() 612 self.assertEqual('123Linux', result) 613 614 def test_install_argument_escaping(self): 615 """Make sure that install argument escaping works.""" 616 # http://b/20323053, http://b/3090932. 617 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 618 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 619 delete=False) 620 tf.close() 621 622 # Installing bogus .apks fails if the device supports exit codes. 623 try: 624 output = self.device.install(tf.name.decode("utf8")) 625 except subprocess.CalledProcessError as e: 626 output = e.output 627 628 self.assertIn(file_suffix, output) 629 os.remove(tf.name) 630 631 632@unittest.skip("b/172372960: temporarily disabled due to flakiness") 633class RootUnrootTest(DeviceTest): 634 def _test_root(self): 635 message = self.device.root() 636 if 'adbd cannot run as root in production builds' in message: 637 return 638 self.device.wait() 639 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 640 641 def _test_unroot(self): 642 self.device.unroot() 643 self.device.wait() 644 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 645 646 def test_root_unroot(self): 647 """Make sure that adb root and adb unroot work, using id(1).""" 648 if self.device.get_prop('ro.debuggable') != '1': 649 raise unittest.SkipTest('requires rootable build') 650 651 original_user = self.device.shell(['id', '-un'])[0].strip() 652 try: 653 if original_user == 'root': 654 self._test_unroot() 655 self._test_root() 656 elif original_user == 'shell': 657 self._test_root() 658 self._test_unroot() 659 finally: 660 if original_user == 'root': 661 self.device.root() 662 else: 663 self.device.unroot() 664 self.device.wait() 665 666 667class TcpIpTest(DeviceTest): 668 def test_tcpip_failure_raises(self): 669 """adb tcpip requires a port. 670 671 Bug: http://b/22636927 672 """ 673 self.assertRaises( 674 subprocess.CalledProcessError, self.device.tcpip, '') 675 self.assertRaises( 676 subprocess.CalledProcessError, self.device.tcpip, 'foo') 677 678 679class SystemPropertiesTest(DeviceTest): 680 def test_get_prop(self): 681 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 682 683 @requires_root 684 def test_set_prop(self): 685 prop_name = 'foo.bar' 686 self.device.shell(['setprop', prop_name, '""']) 687 688 self.device.set_prop(prop_name, 'qux') 689 self.assertEqual( 690 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 691 692 693def compute_md5(string): 694 hsh = hashlib.md5() 695 hsh.update(string) 696 return hsh.hexdigest() 697 698 699def get_md5_prog(device): 700 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 701 try: 702 device.shell(['md5sum', '/proc/uptime']) 703 return 'md5sum' 704 except adb.ShellError: 705 return 'md5' 706 707 708class HostFile(object): 709 def __init__(self, handle, checksum): 710 self.handle = handle 711 self.checksum = checksum 712 self.full_path = handle.name 713 self.base_name = os.path.basename(self.full_path) 714 715 716class DeviceFile(object): 717 def __init__(self, checksum, full_path): 718 self.checksum = checksum 719 self.full_path = full_path 720 self.base_name = posixpath.basename(self.full_path) 721 722 723def make_random_host_files(in_dir, num_files): 724 min_size = 1 * (1 << 10) 725 max_size = 16 * (1 << 10) 726 727 files = [] 728 for _ in range(num_files): 729 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 730 731 size = random.randrange(min_size, max_size, 1024) 732 rand_str = os.urandom(size) 733 file_handle.write(rand_str) 734 file_handle.flush() 735 file_handle.close() 736 737 md5 = compute_md5(rand_str) 738 files.append(HostFile(file_handle, md5)) 739 return files 740 741 742def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 743 min_size = 1 * (1 << 10) 744 max_size = 16 * (1 << 10) 745 746 files = [] 747 for file_num in range(num_files): 748 size = random.randrange(min_size, max_size, 1024) 749 750 base_name = prefix + str(file_num) 751 full_path = posixpath.join(in_dir, base_name) 752 753 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 754 'bs={}'.format(size), 'count=1']) 755 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 756 757 files.append(DeviceFile(dev_md5, full_path)) 758 return files 759 760 761class FileOperationsTest: 762 class Base(DeviceTest): 763 SCRATCH_DIR = '/data/local/tmp' 764 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 765 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 766 767 def setUp(self): 768 self.previous_env = os.environ.get("ADB_COMPRESSION") 769 os.environ["ADB_COMPRESSION"] = self.compression 770 771 def tearDown(self): 772 if self.previous_env is None: 773 del os.environ["ADB_COMPRESSION"] 774 else: 775 os.environ["ADB_COMPRESSION"] = self.previous_env 776 777 def _verify_remote(self, checksum, remote_path): 778 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 779 remote_path])[0].split() 780 self.assertEqual(checksum, dev_md5) 781 782 def _verify_local(self, checksum, local_path): 783 with open(local_path, 'rb') as host_file: 784 host_md5 = compute_md5(host_file.read()) 785 self.assertEqual(host_md5, checksum) 786 787 def test_push(self): 788 """Push a randomly generated file to specified device.""" 789 kbytes = 512 790 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 791 rand_str = os.urandom(1024 * kbytes) 792 tmp.write(rand_str) 793 tmp.close() 794 795 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 796 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 797 798 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 799 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 800 801 os.remove(tmp.name) 802 803 def test_push_dir(self): 804 """Push a randomly generated directory of files to the device.""" 805 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 806 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 807 808 try: 809 host_dir = tempfile.mkdtemp() 810 811 # Make sure the temp directory isn't setuid, or else adb will complain. 812 os.chmod(host_dir, 0o700) 813 814 # Create 32 random files. 815 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 816 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 817 818 for temp_file in temp_files: 819 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 820 os.path.basename(host_dir), 821 temp_file.base_name) 822 self._verify_remote(temp_file.checksum, remote_path) 823 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 824 finally: 825 if host_dir is not None: 826 shutil.rmtree(host_dir) 827 828 def disabled_test_push_empty(self): 829 """Push an empty directory to the device.""" 830 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 831 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 832 833 try: 834 host_dir = tempfile.mkdtemp() 835 836 # Make sure the temp directory isn't setuid, or else adb will complain. 837 os.chmod(host_dir, 0o700) 838 839 # Create an empty directory. 840 empty_dir_path = os.path.join(host_dir, 'empty') 841 os.mkdir(empty_dir_path); 842 843 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 844 845 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 846 test_empty_cmd = ["[", "-d", remote_path, "]"] 847 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 848 849 self.assertEqual(rc, 0) 850 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 851 finally: 852 if host_dir is not None: 853 shutil.rmtree(host_dir) 854 855 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 856 def test_push_symlink(self): 857 """Push a symlink. 858 859 Bug: http://b/31491920 860 """ 861 try: 862 host_dir = tempfile.mkdtemp() 863 864 # Make sure the temp directory isn't setuid, or else adb will 865 # complain. 866 os.chmod(host_dir, 0o700) 867 868 with open(os.path.join(host_dir, 'foo'), 'w') as f: 869 f.write('foo') 870 871 symlink_path = os.path.join(host_dir, 'symlink') 872 os.symlink('foo', symlink_path) 873 874 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 875 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 876 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 877 rc, out, _ = self.device.shell_nocheck( 878 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 879 self.assertEqual(0, rc) 880 self.assertEqual(out.strip(), 'foo') 881 finally: 882 if host_dir is not None: 883 shutil.rmtree(host_dir) 884 885 def test_multiple_push(self): 886 """Push multiple files to the device in one adb push command. 887 888 Bug: http://b/25324823 889 """ 890 891 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 892 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 893 894 try: 895 host_dir = tempfile.mkdtemp() 896 897 # Create some random files and a subdirectory containing more files. 898 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 899 900 subdir = os.path.join(host_dir, 'subdir') 901 os.mkdir(subdir) 902 subdir_temp_files = make_random_host_files(in_dir=subdir, 903 num_files=4) 904 905 paths = [x.full_path for x in temp_files] 906 paths.append(subdir) 907 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 908 909 for temp_file in temp_files: 910 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 911 temp_file.base_name) 912 self._verify_remote(temp_file.checksum, remote_path) 913 914 for subdir_temp_file in subdir_temp_files: 915 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 916 # BROKEN: http://b/25394682 917 # 'subdir'; 918 temp_file.base_name) 919 self._verify_remote(temp_file.checksum, remote_path) 920 921 922 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 923 finally: 924 if host_dir is not None: 925 shutil.rmtree(host_dir) 926 927 @requires_non_root 928 def test_push_error_reporting(self): 929 """Make sure that errors that occur while pushing a file get reported 930 931 Bug: http://b/26816782 932 """ 933 with tempfile.NamedTemporaryFile() as tmp_file: 934 tmp_file.write(b'\0' * 1024 * 1024) 935 tmp_file.flush() 936 try: 937 self.device.push(local=tmp_file.name, remote='/system/') 938 self.fail('push should not have succeeded') 939 except subprocess.CalledProcessError as e: 940 output = e.output 941 942 self.assertTrue(b'Permission denied' in output or 943 b'Read-only file system' in output) 944 945 @requires_non_root 946 def test_push_directory_creation(self): 947 """Regression test for directory creation. 948 949 Bug: http://b/110953234 950 """ 951 with tempfile.NamedTemporaryFile() as tmp_file: 952 tmp_file.write(b'\0' * 1024 * 1024) 953 tmp_file.flush() 954 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 955 self.device.shell(['rm', '-rf', remote_path]) 956 957 remote_path += '/filename' 958 self.device.push(local=tmp_file.name, remote=remote_path) 959 960 def disabled_test_push_multiple_slash_root(self): 961 """Regression test for pushing to //data/local/tmp. 962 963 Bug: http://b/141311284 964 965 Disabled because this broken on the adbd side as well: b/141943968 966 """ 967 with tempfile.NamedTemporaryFile() as tmp_file: 968 tmp_file.write('\0' * 1024 * 1024) 969 tmp_file.flush() 970 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 971 self.device.shell(['rm', '-rf', remote_path]) 972 self.device.push(local=tmp_file.name, remote=remote_path) 973 974 def _test_pull(self, remote_file, checksum): 975 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 976 tmp_write.close() 977 self.device.pull(remote=remote_file, local=tmp_write.name) 978 with open(tmp_write.name, 'rb') as tmp_read: 979 host_contents = tmp_read.read() 980 host_md5 = compute_md5(host_contents) 981 self.assertEqual(checksum, host_md5) 982 os.remove(tmp_write.name) 983 984 @requires_non_root 985 def test_pull_error_reporting(self): 986 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 987 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 988 989 try: 990 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 991 except subprocess.CalledProcessError as e: 992 output = e.output 993 994 self.assertIn(b'Permission denied', output) 995 996 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 997 998 def test_pull(self): 999 """Pull a randomly generated file from specified device.""" 1000 kbytes = 512 1001 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 1002 cmd = ['dd', 'if=/dev/urandom', 1003 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 1004 'count={}'.format(kbytes)] 1005 self.device.shell(cmd) 1006 dev_md5, _ = self.device.shell( 1007 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 1008 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 1009 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 1010 1011 def test_pull_dir(self): 1012 """Pull a randomly generated directory of files from the device.""" 1013 try: 1014 host_dir = tempfile.mkdtemp() 1015 1016 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1017 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1018 1019 # Populate device directory with random files. 1020 temp_files = make_random_device_files( 1021 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1022 1023 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1024 1025 for temp_file in temp_files: 1026 host_path = os.path.join( 1027 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1028 temp_file.base_name) 1029 self._verify_local(temp_file.checksum, host_path) 1030 1031 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1032 finally: 1033 if host_dir is not None: 1034 shutil.rmtree(host_dir) 1035 1036 def test_pull_dir_symlink(self): 1037 """Pull a directory into a symlink to a directory. 1038 1039 Bug: http://b/27362811 1040 """ 1041 if os.name != 'posix': 1042 raise unittest.SkipTest('requires POSIX') 1043 1044 try: 1045 host_dir = tempfile.mkdtemp() 1046 real_dir = os.path.join(host_dir, 'dir') 1047 symlink = os.path.join(host_dir, 'symlink') 1048 os.mkdir(real_dir) 1049 os.symlink(real_dir, symlink) 1050 1051 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1052 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1053 1054 # Populate device directory with random files. 1055 temp_files = make_random_device_files( 1056 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1057 1058 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1059 1060 for temp_file in temp_files: 1061 host_path = os.path.join( 1062 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1063 temp_file.base_name) 1064 self._verify_local(temp_file.checksum, host_path) 1065 1066 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1067 finally: 1068 if host_dir is not None: 1069 shutil.rmtree(host_dir) 1070 1071 def test_pull_dir_symlink_collision(self): 1072 """Pull a directory into a colliding symlink to directory.""" 1073 if os.name != 'posix': 1074 raise unittest.SkipTest('requires POSIX') 1075 1076 try: 1077 host_dir = tempfile.mkdtemp() 1078 real_dir = os.path.join(host_dir, 'real') 1079 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1080 symlink = os.path.join(host_dir, tmp_dirname) 1081 os.mkdir(real_dir) 1082 os.symlink(real_dir, symlink) 1083 1084 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1085 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1086 1087 # Populate device directory with random files. 1088 temp_files = make_random_device_files( 1089 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1090 1091 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1092 1093 for temp_file in temp_files: 1094 host_path = os.path.join(real_dir, temp_file.base_name) 1095 self._verify_local(temp_file.checksum, host_path) 1096 1097 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1098 finally: 1099 if host_dir is not None: 1100 shutil.rmtree(host_dir) 1101 1102 def test_pull_dir_nonexistent(self): 1103 """Pull a directory of files from the device to a nonexistent path.""" 1104 try: 1105 host_dir = tempfile.mkdtemp() 1106 dest_dir = os.path.join(host_dir, 'dest') 1107 1108 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1109 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1110 1111 # Populate device directory with random files. 1112 temp_files = make_random_device_files( 1113 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1114 1115 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1116 1117 for temp_file in temp_files: 1118 host_path = os.path.join(dest_dir, temp_file.base_name) 1119 self._verify_local(temp_file.checksum, host_path) 1120 1121 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1122 finally: 1123 if host_dir is not None: 1124 shutil.rmtree(host_dir) 1125 1126 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1127 def disabled_test_pull_symlink_dir(self): 1128 """Pull a symlink to a directory of symlinks to files.""" 1129 try: 1130 host_dir = tempfile.mkdtemp() 1131 1132 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1133 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1134 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1135 1136 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1137 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1138 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1139 1140 # Populate device directory with random files. 1141 temp_files = make_random_device_files( 1142 self.device, in_dir=remote_dir, num_files=32) 1143 1144 for temp_file in temp_files: 1145 self.device.shell( 1146 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1147 posixpath.join(remote_links, temp_file.base_name)]) 1148 1149 self.device.pull(remote=remote_symlink, local=host_dir) 1150 1151 for temp_file in temp_files: 1152 host_path = os.path.join( 1153 host_dir, 'symlink', temp_file.base_name) 1154 self._verify_local(temp_file.checksum, host_path) 1155 1156 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1157 finally: 1158 if host_dir is not None: 1159 shutil.rmtree(host_dir) 1160 1161 def test_pull_empty(self): 1162 """Pull a directory containing an empty directory from the device.""" 1163 try: 1164 host_dir = tempfile.mkdtemp() 1165 1166 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1167 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1168 self.device.shell(['mkdir', '-p', remote_empty_path]) 1169 1170 self.device.pull(remote=remote_empty_path, local=host_dir) 1171 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1172 finally: 1173 if host_dir is not None: 1174 shutil.rmtree(host_dir) 1175 1176 def test_multiple_pull(self): 1177 """Pull a randomly generated directory of files from the device.""" 1178 1179 try: 1180 host_dir = tempfile.mkdtemp() 1181 1182 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1183 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1184 self.device.shell(['mkdir', '-p', subdir]) 1185 1186 # Create some random files and a subdirectory containing more files. 1187 temp_files = make_random_device_files( 1188 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1189 1190 subdir_temp_files = make_random_device_files( 1191 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1192 1193 paths = [x.full_path for x in temp_files] 1194 paths.append(subdir) 1195 self.device._simple_call(['pull'] + paths + [host_dir]) 1196 1197 for temp_file in temp_files: 1198 local_path = os.path.join(host_dir, temp_file.base_name) 1199 self._verify_local(temp_file.checksum, local_path) 1200 1201 for subdir_temp_file in subdir_temp_files: 1202 local_path = os.path.join(host_dir, 1203 'subdir', 1204 subdir_temp_file.base_name) 1205 self._verify_local(subdir_temp_file.checksum, local_path) 1206 1207 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1208 finally: 1209 if host_dir is not None: 1210 shutil.rmtree(host_dir) 1211 1212 def verify_sync(self, device, temp_files, device_dir): 1213 """Verifies that a list of temp files was synced to the device.""" 1214 # Confirm that every file on the device mirrors that on the host. 1215 for temp_file in temp_files: 1216 device_full_path = posixpath.join( 1217 device_dir, temp_file.base_name) 1218 dev_md5, _ = device.shell( 1219 [get_md5_prog(self.device), device_full_path])[0].split() 1220 self.assertEqual(temp_file.checksum, dev_md5) 1221 1222 def test_sync(self): 1223 """Sync a host directory to the data partition.""" 1224 1225 try: 1226 base_dir = tempfile.mkdtemp() 1227 1228 # Create mirror device directory hierarchy within base_dir. 1229 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1230 os.makedirs(full_dir_path) 1231 1232 # Create 32 random files within the host mirror. 1233 temp_files = make_random_host_files( 1234 in_dir=full_dir_path, num_files=32) 1235 1236 # Clean up any stale files on the device. 1237 device = adb.get_device() # pylint: disable=no-member 1238 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1239 1240 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1241 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1242 device.sync('data') 1243 if old_product_out is None: 1244 del os.environ['ANDROID_PRODUCT_OUT'] 1245 else: 1246 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1247 1248 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1249 1250 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1251 finally: 1252 if base_dir is not None: 1253 shutil.rmtree(base_dir) 1254 1255 def test_push_sync(self): 1256 """Sync a host directory to a specific path.""" 1257 1258 try: 1259 temp_dir = tempfile.mkdtemp() 1260 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1261 1262 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1263 1264 # Clean up any stale files on the device. 1265 device = adb.get_device() # pylint: disable=no-member 1266 device.shell(['rm', '-rf', device_dir]) 1267 1268 device.push(temp_dir, device_dir, sync=True) 1269 1270 self.verify_sync(device, temp_files, device_dir) 1271 1272 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1273 finally: 1274 if temp_dir is not None: 1275 shutil.rmtree(temp_dir) 1276 1277 def test_push_sync_multiple(self): 1278 """Sync multiple host directories to a specific path.""" 1279 1280 try: 1281 temp_dir = tempfile.mkdtemp() 1282 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1283 1284 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1285 1286 # Clean up any stale files on the device. 1287 device = adb.get_device() # pylint: disable=no-member 1288 device.shell(['rm', '-rf', device_dir]) 1289 device.shell(['mkdir', '-p', device_dir]) 1290 1291 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1292 device.push(host_paths, device_dir, sync=True) 1293 1294 self.verify_sync(device, temp_files, device_dir) 1295 1296 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1297 finally: 1298 if temp_dir is not None: 1299 shutil.rmtree(temp_dir) 1300 1301 1302 def test_push_dry_run_nonexistent_file(self): 1303 """Push with dry run.""" 1304 1305 for file_size in [8, 1024 * 1024]: 1306 try: 1307 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1308 device_file = posixpath.join(device_dir, 'file') 1309 1310 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1311 self.device.shell(['mkdir', '-p', device_dir]) 1312 1313 host_dir = tempfile.mkdtemp() 1314 host_file = posixpath.join(host_dir, 'file') 1315 1316 with open(host_file, "w") as f: 1317 f.write('x' * file_size) 1318 1319 self.device._simple_call(['push', '-n', host_file, device_file]) 1320 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1321 self.assertNotEqual(0, rc) 1322 1323 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1324 finally: 1325 if host_dir is not None: 1326 shutil.rmtree(host_dir) 1327 1328 def test_push_dry_run_existent_file(self): 1329 """Push with dry run.""" 1330 1331 for file_size in [8, 1024 * 1024]: 1332 try: 1333 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1334 device_file = posixpath.join(device_dir, 'file') 1335 1336 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1337 self.device.shell(['mkdir', '-p', device_dir]) 1338 self.device.shell(['echo', 'foo', '>', device_file]) 1339 1340 host_dir = tempfile.mkdtemp() 1341 host_file = posixpath.join(host_dir, 'file') 1342 1343 with open(host_file, "w") as f: 1344 f.write('x' * file_size) 1345 1346 self.device._simple_call(['push', '-n', host_file, device_file]) 1347 stdout, stderr = self.device.shell(['cat', device_file]) 1348 self.assertEqual(stdout.strip(), "foo") 1349 1350 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1351 finally: 1352 if host_dir is not None: 1353 shutil.rmtree(host_dir) 1354 1355 def test_unicode_paths(self): 1356 """Ensure that we can support non-ASCII paths, even on Windows.""" 1357 name = u'로보카 폴리' 1358 1359 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1360 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1361 1362 ## push. 1363 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1364 tf.close() 1365 self.device.push(tf.name, remote_path) 1366 os.remove(tf.name) 1367 self.assertFalse(os.path.exists(tf.name)) 1368 1369 # Verify that the device ended up with the expected UTF-8 path 1370 output = self.device.shell( 1371 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1372 self.assertEqual(remote_path, output) 1373 1374 # pull. 1375 self.device.pull(remote_path, tf.name) 1376 self.assertTrue(os.path.exists(tf.name)) 1377 os.remove(tf.name) 1378 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1379 1380 1381class FileOperationsTestUncompressed(FileOperationsTest.Base): 1382 compression = "none" 1383 1384 1385class FileOperationsTestBrotli(FileOperationsTest.Base): 1386 compression = "brotli" 1387 1388 1389class FileOperationsTestLZ4(FileOperationsTest.Base): 1390 compression = "lz4" 1391 1392 1393class FileOperationsTestZstd(FileOperationsTest.Base): 1394 compression = "zstd" 1395 1396 1397class DeviceOfflineTest(DeviceTest): 1398 def _get_device_state(self, serialno): 1399 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1400 for line in output.split('\n'): 1401 m = re.match('(\S+)\s+(\S+)', line) 1402 if m and m.group(1) == serialno: 1403 return m.group(2) 1404 return None 1405 1406 def disabled_test_killed_when_pushing_a_large_file(self): 1407 """ 1408 While running adb push with a large file, kill adb server. 1409 Occasionally the device becomes offline. Because the device is still 1410 reading data without realizing that the adb server has been restarted. 1411 Test if we can bring the device online automatically now. 1412 http://b/32952319 1413 """ 1414 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1415 # 1. Push a large file 1416 file_path = 'tmp_large_file' 1417 try: 1418 fh = open(file_path, 'w') 1419 fh.write('\0' * (100 * 1024 * 1024)) 1420 fh.close() 1421 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1422 time.sleep(0.1) 1423 # 2. Kill the adb server 1424 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1425 subproc.terminate() 1426 finally: 1427 try: 1428 os.unlink(file_path) 1429 except: 1430 pass 1431 # 3. See if the device still exist. 1432 # Sleep to wait for the adb server exit. 1433 time.sleep(0.5) 1434 # 4. The device should be online 1435 self.assertEqual(self._get_device_state(serialno), 'device') 1436 1437 def disabled_test_killed_when_pulling_a_large_file(self): 1438 """ 1439 While running adb pull with a large file, kill adb server. 1440 Occasionally the device can't be connected. Because the device is trying to 1441 send a message larger than what is expected by the adb server. 1442 Test if we can bring the device online automatically now. 1443 """ 1444 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1445 file_path = 'tmp_large_file' 1446 try: 1447 # 1. Create a large file on device. 1448 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1449 'bs=1000000', 'count=100']) 1450 # 2. Pull the large file on host. 1451 subproc = subprocess.Popen(self.device.adb_cmd + 1452 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1453 time.sleep(0.1) 1454 # 3. Kill the adb server 1455 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1456 subproc.terminate() 1457 finally: 1458 try: 1459 os.unlink(file_path) 1460 except: 1461 pass 1462 # 4. See if the device still exist. 1463 # Sleep to wait for the adb server exit. 1464 time.sleep(0.5) 1465 self.assertEqual(self._get_device_state(serialno), 'device') 1466 1467 1468 def test_packet_size_regression(self): 1469 """Test for http://b/37783561 1470 1471 Receiving packets of a length divisible by 512 but not 1024 resulted in 1472 the adb client waiting indefinitely for more input. 1473 """ 1474 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1475 # Probe some surrounding values as well, for the hell of it. 1476 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1477 for offset in [-6, -5, -4]: 1478 length = base + offset 1479 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1480 'echo', 'foo'] 1481 rc, stdout, _ = self.device.shell_nocheck(cmd) 1482 1483 self.assertEqual(0, rc) 1484 1485 # Output should be '\0' * length, followed by "foo\n" 1486 self.assertEqual(length, len(stdout) - 4) 1487 self.assertEqual(stdout, "\0" * length + "foo\n") 1488 1489 def test_zero_packet(self): 1490 """Test for http://b/113070258 1491 1492 Make sure that we don't blow up when sending USB transfers that line up 1493 exactly with the USB packet size. 1494 """ 1495 1496 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1497 try: 1498 for size in [512, 1024]: 1499 def listener(): 1500 cmd = ["echo foo | nc -l -p 12345; echo done"] 1501 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1502 1503 thread = threading.Thread(target=listener) 1504 thread.start() 1505 1506 # Wait a bit to let the shell command start. 1507 time.sleep(0.25) 1508 1509 sock = socket.create_connection(("localhost", local_port)) 1510 with contextlib.closing(sock): 1511 bytesWritten = sock.send(b"a" * size) 1512 self.assertEqual(size, bytesWritten) 1513 readBytes = sock.recv(4096) 1514 self.assertEqual(b"foo\n", readBytes) 1515 1516 thread.join() 1517 finally: 1518 self.device.forward_remove("tcp:{}".format(local_port)) 1519 1520 1521class SocketTest(DeviceTest): 1522 def test_socket_flush(self): 1523 """Test that we handle socket closure properly. 1524 1525 If we're done writing to a socket, closing before the other end has 1526 closed will send a TCP_RST if we have incoming data queued up, which 1527 may result in data that we've written being discarded. 1528 1529 Bug: http://b/74616284 1530 """ 1531 s = socket.create_connection(("localhost", 5037)) 1532 1533 def adb_length_prefixed(string): 1534 encoded = string.encode("utf8") 1535 result = b"%04x%s" % (len(encoded), encoded) 1536 return result 1537 1538 if "ANDROID_SERIAL" in os.environ: 1539 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1540 else: 1541 transport_string = "host:transport-any" 1542 1543 s.sendall(adb_length_prefixed(transport_string)) 1544 response = s.recv(4) 1545 self.assertEqual(b"OKAY", response) 1546 1547 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1548 s.sendall(adb_length_prefixed(shell_string)) 1549 1550 response = s.recv(4) 1551 self.assertEqual(b"OKAY", response) 1552 1553 # Spawn a thread that dumps garbage into the socket until failure. 1554 def spam(): 1555 buf = b"\0" * 16384 1556 try: 1557 while True: 1558 s.sendall(buf) 1559 except Exception as ex: 1560 print(ex) 1561 1562 thread = threading.Thread(target=spam) 1563 thread.start() 1564 1565 time.sleep(1) 1566 1567 received = b"" 1568 while True: 1569 read = s.recv(512) 1570 if len(read) == 0: 1571 break 1572 received += read 1573 1574 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1575 thread.join() 1576 1577 1578class FramebufferTest(DeviceTest): 1579 @requires_root 1580 def test_framebuffer(self): 1581 """Test that we get something from the framebuffer service.""" 1582 output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) 1583 self.assertFalse(len(output) == 0) 1584 1585 1586if sys.platform == "win32": 1587 # From https://stackoverflow.com/a/38749458 1588 import os 1589 import contextlib 1590 import msvcrt 1591 import ctypes 1592 from ctypes import wintypes 1593 1594 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1595 1596 GENERIC_READ = 0x80000000 1597 GENERIC_WRITE = 0x40000000 1598 FILE_SHARE_READ = 1 1599 FILE_SHARE_WRITE = 2 1600 CONSOLE_TEXTMODE_BUFFER = 1 1601 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1602 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1603 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1604 1605 def _check_zero(result, func, args): 1606 if not result: 1607 raise ctypes.WinError(ctypes.get_last_error()) 1608 return args 1609 1610 def _check_invalid(result, func, args): 1611 if result == INVALID_HANDLE_VALUE: 1612 raise ctypes.WinError(ctypes.get_last_error()) 1613 return args 1614 1615 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1616 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1617 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1618 1619 class COORD(ctypes.Structure): 1620 _fields_ = (('X', wintypes.SHORT), 1621 ('Y', wintypes.SHORT)) 1622 1623 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1624 _fields_ = (('cbSize', wintypes.ULONG), 1625 ('dwSize', COORD), 1626 ('dwCursorPosition', COORD), 1627 ('wAttributes', wintypes.WORD), 1628 ('srWindow', wintypes.SMALL_RECT), 1629 ('dwMaximumWindowSize', COORD), 1630 ('wPopupAttributes', wintypes.WORD), 1631 ('bFullscreenSupported', wintypes.BOOL), 1632 ('ColorTable', wintypes.DWORD * 16)) 1633 def __init__(self, *args, **kwds): 1634 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1635 *args, **kwds) 1636 self.cbSize = ctypes.sizeof(self) 1637 1638 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1639 CONSOLE_SCREEN_BUFFER_INFOEX) 1640 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1641 1642 kernel32.GetStdHandle.errcheck = _check_invalid 1643 kernel32.GetStdHandle.restype = wintypes.HANDLE 1644 kernel32.GetStdHandle.argtypes = ( 1645 wintypes.DWORD,) # _In_ nStdHandle 1646 1647 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1648 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1649 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1650 wintypes.DWORD, # _In_ dwDesiredAccess 1651 wintypes.DWORD, # _In_ dwShareMode 1652 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1653 wintypes.DWORD, # _In_ dwFlags 1654 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1655 1656 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1657 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1658 wintypes.HANDLE, # _In_ hConsoleOutput 1659 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1660 1661 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1662 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1663 wintypes.HANDLE, # _In_ hConsoleOutput 1664 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1665 1666 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1667 kernel32.SetConsoleWindowInfo.argtypes = ( 1668 wintypes.HANDLE, # _In_ hConsoleOutput 1669 wintypes.BOOL, # _In_ bAbsolute 1670 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1671 1672 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1673 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1674 wintypes.HANDLE, # _In_ hConsoleOutput 1675 wintypes.WCHAR, # _In_ cCharacter 1676 wintypes.DWORD, # _In_ nLength 1677 COORD, # _In_ dwWriteCoord 1678 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1679 1680 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1681 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1682 wintypes.HANDLE, # _In_ hConsoleOutput 1683 wintypes.LPWSTR, # _Out_ lpCharacter 1684 wintypes.DWORD, # _In_ nLength 1685 COORD, # _In_ dwReadCoord 1686 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1687 1688 @contextlib.contextmanager 1689 def allocate_console(): 1690 allocated = kernel32.AllocConsole() 1691 try: 1692 yield allocated 1693 finally: 1694 if allocated: 1695 kernel32.FreeConsole() 1696 1697 @contextlib.contextmanager 1698 def console_screen(ncols=None, nrows=None): 1699 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1700 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1701 nwritten = (wintypes.DWORD * 1)() 1702 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1703 kernel32.GetConsoleScreenBufferInfoEx( 1704 hStdOut, ctypes.byref(info)) 1705 if ncols is None: 1706 ncols = info.dwSize.X 1707 if nrows is None: 1708 nrows = info.dwSize.Y 1709 elif nrows > 9999: 1710 raise ValueError('nrows must be 9999 or less') 1711 fd_screen = None 1712 hScreen = kernel32.CreateConsoleScreenBuffer( 1713 GENERIC_READ | GENERIC_WRITE, 1714 FILE_SHARE_READ | FILE_SHARE_WRITE, 1715 None, CONSOLE_TEXTMODE_BUFFER, None) 1716 try: 1717 fd_screen = msvcrt.open_osfhandle( 1718 hScreen, os.O_RDWR | os.O_BINARY) 1719 kernel32.GetConsoleScreenBufferInfoEx( 1720 hScreen, ctypes.byref(new_info)) 1721 new_info.dwSize = COORD(ncols, nrows) 1722 new_info.srWindow = wintypes.SMALL_RECT( 1723 Left=0, Top=0, Right=(ncols - 1), 1724 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1725 kernel32.SetConsoleScreenBufferInfoEx( 1726 hScreen, ctypes.byref(new_info)) 1727 kernel32.SetConsoleWindowInfo(hScreen, True, 1728 ctypes.byref(new_info.srWindow)) 1729 kernel32.FillConsoleOutputCharacterW( 1730 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1731 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1732 try: 1733 yield fd_screen 1734 finally: 1735 kernel32.SetConsoleScreenBufferInfoEx( 1736 hStdOut, ctypes.byref(info)) 1737 kernel32.SetConsoleWindowInfo(hStdOut, True, 1738 ctypes.byref(info.srWindow)) 1739 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1740 finally: 1741 if fd_screen is not None: 1742 os.close(fd_screen) 1743 else: 1744 kernel32.CloseHandle(hScreen) 1745 1746 def read_screen(fd): 1747 hScreen = msvcrt.get_osfhandle(fd) 1748 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1749 kernel32.GetConsoleScreenBufferInfoEx( 1750 hScreen, ctypes.byref(csbi)) 1751 ncols = csbi.dwSize.X 1752 pos = csbi.dwCursorPosition 1753 length = ncols * pos.Y + pos.X + 1 1754 buf = (ctypes.c_wchar * length)() 1755 n = (wintypes.DWORD * 1)() 1756 kernel32.ReadConsoleOutputCharacterW( 1757 hScreen, buf, length, COORD(0,0), n) 1758 lines = [buf[i:i+ncols].rstrip(u'\0') 1759 for i in range(0, n[0], ncols)] 1760 return u'\n'.join(lines) 1761 1762@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1763class WindowsConsoleTest(DeviceTest): 1764 def test_unicode_output(self): 1765 """Test Unicode command line parameters and Unicode console window output. 1766 1767 Bug: https://issuetracker.google.com/issues/111972753 1768 """ 1769 # If we don't have a console window, allocate one. This isn't necessary if we're already 1770 # being run from a console window, which is typical. 1771 with allocate_console() as allocated_console: 1772 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1773 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1774 # likely unnecessary given the typical console window size. 1775 with console_screen(nrows=1000) as screen: 1776 unicode_string = u'로보카 폴리' 1777 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1778 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1779 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1780 process.wait() 1781 # Read what was written by adb to the temporary console buffer. 1782 console_output = read_screen(screen) 1783 self.assertEqual(unicode_string, console_output) 1784 1785 1786def main(): 1787 random.seed(0) 1788 if len(adb.get_devices()) > 0: 1789 suite = unittest.TestLoader().loadTestsFromName(__name__) 1790 unittest.TextTestRunner(verbosity=3).run(suite) 1791 else: 1792 print('Test suite must be run with attached devices') 1793 1794 1795if __name__ == '__main__': 1796 main() 1797