• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2"""Simple conformance test for adb.
3
4This script will use the available adb in path and run simple
5tests that attempt to touch all accessible attached devices.
6"""
7import hashlib
8import os
9import pipes
10import random
11import re
12import shlex
13import subprocess
14import sys
15import tempfile
16import unittest
17
18
19def trace(cmd):
20    """Print debug message if tracing enabled."""
21    if False:
22        print >> sys.stderr, cmd
23
24
25def call(cmd_str):
26    """Run process and return output tuple (stdout, stderr, ret code)."""
27    trace(cmd_str)
28    process = subprocess.Popen(shlex.split(cmd_str),
29                               stdout=subprocess.PIPE,
30                               stderr=subprocess.PIPE)
31    stdout, stderr = process.communicate()
32    return stdout, stderr, process.returncode
33
34
35def call_combined(cmd_str):
36    """Run process and return output tuple (stdout+stderr, ret code)."""
37    trace(cmd_str)
38    process = subprocess.Popen(shlex.split(cmd_str),
39                               stdout=subprocess.PIPE,
40                               stderr=subprocess.STDOUT)
41    stdout, _ = process.communicate()
42    return stdout, process.returncode
43
44
45def call_checked(cmd_str):
46    """Run process and get stdout+stderr, raise an exception on trouble."""
47    trace(cmd_str)
48    return subprocess.check_output(shlex.split(cmd_str),
49                                   stderr=subprocess.STDOUT)
50
51
52def call_checked_list(cmd_str):
53    return call_checked(cmd_str).split('\n')
54
55
56def call_checked_list_skip(cmd_str):
57    out_list = call_checked_list(cmd_str)
58
59    def is_init_line(line):
60        if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"):
61            return True
62        else:
63            return False
64
65    return [line for line in out_list if not is_init_line(line)]
66
67
68def get_device_list():
69    output = call_checked_list_skip("adb devices")
70    dev_list = []
71    for line in output[1:]:
72        if line.strip() == "":
73            continue
74        device, _ = line.split()
75        dev_list.append(device)
76    return dev_list
77
78
79def get_attached_device_count():
80    return len(get_device_list())
81
82
83def compute_md5(string):
84    hsh = hashlib.md5()
85    hsh.update(string)
86    return hsh.hexdigest()
87
88
89class HostFile(object):
90    def __init__(self, handle, md5):
91        self.handle = handle
92        self.md5 = md5
93        self.full_path = handle.name
94        self.base_name = os.path.basename(self.full_path)
95
96
97class DeviceFile(object):
98    def __init__(self, md5, full_path):
99        self.md5 = md5
100        self.full_path = full_path
101        self.base_name = os.path.basename(self.full_path)
102
103
104def make_random_host_files(in_dir, num_files, rand_size=True):
105    files = {}
106    min_size = 1 * (1 << 10)
107    max_size = 16 * (1 << 10)
108    fixed_size = min_size
109
110    for _ in range(num_files):
111        file_handle = tempfile.NamedTemporaryFile(dir=in_dir)
112
113        if rand_size:
114            size = random.randrange(min_size, max_size, 1024)
115        else:
116            size = fixed_size
117        rand_str = os.urandom(size)
118        file_handle.write(rand_str)
119        file_handle.flush()
120
121        md5 = compute_md5(rand_str)
122        files[file_handle.name] = HostFile(file_handle, md5)
123    return files
124
125
126def make_random_device_files(adb, in_dir, num_files, rand_size=True):
127    files = {}
128    min_size = 1 * (1 << 10)
129    max_size = 16 * (1 << 10)
130    fixed_size = min_size
131
132    for i in range(num_files):
133        if rand_size:
134            size = random.randrange(min_size, max_size, 1024)
135        else:
136            size = fixed_size
137
138        base_name = "device_tmpfile" + str(i)
139        full_path = in_dir + "/" + base_name
140
141        adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path,
142                                                                  size))
143        dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split()
144
145        files[full_path] = DeviceFile(dev_md5, full_path)
146    return files
147
148
149class AdbWrapper(object):
150    """Convenience wrapper object for the adb command."""
151    def __init__(self, device=None, out_dir=None):
152        self.device = device
153        self.out_dir = out_dir
154        self.adb_cmd = "adb "
155        if self.device:
156            self.adb_cmd += "-s {} ".format(device)
157        if self.out_dir:
158            self.adb_cmd += "-p {} ".format(out_dir)
159
160    def shell(self, cmd):
161        return call_checked(self.adb_cmd + "shell " + cmd)
162
163    def shell_nocheck(self, cmd):
164        return call_combined(self.adb_cmd + "shell " + cmd)
165
166    def install(self, filename):
167        return call_checked(self.adb_cmd + "install {}".format(pipes.quote(filename)))
168
169    def push(self, local, remote):
170        return call_checked(self.adb_cmd + "push {} {}".format(local, remote))
171
172    def pull(self, remote, local):
173        return call_checked(self.adb_cmd + "pull {} {}".format(remote, local))
174
175    def sync(self, directory=""):
176        return call_checked(self.adb_cmd + "sync {}".format(directory))
177
178    def forward(self, local, remote):
179        return call_checked(self.adb_cmd + "forward {} {}".format(local,
180                                                                  remote))
181
182    def tcpip(self, port):
183        return call_checked(self.adb_cmd + "tcpip {}".format(port))
184
185    def usb(self):
186        return call_checked(self.adb_cmd + "usb")
187
188    def root(self):
189        return call_checked(self.adb_cmd + "root")
190
191    def unroot(self):
192        return call_checked(self.adb_cmd + "unroot")
193
194    def forward_remove(self, local):
195        return call_checked(self.adb_cmd + "forward --remove {}".format(local))
196
197    def forward_remove_all(self):
198        return call_checked(self.adb_cmd + "forward --remove-all")
199
200    def connect(self, host):
201        return call_checked(self.adb_cmd + "connect {}".format(host))
202
203    def disconnect(self, host):
204        return call_checked(self.adb_cmd + "disconnect {}".format(host))
205
206    def reverse(self, remote, local):
207        return call_checked(self.adb_cmd + "reverse {} {}".format(remote,
208                                                                  local))
209
210    def reverse_remove_all(self):
211        return call_checked(self.adb_cmd + "reverse --remove-all")
212
213    def reverse_remove(self, remote):
214        return call_checked(
215            self.adb_cmd + "reverse --remove {}".format(remote))
216
217    def wait(self):
218        return call_checked(self.adb_cmd + "wait-for-device")
219
220
221class AdbBasic(unittest.TestCase):
222    def test_shell(self):
223        """Check that we can at least cat a file."""
224        adb = AdbWrapper()
225        out = adb.shell("cat /proc/uptime")
226        self.assertEqual(len(out.split()), 2)
227        self.assertGreater(float(out.split()[0]), 0.0)
228        self.assertGreater(float(out.split()[1]), 0.0)
229
230    def test_help(self):
231        """Make sure we get _something_ out of help."""
232        out = call_checked("adb help")
233        self.assertTrue(len(out) > 0)
234
235    def test_version(self):
236        """Get a version number out of the output of adb."""
237        out = call_checked("adb version").split()
238        version_num = False
239        for item in out:
240            if re.match(r"[\d+\.]*\d", item):
241                version_num = True
242        self.assertTrue(version_num)
243
244    def _test_root(self):
245        adb = AdbWrapper()
246        adb.root()
247        adb.wait()
248        self.assertEqual("root", adb.shell("id -un").strip())
249
250    def _test_unroot(self):
251        adb = AdbWrapper()
252        adb.unroot()
253        adb.wait()
254        self.assertEqual("shell", adb.shell("id -un").strip())
255
256    def test_root_unroot(self):
257        """Make sure that adb root and adb unroot work, using id(1)."""
258        adb = AdbWrapper()
259        original_user = adb.shell("id -un").strip()
260        try:
261            if original_user == "root":
262                self._test_unroot()
263                self._test_root()
264            elif original_user == "shell":
265                self._test_root()
266                self._test_unroot()
267        finally:
268            if original_user == "root":
269                adb.root()
270            else:
271                adb.unroot()
272            adb.wait()
273
274    def test_argument_escaping(self):
275        """Make sure that argument escaping is somewhat sane."""
276        adb = AdbWrapper()
277
278        # http://b/19734868
279        # Note that this actually matches ssh(1)'s behavior --- it's
280        # converted to "sh -c echo hello; echo world" which sh interprets
281        # as "sh -c echo" (with an argument to that shell of "hello"),
282        # and then "echo world" back in the first shell.
283        result = adb.shell("sh -c 'echo hello; echo world'").splitlines()
284        self.assertEqual(["", "world"], result)
285        # If you really wanted "hello" and "world", here's what you'd do:
286        result = adb.shell("echo hello\;echo world").splitlines()
287        self.assertEqual(["hello", "world"], result)
288
289        # http://b/15479704
290        self.assertEqual('t', adb.shell("'true && echo t'").strip())
291        self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip())
292
293        # http://b/20564385
294        self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip())
295        self.assertEqual('123Linux', adb.shell("echo -n 123\;uname").strip())
296
297    def test_install_argument_escaping(self):
298        """Make sure that install argument escaping works."""
299        adb = AdbWrapper()
300
301        # http://b/20323053
302        tf = tempfile.NamedTemporaryFile("w", suffix="-text;ls;1.apk")
303        self.assertIn("-text;ls;1.apk", adb.install(tf.name))
304
305        # http://b/3090932
306        tf = tempfile.NamedTemporaryFile("w", suffix="-Live Hold'em.apk")
307        self.assertIn("-Live Hold'em.apk", adb.install(tf.name))
308
309
310class AdbFile(unittest.TestCase):
311    SCRATCH_DIR = "/data/local/tmp"
312    DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file"
313    DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir"
314
315    def test_push(self):
316        """Push a randomly generated file to specified device."""
317        kbytes = 512
318        adb = AdbWrapper()
319        with tempfile.NamedTemporaryFile(mode="w") as tmp:
320            rand_str = os.urandom(1024 * kbytes)
321            tmp.write(rand_str)
322            tmp.flush()
323
324            host_md5 = compute_md5(rand_str)
325            adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
326            try:
327                adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE)
328                dev_md5, _ = adb.shell(
329                    "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
330                self.assertEqual(host_md5, dev_md5)
331            finally:
332                adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
333
334    # TODO: write push directory test.
335
336    def test_pull(self):
337        """Pull a randomly generated file from specified device."""
338        kbytes = 512
339        adb = AdbWrapper()
340        adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))
341        try:
342            adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format(
343                AdbFile.DEVICE_TEMP_FILE, kbytes))
344            dev_md5, _ = adb.shell(
345                "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()
346
347            with tempfile.NamedTemporaryFile(mode="w") as tmp_write:
348                adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name)
349                with open(tmp_write.name) as tmp_read:
350                    host_contents = tmp_read.read()
351                    host_md5 = compute_md5(host_contents)
352                self.assertEqual(dev_md5, host_md5)
353        finally:
354            adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))
355
356    def test_pull_dir(self):
357        """Pull a randomly generated directory of files from the device."""
358        adb = AdbWrapper()
359        temp_files = {}
360        host_dir = None
361        try:
362            # create temporary host directory
363            host_dir = tempfile.mkdtemp()
364
365            # create temporary dir on device
366            adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
367            adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR))
368
369            # populate device dir with random files
370            temp_files = make_random_device_files(
371                adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32)
372
373            adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir)
374
375            for device_full_path in temp_files:
376                host_path = os.path.join(
377                    host_dir, temp_files[device_full_path].base_name)
378                with open(host_path) as host_file:
379                    host_md5 = compute_md5(host_file.read())
380                    self.assertEqual(host_md5,
381                                     temp_files[device_full_path].md5)
382        finally:
383            for dev_file in temp_files.values():
384                host_path = os.path.join(host_dir, dev_file.base_name)
385                os.remove(host_path)
386            adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
387            if host_dir:
388                os.removedirs(host_dir)
389
390    def test_sync(self):
391        """Sync a randomly generated directory of files to specified device."""
392        try:
393            adb = AdbWrapper()
394            temp_files = {}
395
396            # create temporary host directory
397            base_dir = tempfile.mkdtemp()
398
399            # create mirror device directory hierarchy within base_dir
400            full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR
401            os.makedirs(full_dir_path)
402
403            # create 32 random files within the host mirror
404            temp_files = make_random_host_files(in_dir=full_dir_path,
405                                                num_files=32)
406
407            # clean up any trash on the device
408            adb = AdbWrapper(out_dir=base_dir)
409            adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
410
411            # issue the sync
412            adb.sync("data")
413
414            # confirm that every file on the device mirrors that on the host
415            for host_full_path in temp_files.keys():
416                device_full_path = os.path.join(
417                    AdbFile.DEVICE_TEMP_DIR,
418                    temp_files[host_full_path].base_name)
419                dev_md5, _ = adb.shell(
420                    "md5sum {}".format(device_full_path)).split()
421                self.assertEqual(temp_files[host_full_path].md5, dev_md5)
422
423        finally:
424            adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))
425            if temp_files:
426                for tf in temp_files.values():
427                    tf.handle.close()
428            if base_dir:
429                os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR)
430
431
432if __name__ == '__main__':
433    random.seed(0)
434    dev_count = get_attached_device_count()
435    if dev_count:
436        suite = unittest.TestLoader().loadTestsFromName(__name__)
437        unittest.TextTestRunner(verbosity=3).run(suite)
438    else:
439        print "Test suite must be run with attached devices"
440