1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 def setUp(self): 81 self.device = adb.get_device() 82 83 84class ForwardReverseTest(DeviceTest): 85 def _test_no_rebind(self, description, direction_list, direction, 86 direction_no_rebind, direction_remove_all): 87 msg = direction_list() 88 self.assertEqual('', msg.strip(), 89 description + ' list must be empty to run this test.') 90 91 # Use --no-rebind with no existing binding 92 direction_no_rebind('tcp:5566', 'tcp:6655') 93 msg = direction_list() 94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 95 96 # Use --no-rebind with existing binding 97 with self.assertRaises(subprocess.CalledProcessError): 98 direction_no_rebind('tcp:5566', 'tcp:6677') 99 msg = direction_list() 100 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 102 103 # Use the absence of --no-rebind with existing binding 104 direction('tcp:5566', 'tcp:6677') 105 msg = direction_list() 106 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 107 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 108 109 direction_remove_all() 110 msg = direction_list() 111 self.assertEqual('', msg.strip()) 112 113 def test_forward_no_rebind(self): 114 self._test_no_rebind('forward', self.device.forward_list, 115 self.device.forward, self.device.forward_no_rebind, 116 self.device.forward_remove_all) 117 118 def test_reverse_no_rebind(self): 119 self._test_no_rebind('reverse', self.device.reverse_list, 120 self.device.reverse, self.device.reverse_no_rebind, 121 self.device.reverse_remove_all) 122 123 def test_forward(self): 124 msg = self.device.forward_list() 125 self.assertEqual('', msg.strip(), 126 'Forwarding list must be empty to run this test.') 127 self.device.forward('tcp:5566', 'tcp:6655') 128 msg = self.device.forward_list() 129 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 130 self.device.forward('tcp:7788', 'tcp:8877') 131 msg = self.device.forward_list() 132 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 133 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 134 self.device.forward_remove('tcp:5566') 135 msg = self.device.forward_list() 136 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 137 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 138 self.device.forward_remove_all() 139 msg = self.device.forward_list() 140 self.assertEqual('', msg.strip()) 141 142 def test_forward_tcp_port_0(self): 143 self.assertEqual('', self.device.forward_list().strip(), 144 'Forwarding list must be empty to run this test.') 145 146 try: 147 # If resolving TCP port 0 is supported, `adb forward` will print 148 # the actual port number. 149 port = self.device.forward('tcp:0', 'tcp:8888').strip() 150 if not port: 151 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 152 153 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 154 self.device.forward_list())) 155 finally: 156 self.device.forward_remove_all() 157 158 def test_reverse(self): 159 msg = self.device.reverse_list() 160 self.assertEqual('', msg.strip(), 161 'Reverse forwarding list must be empty to run this test.') 162 self.device.reverse('tcp:5566', 'tcp:6655') 163 msg = self.device.reverse_list() 164 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 165 self.device.reverse('tcp:7788', 'tcp:8877') 166 msg = self.device.reverse_list() 167 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 168 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 169 self.device.reverse_remove('tcp:5566') 170 msg = self.device.reverse_list() 171 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 172 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 173 self.device.reverse_remove_all() 174 msg = self.device.reverse_list() 175 self.assertEqual('', msg.strip()) 176 177 def test_reverse_tcp_port_0(self): 178 self.assertEqual('', self.device.reverse_list().strip(), 179 'Reverse list must be empty to run this test.') 180 181 try: 182 # If resolving TCP port 0 is supported, `adb reverse` will print 183 # the actual port number. 184 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 185 if not port: 186 raise unittest.SkipTest('Reversing tcp:0 is not available.') 187 188 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 189 self.device.reverse_list())) 190 finally: 191 self.device.reverse_remove_all() 192 193 def test_forward_reverse_echo(self): 194 """Send data through adb forward and read it back via adb reverse""" 195 forward_port = 12345 196 reverse_port = forward_port + 1 197 forward_spec = 'tcp:' + str(forward_port) 198 reverse_spec = 'tcp:' + str(reverse_port) 199 forward_setup = False 200 reverse_setup = False 201 202 try: 203 # listen on localhost:forward_port, connect to remote:forward_port 204 self.device.forward(forward_spec, forward_spec) 205 forward_setup = True 206 # listen on remote:forward_port, connect to localhost:reverse_port 207 self.device.reverse(forward_spec, reverse_spec) 208 reverse_setup = True 209 210 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 211 with contextlib.closing(listener): 212 # Use SO_REUSEADDR so that subsequent runs of the test can grab 213 # the port even if it is in TIME_WAIT. 214 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 215 216 # Listen on localhost:reverse_port before connecting to 217 # localhost:forward_port because that will cause adb to connect 218 # back to localhost:reverse_port. 219 listener.bind(('127.0.0.1', reverse_port)) 220 listener.listen(4) 221 222 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 223 with contextlib.closing(client): 224 # Connect to the listener. 225 client.connect(('127.0.0.1', forward_port)) 226 227 # Accept the client connection. 228 accepted_connection, addr = listener.accept() 229 with contextlib.closing(accepted_connection) as server: 230 data = 'hello' 231 232 # Send data into the port setup by adb forward. 233 client.sendall(data) 234 # Explicitly close() so that server gets EOF. 235 client.close() 236 237 # Verify that the data came back via adb reverse. 238 self.assertEqual(data, server.makefile().read()) 239 finally: 240 if reverse_setup: 241 self.device.reverse_remove(forward_spec) 242 if forward_setup: 243 self.device.forward_remove(forward_spec) 244 245 246class ShellTest(DeviceTest): 247 def _interactive_shell(self, shell_args, input): 248 """Runs an interactive adb shell. 249 250 Args: 251 shell_args: List of string arguments to `adb shell`. 252 input: String input to send to the interactive shell. 253 254 Returns: 255 The remote exit code. 256 257 Raises: 258 unittest.SkipTest: The device doesn't support exit codes. 259 """ 260 if not self.device.has_shell_protocol(): 261 raise unittest.SkipTest('exit codes are unavailable on this device') 262 263 proc = subprocess.Popen( 264 self.device.adb_cmd + ['shell'] + shell_args, 265 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 266 stderr=subprocess.PIPE) 267 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 268 # to explicitly add an exit command to close the session from the device 269 # side, plus the necessary newline to complete the interactive command. 270 proc.communicate(input + '; exit\n') 271 return proc.returncode 272 273 def test_cat(self): 274 """Check that we can at least cat a file.""" 275 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 276 elements = out.split() 277 self.assertEqual(len(elements), 2) 278 279 uptime, idle = elements 280 self.assertGreater(float(uptime), 0.0) 281 self.assertGreater(float(idle), 0.0) 282 283 def test_throws_on_failure(self): 284 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 285 286 def test_output_not_stripped(self): 287 out = self.device.shell(['echo', 'foo'])[0] 288 self.assertEqual(out, 'foo' + self.device.linesep) 289 290 def test_shell_command_length(self): 291 # Devices that have shell_v2 should be able to handle long commands. 292 if self.device.has_shell_protocol(): 293 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 294 self.assertEqual(rc, 0) 295 self.assertTrue(out == ('x' * 16384 + '\n')) 296 297 def test_shell_nocheck_failure(self): 298 rc, out, _ = self.device.shell_nocheck(['false']) 299 self.assertNotEqual(rc, 0) 300 self.assertEqual(out, '') 301 302 def test_shell_nocheck_output_not_stripped(self): 303 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 304 self.assertEqual(rc, 0) 305 self.assertEqual(out, 'foo' + self.device.linesep) 306 307 def test_can_distinguish_tricky_results(self): 308 # If result checking on ADB shell is naively implemented as 309 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 310 # output from the result for a cmd of `echo -n 1`. 311 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 312 self.assertEqual(rc, 0) 313 self.assertEqual(out, '1') 314 315 def test_line_endings(self): 316 """Ensure that line ending translation is not happening in the pty. 317 318 Bug: http://b/19735063 319 """ 320 output = self.device.shell(['uname'])[0] 321 self.assertEqual(output, 'Linux' + self.device.linesep) 322 323 def test_pty_logic(self): 324 """Tests that a PTY is allocated when it should be. 325 326 PTY allocation behavior should match ssh. 327 """ 328 def check_pty(args): 329 """Checks adb shell PTY allocation. 330 331 Tests |args| for terminal and non-terminal stdin. 332 333 Args: 334 args: -Tt args in a list (e.g. ['-t', '-t']). 335 336 Returns: 337 A tuple (<terminal>, <non-terminal>). True indicates 338 the corresponding shell allocated a remote PTY. 339 """ 340 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 341 342 terminal = subprocess.Popen( 343 test_cmd, stdin=None, 344 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 345 terminal.communicate() 346 347 non_terminal = subprocess.Popen( 348 test_cmd, stdin=subprocess.PIPE, 349 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 350 non_terminal.communicate() 351 352 return (terminal.returncode == 0, non_terminal.returncode == 0) 353 354 # -T: never allocate PTY. 355 self.assertEqual((False, False), check_pty(['-T'])) 356 357 # These tests require a new device. 358 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 359 # No args: PTY only if stdin is a terminal and shell is interactive, 360 # which is difficult to reliably test from a script. 361 self.assertEqual((False, False), check_pty([])) 362 363 # -t: PTY if stdin is a terminal. 364 self.assertEqual((True, False), check_pty(['-t'])) 365 366 # -t -t: always allocate PTY. 367 self.assertEqual((True, True), check_pty(['-t', '-t'])) 368 369 # -tt: always allocate PTY, POSIX style (http://b/32216152). 370 self.assertEqual((True, True), check_pty(['-tt'])) 371 372 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 373 # we follow the man page instead. 374 self.assertEqual((True, True), check_pty(['-ttt'])) 375 376 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 377 self.assertEqual((True, True), check_pty(['-ttx'])) 378 379 # -Ttt: -tt cancels out -T. 380 self.assertEqual((True, True), check_pty(['-Ttt'])) 381 382 # -ttT: -T cancels out -tt. 383 self.assertEqual((False, False), check_pty(['-ttT'])) 384 385 def test_shell_protocol(self): 386 """Tests the shell protocol on the device. 387 388 If the device supports shell protocol, this gives us the ability 389 to separate stdout/stderr and return the exit code directly. 390 391 Bug: http://b/19734861 392 """ 393 if not self.device.has_shell_protocol(): 394 raise unittest.SkipTest('shell protocol unsupported on this device') 395 396 # Shell protocol should be used by default. 397 result = self.device.shell_nocheck( 398 shlex.split('echo foo; echo bar >&2; exit 17')) 399 self.assertEqual(17, result[0]) 400 self.assertEqual('foo' + self.device.linesep, result[1]) 401 self.assertEqual('bar' + self.device.linesep, result[2]) 402 403 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 404 405 # -x flag should disable shell protocol. 406 result = self.device.shell_nocheck( 407 shlex.split('-x echo foo; echo bar >&2; exit 17')) 408 self.assertEqual(0, result[0]) 409 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 410 self.assertEqual('', result[2]) 411 412 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 413 414 def test_non_interactive_sigint(self): 415 """Tests that SIGINT in a non-interactive shell kills the process. 416 417 This requires the shell protocol in order to detect the broken 418 pipe; raw data transfer mode will only see the break once the 419 subprocess tries to read or write. 420 421 Bug: http://b/23825725 422 """ 423 if not self.device.has_shell_protocol(): 424 raise unittest.SkipTest('shell protocol unsupported on this device') 425 426 # Start a long-running process. 427 sleep_proc = subprocess.Popen( 428 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 429 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 430 stderr=subprocess.STDOUT) 431 remote_pid = sleep_proc.stdout.readline().strip() 432 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 433 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 434 435 # Verify that the process is running, send signal, verify it stopped. 436 self.device.shell(proc_query) 437 os.kill(sleep_proc.pid, signal.SIGINT) 438 sleep_proc.communicate() 439 440 # It can take some time for the process to receive the signal and die. 441 end_time = time.time() + 3 442 while self.device.shell_nocheck(proc_query)[0] != 1: 443 self.assertFalse(time.time() > end_time, 444 'subprocess failed to terminate in time') 445 446 def test_non_interactive_stdin(self): 447 """Tests that non-interactive shells send stdin.""" 448 if not self.device.has_shell_protocol(): 449 raise unittest.SkipTest('non-interactive stdin unsupported ' 450 'on this device') 451 452 # Test both small and large inputs. 453 small_input = 'foo' 454 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 455 string.digits)) 456 457 for input in (small_input, large_input): 458 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 459 stdin=subprocess.PIPE, 460 stdout=subprocess.PIPE, 461 stderr=subprocess.PIPE) 462 stdout, stderr = proc.communicate(input) 463 self.assertEqual(input.splitlines(), stdout.splitlines()) 464 self.assertEqual('', stderr) 465 466 def test_sighup(self): 467 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 468 log_path = "/data/local/tmp/adb_signal_test.log" 469 470 # Clear the output file. 471 self.device.shell_nocheck(["echo", ">", log_path]) 472 473 script = """ 474 trap "echo SIGINT > {path}; exit 0" SIGINT 475 trap "echo SIGHUP > {path}; exit 0" SIGHUP 476 echo Waiting 477 read 478 """.format(path=log_path) 479 480 script = ";".join([x.strip() for x in script.strip().splitlines()]) 481 482 process = self.device.shell_popen([script], kill_atexit=False, 483 stdin=subprocess.PIPE, 484 stdout=subprocess.PIPE) 485 486 self.assertEqual("Waiting\n", process.stdout.readline()) 487 process.send_signal(signal.SIGINT) 488 process.wait() 489 490 # Waiting for the local adb to finish is insufficient, since it hangs 491 # up immediately. 492 time.sleep(1) 493 494 stdout, _ = self.device.shell(["cat", log_path]) 495 self.assertEqual(stdout.strip(), "SIGHUP") 496 497 def test_exit_stress(self): 498 """Hammer `adb shell exit 42` with multiple threads.""" 499 thread_count = 48 500 result = dict() 501 def hammer(thread_idx, thread_count, result): 502 success = True 503 for i in range(thread_idx, 240, thread_count): 504 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 505 if ret != i % 256: 506 success = False 507 break 508 result[thread_idx] = success 509 510 threads = [] 511 for i in range(thread_count): 512 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 513 thread.start() 514 threads.append(thread) 515 for thread in threads: 516 thread.join() 517 for i, success in result.iteritems(): 518 self.assertTrue(success) 519 520 521class ArgumentEscapingTest(DeviceTest): 522 def test_shell_escaping(self): 523 """Make sure that argument escaping is somewhat sane.""" 524 525 # http://b/19734868 526 # Note that this actually matches ssh(1)'s behavior --- it's 527 # converted to `sh -c echo hello; echo world` which sh interprets 528 # as `sh -c echo` (with an argument to that shell of "hello"), 529 # and then `echo world` back in the first shell. 530 result = self.device.shell( 531 shlex.split("sh -c 'echo hello; echo world'"))[0] 532 result = result.splitlines() 533 self.assertEqual(['', 'world'], result) 534 # If you really wanted "hello" and "world", here's what you'd do: 535 result = self.device.shell( 536 shlex.split(r'echo hello\;echo world'))[0].splitlines() 537 self.assertEqual(['hello', 'world'], result) 538 539 # http://b/15479704 540 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 541 self.assertEqual('t', result) 542 result = self.device.shell( 543 shlex.split("sh -c 'true && echo t'"))[0].strip() 544 self.assertEqual('t', result) 545 546 # http://b/20564385 547 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 548 self.assertEqual('t', result) 549 result = self.device.shell( 550 shlex.split(r'echo -n 123\;uname'))[0].strip() 551 self.assertEqual('123Linux', result) 552 553 def test_install_argument_escaping(self): 554 """Make sure that install argument escaping works.""" 555 # http://b/20323053, http://b/3090932. 556 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 557 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 558 delete=False) 559 tf.close() 560 561 # Installing bogus .apks fails if the device supports exit codes. 562 try: 563 output = self.device.install(tf.name) 564 except subprocess.CalledProcessError as e: 565 output = e.output 566 567 self.assertIn(file_suffix, output) 568 os.remove(tf.name) 569 570 571class RootUnrootTest(DeviceTest): 572 def _test_root(self): 573 message = self.device.root() 574 if 'adbd cannot run as root in production builds' in message: 575 return 576 self.device.wait() 577 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 578 579 def _test_unroot(self): 580 self.device.unroot() 581 self.device.wait() 582 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 583 584 def test_root_unroot(self): 585 """Make sure that adb root and adb unroot work, using id(1).""" 586 if self.device.get_prop('ro.debuggable') != '1': 587 raise unittest.SkipTest('requires rootable build') 588 589 original_user = self.device.shell(['id', '-un'])[0].strip() 590 try: 591 if original_user == 'root': 592 self._test_unroot() 593 self._test_root() 594 elif original_user == 'shell': 595 self._test_root() 596 self._test_unroot() 597 finally: 598 if original_user == 'root': 599 self.device.root() 600 else: 601 self.device.unroot() 602 self.device.wait() 603 604 605class TcpIpTest(DeviceTest): 606 def test_tcpip_failure_raises(self): 607 """adb tcpip requires a port. 608 609 Bug: http://b/22636927 610 """ 611 self.assertRaises( 612 subprocess.CalledProcessError, self.device.tcpip, '') 613 self.assertRaises( 614 subprocess.CalledProcessError, self.device.tcpip, 'foo') 615 616 617class SystemPropertiesTest(DeviceTest): 618 def test_get_prop(self): 619 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 620 621 @requires_root 622 def test_set_prop(self): 623 prop_name = 'foo.bar' 624 self.device.shell(['setprop', prop_name, '""']) 625 626 self.device.set_prop(prop_name, 'qux') 627 self.assertEqual( 628 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 629 630 631def compute_md5(string): 632 hsh = hashlib.md5() 633 hsh.update(string) 634 return hsh.hexdigest() 635 636 637def get_md5_prog(device): 638 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 639 try: 640 device.shell(['md5sum', '/proc/uptime']) 641 return 'md5sum' 642 except adb.ShellError: 643 return 'md5' 644 645 646class HostFile(object): 647 def __init__(self, handle, checksum): 648 self.handle = handle 649 self.checksum = checksum 650 self.full_path = handle.name 651 self.base_name = os.path.basename(self.full_path) 652 653 654class DeviceFile(object): 655 def __init__(self, checksum, full_path): 656 self.checksum = checksum 657 self.full_path = full_path 658 self.base_name = posixpath.basename(self.full_path) 659 660 661def make_random_host_files(in_dir, num_files): 662 min_size = 1 * (1 << 10) 663 max_size = 16 * (1 << 10) 664 665 files = [] 666 for _ in xrange(num_files): 667 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 668 669 size = random.randrange(min_size, max_size, 1024) 670 rand_str = os.urandom(size) 671 file_handle.write(rand_str) 672 file_handle.flush() 673 file_handle.close() 674 675 md5 = compute_md5(rand_str) 676 files.append(HostFile(file_handle, md5)) 677 return files 678 679 680def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 681 min_size = 1 * (1 << 10) 682 max_size = 16 * (1 << 10) 683 684 files = [] 685 for file_num in xrange(num_files): 686 size = random.randrange(min_size, max_size, 1024) 687 688 base_name = prefix + str(file_num) 689 full_path = posixpath.join(in_dir, base_name) 690 691 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 692 'bs={}'.format(size), 'count=1']) 693 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 694 695 files.append(DeviceFile(dev_md5, full_path)) 696 return files 697 698 699class FileOperationsTest(DeviceTest): 700 SCRATCH_DIR = '/data/local/tmp' 701 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 702 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 703 704 def _verify_remote(self, checksum, remote_path): 705 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 706 remote_path])[0].split() 707 self.assertEqual(checksum, dev_md5) 708 709 def _verify_local(self, checksum, local_path): 710 with open(local_path, 'rb') as host_file: 711 host_md5 = compute_md5(host_file.read()) 712 self.assertEqual(host_md5, checksum) 713 714 def test_push(self): 715 """Push a randomly generated file to specified device.""" 716 kbytes = 512 717 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 718 rand_str = os.urandom(1024 * kbytes) 719 tmp.write(rand_str) 720 tmp.close() 721 722 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 723 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 724 725 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 726 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 727 728 os.remove(tmp.name) 729 730 def test_push_dir(self): 731 """Push a randomly generated directory of files to the device.""" 732 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 733 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 734 735 try: 736 host_dir = tempfile.mkdtemp() 737 738 # Make sure the temp directory isn't setuid, or else adb will complain. 739 os.chmod(host_dir, 0o700) 740 741 # Create 32 random files. 742 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 743 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 744 745 for temp_file in temp_files: 746 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 747 os.path.basename(host_dir), 748 temp_file.base_name) 749 self._verify_remote(temp_file.checksum, remote_path) 750 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 751 finally: 752 if host_dir is not None: 753 shutil.rmtree(host_dir) 754 755 def disabled_test_push_empty(self): 756 """Push an empty directory to the device.""" 757 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 758 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 759 760 try: 761 host_dir = tempfile.mkdtemp() 762 763 # Make sure the temp directory isn't setuid, or else adb will complain. 764 os.chmod(host_dir, 0o700) 765 766 # Create an empty directory. 767 empty_dir_path = os.path.join(host_dir, 'empty') 768 os.mkdir(empty_dir_path); 769 770 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 771 772 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 773 test_empty_cmd = ["[", "-d", remote_path, "]"] 774 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 775 776 self.assertEqual(rc, 0) 777 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 778 finally: 779 if host_dir is not None: 780 shutil.rmtree(host_dir) 781 782 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 783 def test_push_symlink(self): 784 """Push a symlink. 785 786 Bug: http://b/31491920 787 """ 788 try: 789 host_dir = tempfile.mkdtemp() 790 791 # Make sure the temp directory isn't setuid, or else adb will 792 # complain. 793 os.chmod(host_dir, 0o700) 794 795 with open(os.path.join(host_dir, 'foo'), 'w') as f: 796 f.write('foo') 797 798 symlink_path = os.path.join(host_dir, 'symlink') 799 os.symlink('foo', symlink_path) 800 801 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 802 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 803 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 804 rc, out, _ = self.device.shell_nocheck( 805 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 806 self.assertEqual(0, rc) 807 self.assertEqual(out.strip(), 'foo') 808 finally: 809 if host_dir is not None: 810 shutil.rmtree(host_dir) 811 812 def test_multiple_push(self): 813 """Push multiple files to the device in one adb push command. 814 815 Bug: http://b/25324823 816 """ 817 818 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 819 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 820 821 try: 822 host_dir = tempfile.mkdtemp() 823 824 # Create some random files and a subdirectory containing more files. 825 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 826 827 subdir = os.path.join(host_dir, 'subdir') 828 os.mkdir(subdir) 829 subdir_temp_files = make_random_host_files(in_dir=subdir, 830 num_files=4) 831 832 paths = map(lambda temp_file: temp_file.full_path, temp_files) 833 paths.append(subdir) 834 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 835 836 for temp_file in temp_files: 837 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 838 temp_file.base_name) 839 self._verify_remote(temp_file.checksum, remote_path) 840 841 for subdir_temp_file in subdir_temp_files: 842 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 843 # BROKEN: http://b/25394682 844 # 'subdir'; 845 temp_file.base_name) 846 self._verify_remote(temp_file.checksum, remote_path) 847 848 849 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 850 finally: 851 if host_dir is not None: 852 shutil.rmtree(host_dir) 853 854 @requires_non_root 855 def test_push_error_reporting(self): 856 """Make sure that errors that occur while pushing a file get reported 857 858 Bug: http://b/26816782 859 """ 860 with tempfile.NamedTemporaryFile() as tmp_file: 861 tmp_file.write('\0' * 1024 * 1024) 862 tmp_file.flush() 863 try: 864 self.device.push(local=tmp_file.name, remote='/system/') 865 self.fail('push should not have succeeded') 866 except subprocess.CalledProcessError as e: 867 output = e.output 868 869 self.assertTrue('Permission denied' in output or 870 'Read-only file system' in output) 871 872 @requires_non_root 873 def test_push_directory_creation(self): 874 """Regression test for directory creation. 875 876 Bug: http://b/110953234 877 """ 878 with tempfile.NamedTemporaryFile() as tmp_file: 879 tmp_file.write('\0' * 1024 * 1024) 880 tmp_file.flush() 881 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 882 self.device.shell(['rm', '-rf', remote_path]) 883 884 remote_path += '/filename' 885 self.device.push(local=tmp_file.name, remote=remote_path) 886 887 def _test_pull(self, remote_file, checksum): 888 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 889 tmp_write.close() 890 self.device.pull(remote=remote_file, local=tmp_write.name) 891 with open(tmp_write.name, 'rb') as tmp_read: 892 host_contents = tmp_read.read() 893 host_md5 = compute_md5(host_contents) 894 self.assertEqual(checksum, host_md5) 895 os.remove(tmp_write.name) 896 897 @requires_non_root 898 def test_pull_error_reporting(self): 899 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 900 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 901 902 try: 903 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 904 except subprocess.CalledProcessError as e: 905 output = e.output 906 907 self.assertIn('Permission denied', output) 908 909 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 910 911 def test_pull(self): 912 """Pull a randomly generated file from specified device.""" 913 kbytes = 512 914 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 915 cmd = ['dd', 'if=/dev/urandom', 916 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 917 'count={}'.format(kbytes)] 918 self.device.shell(cmd) 919 dev_md5, _ = self.device.shell( 920 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 921 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 922 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 923 924 def test_pull_dir(self): 925 """Pull a randomly generated directory of files from the device.""" 926 try: 927 host_dir = tempfile.mkdtemp() 928 929 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 930 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 931 932 # Populate device directory with random files. 933 temp_files = make_random_device_files( 934 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 935 936 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 937 938 for temp_file in temp_files: 939 host_path = os.path.join( 940 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 941 temp_file.base_name) 942 self._verify_local(temp_file.checksum, host_path) 943 944 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 945 finally: 946 if host_dir is not None: 947 shutil.rmtree(host_dir) 948 949 def test_pull_dir_symlink(self): 950 """Pull a directory into a symlink to a directory. 951 952 Bug: http://b/27362811 953 """ 954 if os.name != 'posix': 955 raise unittest.SkipTest('requires POSIX') 956 957 try: 958 host_dir = tempfile.mkdtemp() 959 real_dir = os.path.join(host_dir, 'dir') 960 symlink = os.path.join(host_dir, 'symlink') 961 os.mkdir(real_dir) 962 os.symlink(real_dir, symlink) 963 964 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 965 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 966 967 # Populate device directory with random files. 968 temp_files = make_random_device_files( 969 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 970 971 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 972 973 for temp_file in temp_files: 974 host_path = os.path.join( 975 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 976 temp_file.base_name) 977 self._verify_local(temp_file.checksum, host_path) 978 979 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 980 finally: 981 if host_dir is not None: 982 shutil.rmtree(host_dir) 983 984 def test_pull_dir_symlink_collision(self): 985 """Pull a directory into a colliding symlink to directory.""" 986 if os.name != 'posix': 987 raise unittest.SkipTest('requires POSIX') 988 989 try: 990 host_dir = tempfile.mkdtemp() 991 real_dir = os.path.join(host_dir, 'real') 992 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 993 symlink = os.path.join(host_dir, tmp_dirname) 994 os.mkdir(real_dir) 995 os.symlink(real_dir, symlink) 996 997 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 998 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 999 1000 # Populate device directory with random files. 1001 temp_files = make_random_device_files( 1002 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1003 1004 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1005 1006 for temp_file in temp_files: 1007 host_path = os.path.join(real_dir, temp_file.base_name) 1008 self._verify_local(temp_file.checksum, host_path) 1009 1010 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1011 finally: 1012 if host_dir is not None: 1013 shutil.rmtree(host_dir) 1014 1015 def test_pull_dir_nonexistent(self): 1016 """Pull a directory of files from the device to a nonexistent path.""" 1017 try: 1018 host_dir = tempfile.mkdtemp() 1019 dest_dir = os.path.join(host_dir, 'dest') 1020 1021 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1022 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1023 1024 # Populate device directory with random files. 1025 temp_files = make_random_device_files( 1026 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1027 1028 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1029 1030 for temp_file in temp_files: 1031 host_path = os.path.join(dest_dir, temp_file.base_name) 1032 self._verify_local(temp_file.checksum, host_path) 1033 1034 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1035 finally: 1036 if host_dir is not None: 1037 shutil.rmtree(host_dir) 1038 1039 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1040 def disabled_test_pull_symlink_dir(self): 1041 """Pull a symlink to a directory of symlinks to files.""" 1042 try: 1043 host_dir = tempfile.mkdtemp() 1044 1045 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1046 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1047 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1048 1049 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1050 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1051 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1052 1053 # Populate device directory with random files. 1054 temp_files = make_random_device_files( 1055 self.device, in_dir=remote_dir, num_files=32) 1056 1057 for temp_file in temp_files: 1058 self.device.shell( 1059 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1060 posixpath.join(remote_links, temp_file.base_name)]) 1061 1062 self.device.pull(remote=remote_symlink, local=host_dir) 1063 1064 for temp_file in temp_files: 1065 host_path = os.path.join( 1066 host_dir, 'symlink', temp_file.base_name) 1067 self._verify_local(temp_file.checksum, host_path) 1068 1069 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1070 finally: 1071 if host_dir is not None: 1072 shutil.rmtree(host_dir) 1073 1074 def test_pull_empty(self): 1075 """Pull a directory containing an empty directory from the device.""" 1076 try: 1077 host_dir = tempfile.mkdtemp() 1078 1079 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1080 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1081 self.device.shell(['mkdir', '-p', remote_empty_path]) 1082 1083 self.device.pull(remote=remote_empty_path, local=host_dir) 1084 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1085 finally: 1086 if host_dir is not None: 1087 shutil.rmtree(host_dir) 1088 1089 def test_multiple_pull(self): 1090 """Pull a randomly generated directory of files from the device.""" 1091 1092 try: 1093 host_dir = tempfile.mkdtemp() 1094 1095 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1096 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1097 self.device.shell(['mkdir', '-p', subdir]) 1098 1099 # Create some random files and a subdirectory containing more files. 1100 temp_files = make_random_device_files( 1101 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1102 1103 subdir_temp_files = make_random_device_files( 1104 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1105 1106 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1107 paths.append(subdir) 1108 self.device._simple_call(['pull'] + paths + [host_dir]) 1109 1110 for temp_file in temp_files: 1111 local_path = os.path.join(host_dir, temp_file.base_name) 1112 self._verify_local(temp_file.checksum, local_path) 1113 1114 for subdir_temp_file in subdir_temp_files: 1115 local_path = os.path.join(host_dir, 1116 'subdir', 1117 subdir_temp_file.base_name) 1118 self._verify_local(subdir_temp_file.checksum, local_path) 1119 1120 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1121 finally: 1122 if host_dir is not None: 1123 shutil.rmtree(host_dir) 1124 1125 def verify_sync(self, device, temp_files, device_dir): 1126 """Verifies that a list of temp files was synced to the device.""" 1127 # Confirm that every file on the device mirrors that on the host. 1128 for temp_file in temp_files: 1129 device_full_path = posixpath.join( 1130 device_dir, temp_file.base_name) 1131 dev_md5, _ = device.shell( 1132 [get_md5_prog(self.device), device_full_path])[0].split() 1133 self.assertEqual(temp_file.checksum, dev_md5) 1134 1135 def test_sync(self): 1136 """Sync a host directory to the data partition.""" 1137 1138 try: 1139 base_dir = tempfile.mkdtemp() 1140 1141 # Create mirror device directory hierarchy within base_dir. 1142 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1143 os.makedirs(full_dir_path) 1144 1145 # Create 32 random files within the host mirror. 1146 temp_files = make_random_host_files( 1147 in_dir=full_dir_path, num_files=32) 1148 1149 # Clean up any stale files on the device. 1150 device = adb.get_device() # pylint: disable=no-member 1151 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1152 1153 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1154 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1155 device.sync('data') 1156 if old_product_out is None: 1157 del os.environ['ANDROID_PRODUCT_OUT'] 1158 else: 1159 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1160 1161 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1162 1163 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1164 finally: 1165 if base_dir is not None: 1166 shutil.rmtree(base_dir) 1167 1168 def test_push_sync(self): 1169 """Sync a host directory to a specific path.""" 1170 1171 try: 1172 temp_dir = tempfile.mkdtemp() 1173 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1174 1175 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1176 1177 # Clean up any stale files on the device. 1178 device = adb.get_device() # pylint: disable=no-member 1179 device.shell(['rm', '-rf', device_dir]) 1180 1181 device.push(temp_dir, device_dir, sync=True) 1182 1183 self.verify_sync(device, temp_files, device_dir) 1184 1185 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1186 finally: 1187 if temp_dir is not None: 1188 shutil.rmtree(temp_dir) 1189 1190 def test_unicode_paths(self): 1191 """Ensure that we can support non-ASCII paths, even on Windows.""" 1192 name = u'로보카 폴리' 1193 1194 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1195 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1196 1197 ## push. 1198 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1199 tf.close() 1200 self.device.push(tf.name, remote_path) 1201 os.remove(tf.name) 1202 self.assertFalse(os.path.exists(tf.name)) 1203 1204 # Verify that the device ended up with the expected UTF-8 path 1205 output = self.device.shell( 1206 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1207 self.assertEqual(remote_path, output) 1208 1209 # pull. 1210 self.device.pull(remote_path, tf.name) 1211 self.assertTrue(os.path.exists(tf.name)) 1212 os.remove(tf.name) 1213 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1214 1215 1216class DeviceOfflineTest(DeviceTest): 1217 def _get_device_state(self, serialno): 1218 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1219 for line in output.split('\n'): 1220 m = re.match('(\S+)\s+(\S+)', line) 1221 if m and m.group(1) == serialno: 1222 return m.group(2) 1223 return None 1224 1225 def disabled_test_killed_when_pushing_a_large_file(self): 1226 """ 1227 While running adb push with a large file, kill adb server. 1228 Occasionally the device becomes offline. Because the device is still 1229 reading data without realizing that the adb server has been restarted. 1230 Test if we can bring the device online automatically now. 1231 http://b/32952319 1232 """ 1233 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1234 # 1. Push a large file 1235 file_path = 'tmp_large_file' 1236 try: 1237 fh = open(file_path, 'w') 1238 fh.write('\0' * (100 * 1024 * 1024)) 1239 fh.close() 1240 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1241 time.sleep(0.1) 1242 # 2. Kill the adb server 1243 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1244 subproc.terminate() 1245 finally: 1246 try: 1247 os.unlink(file_path) 1248 except: 1249 pass 1250 # 3. See if the device still exist. 1251 # Sleep to wait for the adb server exit. 1252 time.sleep(0.5) 1253 # 4. The device should be online 1254 self.assertEqual(self._get_device_state(serialno), 'device') 1255 1256 def disabled_test_killed_when_pulling_a_large_file(self): 1257 """ 1258 While running adb pull with a large file, kill adb server. 1259 Occasionally the device can't be connected. Because the device is trying to 1260 send a message larger than what is expected by the adb server. 1261 Test if we can bring the device online automatically now. 1262 """ 1263 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1264 file_path = 'tmp_large_file' 1265 try: 1266 # 1. Create a large file on device. 1267 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1268 'bs=1000000', 'count=100']) 1269 # 2. Pull the large file on host. 1270 subproc = subprocess.Popen(self.device.adb_cmd + 1271 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1272 time.sleep(0.1) 1273 # 3. Kill the adb server 1274 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1275 subproc.terminate() 1276 finally: 1277 try: 1278 os.unlink(file_path) 1279 except: 1280 pass 1281 # 4. See if the device still exist. 1282 # Sleep to wait for the adb server exit. 1283 time.sleep(0.5) 1284 self.assertEqual(self._get_device_state(serialno), 'device') 1285 1286 1287 def test_packet_size_regression(self): 1288 """Test for http://b/37783561 1289 1290 Receiving packets of a length divisible by 512 but not 1024 resulted in 1291 the adb client waiting indefinitely for more input. 1292 """ 1293 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1294 # Probe some surrounding values as well, for the hell of it. 1295 for base in [512] + range(1024, 1024 * 16, 1024): 1296 for offset in [-6, -5, -4]: 1297 length = base + offset 1298 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1299 'echo', 'foo'] 1300 rc, stdout, _ = self.device.shell_nocheck(cmd) 1301 1302 self.assertEqual(0, rc) 1303 1304 # Output should be '\0' * length, followed by "foo\n" 1305 self.assertEqual(length, len(stdout) - 4) 1306 self.assertEqual(stdout, "\0" * length + "foo\n") 1307 1308 def test_zero_packet(self): 1309 """Test for http://b/113070258 1310 1311 Make sure that we don't blow up when sending USB transfers that line up 1312 exactly with the USB packet size. 1313 """ 1314 1315 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1316 try: 1317 for size in [512, 1024]: 1318 def listener(): 1319 cmd = ["echo foo | nc -l -p 12345; echo done"] 1320 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1321 1322 thread = threading.Thread(target=listener) 1323 thread.start() 1324 1325 # Wait a bit to let the shell command start. 1326 time.sleep(0.25) 1327 1328 sock = socket.create_connection(("localhost", local_port)) 1329 with contextlib.closing(sock): 1330 bytesWritten = sock.send("a" * size) 1331 self.assertEqual(size, bytesWritten) 1332 readBytes = sock.recv(4096) 1333 self.assertEqual("foo\n", readBytes) 1334 1335 thread.join() 1336 finally: 1337 self.device.forward_remove("tcp:{}".format(local_port)) 1338 1339 1340class SocketTest(DeviceTest): 1341 def test_socket_flush(self): 1342 """Test that we handle socket closure properly. 1343 1344 If we're done writing to a socket, closing before the other end has 1345 closed will send a TCP_RST if we have incoming data queued up, which 1346 may result in data that we've written being discarded. 1347 1348 Bug: http://b/74616284 1349 """ 1350 s = socket.create_connection(("localhost", 5037)) 1351 1352 def adb_length_prefixed(string): 1353 encoded = string.encode("utf8") 1354 result = b"%04x%s" % (len(encoded), encoded) 1355 return result 1356 1357 if "ANDROID_SERIAL" in os.environ: 1358 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1359 else: 1360 transport_string = "host:transport-any" 1361 1362 s.sendall(adb_length_prefixed(transport_string)) 1363 response = s.recv(4) 1364 self.assertEquals(b"OKAY", response) 1365 1366 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1367 s.sendall(adb_length_prefixed(shell_string)) 1368 1369 response = s.recv(4) 1370 self.assertEquals(b"OKAY", response) 1371 1372 # Spawn a thread that dumps garbage into the socket until failure. 1373 def spam(): 1374 buf = b"\0" * 16384 1375 try: 1376 while True: 1377 s.sendall(buf) 1378 except Exception as ex: 1379 print(ex) 1380 1381 thread = threading.Thread(target=spam) 1382 thread.start() 1383 1384 time.sleep(1) 1385 1386 received = b"" 1387 while True: 1388 read = s.recv(512) 1389 if len(read) == 0: 1390 break 1391 received += read 1392 1393 self.assertEquals(1024 * 1024 + len("foo\n"), len(received)) 1394 thread.join() 1395 1396 1397if sys.platform == "win32": 1398 # From https://stackoverflow.com/a/38749458 1399 import os 1400 import contextlib 1401 import msvcrt 1402 import ctypes 1403 from ctypes import wintypes 1404 1405 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1406 1407 GENERIC_READ = 0x80000000 1408 GENERIC_WRITE = 0x40000000 1409 FILE_SHARE_READ = 1 1410 FILE_SHARE_WRITE = 2 1411 CONSOLE_TEXTMODE_BUFFER = 1 1412 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1413 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1414 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1415 1416 def _check_zero(result, func, args): 1417 if not result: 1418 raise ctypes.WinError(ctypes.get_last_error()) 1419 return args 1420 1421 def _check_invalid(result, func, args): 1422 if result == INVALID_HANDLE_VALUE: 1423 raise ctypes.WinError(ctypes.get_last_error()) 1424 return args 1425 1426 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1427 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1428 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1429 1430 class COORD(ctypes.Structure): 1431 _fields_ = (('X', wintypes.SHORT), 1432 ('Y', wintypes.SHORT)) 1433 1434 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1435 _fields_ = (('cbSize', wintypes.ULONG), 1436 ('dwSize', COORD), 1437 ('dwCursorPosition', COORD), 1438 ('wAttributes', wintypes.WORD), 1439 ('srWindow', wintypes.SMALL_RECT), 1440 ('dwMaximumWindowSize', COORD), 1441 ('wPopupAttributes', wintypes.WORD), 1442 ('bFullscreenSupported', wintypes.BOOL), 1443 ('ColorTable', wintypes.DWORD * 16)) 1444 def __init__(self, *args, **kwds): 1445 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1446 *args, **kwds) 1447 self.cbSize = ctypes.sizeof(self) 1448 1449 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1450 CONSOLE_SCREEN_BUFFER_INFOEX) 1451 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1452 1453 kernel32.GetStdHandle.errcheck = _check_invalid 1454 kernel32.GetStdHandle.restype = wintypes.HANDLE 1455 kernel32.GetStdHandle.argtypes = ( 1456 wintypes.DWORD,) # _In_ nStdHandle 1457 1458 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1459 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1460 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1461 wintypes.DWORD, # _In_ dwDesiredAccess 1462 wintypes.DWORD, # _In_ dwShareMode 1463 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1464 wintypes.DWORD, # _In_ dwFlags 1465 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1466 1467 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1468 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1469 wintypes.HANDLE, # _In_ hConsoleOutput 1470 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1471 1472 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1473 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1474 wintypes.HANDLE, # _In_ hConsoleOutput 1475 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1476 1477 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1478 kernel32.SetConsoleWindowInfo.argtypes = ( 1479 wintypes.HANDLE, # _In_ hConsoleOutput 1480 wintypes.BOOL, # _In_ bAbsolute 1481 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1482 1483 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1484 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1485 wintypes.HANDLE, # _In_ hConsoleOutput 1486 wintypes.WCHAR, # _In_ cCharacter 1487 wintypes.DWORD, # _In_ nLength 1488 COORD, # _In_ dwWriteCoord 1489 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1490 1491 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1492 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1493 wintypes.HANDLE, # _In_ hConsoleOutput 1494 wintypes.LPWSTR, # _Out_ lpCharacter 1495 wintypes.DWORD, # _In_ nLength 1496 COORD, # _In_ dwReadCoord 1497 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1498 1499 @contextlib.contextmanager 1500 def allocate_console(): 1501 allocated = kernel32.AllocConsole() 1502 try: 1503 yield allocated 1504 finally: 1505 if allocated: 1506 kernel32.FreeConsole() 1507 1508 @contextlib.contextmanager 1509 def console_screen(ncols=None, nrows=None): 1510 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1511 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1512 nwritten = (wintypes.DWORD * 1)() 1513 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1514 kernel32.GetConsoleScreenBufferInfoEx( 1515 hStdOut, ctypes.byref(info)) 1516 if ncols is None: 1517 ncols = info.dwSize.X 1518 if nrows is None: 1519 nrows = info.dwSize.Y 1520 elif nrows > 9999: 1521 raise ValueError('nrows must be 9999 or less') 1522 fd_screen = None 1523 hScreen = kernel32.CreateConsoleScreenBuffer( 1524 GENERIC_READ | GENERIC_WRITE, 1525 FILE_SHARE_READ | FILE_SHARE_WRITE, 1526 None, CONSOLE_TEXTMODE_BUFFER, None) 1527 try: 1528 fd_screen = msvcrt.open_osfhandle( 1529 hScreen, os.O_RDWR | os.O_BINARY) 1530 kernel32.GetConsoleScreenBufferInfoEx( 1531 hScreen, ctypes.byref(new_info)) 1532 new_info.dwSize = COORD(ncols, nrows) 1533 new_info.srWindow = wintypes.SMALL_RECT( 1534 Left=0, Top=0, Right=(ncols - 1), 1535 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1536 kernel32.SetConsoleScreenBufferInfoEx( 1537 hScreen, ctypes.byref(new_info)) 1538 kernel32.SetConsoleWindowInfo(hScreen, True, 1539 ctypes.byref(new_info.srWindow)) 1540 kernel32.FillConsoleOutputCharacterW( 1541 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1542 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1543 try: 1544 yield fd_screen 1545 finally: 1546 kernel32.SetConsoleScreenBufferInfoEx( 1547 hStdOut, ctypes.byref(info)) 1548 kernel32.SetConsoleWindowInfo(hStdOut, True, 1549 ctypes.byref(info.srWindow)) 1550 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1551 finally: 1552 if fd_screen is not None: 1553 os.close(fd_screen) 1554 else: 1555 kernel32.CloseHandle(hScreen) 1556 1557 def read_screen(fd): 1558 hScreen = msvcrt.get_osfhandle(fd) 1559 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1560 kernel32.GetConsoleScreenBufferInfoEx( 1561 hScreen, ctypes.byref(csbi)) 1562 ncols = csbi.dwSize.X 1563 pos = csbi.dwCursorPosition 1564 length = ncols * pos.Y + pos.X + 1 1565 buf = (ctypes.c_wchar * length)() 1566 n = (wintypes.DWORD * 1)() 1567 kernel32.ReadConsoleOutputCharacterW( 1568 hScreen, buf, length, COORD(0,0), n) 1569 lines = [buf[i:i+ncols].rstrip(u'\0') 1570 for i in range(0, n[0], ncols)] 1571 return u'\n'.join(lines) 1572 1573@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1574class WindowsConsoleTest(DeviceTest): 1575 def test_unicode_output(self): 1576 """Test Unicode command line parameters and Unicode console window output. 1577 1578 Bug: https://issuetracker.google.com/issues/111972753 1579 """ 1580 # If we don't have a console window, allocate one. This isn't necessary if we're already 1581 # being run from a console window, which is typical. 1582 with allocate_console() as allocated_console: 1583 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1584 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1585 # likely unnecessary given the typical console window size. 1586 with console_screen(nrows=1000) as screen: 1587 unicode_string = u'로보카 폴리' 1588 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1589 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1590 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1591 process.wait() 1592 # Read what was written by adb to the temporary console buffer. 1593 console_output = read_screen(screen) 1594 self.assertEqual(unicode_string, console_output) 1595 1596 1597def main(): 1598 random.seed(0) 1599 if len(adb.get_devices()) > 0: 1600 suite = unittest.TestLoader().loadTestsFromName(__name__) 1601 unittest.TextTestRunner(verbosity=3).run(suite) 1602 else: 1603 print('Test suite must be run with attached devices') 1604 1605 1606if __name__ == '__main__': 1607 main() 1608