• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2015 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Tests for the adb program itself.
18
19This differs from things in test_device.py in that there is no API for these
20things. Most of these tests involve specific error messages or the help text.
21"""
22
23import contextlib
24import os
25import random
26import select
27import socket
28import struct
29import subprocess
30import sys
31import threading
32import time
33import unittest
34import warnings
35
36
37@contextlib.contextmanager
38def fake_adbd(protocol=socket.AF_INET, port=0):
39    """Creates a fake ADB daemon that just replies with a CNXN packet."""
40
41    serversock = socket.socket(protocol, socket.SOCK_STREAM)
42    serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43    if protocol == socket.AF_INET:
44        serversock.bind(("127.0.0.1", port))
45    else:
46        serversock.bind(("::1", port))
47    serversock.listen(1)
48
49    # A pipe that is used to signal the thread that it should terminate.
50    readsock, writesock = socket.socketpair()
51
52    def _adb_packet(command: bytes, arg0: int, arg1: int, data: bytes) -> bytes:
53        bin_command = struct.unpack("I", command)[0]
54        buf = struct.pack("IIIIII", bin_command, arg0, arg1, len(data), 0,
55                          bin_command ^ 0xffffffff)
56        buf += data
57        return buf
58
59    def _handle(sock):
60        with contextlib.closing(sock) as serversock:
61            rlist = [readsock, serversock]
62            cnxn_sent = {}
63            while True:
64                read_ready, _, _ = select.select(rlist, [], [])
65                for ready in read_ready:
66                    if ready == readsock:
67                        # Closure pipe
68                        for f in rlist:
69                            f.close()
70                        return
71                    elif ready == serversock:
72                        # Server socket
73                        conn, _ = ready.accept()
74                        rlist.append(conn)
75                    else:
76                        # Client socket
77                        data = ready.recv(1024)
78                        if not data or data.startswith(b"OPEN"):
79                            if ready in cnxn_sent:
80                                del cnxn_sent[ready]
81                            ready.shutdown(socket.SHUT_RDWR)
82                            ready.close()
83                            rlist.remove(ready)
84                            continue
85                        if ready in cnxn_sent:
86                            continue
87                        cnxn_sent[ready] = True
88                        ready.sendall(_adb_packet(b"CNXN", 0x01000001, 1024 * 1024,
89                                                  b"device::ro.product.name=fakeadb"))
90
91    port = serversock.getsockname()[1]
92    server_thread = threading.Thread(target=_handle, args=(serversock,))
93    server_thread.start()
94
95    try:
96        yield port, writesock
97    finally:
98        writesock.close()
99        server_thread.join()
100
101
102@contextlib.contextmanager
103def adb_connect(unittest, serial):
104    """Context manager for an ADB connection.
105
106    This automatically disconnects when done with the connection.
107    """
108
109    output = subprocess.check_output(["adb", "connect", serial])
110    unittest.assertEqual(output.strip(),
111                        "connected to {}".format(serial).encode("utf8"))
112
113    try:
114        yield
115    finally:
116        # Perform best-effort disconnection. Discard the output.
117        subprocess.Popen(["adb", "disconnect", serial],
118                         stdout=subprocess.PIPE,
119                         stderr=subprocess.PIPE).communicate()
120
121
122@contextlib.contextmanager
123def adb_server():
124    """Context manager for an ADB server.
125
126    This creates an ADB server and returns the port it's listening on.
127    """
128
129    port = 5038
130    # Kill any existing server on this non-default port.
131    subprocess.check_output(["adb", "-P", str(port), "kill-server"],
132                            stderr=subprocess.STDOUT)
133    read_pipe, write_pipe = os.pipe()
134
135    if sys.platform == "win32":
136        import msvcrt
137        write_handle = msvcrt.get_osfhandle(write_pipe)
138        os.set_handle_inheritable(write_handle, True)
139        reply_fd = str(write_handle)
140    else:
141        os.set_inheritable(write_pipe, True)
142        reply_fd = str(write_pipe)
143
144    proc = subprocess.Popen(["adb", "-L", "tcp:localhost:{}".format(port),
145                             "fork-server", "server",
146                             "--reply-fd", reply_fd], close_fds=False)
147    try:
148        os.close(write_pipe)
149        greeting = os.read(read_pipe, 1024)
150        assert greeting == b"OK\n", repr(greeting)
151        yield port
152    finally:
153        proc.terminate()
154        proc.wait()
155
156
157class CommandlineTest(unittest.TestCase):
158    """Tests for the ADB commandline."""
159
160    def test_help(self):
161        """Make sure we get _something_ out of help."""
162        out = subprocess.check_output(
163            ["adb", "help"], stderr=subprocess.STDOUT)
164        self.assertGreater(len(out), 0)
165
166    def test_version(self):
167        """Get a version number out of the output of adb."""
168        lines = subprocess.check_output(["adb", "version"]).splitlines()
169        version_line = lines[0]
170        self.assertRegex(
171            version_line, rb"^Android Debug Bridge version \d+\.\d+\.\d+$")
172        if len(lines) == 2:
173            # Newer versions of ADB have a second line of output for the
174            # version that includes a specific revision (git SHA).
175            revision_line = lines[1]
176            self.assertRegex(
177                revision_line, rb"^Revision [0-9a-f]{12}-android$")
178
179    def test_tcpip_error_messages(self):
180        """Make sure 'adb tcpip' parsing is sane."""
181        proc = subprocess.Popen(["adb", "tcpip"],
182                                stdout=subprocess.PIPE,
183                                stderr=subprocess.STDOUT)
184        out, _ = proc.communicate()
185        self.assertEqual(1, proc.returncode)
186        self.assertIn(b"requires an argument", out)
187
188        proc = subprocess.Popen(["adb", "tcpip", "foo"],
189                                stdout=subprocess.PIPE,
190                                stderr=subprocess.STDOUT)
191        out, _ = proc.communicate()
192        self.assertEqual(1, proc.returncode)
193        self.assertIn(b"invalid port", out)
194
195
196class ServerTest(unittest.TestCase):
197    """Tests for the ADB server."""
198
199    @staticmethod
200    def _read_pipe_and_set_event(pipe, event):
201        """Reads a pipe until it is closed, then sets the event."""
202        pipe.read()
203        event.set()
204
205    def test_handle_inheritance(self):
206        """Test that launch_server() does not inherit handles.
207
208        launch_server() should not let the adb server inherit
209        stdin/stdout/stderr handles, which can cause callers of adb.exe to hang.
210        This test also runs fine on unix even though the impetus is an issue
211        unique to Windows.
212        """
213        # This test takes 5 seconds to run on Windows: if there is no adb server
214        # running on the the port used below, adb kill-server tries to make a
215        # TCP connection to a closed port and that takes 1 second on Windows;
216        # adb start-server does the same TCP connection which takes another
217        # second, and it waits 3 seconds after starting the server.
218
219        # Start adb client with redirected stdin/stdout/stderr to check if it
220        # passes those redirections to the adb server that it starts. To do
221        # this, run an instance of the adb server on a non-default port so we
222        # don't conflict with a pre-existing adb server that may already be
223        # setup with adb TCP/emulator connections. If there is a pre-existing
224        # adb server, this also tests whether multiple instances of the adb
225        # server conflict on adb.log.
226
227        port = 5038
228        # Kill any existing server on this non-default port.
229        subprocess.check_output(["adb", "-P", str(port), "kill-server"],
230                                stderr=subprocess.STDOUT)
231
232        try:
233            # We get warnings for unclosed files for the subprocess's pipes,
234            # and it's somewhat cumbersome to close them, so just ignore this.
235            warnings.simplefilter("ignore", ResourceWarning)
236
237            # Run the adb client and have it start the adb server.
238            proc = subprocess.Popen(["adb", "-P", str(port), "start-server"],
239                                    stdin=subprocess.PIPE,
240                                    stdout=subprocess.PIPE,
241                                    stderr=subprocess.PIPE)
242
243            # Start threads that set events when stdout/stderr are closed.
244            stdout_event = threading.Event()
245            stdout_thread = threading.Thread(
246                target=ServerTest._read_pipe_and_set_event,
247                args=(proc.stdout, stdout_event))
248            stdout_thread.start()
249
250            stderr_event = threading.Event()
251            stderr_thread = threading.Thread(
252                target=ServerTest._read_pipe_and_set_event,
253                args=(proc.stderr, stderr_event))
254            stderr_thread.start()
255
256            # Wait for the adb client to finish. Once that has occurred, if
257            # stdin/stderr/stdout are still open, it must be open in the adb
258            # server.
259            proc.wait()
260
261            # Try to write to stdin which we expect is closed. If it isn't
262            # closed, we should get an IOError. If we don't get an IOError,
263            # stdin must still be open in the adb server. The adb client is
264            # probably letting the adb server inherit stdin which would be
265            # wrong.
266            with self.assertRaises(IOError):
267                proc.stdin.write(b"x")
268                proc.stdin.flush()
269
270            # Wait a few seconds for stdout/stderr to be closed (in the success
271            # case, this won't wait at all). If there is a timeout, that means
272            # stdout/stderr were not closed and and they must be open in the adb
273            # server, suggesting that the adb client is letting the adb server
274            # inherit stdout/stderr which would be wrong.
275            self.assertTrue(stdout_event.wait(5), "adb stdout not closed")
276            self.assertTrue(stderr_event.wait(5), "adb stderr not closed")
277            stdout_thread.join()
278            stderr_thread.join()
279        finally:
280            # If we started a server, kill it.
281            subprocess.check_output(["adb", "-P", str(port), "kill-server"],
282                                    stderr=subprocess.STDOUT)
283
284
285class EmulatorTest(unittest.TestCase):
286    """Tests for the emulator connection."""
287
288    def _reset_socket_on_close(self, sock):
289        """Use SO_LINGER to cause TCP RST segment to be sent on socket close."""
290        # The linger structure is two shorts on Windows, but two ints on Unix.
291        linger_format = "hh" if os.name == "nt" else "ii"
292        l_onoff = 1
293        l_linger = 0
294
295        sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
296                        struct.pack(linger_format, l_onoff, l_linger))
297        # Verify that we set the linger structure properly by retrieving it.
298        linger = sock.getsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 16)
299        self.assertEqual((l_onoff, l_linger),
300                         struct.unpack_from(linger_format, linger))
301
302    def test_emu_kill(self):
303        """Ensure that adb emu kill works.
304
305        Bug: https://code.google.com/p/android/issues/detail?id=21021
306        """
307        with contextlib.closing(
308            socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
309            # Use SO_REUSEADDR so subsequent runs of the test can grab the port
310            # even if it is in TIME_WAIT.
311            listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
312            listener.bind(("127.0.0.1", 0))
313            listener.listen(4)
314            port = listener.getsockname()[1]
315
316            # Now that listening has started, start adb emu kill, telling it to
317            # connect to our mock emulator.
318            proc = subprocess.Popen(
319                ["adb", "-s", "emulator-" + str(port), "emu", "kill"],
320                stderr=subprocess.STDOUT)
321
322            accepted_connection, addr = listener.accept()
323            with contextlib.closing(accepted_connection) as conn:
324                # If WSAECONNABORTED (10053) is raised by any socket calls,
325                # then adb probably isn't reading the data that we sent it.
326                conn.sendall(("Android Console: type 'help' for a list "
327                             "of commands\r\n").encode("utf8"))
328                conn.sendall(b"OK\r\n")
329
330                with contextlib.closing(conn.makefile()) as connf:
331                    line = connf.readline()
332                    if line.startswith("auth"):
333                        # Ignore the first auth line.
334                        line = connf.readline()
335                    self.assertEqual("kill\n", line)
336                    self.assertEqual("quit\n", connf.readline())
337
338                conn.sendall(b"OK: killing emulator, bye bye\r\n")
339
340                # Use SO_LINGER to send TCP RST segment to test whether adb
341                # ignores WSAECONNRESET on Windows. This happens with the
342                # real emulator because it just calls exit() without closing
343                # the socket or calling shutdown(SD_SEND). At process
344                # termination, Windows sends a TCP RST segment for every
345                # open socket that shutdown(SD_SEND) wasn't used on.
346                self._reset_socket_on_close(conn)
347
348            # Wait for adb to finish, so we can check return code.
349            proc.communicate()
350
351            # If this fails, adb probably isn't ignoring WSAECONNRESET when
352            # reading the response from the adb emu kill command (on Windows).
353            self.assertEqual(0, proc.returncode)
354
355    def test_emulator_connect(self):
356        """Ensure that the emulator can connect.
357
358        Bug: http://b/78991667
359        """
360        with adb_server() as server_port:
361            with fake_adbd() as (port, _):
362                serial = "emulator-{}".format(port - 1)
363                # Ensure that the emulator is not there.
364                try:
365                    subprocess.check_output(["adb", "-P", str(server_port),
366                                             "-s", serial, "get-state"],
367                                            stderr=subprocess.STDOUT)
368                    self.fail("Device should not be available")
369                except subprocess.CalledProcessError as err:
370                    self.assertEqual(
371                        err.output.strip(),
372                        "error: device '{}' not found".format(serial).encode("utf8"))
373
374                # Let the ADB server know that the emulator has started.
375                with contextlib.closing(
376                        socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
377                    sock.connect(("localhost", server_port))
378                    command = "host:emulator:{}".format(port).encode("utf8")
379                    sock.sendall(b"%04x%s" % (len(command), command))
380
381                # Ensure the emulator is there.
382                subprocess.check_call(["adb", "-P", str(server_port),
383                                       "-s", serial, "wait-for-device"])
384                output = subprocess.check_output(["adb", "-P", str(server_port),
385                                                  "-s", serial, "get-state"])
386                self.assertEqual(output.strip(), b"device")
387
388
389class ConnectionTest(unittest.TestCase):
390    """Tests for adb connect."""
391
392    def test_connect_ipv4_ipv6(self):
393        """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6.
394
395        Bug: http://b/30313466
396        """
397        for protocol in (socket.AF_INET, socket.AF_INET6):
398            try:
399                with fake_adbd(protocol=protocol) as (port, _):
400                    serial = "localhost:{}".format(port)
401                    with adb_connect(self, serial):
402                        pass
403            except socket.error:
404                print("IPv6 not available, skipping")
405                continue
406
407    def test_already_connected(self):
408        """Ensure that an already-connected device stays connected."""
409
410        with fake_adbd() as (port, _):
411            serial = "localhost:{}".format(port)
412            with adb_connect(self, serial):
413                # b/31250450: this always returns 0 but probably shouldn't.
414                output = subprocess.check_output(["adb", "connect", serial])
415                self.assertEqual(
416                    output.strip(),
417                    "already connected to {}".format(serial).encode("utf8"))
418
419    @unittest.skip("Currently failing b/123247844")
420    def test_reconnect(self):
421        """Ensure that a disconnected device reconnects."""
422
423        with fake_adbd() as (port, _):
424            serial = "localhost:{}".format(port)
425            with adb_connect(self, serial):
426                # Wait a bit to give adb some time to connect.
427                time.sleep(0.25)
428
429                output = subprocess.check_output(["adb", "-s", serial,
430                                                  "get-state"])
431                self.assertEqual(output.strip(), b"device")
432
433                # This will fail.
434                proc = subprocess.Popen(["adb", "-s", serial, "shell", "true"],
435                                        stdout=subprocess.PIPE,
436                                        stderr=subprocess.STDOUT)
437                output, _ = proc.communicate()
438                self.assertEqual(output.strip(), b"error: closed")
439
440                subprocess.check_call(["adb", "-s", serial, "wait-for-device"])
441
442                output = subprocess.check_output(["adb", "-s", serial,
443                                                  "get-state"])
444                self.assertEqual(output.strip(), b"device")
445
446                # Once we explicitly kick a device, it won't attempt to
447                # reconnect.
448                output = subprocess.check_output(["adb", "disconnect", serial])
449                self.assertEqual(
450                    output.strip(),
451                    "disconnected {}".format(serial).encode("utf8"))
452                try:
453                    subprocess.check_output(["adb", "-s", serial, "get-state"],
454                                            stderr=subprocess.STDOUT)
455                    self.fail("Device should not be available")
456                except subprocess.CalledProcessError as err:
457                    self.assertEqual(
458                        err.output.strip(),
459                        "error: device '{}' not found".format(serial).encode("utf8"))
460
461
462class DisconnectionTest(unittest.TestCase):
463    """Tests for adb disconnect."""
464
465    def test_disconnect(self):
466        """Ensure that `adb disconnect` takes effect immediately."""
467
468        def _devices(port):
469            output = subprocess.check_output(["adb", "-P", str(port), "devices"])
470            return [x.split("\t") for x in output.decode("utf8").strip().splitlines()[1:]]
471
472        with adb_server() as server_port:
473            with fake_adbd() as (port, sock):
474                device_name = "localhost:{}".format(port)
475                output = subprocess.check_output(["adb", "-P", str(server_port),
476                                                  "connect", device_name])
477                self.assertEqual(output.strip(),
478                                  "connected to {}".format(device_name).encode("utf8"))
479
480
481                self.assertEqual(_devices(server_port), [[device_name, "device"]])
482
483                # Send a deliberately malformed packet to make the device go offline.
484                packet = struct.pack("IIIIII", 0, 0, 0, 0, 0, 0)
485                sock.sendall(packet)
486
487                # Wait a bit.
488                time.sleep(0.1)
489
490                self.assertEqual(_devices(server_port), [[device_name, "offline"]])
491
492                # Disconnect the device.
493                output = subprocess.check_output(["adb", "-P", str(server_port),
494                                                  "disconnect", device_name])
495
496                # Wait a bit.
497                time.sleep(0.1)
498
499                self.assertEqual(_devices(server_port), [])
500
501
502@unittest.skipUnless(sys.platform == "win32", "requires Windows")
503class PowerTest(unittest.TestCase):
504    def test_resume_usb_kick(self):
505        """Resuming from sleep/hibernate should kick USB devices."""
506        try:
507            usb_serial = subprocess.check_output(["adb", "-d", "get-serialno"]).strip()
508        except subprocess.CalledProcessError:
509            # If there are multiple USB devices, we don't have a way to check whether the selected
510            # device is USB.
511            raise unittest.SkipTest('requires single USB device')
512
513        try:
514            serial = subprocess.check_output(["adb", "get-serialno"]).strip()
515        except subprocess.CalledProcessError:
516            # Did you forget to select a device with $ANDROID_SERIAL?
517            raise unittest.SkipTest('requires $ANDROID_SERIAL set to a USB device')
518
519        # Test only works with USB devices because adb _power_notification_thread does not kick
520        # non-USB devices on resume event.
521        if serial != usb_serial:
522            raise unittest.SkipTest('requires USB device')
523
524        # Run an adb shell command in the background that takes a while to complete.
525        proc = subprocess.Popen(['adb', 'shell', 'sleep', '5'])
526
527        # Wait for startup of adb server's _power_notification_thread.
528        time.sleep(0.1)
529
530        # Simulate resuming from sleep/hibernation by sending Windows message.
531        import ctypes
532        from ctypes import wintypes
533        HWND_BROADCAST = 0xffff
534        WM_POWERBROADCAST = 0x218
535        PBT_APMRESUMEAUTOMATIC = 0x12
536
537        PostMessageW = ctypes.windll.user32.PostMessageW
538        PostMessageW.restype = wintypes.BOOL
539        PostMessageW.argtypes = (wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM)
540        result = PostMessageW(HWND_BROADCAST, WM_POWERBROADCAST, PBT_APMRESUMEAUTOMATIC, 0)
541        if not result:
542            raise ctypes.WinError()
543
544        # Wait for connection to adb shell to be broken by _power_notification_thread detecting the
545        # Windows message.
546        start = time.time()
547        proc.wait()
548        end = time.time()
549
550        # If the power event was detected, the adb shell command should be broken very quickly.
551        self.assertLess(end - start, 2)
552
553
554def main():
555    """Main entrypoint."""
556    random.seed(0)
557    unittest.main(verbosity=3)
558
559
560if __name__ == "__main__":
561    main()
562