1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import io 23import os 24import posixpath 25import random 26import re 27import shlex 28import shutil 29import signal 30import socket 31import string 32import subprocess 33import sys 34import tempfile 35import threading 36import time 37import unittest 38 39import adb_host_pb2 as adb_host_proto 40 41from datetime import datetime 42 43import adb 44 45def requires_non_root(func): 46 def wrapper(self, *args): 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if was_root: 49 self.device.unroot() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if was_root: 56 self.device.root() 57 self.device.wait() 58 59 return wrapper 60 61 62class DeviceTest(unittest.TestCase): 63 def setUp(self) -> None: 64 self.device = adb.get_device() 65 66 67class AbbTest(DeviceTest): 68 def test_smoke(self): 69 abb = subprocess.run(['adb', 'abb'], capture_output=True) 70 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 71 72 # abb squashes all failures to 1. 73 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 74 self.assertEqual(abb.stdout, cmd.stdout) 75 self.assertEqual(abb.stderr, cmd.stderr) 76 77class ForwardReverseTest(DeviceTest): 78 def _test_no_rebind(self, description, direction_list, direction, 79 direction_no_rebind, direction_remove_all): 80 msg = direction_list() 81 self.assertEqual('', msg.strip(), 82 description + ' list must be empty to run this test.') 83 84 # Use --no-rebind with no existing binding 85 direction_no_rebind('tcp:5566', 'tcp:6655') 86 msg = direction_list() 87 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 88 89 # Use --no-rebind with existing binding 90 with self.assertRaises(subprocess.CalledProcessError): 91 direction_no_rebind('tcp:5566', 'tcp:6677') 92 msg = direction_list() 93 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 95 96 # Use the absence of --no-rebind with existing binding 97 direction('tcp:5566', 'tcp:6677') 98 msg = direction_list() 99 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 100 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 101 102 direction_remove_all() 103 msg = direction_list() 104 self.assertEqual('', msg.strip()) 105 106 def test_forward_no_rebind(self): 107 self._test_no_rebind('forward', self.device.forward_list, 108 self.device.forward, self.device.forward_no_rebind, 109 self.device.forward_remove_all) 110 111 def test_reverse_no_rebind(self): 112 self._test_no_rebind('reverse', self.device.reverse_list, 113 self.device.reverse, self.device.reverse_no_rebind, 114 self.device.reverse_remove_all) 115 116 def test_forward(self): 117 msg = self.device.forward_list() 118 self.assertEqual('', msg.strip(), 119 'Forwarding list must be empty to run this test.') 120 self.device.forward('tcp:5566', 'tcp:6655') 121 msg = self.device.forward_list() 122 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 123 self.device.forward('tcp:7788', 'tcp:8877') 124 msg = self.device.forward_list() 125 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 126 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 127 self.device.forward_remove('tcp:5566') 128 msg = self.device.forward_list() 129 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 130 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 131 self.device.forward_remove_all() 132 msg = self.device.forward_list() 133 self.assertEqual('', msg.strip()) 134 135 def test_forward_old_protocol(self): 136 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 137 138 msg = self.device.forward_list() 139 self.assertEqual('', msg.strip(), 140 'Forwarding list must be empty to run this test.') 141 142 with socket.create_connection(("localhost", 5037)) as s: 143 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 144 cmd = b"%04x%s" % (len(service), service) 145 s.sendall(cmd) 146 147 msg = self.device.forward_list() 148 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 149 150 self.device.forward_remove_all() 151 msg = self.device.forward_list() 152 self.assertEqual('', msg.strip()) 153 154 def test_forward_tcp_port_0(self): 155 self.assertEqual('', self.device.forward_list().strip(), 156 'Forwarding list must be empty to run this test.') 157 158 try: 159 # If resolving TCP port 0 is supported, `adb forward` will print 160 # the actual port number. 161 port = self.device.forward('tcp:0', 'tcp:8888').strip() 162 if not port: 163 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 164 165 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 166 self.device.forward_list())) 167 finally: 168 self.device.forward_remove_all() 169 170 def test_reverse(self): 171 msg = self.device.reverse_list() 172 self.assertEqual('', msg.strip(), 173 'Reverse forwarding list must be empty to run this test.') 174 self.device.reverse('tcp:5566', 'tcp:6655') 175 msg = self.device.reverse_list() 176 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 177 self.device.reverse('tcp:7788', 'tcp:8877') 178 msg = self.device.reverse_list() 179 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 180 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 181 self.device.reverse_remove('tcp:5566') 182 msg = self.device.reverse_list() 183 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 184 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 185 self.device.reverse_remove_all() 186 msg = self.device.reverse_list() 187 self.assertEqual('', msg.strip()) 188 189 def test_reverse_tcp_port_0(self): 190 self.assertEqual('', self.device.reverse_list().strip(), 191 'Reverse list must be empty to run this test.') 192 193 try: 194 # If resolving TCP port 0 is supported, `adb reverse` will print 195 # the actual port number. 196 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 197 if not port: 198 raise unittest.SkipTest('Reversing tcp:0 is not available.') 199 200 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 201 self.device.reverse_list())) 202 finally: 203 self.device.reverse_remove_all() 204 205 def test_forward_reverse_echo(self): 206 """Send data through adb forward and read it back via adb reverse""" 207 forward_port = 12345 208 reverse_port = forward_port + 1 209 forward_spec = 'tcp:' + str(forward_port) 210 reverse_spec = 'tcp:' + str(reverse_port) 211 forward_setup = False 212 reverse_setup = False 213 214 try: 215 # listen on localhost:forward_port, connect to remote:forward_port 216 self.device.forward(forward_spec, forward_spec) 217 forward_setup = True 218 # listen on remote:forward_port, connect to localhost:reverse_port 219 self.device.reverse(forward_spec, reverse_spec) 220 reverse_setup = True 221 222 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 223 with contextlib.closing(listener): 224 # Use SO_REUSEADDR so that subsequent runs of the test can grab 225 # the port even if it is in TIME_WAIT. 226 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 227 228 # Listen on localhost:reverse_port before connecting to 229 # localhost:forward_port because that will cause adb to connect 230 # back to localhost:reverse_port. 231 listener.bind(('127.0.0.1', reverse_port)) 232 listener.listen(4) 233 234 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 235 with contextlib.closing(client): 236 # Connect to the listener. 237 client.connect(('127.0.0.1', forward_port)) 238 239 # Accept the client connection. 240 accepted_connection, addr = listener.accept() 241 with contextlib.closing(accepted_connection) as server: 242 data = b'hello' 243 244 # Send data into the port setup by adb forward. 245 client.sendall(data) 246 # Explicitly close() so that server gets EOF. 247 client.close() 248 249 # Verify that the data came back via adb reverse. 250 self.assertEqual(data, server.makefile().read().encode("utf8")) 251 finally: 252 if reverse_setup: 253 self.device.reverse_remove(forward_spec) 254 if forward_setup: 255 self.device.forward_remove(forward_spec) 256 257 258class ShellTest(DeviceTest): 259 def _interactive_shell(self, shell_args, input): 260 """Runs an interactive adb shell. 261 262 Args: 263 shell_args: List of string arguments to `adb shell`. 264 input: bytes input to send to the interactive shell. 265 266 Returns: 267 The remote exit code. 268 269 Raises: 270 unittest.SkipTest: The device doesn't support exit codes. 271 """ 272 if not self.device.has_shell_protocol(): 273 raise unittest.SkipTest('exit codes are unavailable on this device') 274 275 proc = subprocess.Popen( 276 self.device.adb_cmd + ['shell'] + shell_args, 277 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 278 stderr=subprocess.PIPE) 279 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 280 # to explicitly add an exit command to close the session from the device 281 # side, plus the necessary newline to complete the interactive command. 282 proc.communicate(input + b'; exit\n') 283 return proc.returncode 284 285 def test_cat(self): 286 """Check that we can at least cat a file.""" 287 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 288 elements = out.split() 289 self.assertEqual(len(elements), 2) 290 291 uptime, idle = elements 292 self.assertGreater(float(uptime), 0.0) 293 self.assertGreater(float(idle), 0.0) 294 295 def test_throws_on_failure(self): 296 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 297 298 def test_output_not_stripped(self): 299 out = self.device.shell(['echo', 'foo'])[0] 300 self.assertEqual(out, 'foo' + self.device.linesep) 301 302 def test_shell_command_length(self): 303 # Devices that have shell_v2 should be able to handle long commands. 304 if self.device.has_shell_protocol(): 305 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 306 self.assertEqual(rc, 0) 307 self.assertTrue(out == ('x' * 16384 + '\n')) 308 309 def test_shell_nocheck_failure(self): 310 rc, out, _ = self.device.shell_nocheck(['false']) 311 self.assertNotEqual(rc, 0) 312 self.assertEqual(out, '') 313 314 def test_shell_nocheck_output_not_stripped(self): 315 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 316 self.assertEqual(rc, 0) 317 self.assertEqual(out, 'foo' + self.device.linesep) 318 319 def test_can_distinguish_tricky_results(self): 320 # If result checking on ADB shell is naively implemented as 321 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 322 # output from the result for a cmd of `echo -n 1`. 323 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 324 self.assertEqual(rc, 0) 325 self.assertEqual(out, '1') 326 327 def test_line_endings(self): 328 """Ensure that line ending translation is not happening in the pty. 329 330 Bug: http://b/19735063 331 """ 332 output = self.device.shell(['uname'])[0] 333 self.assertEqual(output, 'Linux' + self.device.linesep) 334 335 def test_pty_logic(self): 336 """Tests that a PTY is allocated when it should be. 337 338 PTY allocation behavior should match ssh. 339 """ 340 def check_pty(args): 341 """Checks adb shell PTY allocation. 342 343 Tests |args| for terminal and non-terminal stdin. 344 345 Args: 346 args: -Tt args in a list (e.g. ['-t', '-t']). 347 348 Returns: 349 A tuple (<terminal>, <non-terminal>). True indicates 350 the corresponding shell allocated a remote PTY. 351 """ 352 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 353 354 terminal = subprocess.Popen( 355 test_cmd, stdin=None, 356 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 357 terminal.communicate() 358 359 non_terminal = subprocess.Popen( 360 test_cmd, stdin=subprocess.PIPE, 361 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 362 non_terminal.communicate() 363 364 return (terminal.returncode == 0, non_terminal.returncode == 0) 365 366 # -T: never allocate PTY. 367 self.assertEqual((False, False), check_pty(['-T'])) 368 369 # These tests require a new device. 370 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 371 # No args: PTY only if stdin is a terminal and shell is interactive, 372 # which is difficult to reliably test from a script. 373 self.assertEqual((False, False), check_pty([])) 374 375 # -t: PTY if stdin is a terminal. 376 self.assertEqual((True, False), check_pty(['-t'])) 377 378 # -t -t: always allocate PTY. 379 self.assertEqual((True, True), check_pty(['-t', '-t'])) 380 381 # -tt: always allocate PTY, POSIX style (http://b/32216152). 382 self.assertEqual((True, True), check_pty(['-tt'])) 383 384 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 385 # we follow the man page instead. 386 self.assertEqual((True, True), check_pty(['-ttt'])) 387 388 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 389 self.assertEqual((True, True), check_pty(['-ttx'])) 390 391 # -Ttt: -tt cancels out -T. 392 self.assertEqual((True, True), check_pty(['-Ttt'])) 393 394 # -ttT: -T cancels out -tt. 395 self.assertEqual((False, False), check_pty(['-ttT'])) 396 397 def test_shell_protocol(self): 398 """Tests the shell protocol on the device. 399 400 If the device supports shell protocol, this gives us the ability 401 to separate stdout/stderr and return the exit code directly. 402 403 Bug: http://b/19734861 404 """ 405 if not self.device.has_shell_protocol(): 406 raise unittest.SkipTest('shell protocol unsupported on this device') 407 408 # Shell protocol should be used by default. 409 result = self.device.shell_nocheck( 410 shlex.split('echo foo; echo bar >&2; exit 17')) 411 self.assertEqual(17, result[0]) 412 self.assertEqual('foo' + self.device.linesep, result[1]) 413 self.assertEqual('bar' + self.device.linesep, result[2]) 414 415 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 416 417 # -x flag should disable shell protocol. 418 result = self.device.shell_nocheck( 419 shlex.split('-x echo foo; echo bar >&2; exit 17')) 420 self.assertEqual(0, result[0]) 421 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 422 self.assertEqual('', result[2]) 423 424 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 425 426 def test_non_interactive_sigint(self): 427 """Tests that SIGINT in a non-interactive shell kills the process. 428 429 This requires the shell protocol in order to detect the broken 430 pipe; raw data transfer mode will only see the break once the 431 subprocess tries to read or write. 432 433 Bug: http://b/23825725 434 """ 435 if not self.device.has_shell_protocol(): 436 raise unittest.SkipTest('shell protocol unsupported on this device') 437 438 # Start a long-running process. 439 sleep_proc = subprocess.Popen( 440 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 441 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 442 stderr=subprocess.STDOUT) 443 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 444 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 445 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 446 447 # Verify that the process is running, send signal, verify it stopped. 448 self.device.shell(proc_query) 449 os.kill(sleep_proc.pid, signal.SIGINT) 450 sleep_proc.communicate() 451 452 # It can take some time for the process to receive the signal and die. 453 end_time = time.time() + 3 454 while self.device.shell_nocheck(proc_query)[0] != 1: 455 self.assertFalse(time.time() > end_time, 456 'subprocess failed to terminate in time') 457 458 def test_non_interactive_stdin(self): 459 """Tests that non-interactive shells send stdin.""" 460 if not self.device.has_shell_protocol(): 461 raise unittest.SkipTest('non-interactive stdin unsupported ' 462 'on this device') 463 464 # Test both small and large inputs. 465 small_input = b'foo' 466 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 467 large_input = b'\n'.join(characters) 468 469 470 for input in (small_input, large_input): 471 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 472 stdin=subprocess.PIPE, 473 stdout=subprocess.PIPE, 474 stderr=subprocess.PIPE) 475 stdout, stderr = proc.communicate(input) 476 self.assertEqual(input.splitlines(), stdout.splitlines()) 477 self.assertEqual(b'', stderr) 478 479 def test_sighup(self): 480 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 481 log_path = "/data/local/tmp/adb_signal_test.log" 482 483 # Clear the output file. 484 self.device.shell_nocheck(["echo", ">", log_path]) 485 486 script = """ 487 trap "echo SIGINT > {path}; exit 0" SIGINT 488 trap "echo SIGHUP > {path}; exit 0" SIGHUP 489 echo Waiting 490 read 491 """.format(path=log_path) 492 493 script = ";".join([x.strip() for x in script.strip().splitlines()]) 494 495 with self.device.shell_popen([script], kill_atexit=False, 496 stdin=subprocess.PIPE, 497 stdout=subprocess.PIPE) as process: 498 499 self.assertEqual(b"Waiting\n", process.stdout.readline()) 500 process.send_signal(signal.SIGINT) 501 process.wait() 502 503 # Waiting for the local adb to finish is insufficient, since it hangs 504 # up immediately. 505 time.sleep(1) 506 507 stdout, _ = self.device.shell(["cat", log_path]) 508 self.assertEqual(stdout.strip(), "SIGHUP") 509 510 # Temporarily disabled because it seems to cause later instability. 511 # http://b/228114748 512 def disabled_test_exit_stress(self): 513 """Hammer `adb shell exit 42` with multiple threads.""" 514 thread_count = 48 515 result = dict() 516 def hammer(thread_idx, thread_count, result): 517 success = True 518 for i in range(thread_idx, 240, thread_count): 519 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 520 if ret != i % 256: 521 success = False 522 break 523 result[thread_idx] = success 524 525 threads = [] 526 for i in range(thread_count): 527 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 528 thread.start() 529 threads.append(thread) 530 for thread in threads: 531 thread.join() 532 for i, success in result.items(): 533 self.assertTrue(success) 534 535 def disabled_test_parallel(self): 536 """Spawn a bunch of `adb shell` instances in parallel. 537 538 This was broken historically due to the use of select, which only works 539 for fds that are numerically less than 1024. 540 541 Bug: http://b/141955761""" 542 543 n_procs = 2048 544 procs = dict() 545 for i in range(0, n_procs): 546 procs[i] = subprocess.Popen( 547 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 548 stdin=subprocess.PIPE, 549 stdout=subprocess.PIPE 550 ) 551 552 for i in range(0, n_procs): 553 procs[i].stdin.write("%d\n" % i) 554 555 for i in range(0, n_procs): 556 response = procs[i].stdout.readline() 557 assert(response == "%d\n" % i) 558 559 for i in range(0, n_procs): 560 procs[i].stdin.write("%d\n" % (i % 256)) 561 562 for i in range(0, n_procs): 563 assert(procs[i].wait() == i % 256) 564 565 566class ArgumentEscapingTest(DeviceTest): 567 def test_shell_escaping(self): 568 """Make sure that argument escaping is somewhat sane.""" 569 570 # http://b/19734868 571 # Note that this actually matches ssh(1)'s behavior --- it's 572 # converted to `sh -c echo hello; echo world` which sh interprets 573 # as `sh -c echo` (with an argument to that shell of "hello"), 574 # and then `echo world` back in the first shell. 575 result = self.device.shell( 576 shlex.split("sh -c 'echo hello; echo world'"))[0] 577 result = result.splitlines() 578 self.assertEqual(['', 'world'], result) 579 # If you really wanted "hello" and "world", here's what you'd do: 580 result = self.device.shell( 581 shlex.split(r'echo hello\;echo world'))[0].splitlines() 582 self.assertEqual(['hello', 'world'], result) 583 584 # http://b/15479704 585 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 586 self.assertEqual('t', result) 587 result = self.device.shell( 588 shlex.split("sh -c 'true && echo t'"))[0].strip() 589 self.assertEqual('t', result) 590 591 # http://b/20564385 592 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 593 self.assertEqual('t', result) 594 result = self.device.shell( 595 shlex.split(r'echo -n 123\;uname'))[0].strip() 596 self.assertEqual('123Linux', result) 597 598 def test_install_argument_escaping(self): 599 """Make sure that install argument escaping works.""" 600 # http://b/20323053, http://b/3090932. 601 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 602 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 603 delete=False) 604 tf.close() 605 606 # Installing bogus .apks fails if the device supports exit codes. 607 try: 608 output = self.device.install(tf.name.decode("utf8")) 609 except subprocess.CalledProcessError as e: 610 output = e.output 611 612 self.assertIn(file_suffix, output) 613 os.remove(tf.name) 614 615 616class RootUnrootTest(DeviceTest): 617 def _test_root(self): 618 message = self.device.root() 619 if 'adbd cannot run as root in production builds' in message: 620 return 621 self.device.wait() 622 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 623 624 def _test_unroot(self): 625 self.device.unroot() 626 self.device.wait() 627 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 628 629 def test_root_unroot(self): 630 """Make sure that adb root and adb unroot work, using id(1).""" 631 if self.device.get_prop('ro.debuggable') != '1': 632 raise unittest.SkipTest('requires rootable build') 633 634 original_user = self.device.shell(['id', '-un'])[0].strip() 635 try: 636 if original_user == 'root': 637 self._test_unroot() 638 self._test_root() 639 elif original_user == 'shell': 640 self._test_root() 641 self._test_unroot() 642 finally: 643 if original_user == 'root': 644 self.device.root() 645 else: 646 self.device.unroot() 647 self.device.wait() 648 649 650class TcpIpTest(DeviceTest): 651 def test_tcpip_failure_raises(self): 652 """adb tcpip requires a port. 653 654 Bug: http://b/22636927 655 """ 656 self.assertRaises( 657 subprocess.CalledProcessError, self.device.tcpip, '') 658 self.assertRaises( 659 subprocess.CalledProcessError, self.device.tcpip, 'foo') 660 661 662class SystemPropertiesTest(DeviceTest): 663 def test_get_prop(self): 664 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 665 666 def test_set_prop(self): 667 # debug.* prop does not require root privileges 668 prop_name = 'debug.foo' 669 self.device.shell(['setprop', prop_name, '""']) 670 671 val = random.random() 672 self.device.set_prop(prop_name, str(val)) 673 self.assertEqual( 674 self.device.shell(['getprop', prop_name])[0].strip(), str(val)) 675 676 677def compute_md5(string): 678 hsh = hashlib.md5() 679 hsh.update(string) 680 return hsh.hexdigest() 681 682 683class HostFile(object): 684 def __init__(self, handle, checksum): 685 self.handle = handle 686 self.checksum = checksum 687 self.full_path = handle.name 688 self.base_name = os.path.basename(self.full_path) 689 690 691class DeviceFile(object): 692 def __init__(self, checksum, full_path): 693 self.checksum = checksum 694 self.full_path = full_path 695 self.base_name = posixpath.basename(self.full_path) 696 697 698def make_random_host_files(in_dir, num_files): 699 min_size = 1 * (1 << 10) 700 max_size = 16 * (1 << 10) 701 702 files = [] 703 for _ in range(num_files): 704 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 705 706 size = random.randrange(min_size, max_size, 1024) 707 rand_str = os.urandom(size) 708 file_handle.write(rand_str) 709 file_handle.flush() 710 file_handle.close() 711 712 md5 = compute_md5(rand_str) 713 files.append(HostFile(file_handle, md5)) 714 return files 715 716 717def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 718 min_size = 1 * (1 << 10) 719 max_size = 16 * (1 << 10) 720 721 files = [] 722 for file_num in range(num_files): 723 size = random.randrange(min_size, max_size, 1024) 724 725 base_name = prefix + str(file_num) 726 full_path = posixpath.join(in_dir, base_name) 727 728 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 729 'bs={}'.format(size), 'count=1']) 730 dev_md5, _ = device.shell(['md5sum', full_path])[0].split() 731 732 files.append(DeviceFile(dev_md5, full_path)) 733 return files 734 735 736class FileOperationsTest: 737 class Base(DeviceTest): 738 SCRATCH_DIR = '/data/local/tmp' 739 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 740 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 741 742 def setUp(self): 743 super().setUp() 744 self.previous_env = os.environ.get("ADB_COMPRESSION") 745 os.environ["ADB_COMPRESSION"] = self.compression 746 747 def tearDown(self): 748 if self.previous_env is None: 749 del os.environ["ADB_COMPRESSION"] 750 else: 751 os.environ["ADB_COMPRESSION"] = self.previous_env 752 753 def _verify_remote(self, checksum, remote_path): 754 dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split() 755 self.assertEqual(checksum, dev_md5) 756 757 def _verify_local(self, checksum, local_path): 758 with open(local_path, 'rb') as host_file: 759 host_md5 = compute_md5(host_file.read()) 760 self.assertEqual(host_md5, checksum) 761 762 def test_push(self): 763 """Push a randomly generated file to specified device.""" 764 kbytes = 512 765 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 766 rand_str = os.urandom(1024 * kbytes) 767 tmp.write(rand_str) 768 tmp.close() 769 770 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 771 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 772 773 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 774 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 775 776 os.remove(tmp.name) 777 778 def test_push_dir(self): 779 """Push a randomly generated directory of files to the device.""" 780 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 781 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 782 783 try: 784 host_dir = tempfile.mkdtemp() 785 786 # Make sure the temp directory isn't setuid, or else adb will complain. 787 os.chmod(host_dir, 0o700) 788 789 # Create 32 random files. 790 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 791 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 792 793 for temp_file in temp_files: 794 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 795 os.path.basename(host_dir), 796 temp_file.base_name) 797 self._verify_remote(temp_file.checksum, remote_path) 798 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 799 finally: 800 if host_dir is not None: 801 shutil.rmtree(host_dir) 802 803 def disabled_test_push_empty(self): 804 """Push an empty directory to the device.""" 805 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 806 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 807 808 try: 809 host_dir = tempfile.mkdtemp() 810 811 # Make sure the temp directory isn't setuid, or else adb will complain. 812 os.chmod(host_dir, 0o700) 813 814 # Create an empty directory. 815 empty_dir_path = os.path.join(host_dir, 'empty') 816 os.mkdir(empty_dir_path); 817 818 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 819 820 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 821 test_empty_cmd = ["[", "-d", remote_path, "]"] 822 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 823 824 self.assertEqual(rc, 0) 825 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 826 finally: 827 if host_dir is not None: 828 shutil.rmtree(host_dir) 829 830 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 831 def test_push_symlink(self): 832 """Push a symlink. 833 834 Bug: http://b/31491920 835 """ 836 try: 837 host_dir = tempfile.mkdtemp() 838 839 # Make sure the temp directory isn't setuid, or else adb will 840 # complain. 841 os.chmod(host_dir, 0o700) 842 843 with open(os.path.join(host_dir, 'foo'), 'w') as f: 844 f.write('foo') 845 846 symlink_path = os.path.join(host_dir, 'symlink') 847 os.symlink('foo', symlink_path) 848 849 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 850 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 851 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 852 rc, out, _ = self.device.shell_nocheck( 853 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 854 self.assertEqual(0, rc) 855 self.assertEqual(out.strip(), 'foo') 856 finally: 857 if host_dir is not None: 858 shutil.rmtree(host_dir) 859 860 def test_multiple_push(self): 861 """Push multiple files to the device in one adb push command. 862 863 Bug: http://b/25324823 864 """ 865 866 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 867 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 868 869 try: 870 host_dir = tempfile.mkdtemp() 871 872 # Create some random files and a subdirectory containing more files. 873 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 874 875 subdir = os.path.join(host_dir, 'subdir') 876 os.mkdir(subdir) 877 subdir_temp_files = make_random_host_files(in_dir=subdir, 878 num_files=4) 879 880 paths = [x.full_path for x in temp_files] 881 paths.append(subdir) 882 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 883 884 for temp_file in temp_files: 885 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 886 temp_file.base_name) 887 self._verify_remote(temp_file.checksum, remote_path) 888 889 for subdir_temp_file in subdir_temp_files: 890 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 891 # BROKEN: http://b/25394682 892 # 'subdir'; 893 temp_file.base_name) 894 self._verify_remote(temp_file.checksum, remote_path) 895 896 897 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 898 finally: 899 if host_dir is not None: 900 shutil.rmtree(host_dir) 901 902 @requires_non_root 903 def test_push_error_reporting(self): 904 """Make sure that errors that occur while pushing a file get reported 905 906 Bug: http://b/26816782 907 """ 908 with tempfile.NamedTemporaryFile() as tmp_file: 909 tmp_file.write(b'\0' * 1024 * 1024) 910 tmp_file.flush() 911 try: 912 self.device.push(local=tmp_file.name, remote='/system/') 913 self.fail('push should not have succeeded') 914 except subprocess.CalledProcessError as e: 915 output = e.output 916 917 self.assertTrue(b'Permission denied' in output or 918 b'Read-only file system' in output) 919 920 @requires_non_root 921 def test_push_directory_creation(self): 922 """Regression test for directory creation. 923 924 Bug: http://b/110953234 925 """ 926 with tempfile.NamedTemporaryFile() as tmp_file: 927 tmp_file.write(b'\0' * 1024 * 1024) 928 tmp_file.flush() 929 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 930 self.device.shell(['rm', '-rf', remote_path]) 931 932 remote_path += '/filename' 933 self.device.push(local=tmp_file.name, remote=remote_path) 934 935 def disabled_test_push_multiple_slash_root(self): 936 """Regression test for pushing to //data/local/tmp. 937 938 Bug: http://b/141311284 939 940 Disabled because this broken on the adbd side as well: b/141943968 941 """ 942 with tempfile.NamedTemporaryFile() as tmp_file: 943 tmp_file.write(b'\0' * 1024 * 1024) 944 tmp_file.flush() 945 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 946 self.device.shell(['rm', '-rf', remote_path]) 947 self.device.push(local=tmp_file.name, remote=remote_path) 948 949 def _test_pull(self, remote_file, checksum): 950 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 951 tmp_write.close() 952 self.device.pull(remote=remote_file, local=tmp_write.name) 953 with open(tmp_write.name, 'rb') as tmp_read: 954 host_contents = tmp_read.read() 955 host_md5 = compute_md5(host_contents) 956 self.assertEqual(checksum, host_md5) 957 os.remove(tmp_write.name) 958 959 @requires_non_root 960 def test_pull_error_reporting(self): 961 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 962 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 963 964 try: 965 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 966 except subprocess.CalledProcessError as e: 967 output = e.output 968 969 self.assertIn(b'Permission denied', output) 970 971 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 972 973 def test_pull(self): 974 """Pull a randomly generated file from specified device.""" 975 kbytes = 512 976 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 977 cmd = ['dd', 'if=/dev/urandom', 978 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 979 'count={}'.format(kbytes)] 980 self.device.shell(cmd) 981 dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split() 982 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 983 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 984 985 def test_pull_dir(self): 986 """Pull a randomly generated directory of files from the device.""" 987 try: 988 host_dir = tempfile.mkdtemp() 989 990 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 991 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 992 993 # Populate device directory with random files. 994 temp_files = make_random_device_files( 995 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 996 997 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 998 999 for temp_file in temp_files: 1000 host_path = os.path.join( 1001 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1002 temp_file.base_name) 1003 self._verify_local(temp_file.checksum, host_path) 1004 1005 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1006 finally: 1007 if host_dir is not None: 1008 shutil.rmtree(host_dir) 1009 1010 def test_pull_dir_symlink(self): 1011 """Pull a directory into a symlink to a directory. 1012 1013 Bug: http://b/27362811 1014 """ 1015 if os.name != 'posix': 1016 raise unittest.SkipTest('requires POSIX') 1017 1018 try: 1019 host_dir = tempfile.mkdtemp() 1020 real_dir = os.path.join(host_dir, 'dir') 1021 symlink = os.path.join(host_dir, 'symlink') 1022 os.mkdir(real_dir) 1023 os.symlink(real_dir, symlink) 1024 1025 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1026 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1027 1028 # Populate device directory with random files. 1029 temp_files = make_random_device_files( 1030 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1031 1032 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1033 1034 for temp_file in temp_files: 1035 host_path = os.path.join( 1036 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1037 temp_file.base_name) 1038 self._verify_local(temp_file.checksum, host_path) 1039 1040 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1041 finally: 1042 if host_dir is not None: 1043 shutil.rmtree(host_dir) 1044 1045 def test_pull_dir_symlink_collision(self): 1046 """Pull a directory into a colliding symlink to directory.""" 1047 if os.name != 'posix': 1048 raise unittest.SkipTest('requires POSIX') 1049 1050 try: 1051 host_dir = tempfile.mkdtemp() 1052 real_dir = os.path.join(host_dir, 'real') 1053 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1054 symlink = os.path.join(host_dir, tmp_dirname) 1055 os.mkdir(real_dir) 1056 os.symlink(real_dir, symlink) 1057 1058 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1059 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1060 1061 # Populate device directory with random files. 1062 temp_files = make_random_device_files( 1063 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1064 1065 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1066 1067 for temp_file in temp_files: 1068 host_path = os.path.join(real_dir, temp_file.base_name) 1069 self._verify_local(temp_file.checksum, host_path) 1070 1071 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1072 finally: 1073 if host_dir is not None: 1074 shutil.rmtree(host_dir) 1075 1076 def test_pull_dir_nonexistent(self): 1077 """Pull a directory of files from the device to a nonexistent path.""" 1078 try: 1079 host_dir = tempfile.mkdtemp() 1080 dest_dir = os.path.join(host_dir, 'dest') 1081 1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1083 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1084 1085 # Populate device directory with random files. 1086 temp_files = make_random_device_files( 1087 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1088 1089 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1090 1091 for temp_file in temp_files: 1092 host_path = os.path.join(dest_dir, temp_file.base_name) 1093 self._verify_local(temp_file.checksum, host_path) 1094 1095 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1096 finally: 1097 if host_dir is not None: 1098 shutil.rmtree(host_dir) 1099 1100 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1101 def disabled_test_pull_symlink_dir(self): 1102 """Pull a symlink to a directory of symlinks to files.""" 1103 try: 1104 host_dir = tempfile.mkdtemp() 1105 1106 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1107 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1108 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1109 1110 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1111 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1112 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1113 1114 # Populate device directory with random files. 1115 temp_files = make_random_device_files( 1116 self.device, in_dir=remote_dir, num_files=32) 1117 1118 for temp_file in temp_files: 1119 self.device.shell( 1120 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1121 posixpath.join(remote_links, temp_file.base_name)]) 1122 1123 self.device.pull(remote=remote_symlink, local=host_dir) 1124 1125 for temp_file in temp_files: 1126 host_path = os.path.join( 1127 host_dir, 'symlink', temp_file.base_name) 1128 self._verify_local(temp_file.checksum, host_path) 1129 1130 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1131 finally: 1132 if host_dir is not None: 1133 shutil.rmtree(host_dir) 1134 1135 def test_pull_empty(self): 1136 """Pull a directory containing an empty directory from the device.""" 1137 try: 1138 host_dir = tempfile.mkdtemp() 1139 1140 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1141 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1142 self.device.shell(['mkdir', '-p', remote_empty_path]) 1143 1144 self.device.pull(remote=remote_empty_path, local=host_dir) 1145 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1146 finally: 1147 if host_dir is not None: 1148 shutil.rmtree(host_dir) 1149 1150 def test_multiple_pull(self): 1151 """Pull a randomly generated directory of files from the device.""" 1152 1153 try: 1154 host_dir = tempfile.mkdtemp() 1155 1156 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1157 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1158 self.device.shell(['mkdir', '-p', subdir]) 1159 1160 # Create some random files and a subdirectory containing more files. 1161 temp_files = make_random_device_files( 1162 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1163 1164 subdir_temp_files = make_random_device_files( 1165 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1166 1167 paths = [x.full_path for x in temp_files] 1168 paths.append(subdir) 1169 self.device._simple_call(['pull'] + paths + [host_dir]) 1170 1171 for temp_file in temp_files: 1172 local_path = os.path.join(host_dir, temp_file.base_name) 1173 self._verify_local(temp_file.checksum, local_path) 1174 1175 for subdir_temp_file in subdir_temp_files: 1176 local_path = os.path.join(host_dir, 1177 'subdir', 1178 subdir_temp_file.base_name) 1179 self._verify_local(subdir_temp_file.checksum, local_path) 1180 1181 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1182 finally: 1183 if host_dir is not None: 1184 shutil.rmtree(host_dir) 1185 1186 def verify_sync(self, device, temp_files, device_dir): 1187 """Verifies that a list of temp files was synced to the device.""" 1188 # Confirm that every file on the device mirrors that on the host. 1189 for temp_file in temp_files: 1190 device_full_path = posixpath.join( 1191 device_dir, temp_file.base_name) 1192 dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split() 1193 self.assertEqual(temp_file.checksum, dev_md5) 1194 1195 def do_test_sync(self, partition): 1196 """Sync a host directory to the given partition.""" 1197 1198 try: 1199 base_dir = tempfile.mkdtemp() 1200 1201 # Create mirror device directory hierarchy within base_dir. 1202 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1203 os.makedirs(full_dir_path) 1204 1205 # Create 32 random files within the host mirror. 1206 temp_files = make_random_host_files( 1207 in_dir=full_dir_path, num_files=32) 1208 1209 # Clean up any stale files on the device. 1210 device = adb.get_device() # pylint: disable=no-member 1211 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1212 1213 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1214 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1215 device.sync(partition) 1216 if old_product_out is None: 1217 del os.environ['ANDROID_PRODUCT_OUT'] 1218 else: 1219 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1220 1221 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1222 1223 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1224 finally: 1225 if base_dir is not None: 1226 shutil.rmtree(base_dir) 1227 1228 def test_sync_data(self): 1229 """Sync a host directory to the data partition.""" 1230 1231 self.do_test_sync('data') 1232 1233 def test_sync_slash_data(self): 1234 """Sync a host directory to the data partition with a leading slash.""" 1235 1236 self.do_test_sync('/data') 1237 1238 def test_push_sync(self): 1239 """Sync a host directory to a specific path.""" 1240 1241 try: 1242 temp_dir = tempfile.mkdtemp() 1243 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1244 1245 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1246 1247 # Clean up any stale files on the device. 1248 device = adb.get_device() # pylint: disable=no-member 1249 device.shell(['rm', '-rf', device_dir]) 1250 1251 device.push(temp_dir, device_dir, sync=True) 1252 1253 self.verify_sync(device, temp_files, device_dir) 1254 1255 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1256 finally: 1257 if temp_dir is not None: 1258 shutil.rmtree(temp_dir) 1259 1260 def test_push_sync_multiple(self): 1261 """Sync multiple host directories to a specific path.""" 1262 1263 try: 1264 temp_dir = tempfile.mkdtemp() 1265 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1266 1267 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1268 1269 # Clean up any stale files on the device. 1270 device = adb.get_device() # pylint: disable=no-member 1271 device.shell(['rm', '-rf', device_dir]) 1272 device.shell(['mkdir', '-p', device_dir]) 1273 1274 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1275 device.push(host_paths, device_dir, sync=True) 1276 1277 self.verify_sync(device, temp_files, device_dir) 1278 1279 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1280 finally: 1281 if temp_dir is not None: 1282 shutil.rmtree(temp_dir) 1283 1284 1285 def test_push_dry_run_nonexistent_file(self): 1286 """Push with dry run (non-existent file).""" 1287 1288 for file_size in [8, 1024 * 1024]: 1289 try: 1290 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1291 device_file = posixpath.join(device_dir, 'file') 1292 1293 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1294 self.device.shell(['mkdir', '-p', device_dir]) 1295 1296 host_dir = tempfile.mkdtemp() 1297 host_file = posixpath.join(host_dir, 'file') 1298 1299 with open(host_file, "w") as f: 1300 f.write('x' * file_size) 1301 1302 self.device._simple_call(['push', '-n', host_file, device_file]) 1303 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1304 self.assertNotEqual(0, rc) 1305 1306 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1307 finally: 1308 if host_dir is not None: 1309 shutil.rmtree(host_dir) 1310 1311 def test_push_dry_run_existent_file(self): 1312 """Push with dry run.""" 1313 1314 for file_size in [8, 1024 * 1024]: 1315 try: 1316 host_dir = tempfile.mkdtemp() 1317 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1318 device_file = posixpath.join(device_dir, 'file') 1319 1320 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1321 self.device.shell(['mkdir', '-p', device_dir]) 1322 self.device.shell(['echo', 'foo', '>', device_file]) 1323 1324 host_file = posixpath.join(host_dir, 'file') 1325 1326 with open(host_file, "w") as f: 1327 f.write('x' * file_size) 1328 1329 self.device._simple_call(['push', '-n', host_file, device_file]) 1330 stdout, stderr = self.device.shell(['cat', device_file]) 1331 self.assertEqual(stdout.strip(), "foo") 1332 1333 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1334 finally: 1335 if host_dir is not None: 1336 shutil.rmtree(host_dir) 1337 1338 def test_unicode_paths(self): 1339 """Ensure that we can support non-ASCII paths, even on Windows.""" 1340 name = u'로보카 폴리' 1341 1342 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1343 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1344 1345 ## push. 1346 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1347 tf.close() 1348 self.device.push(tf.name, remote_path) 1349 os.remove(tf.name) 1350 self.assertFalse(os.path.exists(tf.name)) 1351 1352 # Verify that the device ended up with the expected UTF-8 path 1353 output = self.device.shell( 1354 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1355 self.assertEqual(remote_path, output) 1356 1357 # pull. 1358 self.device.pull(remote_path, tf.name) 1359 self.assertTrue(os.path.exists(tf.name)) 1360 os.remove(tf.name) 1361 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1362 1363 1364class FileOperationsTestUncompressed(FileOperationsTest.Base): 1365 compression = "none" 1366 1367 1368class FileOperationsTestBrotli(FileOperationsTest.Base): 1369 compression = "brotli" 1370 1371 1372class FileOperationsTestLZ4(FileOperationsTest.Base): 1373 compression = "lz4" 1374 1375 1376class FileOperationsTestZstd(FileOperationsTest.Base): 1377 compression = "zstd" 1378 1379 1380class DeviceOfflineTest(DeviceTest): 1381 def _get_device_state(self, serialno): 1382 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1383 for line in output.split('\n'): 1384 m = re.match('(\S+)\s+(\S+)', line) 1385 if m and m.group(1) == serialno: 1386 return m.group(2) 1387 return None 1388 1389 def disabled_test_killed_when_pushing_a_large_file(self): 1390 """ 1391 While running adb push with a large file, kill adb server. 1392 Occasionally the device becomes offline. Because the device is still 1393 reading data without realizing that the adb server has been restarted. 1394 Test if we can bring the device online automatically now. 1395 http://b/32952319 1396 """ 1397 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1398 # 1. Push a large file 1399 file_path = 'tmp_large_file' 1400 try: 1401 fh = open(file_path, 'w') 1402 fh.write('\0' * (100 * 1024 * 1024)) 1403 fh.close() 1404 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1405 time.sleep(0.1) 1406 # 2. Kill the adb server 1407 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1408 subproc.terminate() 1409 finally: 1410 try: 1411 os.unlink(file_path) 1412 except: 1413 pass 1414 # 3. See if the device still exist. 1415 # Sleep to wait for the adb server exit. 1416 time.sleep(0.5) 1417 # 4. The device should be online 1418 self.assertEqual(self._get_device_state(serialno), 'device') 1419 1420 def disabled_test_killed_when_pulling_a_large_file(self): 1421 """ 1422 While running adb pull with a large file, kill adb server. 1423 Occasionally the device can't be connected. Because the device is trying to 1424 send a message larger than what is expected by the adb server. 1425 Test if we can bring the device online automatically now. 1426 """ 1427 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1428 file_path = 'tmp_large_file' 1429 try: 1430 # 1. Create a large file on device. 1431 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1432 'bs=1000000', 'count=100']) 1433 # 2. Pull the large file on host. 1434 subproc = subprocess.Popen(self.device.adb_cmd + 1435 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1436 time.sleep(0.1) 1437 # 3. Kill the adb server 1438 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1439 subproc.terminate() 1440 finally: 1441 try: 1442 os.unlink(file_path) 1443 except: 1444 pass 1445 # 4. See if the device still exist. 1446 # Sleep to wait for the adb server exit. 1447 time.sleep(0.5) 1448 self.assertEqual(self._get_device_state(serialno), 'device') 1449 1450 1451 def test_packet_size_regression(self): 1452 """Test for http://b/37783561 1453 1454 Receiving packets of a length divisible by 512 but not 1024 resulted in 1455 the adb client waiting indefinitely for more input. 1456 """ 1457 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1458 # Probe some surrounding values as well, for the hell of it. 1459 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1460 for offset in [-6, -5, -4]: 1461 length = base + offset 1462 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1463 'echo', 'foo'] 1464 rc, stdout, _ = self.device.shell_nocheck(cmd) 1465 1466 self.assertEqual(0, rc) 1467 1468 # Output should be '\0' * length, followed by "foo\n" 1469 self.assertEqual(length, len(stdout) - 4) 1470 self.assertEqual(stdout, "\0" * length + "foo\n") 1471 1472 def test_zero_packet(self): 1473 """Test for http://b/113070258 1474 1475 Make sure that we don't blow up when sending USB transfers that line up 1476 exactly with the USB packet size. 1477 """ 1478 1479 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1480 try: 1481 for size in [512, 1024]: 1482 def listener(): 1483 cmd = ["echo foo | nc -l -p 12345; echo done"] 1484 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1485 1486 thread = threading.Thread(target=listener) 1487 thread.start() 1488 1489 # Wait a bit to let the shell command start. 1490 time.sleep(0.25) 1491 1492 sock = socket.create_connection(("localhost", local_port)) 1493 with contextlib.closing(sock): 1494 bytesWritten = sock.send(b"a" * size) 1495 self.assertEqual(size, bytesWritten) 1496 readBytes = sock.recv(4096) 1497 self.assertEqual(b"foo\n", readBytes) 1498 1499 thread.join() 1500 finally: 1501 self.device.forward_remove("tcp:{}".format(local_port)) 1502 1503 1504class SocketTest(DeviceTest): 1505 def test_socket_flush(self): 1506 """Test that we handle socket closure properly. 1507 1508 If we're done writing to a socket, closing before the other end has 1509 closed will send a TCP_RST if we have incoming data queued up, which 1510 may result in data that we've written being discarded. 1511 1512 Bug: http://b/74616284 1513 """ 1514 def adb_length_prefixed(string): 1515 encoded = string.encode("utf8") 1516 result = b"%04x%s" % (len(encoded), encoded) 1517 return result 1518 1519 if "ANDROID_SERIAL" in os.environ: 1520 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1521 else: 1522 transport_string = "host:transport-any" 1523 1524 with socket.create_connection(("localhost", 5037)) as s: 1525 1526 s.sendall(adb_length_prefixed(transport_string)) 1527 response = s.recv(4) 1528 self.assertEqual(b"OKAY", response) 1529 1530 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1531 s.sendall(adb_length_prefixed(shell_string)) 1532 1533 response = s.recv(4) 1534 self.assertEqual(b"OKAY", response) 1535 1536 # Spawn a thread that dumps garbage into the socket until failure. 1537 def spam(): 1538 buf = b"\0" * 16384 1539 try: 1540 while True: 1541 s.sendall(buf) 1542 except Exception as ex: 1543 print(ex) 1544 1545 thread = threading.Thread(target=spam) 1546 thread.start() 1547 1548 time.sleep(1) 1549 1550 received = b"" 1551 while True: 1552 read = s.recv(512) 1553 if len(read) == 0: 1554 break 1555 received += read 1556 1557 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1558 thread.join() 1559 1560 1561class FramebufferTest(DeviceTest): 1562 def test_framebuffer(self): 1563 """Test that we get something from the framebuffer service.""" 1564 output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) 1565 self.assertFalse(len(output) == 0) 1566 1567 1568if sys.platform == "win32": 1569 # From https://stackoverflow.com/a/38749458 1570 import os 1571 import contextlib 1572 import msvcrt 1573 import ctypes 1574 from ctypes import wintypes 1575 1576 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1577 1578 GENERIC_READ = 0x80000000 1579 GENERIC_WRITE = 0x40000000 1580 FILE_SHARE_READ = 1 1581 FILE_SHARE_WRITE = 2 1582 CONSOLE_TEXTMODE_BUFFER = 1 1583 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1584 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1585 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1586 1587 def _check_zero(result, func, args): 1588 if not result: 1589 raise ctypes.WinError(ctypes.get_last_error()) 1590 return args 1591 1592 def _check_invalid(result, func, args): 1593 if result == INVALID_HANDLE_VALUE: 1594 raise ctypes.WinError(ctypes.get_last_error()) 1595 return args 1596 1597 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1598 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1599 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1600 1601 class COORD(ctypes.Structure): 1602 _fields_ = (('X', wintypes.SHORT), 1603 ('Y', wintypes.SHORT)) 1604 1605 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1606 _fields_ = (('cbSize', wintypes.ULONG), 1607 ('dwSize', COORD), 1608 ('dwCursorPosition', COORD), 1609 ('wAttributes', wintypes.WORD), 1610 ('srWindow', wintypes.SMALL_RECT), 1611 ('dwMaximumWindowSize', COORD), 1612 ('wPopupAttributes', wintypes.WORD), 1613 ('bFullscreenSupported', wintypes.BOOL), 1614 ('ColorTable', wintypes.DWORD * 16)) 1615 def __init__(self, *args, **kwds): 1616 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1617 *args, **kwds) 1618 self.cbSize = ctypes.sizeof(self) 1619 1620 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1621 CONSOLE_SCREEN_BUFFER_INFOEX) 1622 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1623 1624 kernel32.GetStdHandle.errcheck = _check_invalid 1625 kernel32.GetStdHandle.restype = wintypes.HANDLE 1626 kernel32.GetStdHandle.argtypes = ( 1627 wintypes.DWORD,) # _In_ nStdHandle 1628 1629 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1630 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1631 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1632 wintypes.DWORD, # _In_ dwDesiredAccess 1633 wintypes.DWORD, # _In_ dwShareMode 1634 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1635 wintypes.DWORD, # _In_ dwFlags 1636 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1637 1638 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1639 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1640 wintypes.HANDLE, # _In_ hConsoleOutput 1641 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1642 1643 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1644 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1645 wintypes.HANDLE, # _In_ hConsoleOutput 1646 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1647 1648 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1649 kernel32.SetConsoleWindowInfo.argtypes = ( 1650 wintypes.HANDLE, # _In_ hConsoleOutput 1651 wintypes.BOOL, # _In_ bAbsolute 1652 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1653 1654 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1655 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1656 wintypes.HANDLE, # _In_ hConsoleOutput 1657 wintypes.WCHAR, # _In_ cCharacter 1658 wintypes.DWORD, # _In_ nLength 1659 COORD, # _In_ dwWriteCoord 1660 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1661 1662 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1663 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1664 wintypes.HANDLE, # _In_ hConsoleOutput 1665 wintypes.LPWSTR, # _Out_ lpCharacter 1666 wintypes.DWORD, # _In_ nLength 1667 COORD, # _In_ dwReadCoord 1668 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1669 1670 @contextlib.contextmanager 1671 def allocate_console(): 1672 allocated = kernel32.AllocConsole() 1673 try: 1674 yield allocated 1675 finally: 1676 if allocated: 1677 kernel32.FreeConsole() 1678 1679 @contextlib.contextmanager 1680 def console_screen(ncols=None, nrows=None): 1681 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1682 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1683 nwritten = (wintypes.DWORD * 1)() 1684 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1685 kernel32.GetConsoleScreenBufferInfoEx( 1686 hStdOut, ctypes.byref(info)) 1687 if ncols is None: 1688 ncols = info.dwSize.X 1689 if nrows is None: 1690 nrows = info.dwSize.Y 1691 elif nrows > 9999: 1692 raise ValueError('nrows must be 9999 or less') 1693 fd_screen = None 1694 hScreen = kernel32.CreateConsoleScreenBuffer( 1695 GENERIC_READ | GENERIC_WRITE, 1696 FILE_SHARE_READ | FILE_SHARE_WRITE, 1697 None, CONSOLE_TEXTMODE_BUFFER, None) 1698 try: 1699 fd_screen = msvcrt.open_osfhandle( 1700 hScreen, os.O_RDWR | os.O_BINARY) 1701 kernel32.GetConsoleScreenBufferInfoEx( 1702 hScreen, ctypes.byref(new_info)) 1703 new_info.dwSize = COORD(ncols, nrows) 1704 new_info.srWindow = wintypes.SMALL_RECT( 1705 Left=0, Top=0, Right=(ncols - 1), 1706 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1707 kernel32.SetConsoleScreenBufferInfoEx( 1708 hScreen, ctypes.byref(new_info)) 1709 kernel32.SetConsoleWindowInfo(hScreen, True, 1710 ctypes.byref(new_info.srWindow)) 1711 kernel32.FillConsoleOutputCharacterW( 1712 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1713 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1714 try: 1715 yield fd_screen 1716 finally: 1717 kernel32.SetConsoleScreenBufferInfoEx( 1718 hStdOut, ctypes.byref(info)) 1719 kernel32.SetConsoleWindowInfo(hStdOut, True, 1720 ctypes.byref(info.srWindow)) 1721 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1722 finally: 1723 if fd_screen is not None: 1724 os.close(fd_screen) 1725 else: 1726 kernel32.CloseHandle(hScreen) 1727 1728 def read_screen(fd): 1729 hScreen = msvcrt.get_osfhandle(fd) 1730 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1731 kernel32.GetConsoleScreenBufferInfoEx( 1732 hScreen, ctypes.byref(csbi)) 1733 ncols = csbi.dwSize.X 1734 pos = csbi.dwCursorPosition 1735 length = ncols * pos.Y + pos.X + 1 1736 buf = (ctypes.c_wchar * length)() 1737 n = (wintypes.DWORD * 1)() 1738 kernel32.ReadConsoleOutputCharacterW( 1739 hScreen, buf, length, COORD(0,0), n) 1740 lines = [buf[i:i+ncols].rstrip(u'\0') 1741 for i in range(0, n[0], ncols)] 1742 return u'\n'.join(lines) 1743 1744@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1745class WindowsConsoleTest(DeviceTest): 1746 def test_unicode_output(self): 1747 """Test Unicode command line parameters and Unicode console window output. 1748 1749 Bug: https://issuetracker.google.com/issues/111972753 1750 """ 1751 # If we don't have a console window, allocate one. This isn't necessary if we're already 1752 # being run from a console window, which is typical. 1753 with allocate_console() as allocated_console: 1754 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1755 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1756 # likely unnecessary given the typical console window size. 1757 with console_screen(nrows=1000) as screen: 1758 unicode_string = u'로보카 폴리' 1759 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1760 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1761 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1762 process.wait() 1763 # Read what was written by adb to the temporary console buffer. 1764 console_output = read_screen(screen) 1765 self.assertEqual(unicode_string, console_output) 1766 1767class DevicesListing(DeviceTest): 1768 1769 serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8") 1770 # def get_serial(self): 1771 # return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8") 1772 1773 def test_devices(self): 1774 with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1775 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1776 self.assertEqual(len(lines), 3) 1777 line = lines[1] 1778 self.assertTrue(self.serial in line) 1779 self.assertFalse("{" in line) 1780 self.assertFalse("}" in line) 1781 self.assertTrue("device" in line) 1782 self.assertFalse("product" in line) 1783 self.assertFalse("transport" in line) 1784 1785 def test_devices_l(self): 1786 with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1787 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1788 self.assertEqual(len(lines), 3) 1789 line = lines[1] 1790 self.assertTrue(self.serial in line) 1791 self.assertFalse("{" in line) 1792 self.assertFalse("}" in line) 1793 self.assertTrue("device" in line) 1794 self.assertTrue("product" in line) 1795 self.assertTrue("transport" in line) 1796 1797 def test_track_devices(self): 1798 with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1799 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1800 output_size = int(reader.read(4), 16) 1801 output = reader.read(output_size) 1802 self.assertFalse("{" in output) 1803 self.assertFalse("}" in output) 1804 self.assertTrue(self.serial in output) 1805 self.assertTrue("device" in output) 1806 self.assertFalse("product" in output) 1807 self.assertFalse("transport" in output) 1808 proc.terminate() 1809 1810 def test_track_devices_l(self): 1811 with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1812 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1813 output_size = int(reader.read(4), 16) 1814 output = reader.read(output_size) 1815 self.assertFalse("{" in output) 1816 self.assertFalse("}" in output) 1817 self.assertTrue(self.serial in output) 1818 self.assertTrue("device" in output) 1819 self.assertTrue("product" in output) 1820 self.assertTrue("transport" in output) 1821 proc.terminate() 1822 1823 def test_track_devices_proto_text(self): 1824 with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1825 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1826 output_size = int(reader.read(4), 16) 1827 output = reader.read(output_size) 1828 self.assertTrue("{" in output) 1829 self.assertTrue("}" in output) 1830 self.assertTrue(self.serial in output) 1831 self.assertTrue("device" in output) 1832 self.assertTrue("product" in output) 1833 self.assertTrue("connection_type" in output) 1834 proc.terminate() 1835 1836 def test_track_devices_proto_binary(self): 1837 with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1838 1839 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1840 proto = proc.stdout.read(output_size) 1841 1842 devices = adb_host_proto.Devices() 1843 devices.ParseFromString(proto) 1844 1845 device = devices.device[0] 1846 self.assertTrue(device.serial == self.serial) 1847 self.assertFalse(device.bus_address == "") 1848 self.assertFalse(device.product == "") 1849 self.assertFalse(device.model == "") 1850 self.assertFalse(device.device == "") 1851 self.assertTrue(device.negotiated_speed == int(device.negotiated_speed)) 1852 self.assertTrue(int(device.negotiated_speed) != 0) 1853 self.assertTrue(device.max_speed == int(device.max_speed)) 1854 self.assertTrue(int(device.max_speed) != 0) 1855 self.assertTrue(device.transport_id == int(device.transport_id)) 1856 1857 proc.terminate() 1858 1859def invoke(*args): 1860 print(args) 1861 try: 1862 output = subprocess.check_output(args, stderr=subprocess.STDOUT).strip().decode("utf-8") 1863 print(output) 1864 return output 1865 except subprocess.CalledProcessError as e: 1866 return "ErrorCode " + str(e.returncode) + ":" + e.output.decode("utf-8") 1867 1868 1869class DevicesListing(DeviceTest): 1870 1871 def test_track_app_appinfo(self): 1872 subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app1.apk']).strip().decode("utf-8") 1873 subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app2.apk']).strip().decode("utf-8") 1874 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8") 1875 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8") 1876 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8") 1877 with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1878 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1879 proto = proc.stdout.read(output_size) 1880 1881 apps = proto_track_app.AppProcesses() 1882 apps.ParseFromString(proto) 1883 1884 foundAdbAppDefProc = False 1885 foundAdbAppOwnProc = False 1886 for app in apps.process: 1887 if (app.process_name == "adb.test.process.name"): 1888 foundAdbAppDefProc = True 1889 self.assertTrue(app.debuggable) 1890 self.assertTrue("adb.test.app1" in app.package_names) 1891 self.assertTrue("adb.test.app2" in app.package_names) 1892 1893 if (app.process_name == "adb.test.own.process"): 1894 foundAdbAppOwnProc = True 1895 self.assertTrue(app.debuggable) 1896 self.assertTrue("adb.test.app1" in app.package_names) 1897 1898 self.assertTrue(foundAdbAppDefProc) 1899 self.assertTrue(foundAdbAppOwnProc) 1900 proc.terminate() 1901 1902class ServerStatus(unittest.TestCase): 1903 def test_server_status(self): 1904 with subprocess.Popen(['adb', 'server-status'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1905 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1906 self.assertTrue("usb_backend" in lines[0]) 1907 self.assertTrue("mdns_backend" in lines[1]) 1908 self.assertTrue("version" in lines[2]) 1909 self.assertTrue("build" in lines[3]) 1910 self.assertTrue("executable_absolute_path" in lines[4]) 1911 self.assertTrue("log_absolute_path" in lines[5]) 1912 self.assertTrue("os" in lines[6]) 1913 self.assertTrue("trace_level" in lines[7]) 1914 self.assertTrue("burst_mode" in lines[8]) 1915 1916 1917class DetachSingleServer(unittest.TestCase): 1918 serial = invoke("adb", "get-serialno") 1919 1920 def wait_for_device(self): 1921 count = 0 1922 while True: 1923 devices = invoke("adb", "devices") 1924 if (self.serial in devices and "attached" in devices): 1925 return 1926 count = count + 1 1927 if count > 10: 1928 return 1929 1930 def test_detach_then_attach(self): 1931 # Check device is there with comm working 1932 who = invoke("adb", "shell", "whoami") 1933 self.assertTrue(who == "shell" or who == "root") 1934 devices = invoke("adb", "devices") 1935 self.assertFalse("detached" in devices, devices) 1936 self.assertTrue(self.serial in devices, devices) 1937 1938 invoke("adb", "detach") 1939 1940 # Verify detach did not remove the device from list 1941 devices = invoke("adb", "devices") 1942 self.assertTrue(self.serial in devices, devices) 1943 self.assertTrue("detached" in devices, devices) 1944 1945 # Verify detach makes device unreachable 1946 who = invoke("adb", "shell", "whoami") 1947 self.assertFalse(who == "shell" or who == "root", who) 1948 1949 # Re-attach 1950 invoke("adb", "attach") 1951 time.sleep(2) 1952 self.wait_for_device() 1953 1954 # Check devices is there 1955 devices = invoke("adb", "devices") 1956 self.assertTrue(self.serial in devices, devices) 1957 self.assertFalse("detached" in devices, devices) 1958 1959 # Check device comm was started 1960 who = invoke("adb", "shell", "whoami") 1961 self.assertTrue(who == "shell" or who == "root", who) 1962 1963 def tearDown(self): 1964 invoke("adb", "kill-server") 1965 1966class DetachMultiServer(unittest.TestCase): 1967 server1_port = "5038" 1968 server2_port = "5039" 1969 env_var_detached = "ADB_LIBUSB_START_DETACHED" 1970 serial = invoke("adb", "get-serialno") 1971 1972 def wait_for_device(self, server_id): 1973 count = 0 1974 while True: 1975 devices = invoke("adb", "-P", server_id, "devices") 1976 if (self.serial in devices and "attached" in devices): 1977 return 1978 count = count + 1 1979 if count > 10: 1980 return 1981 1982 def test_device_exchange(self): 1983 # Enable once we support invoke with env variable 1984 return 1985 # Start two "detached" servers with ADB_LIBUSB_START_DETACHED 1986 # Attach device to server 1, test it. 1987 # Detach device from server 1. 1988 # Attach device to server 2. test it. 1989 1990 # Make sure everything is clean 1991 invoke("adb", "-P", self.server1_port, "start-server") 1992 invoke("adb", "-P", self.server2_port, "start-server") 1993 1994 # Make sure server1 sees device as detached 1995 devices1= invoke("adb", "-P", self.server1_port, "devices") 1996 self.assertTrue("detached" in devices1) 1997 self.assertTrue(self.serial in devices1) 1998 1999 # Make sure server2 sees device as detached 2000 devices2= invoke("adb", "-P", self.server2_port, "devices") 2001 self.assertTrue("detached" in devices2) 2002 self.assertTrue(self.serial in devices2) 2003 2004 # Attach device to server 1. Verify. 2005 invoke("adb", "-P", self.server1_port, "attach") 2006 time.sleep(4) 2007 self.wait_for_device(self.server1_port) 2008 2009 devices1= invoke("adb", "-P", self.server1_port, "devices") 2010 self.assertFalse("detached" in devices1) 2011 self.assertTrue(self.serial in devices1) 2012 2013 # Make sure server 1 can comm with device 2014 who = invoke("adb", "-P", self.server1_port, "shell", "whoami") 2015 self.assertTrue(who == "shell" or who == "root") 2016 2017 # Now detach and make sure device cannot comm 2018 invoke("adb", "-P", self.server1_port, "detach") 2019 who = invoke("adb", "-P", self.server1_port, "shell", "whoami") 2020 self.assertFalse(who == "shell" or who == "root") 2021 devices1= invoke("adb", "-P", self.server1_port, "devices") 2022 self.assertTrue("detached" in devices1) 2023 self.assertTrue(self.serial in devices1) 2024 2025 # Give device to server2 2026 invoke("adb", "-P", self.server2_port, "attach") 2027 time.sleep(2) 2028 self.wait_for_device(self.server2_port) 2029 devices2= invoke("adb", "-P", self.server2_port, "devices") 2030 self.assertFalse("detached" in devices2) 2031 self.assertTrue(self.serial in devices2) 2032 2033 # Test that sever2 can comm with device 2034 who = invoke("adb", "-P", self.server2_port, "shell", "whoami") 2035 self.assertTrue(who == "shell" or who == "root") 2036 2037 # Detach device from server2. Verify. 2038 invoke("adb", "-P", self.server2_port, "detach") 2039 devices2= invoke("adb", "-P", self.server2_port, "devices") 2040 self.assertTrue("detached" in devices2) 2041 self.assertTrue(self.serial in devices2) 2042 2043 # Verify server2 cannot comm with device 2044 who = invoke("adb", "-P", self.server2_port, "shell", "whoami") 2045 self.assertFalse(who == "shell" or who == "root") 2046 2047 def setUp(self): 2048 os.environ[self.env_var_detached] = "1" 2049 invoke("adb", "kill-server") 2050 2051 def tearDown(self): 2052 invoke("adb", "-P", self.server1_port, "kill-server") 2053 invoke("adb", "-P", self.server2_port, "kill-server") 2054 del os.environ[self.env_var_detached] 2055 2056class OneDevice(unittest.TestCase): 2057 2058 serial = invoke("adb", "get-serialno") 2059 owner_server_port = "14424" 2060 2061 def test_one_device(self): 2062 invoke("adb", "kill-server") 2063 invoke("adb", "--one-device", self.serial, "-P", self.owner_server_port, "start-server") 2064 devices = invoke("adb", "devices") 2065 owned_devices = invoke("adb", "-P", "14424", "devices") 2066 self.assertTrue(self.serial in owned_devices) 2067 self.assertFalse(self.serial in devices) 2068 2069 def tearDown(self): 2070 invoke("adb", "-P", self.owner_server_port, "kill-server") 2071 invoke("adb", "kill-server") 2072 2073class Debugger(unittest.TestCase): 2074 2075 PKG_NAME = "adb.test.app1" 2076 PROCESS_NAME = "adb.test.process.name" 2077 APP_PORT = "8000" 2078 HANDSHAKE = "JDWP-Handshake" 2079 2080 def test_denied_debugger_on_frozen_app(self): 2081 # TODO: Enable once we have a test runner that allows to debug tests. 2082 # -> JAVA 2083 2084 # Install app 2085 apk = self.PKG_NAME.replace(".", "_") + ".apk" 2086 invoke('adb', 'install', '-r', '-t', apk) 2087 2088 # Start app 2089 target = self.PKG_NAME + '/.MainActivity' 2090 invoke('adb', 'shell', 'am', 'start', '-W', target) 2091 2092 # Assert that debugger is allowed 2093 pid = invoke("adb", "shell", "pidof", self.PROCESS_NAME) 2094 self.assertTrue(pid.isdigit(), pid) 2095 invoke("adb", "forward", "tcp:" + self.APP_PORT, "jdwp:" + pid) 2096 # Connect to debugger 2097 sock = socket.socket() 2098 sock.connect(("localhost", int(self.APP_PORT))) 2099 sock.send(self.HANDSHAKE.encode('utf-8')) 2100 resp = sock.recv(len(self.HANDSHAKE)) 2101 self.assertTrue(resp.decode("utf-8") == self.HANDSHAKE) 2102 sock.close() 2103 2104 # Freeze app (adb shell am freeze <APK_NAME>) 2105 invoke("adb", "shell", "am", "freeze", self.PROCESS_NAME) 2106 2107 # Asset that debugger is denied 2108 connection_refused = False 2109 try: 2110 sock = socket.socket() 2111 sock.connect(("localhost", int(self.APP_PORT))) 2112 except socket.error as e: 2113 connection_refused = True 2114 self.assertTrue(connection_refused, connection_refused) 2115 2116 # Unfreeze app (adb shell am unfreeze <APK_NAME>) 2117 invoke("adb", "shell", "am", "unfreeze", self.PROCESS_NAME) 2118 2119 # Assert that debugger is allowed 2120 sock = socket.socket() 2121 sock.connect(("localhost", int(self.APP_PORT))) 2122 sock.send(self.HANDSHAKE.encode("utf-8")) 2123 resp = sock.recv(len(self.HANDSHAKE)).decode("utf-8") 2124 self.assertTrue(resp == self.HANDSHAKE, resp) 2125 sock.close() 2126 2127 def tearDown(self): 2128 invoke("adb", "forward", "--remove-all") 2129 2130if __name__ == '__main__': 2131 random.seed(0) 2132 unittest.main() 2133