1#!/usr/bin/env python3 2# 3# Copyright (C) 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Tests for the adb program itself. 18 19This differs from things in test_device.py in that there is no API for these 20things. Most of these tests involve specific error messages or the help text. 21""" 22 23import contextlib 24import os 25import random 26import select 27import socket 28import struct 29import subprocess 30import sys 31import threading 32import time 33import unittest 34import warnings 35 36 37@contextlib.contextmanager 38def fake_adbd(protocol=socket.AF_INET, port=0): 39 """Creates a fake ADB daemon that just replies with a CNXN packet.""" 40 41 serversock = socket.socket(protocol, socket.SOCK_STREAM) 42 serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 43 if protocol == socket.AF_INET: 44 serversock.bind(("127.0.0.1", port)) 45 else: 46 serversock.bind(("::1", port)) 47 serversock.listen(1) 48 49 # A pipe that is used to signal the thread that it should terminate. 50 readsock, writesock = socket.socketpair() 51 52 def _adb_packet(command: bytes, arg0: int, arg1: int, data: bytes) -> bytes: 53 bin_command = struct.unpack("I", command)[0] 54 buf = struct.pack("IIIIII", bin_command, arg0, arg1, len(data), 0, 55 bin_command ^ 0xffffffff) 56 buf += data 57 return buf 58 59 def _handle(sock): 60 with contextlib.closing(sock) as serversock: 61 rlist = [readsock, serversock] 62 cnxn_sent = {} 63 while True: 64 read_ready, _, _ = select.select(rlist, [], []) 65 for ready in read_ready: 66 if ready == readsock: 67 # Closure pipe 68 for f in rlist: 69 f.close() 70 return 71 elif ready == serversock: 72 # Server socket 73 conn, _ = ready.accept() 74 rlist.append(conn) 75 else: 76 # Client socket 77 data = ready.recv(1024) 78 if not data or data.startswith(b"OPEN"): 79 if ready in cnxn_sent: 80 del cnxn_sent[ready] 81 ready.shutdown(socket.SHUT_RDWR) 82 ready.close() 83 rlist.remove(ready) 84 continue 85 if ready in cnxn_sent: 86 continue 87 cnxn_sent[ready] = True 88 ready.sendall(_adb_packet(b"CNXN", 0x01000001, 1024 * 1024, 89 b"device::ro.product.name=fakeadb")) 90 91 port = serversock.getsockname()[1] 92 server_thread = threading.Thread(target=_handle, args=(serversock,)) 93 server_thread.start() 94 95 try: 96 yield port, writesock 97 finally: 98 writesock.close() 99 server_thread.join() 100 101 102@contextlib.contextmanager 103def adb_connect(unittest, serial): 104 """Context manager for an ADB connection. 105 106 This automatically disconnects when done with the connection. 107 """ 108 109 output = subprocess.check_output(["adb", "connect", serial]) 110 unittest.assertEqual(output.strip(), 111 "connected to {}".format(serial).encode("utf8")) 112 113 try: 114 yield 115 finally: 116 # Perform best-effort disconnection. Discard the output. 117 subprocess.Popen(["adb", "disconnect", serial], 118 stdout=subprocess.PIPE, 119 stderr=subprocess.PIPE).communicate() 120 121 122@contextlib.contextmanager 123def adb_server(): 124 """Context manager for an ADB server. 125 126 This creates an ADB server and returns the port it's listening on. 127 """ 128 129 port = 5038 130 # Kill any existing server on this non-default port. 131 subprocess.check_output(["adb", "-P", str(port), "kill-server"], 132 stderr=subprocess.STDOUT) 133 read_pipe, write_pipe = os.pipe() 134 135 if sys.platform == "win32": 136 import msvcrt 137 write_handle = msvcrt.get_osfhandle(write_pipe) 138 os.set_handle_inheritable(write_handle, True) 139 reply_fd = str(write_handle) 140 else: 141 os.set_inheritable(write_pipe, True) 142 reply_fd = str(write_pipe) 143 144 proc = subprocess.Popen(["adb", "-L", "tcp:localhost:{}".format(port), 145 "fork-server", "server", 146 "--reply-fd", reply_fd], close_fds=False) 147 try: 148 os.close(write_pipe) 149 greeting = os.read(read_pipe, 1024) 150 assert greeting == b"OK\n", repr(greeting) 151 yield port 152 finally: 153 proc.terminate() 154 proc.wait() 155 156 157class CommandlineTest(unittest.TestCase): 158 """Tests for the ADB commandline.""" 159 160 def test_help(self): 161 """Make sure we get _something_ out of help.""" 162 out = subprocess.check_output( 163 ["adb", "help"], stderr=subprocess.STDOUT) 164 self.assertGreater(len(out), 0) 165 166 def test_version(self): 167 """Get a version number out of the output of adb.""" 168 lines = subprocess.check_output(["adb", "version"]).splitlines() 169 version_line = lines[0] 170 self.assertRegex( 171 version_line, rb"^Android Debug Bridge version \d+\.\d+\.\d+$") 172 if len(lines) == 2: 173 # Newer versions of ADB have a second line of output for the 174 # version that includes a specific revision (git SHA). 175 revision_line = lines[1] 176 self.assertRegex( 177 revision_line, rb"^Revision [0-9a-f]{12}-android$") 178 179 def test_tcpip_error_messages(self): 180 """Make sure 'adb tcpip' parsing is sane.""" 181 proc = subprocess.Popen(["adb", "tcpip"], 182 stdout=subprocess.PIPE, 183 stderr=subprocess.STDOUT) 184 out, _ = proc.communicate() 185 self.assertEqual(1, proc.returncode) 186 self.assertIn(b"requires an argument", out) 187 188 proc = subprocess.Popen(["adb", "tcpip", "foo"], 189 stdout=subprocess.PIPE, 190 stderr=subprocess.STDOUT) 191 out, _ = proc.communicate() 192 self.assertEqual(1, proc.returncode) 193 self.assertIn(b"invalid port", out) 194 195 196class ServerTest(unittest.TestCase): 197 """Tests for the ADB server.""" 198 199 @staticmethod 200 def _read_pipe_and_set_event(pipe, event): 201 """Reads a pipe until it is closed, then sets the event.""" 202 pipe.read() 203 event.set() 204 205 def test_handle_inheritance(self): 206 """Test that launch_server() does not inherit handles. 207 208 launch_server() should not let the adb server inherit 209 stdin/stdout/stderr handles, which can cause callers of adb.exe to hang. 210 This test also runs fine on unix even though the impetus is an issue 211 unique to Windows. 212 """ 213 # This test takes 5 seconds to run on Windows: if there is no adb server 214 # running on the the port used below, adb kill-server tries to make a 215 # TCP connection to a closed port and that takes 1 second on Windows; 216 # adb start-server does the same TCP connection which takes another 217 # second, and it waits 3 seconds after starting the server. 218 219 # Start adb client with redirected stdin/stdout/stderr to check if it 220 # passes those redirections to the adb server that it starts. To do 221 # this, run an instance of the adb server on a non-default port so we 222 # don't conflict with a pre-existing adb server that may already be 223 # setup with adb TCP/emulator connections. If there is a pre-existing 224 # adb server, this also tests whether multiple instances of the adb 225 # server conflict on adb.log. 226 227 port = 5038 228 # Kill any existing server on this non-default port. 229 subprocess.check_output(["adb", "-P", str(port), "kill-server"], 230 stderr=subprocess.STDOUT) 231 232 try: 233 # We get warnings for unclosed files for the subprocess's pipes, 234 # and it's somewhat cumbersome to close them, so just ignore this. 235 warnings.simplefilter("ignore", ResourceWarning) 236 237 # Run the adb client and have it start the adb server. 238 proc = subprocess.Popen(["adb", "-P", str(port), "start-server"], 239 stdin=subprocess.PIPE, 240 stdout=subprocess.PIPE, 241 stderr=subprocess.PIPE) 242 243 # Start threads that set events when stdout/stderr are closed. 244 stdout_event = threading.Event() 245 stdout_thread = threading.Thread( 246 target=ServerTest._read_pipe_and_set_event, 247 args=(proc.stdout, stdout_event)) 248 stdout_thread.start() 249 250 stderr_event = threading.Event() 251 stderr_thread = threading.Thread( 252 target=ServerTest._read_pipe_and_set_event, 253 args=(proc.stderr, stderr_event)) 254 stderr_thread.start() 255 256 # Wait for the adb client to finish. Once that has occurred, if 257 # stdin/stderr/stdout are still open, it must be open in the adb 258 # server. 259 proc.wait() 260 261 # Try to write to stdin which we expect is closed. If it isn't 262 # closed, we should get an IOError. If we don't get an IOError, 263 # stdin must still be open in the adb server. The adb client is 264 # probably letting the adb server inherit stdin which would be 265 # wrong. 266 with self.assertRaises(IOError): 267 proc.stdin.write(b"x") 268 proc.stdin.flush() 269 270 # Wait a few seconds for stdout/stderr to be closed (in the success 271 # case, this won't wait at all). If there is a timeout, that means 272 # stdout/stderr were not closed and and they must be open in the adb 273 # server, suggesting that the adb client is letting the adb server 274 # inherit stdout/stderr which would be wrong. 275 self.assertTrue(stdout_event.wait(5), "adb stdout not closed") 276 self.assertTrue(stderr_event.wait(5), "adb stderr not closed") 277 stdout_thread.join() 278 stderr_thread.join() 279 finally: 280 # If we started a server, kill it. 281 subprocess.check_output(["adb", "-P", str(port), "kill-server"], 282 stderr=subprocess.STDOUT) 283 284 285class EmulatorTest(unittest.TestCase): 286 """Tests for the emulator connection.""" 287 288 def _reset_socket_on_close(self, sock): 289 """Use SO_LINGER to cause TCP RST segment to be sent on socket close.""" 290 # The linger structure is two shorts on Windows, but two ints on Unix. 291 linger_format = "hh" if os.name == "nt" else "ii" 292 l_onoff = 1 293 l_linger = 0 294 295 sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 296 struct.pack(linger_format, l_onoff, l_linger)) 297 # Verify that we set the linger structure properly by retrieving it. 298 linger = sock.getsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 16) 299 self.assertEqual((l_onoff, l_linger), 300 struct.unpack_from(linger_format, linger)) 301 302 def test_emu_kill(self): 303 """Ensure that adb emu kill works. 304 305 Bug: https://code.google.com/p/android/issues/detail?id=21021 306 """ 307 with contextlib.closing( 308 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: 309 # Use SO_REUSEADDR so subsequent runs of the test can grab the port 310 # even if it is in TIME_WAIT. 311 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 312 listener.bind(("127.0.0.1", 0)) 313 listener.listen(4) 314 port = listener.getsockname()[1] 315 316 # Now that listening has started, start adb emu kill, telling it to 317 # connect to our mock emulator. 318 proc = subprocess.Popen( 319 ["adb", "-s", "emulator-" + str(port), "emu", "kill"], 320 stderr=subprocess.STDOUT) 321 322 accepted_connection, addr = listener.accept() 323 with contextlib.closing(accepted_connection) as conn: 324 # If WSAECONNABORTED (10053) is raised by any socket calls, 325 # then adb probably isn't reading the data that we sent it. 326 conn.sendall(("Android Console: type 'help' for a list " 327 "of commands\r\n").encode("utf8")) 328 conn.sendall(b"OK\r\n") 329 330 with contextlib.closing(conn.makefile()) as connf: 331 line = connf.readline() 332 if line.startswith("auth"): 333 # Ignore the first auth line. 334 line = connf.readline() 335 self.assertEqual("kill\n", line) 336 self.assertEqual("quit\n", connf.readline()) 337 338 conn.sendall(b"OK: killing emulator, bye bye\r\n") 339 340 # Use SO_LINGER to send TCP RST segment to test whether adb 341 # ignores WSAECONNRESET on Windows. This happens with the 342 # real emulator because it just calls exit() without closing 343 # the socket or calling shutdown(SD_SEND). At process 344 # termination, Windows sends a TCP RST segment for every 345 # open socket that shutdown(SD_SEND) wasn't used on. 346 self._reset_socket_on_close(conn) 347 348 # Wait for adb to finish, so we can check return code. 349 proc.communicate() 350 351 # If this fails, adb probably isn't ignoring WSAECONNRESET when 352 # reading the response from the adb emu kill command (on Windows). 353 self.assertEqual(0, proc.returncode) 354 355 def test_emulator_connect(self): 356 """Ensure that the emulator can connect. 357 358 Bug: http://b/78991667 359 """ 360 with adb_server() as server_port: 361 with fake_adbd() as (port, _): 362 serial = "emulator-{}".format(port - 1) 363 # Ensure that the emulator is not there. 364 try: 365 subprocess.check_output(["adb", "-P", str(server_port), 366 "-s", serial, "get-state"], 367 stderr=subprocess.STDOUT) 368 self.fail("Device should not be available") 369 except subprocess.CalledProcessError as err: 370 self.assertEqual( 371 err.output.strip(), 372 "error: device '{}' not found".format(serial).encode("utf8")) 373 374 # Let the ADB server know that the emulator has started. 375 with contextlib.closing( 376 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: 377 sock.connect(("localhost", server_port)) 378 command = "host:emulator:{}".format(port).encode("utf8") 379 sock.sendall(b"%04x%s" % (len(command), command)) 380 381 # Ensure the emulator is there. 382 subprocess.check_call(["adb", "-P", str(server_port), 383 "-s", serial, "wait-for-device"]) 384 output = subprocess.check_output(["adb", "-P", str(server_port), 385 "-s", serial, "get-state"]) 386 self.assertEqual(output.strip(), b"device") 387 388 389class ConnectionTest(unittest.TestCase): 390 """Tests for adb connect.""" 391 392 def test_connect_ipv4_ipv6(self): 393 """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6. 394 395 Bug: http://b/30313466 396 """ 397 for protocol in (socket.AF_INET, socket.AF_INET6): 398 try: 399 with fake_adbd(protocol=protocol) as (port, _): 400 serial = "localhost:{}".format(port) 401 with adb_connect(self, serial): 402 pass 403 except socket.error: 404 print("IPv6 not available, skipping") 405 continue 406 407 def test_already_connected(self): 408 """Ensure that an already-connected device stays connected.""" 409 410 with fake_adbd() as (port, _): 411 serial = "localhost:{}".format(port) 412 with adb_connect(self, serial): 413 # b/31250450: this always returns 0 but probably shouldn't. 414 output = subprocess.check_output(["adb", "connect", serial]) 415 self.assertEqual( 416 output.strip(), 417 "already connected to {}".format(serial).encode("utf8")) 418 419 @unittest.skip("Currently failing b/123247844") 420 def test_reconnect(self): 421 """Ensure that a disconnected device reconnects.""" 422 423 with fake_adbd() as (port, _): 424 serial = "localhost:{}".format(port) 425 with adb_connect(self, serial): 426 # Wait a bit to give adb some time to connect. 427 time.sleep(0.25) 428 429 output = subprocess.check_output(["adb", "-s", serial, 430 "get-state"]) 431 self.assertEqual(output.strip(), b"device") 432 433 # This will fail. 434 proc = subprocess.Popen(["adb", "-s", serial, "shell", "true"], 435 stdout=subprocess.PIPE, 436 stderr=subprocess.STDOUT) 437 output, _ = proc.communicate() 438 self.assertEqual(output.strip(), b"error: closed") 439 440 subprocess.check_call(["adb", "-s", serial, "wait-for-device"]) 441 442 output = subprocess.check_output(["adb", "-s", serial, 443 "get-state"]) 444 self.assertEqual(output.strip(), b"device") 445 446 # Once we explicitly kick a device, it won't attempt to 447 # reconnect. 448 output = subprocess.check_output(["adb", "disconnect", serial]) 449 self.assertEqual( 450 output.strip(), 451 "disconnected {}".format(serial).encode("utf8")) 452 try: 453 subprocess.check_output(["adb", "-s", serial, "get-state"], 454 stderr=subprocess.STDOUT) 455 self.fail("Device should not be available") 456 except subprocess.CalledProcessError as err: 457 self.assertEqual( 458 err.output.strip(), 459 "error: device '{}' not found".format(serial).encode("utf8")) 460 461 462class DisconnectionTest(unittest.TestCase): 463 """Tests for adb disconnect.""" 464 465 def test_disconnect(self): 466 """Ensure that `adb disconnect` takes effect immediately.""" 467 468 def _devices(port): 469 output = subprocess.check_output(["adb", "-P", str(port), "devices"]) 470 return [x.split("\t") for x in output.decode("utf8").strip().splitlines()[1:]] 471 472 with adb_server() as server_port: 473 with fake_adbd() as (port, sock): 474 device_name = "localhost:{}".format(port) 475 output = subprocess.check_output(["adb", "-P", str(server_port), 476 "connect", device_name]) 477 self.assertEqual(output.strip(), 478 "connected to {}".format(device_name).encode("utf8")) 479 480 481 self.assertEqual(_devices(server_port), [[device_name, "device"]]) 482 483 # Send a deliberately malformed packet to make the device go offline. 484 packet = struct.pack("IIIIII", 0, 0, 0, 0, 0, 0) 485 sock.sendall(packet) 486 487 # Wait a bit. 488 time.sleep(0.1) 489 490 self.assertEqual(_devices(server_port), [[device_name, "offline"]]) 491 492 # Disconnect the device. 493 output = subprocess.check_output(["adb", "-P", str(server_port), 494 "disconnect", device_name]) 495 496 # Wait a bit. 497 time.sleep(0.1) 498 499 self.assertEqual(_devices(server_port), []) 500 501 502@unittest.skipUnless(sys.platform == "win32", "requires Windows") 503class PowerTest(unittest.TestCase): 504 def test_resume_usb_kick(self): 505 """Resuming from sleep/hibernate should kick USB devices.""" 506 try: 507 usb_serial = subprocess.check_output(["adb", "-d", "get-serialno"]).strip() 508 except subprocess.CalledProcessError: 509 # If there are multiple USB devices, we don't have a way to check whether the selected 510 # device is USB. 511 raise unittest.SkipTest('requires single USB device') 512 513 try: 514 serial = subprocess.check_output(["adb", "get-serialno"]).strip() 515 except subprocess.CalledProcessError: 516 # Did you forget to select a device with $ANDROID_SERIAL? 517 raise unittest.SkipTest('requires $ANDROID_SERIAL set to a USB device') 518 519 # Test only works with USB devices because adb _power_notification_thread does not kick 520 # non-USB devices on resume event. 521 if serial != usb_serial: 522 raise unittest.SkipTest('requires USB device') 523 524 # Run an adb shell command in the background that takes a while to complete. 525 proc = subprocess.Popen(['adb', 'shell', 'sleep', '5']) 526 527 # Wait for startup of adb server's _power_notification_thread. 528 time.sleep(0.1) 529 530 # Simulate resuming from sleep/hibernation by sending Windows message. 531 import ctypes 532 from ctypes import wintypes 533 HWND_BROADCAST = 0xffff 534 WM_POWERBROADCAST = 0x218 535 PBT_APMRESUMEAUTOMATIC = 0x12 536 537 PostMessageW = ctypes.windll.user32.PostMessageW 538 PostMessageW.restype = wintypes.BOOL 539 PostMessageW.argtypes = (wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM) 540 result = PostMessageW(HWND_BROADCAST, WM_POWERBROADCAST, PBT_APMRESUMEAUTOMATIC, 0) 541 if not result: 542 raise ctypes.WinError() 543 544 # Wait for connection to adb shell to be broken by _power_notification_thread detecting the 545 # Windows message. 546 start = time.time() 547 proc.wait() 548 end = time.time() 549 550 # If the power event was detected, the adb shell command should be broken very quickly. 551 self.assertLess(end - start, 2) 552 553 554def main(): 555 """Main entrypoint.""" 556 random.seed(0) 557 unittest.main(verbosity=3) 558 559 560if __name__ == "__main__": 561 main() 562