1#!/usr/bin/env python2 2"""Simple conformance test for adb. 3 4This script will use the available adb in path and run simple 5tests that attempt to touch all accessible attached devices. 6""" 7import hashlib 8import os 9import pipes 10import random 11import re 12import shlex 13import subprocess 14import sys 15import tempfile 16import unittest 17 18 19def trace(cmd): 20 """Print debug message if tracing enabled.""" 21 if False: 22 print >> sys.stderr, cmd 23 24 25def call(cmd_str): 26 """Run process and return output tuple (stdout, stderr, ret code).""" 27 trace(cmd_str) 28 process = subprocess.Popen(shlex.split(cmd_str), 29 stdout=subprocess.PIPE, 30 stderr=subprocess.PIPE) 31 stdout, stderr = process.communicate() 32 return stdout, stderr, process.returncode 33 34 35def call_combined(cmd_str): 36 """Run process and return output tuple (stdout+stderr, ret code).""" 37 trace(cmd_str) 38 process = subprocess.Popen(shlex.split(cmd_str), 39 stdout=subprocess.PIPE, 40 stderr=subprocess.STDOUT) 41 stdout, _ = process.communicate() 42 return stdout, process.returncode 43 44 45def call_checked(cmd_str): 46 """Run process and get stdout+stderr, raise an exception on trouble.""" 47 trace(cmd_str) 48 return subprocess.check_output(shlex.split(cmd_str), 49 stderr=subprocess.STDOUT) 50 51 52def call_checked_list(cmd_str): 53 return call_checked(cmd_str).split('\n') 54 55 56def call_checked_list_skip(cmd_str): 57 out_list = call_checked_list(cmd_str) 58 59 def is_init_line(line): 60 if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"): 61 return True 62 else: 63 return False 64 65 return [line for line in out_list if not is_init_line(line)] 66 67 68def get_device_list(): 69 output = call_checked_list_skip("adb devices") 70 dev_list = [] 71 for line in output[1:]: 72 if line.strip() == "": 73 continue 74 device, _ = line.split() 75 dev_list.append(device) 76 return dev_list 77 78 79def get_attached_device_count(): 80 return len(get_device_list()) 81 82 83def compute_md5(string): 84 hsh = hashlib.md5() 85 hsh.update(string) 86 return hsh.hexdigest() 87 88 89class HostFile(object): 90 def __init__(self, handle, md5): 91 self.handle = handle 92 self.md5 = md5 93 self.full_path = handle.name 94 self.base_name = os.path.basename(self.full_path) 95 96 97class DeviceFile(object): 98 def __init__(self, md5, full_path): 99 self.md5 = md5 100 self.full_path = full_path 101 self.base_name = os.path.basename(self.full_path) 102 103 104def make_random_host_files(in_dir, num_files, rand_size=True): 105 files = {} 106 min_size = 1 * (1 << 10) 107 max_size = 16 * (1 << 10) 108 fixed_size = min_size 109 110 for _ in range(num_files): 111 file_handle = tempfile.NamedTemporaryFile(dir=in_dir) 112 113 if rand_size: 114 size = random.randrange(min_size, max_size, 1024) 115 else: 116 size = fixed_size 117 rand_str = os.urandom(size) 118 file_handle.write(rand_str) 119 file_handle.flush() 120 121 md5 = compute_md5(rand_str) 122 files[file_handle.name] = HostFile(file_handle, md5) 123 return files 124 125 126def make_random_device_files(adb, in_dir, num_files, rand_size=True): 127 files = {} 128 min_size = 1 * (1 << 10) 129 max_size = 16 * (1 << 10) 130 fixed_size = min_size 131 132 for i in range(num_files): 133 if rand_size: 134 size = random.randrange(min_size, max_size, 1024) 135 else: 136 size = fixed_size 137 138 base_name = "device_tmpfile" + str(i) 139 full_path = in_dir + "/" + base_name 140 141 adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path, 142 size)) 143 dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split() 144 145 files[full_path] = DeviceFile(dev_md5, full_path) 146 return files 147 148 149class AdbWrapper(object): 150 """Convenience wrapper object for the adb command.""" 151 def __init__(self, device=None, out_dir=None): 152 self.device = device 153 self.out_dir = out_dir 154 self.adb_cmd = "adb " 155 if self.device: 156 self.adb_cmd += "-s {} ".format(device) 157 if self.out_dir: 158 self.adb_cmd += "-p {} ".format(out_dir) 159 160 def shell(self, cmd): 161 return call_checked(self.adb_cmd + "shell " + cmd) 162 163 def shell_nocheck(self, cmd): 164 return call_combined(self.adb_cmd + "shell " + cmd) 165 166 def install(self, filename): 167 return call_checked(self.adb_cmd + "install {}".format(pipes.quote(filename))) 168 169 def push(self, local, remote): 170 return call_checked(self.adb_cmd + "push {} {}".format(local, remote)) 171 172 def pull(self, remote, local): 173 return call_checked(self.adb_cmd + "pull {} {}".format(remote, local)) 174 175 def sync(self, directory=""): 176 return call_checked(self.adb_cmd + "sync {}".format(directory)) 177 178 def forward(self, local, remote): 179 return call_checked(self.adb_cmd + "forward {} {}".format(local, 180 remote)) 181 182 def tcpip(self, port): 183 return call_checked(self.adb_cmd + "tcpip {}".format(port)) 184 185 def usb(self): 186 return call_checked(self.adb_cmd + "usb") 187 188 def root(self): 189 return call_checked(self.adb_cmd + "root") 190 191 def unroot(self): 192 return call_checked(self.adb_cmd + "unroot") 193 194 def forward_remove(self, local): 195 return call_checked(self.adb_cmd + "forward --remove {}".format(local)) 196 197 def forward_remove_all(self): 198 return call_checked(self.adb_cmd + "forward --remove-all") 199 200 def connect(self, host): 201 return call_checked(self.adb_cmd + "connect {}".format(host)) 202 203 def disconnect(self, host): 204 return call_checked(self.adb_cmd + "disconnect {}".format(host)) 205 206 def reverse(self, remote, local): 207 return call_checked(self.adb_cmd + "reverse {} {}".format(remote, 208 local)) 209 210 def reverse_remove_all(self): 211 return call_checked(self.adb_cmd + "reverse --remove-all") 212 213 def reverse_remove(self, remote): 214 return call_checked( 215 self.adb_cmd + "reverse --remove {}".format(remote)) 216 217 def wait(self): 218 return call_checked(self.adb_cmd + "wait-for-device") 219 220 221class AdbBasic(unittest.TestCase): 222 def test_shell(self): 223 """Check that we can at least cat a file.""" 224 adb = AdbWrapper() 225 out = adb.shell("cat /proc/uptime") 226 self.assertEqual(len(out.split()), 2) 227 self.assertGreater(float(out.split()[0]), 0.0) 228 self.assertGreater(float(out.split()[1]), 0.0) 229 230 def test_help(self): 231 """Make sure we get _something_ out of help.""" 232 out = call_checked("adb help") 233 self.assertTrue(len(out) > 0) 234 235 def test_version(self): 236 """Get a version number out of the output of adb.""" 237 out = call_checked("adb version").split() 238 version_num = False 239 for item in out: 240 if re.match(r"[\d+\.]*\d", item): 241 version_num = True 242 self.assertTrue(version_num) 243 244 def _test_root(self): 245 adb = AdbWrapper() 246 adb.root() 247 adb.wait() 248 self.assertEqual("root", adb.shell("id -un").strip()) 249 250 def _test_unroot(self): 251 adb = AdbWrapper() 252 adb.unroot() 253 adb.wait() 254 self.assertEqual("shell", adb.shell("id -un").strip()) 255 256 def test_root_unroot(self): 257 """Make sure that adb root and adb unroot work, using id(1).""" 258 adb = AdbWrapper() 259 original_user = adb.shell("id -un").strip() 260 try: 261 if original_user == "root": 262 self._test_unroot() 263 self._test_root() 264 elif original_user == "shell": 265 self._test_root() 266 self._test_unroot() 267 finally: 268 if original_user == "root": 269 adb.root() 270 else: 271 adb.unroot() 272 adb.wait() 273 274 def test_argument_escaping(self): 275 """Make sure that argument escaping is somewhat sane.""" 276 adb = AdbWrapper() 277 278 # http://b/19734868 279 # Note that this actually matches ssh(1)'s behavior --- it's 280 # converted to "sh -c echo hello; echo world" which sh interprets 281 # as "sh -c echo" (with an argument to that shell of "hello"), 282 # and then "echo world" back in the first shell. 283 result = adb.shell("sh -c 'echo hello; echo world'").splitlines() 284 self.assertEqual(["", "world"], result) 285 # If you really wanted "hello" and "world", here's what you'd do: 286 result = adb.shell("echo hello\;echo world").splitlines() 287 self.assertEqual(["hello", "world"], result) 288 289 # http://b/15479704 290 self.assertEqual('t', adb.shell("'true && echo t'").strip()) 291 self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip()) 292 293 # http://b/20564385 294 self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip()) 295 self.assertEqual('123Linux', adb.shell("echo -n 123\;uname").strip()) 296 297 def test_install_argument_escaping(self): 298 """Make sure that install argument escaping works.""" 299 adb = AdbWrapper() 300 301 # http://b/20323053 302 tf = tempfile.NamedTemporaryFile("w", suffix="-text;ls;1.apk") 303 self.assertIn("-text;ls;1.apk", adb.install(tf.name)) 304 305 # http://b/3090932 306 tf = tempfile.NamedTemporaryFile("w", suffix="-Live Hold'em.apk") 307 self.assertIn("-Live Hold'em.apk", adb.install(tf.name)) 308 309 310class AdbFile(unittest.TestCase): 311 SCRATCH_DIR = "/data/local/tmp" 312 DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file" 313 DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir" 314 315 def test_push(self): 316 """Push a randomly generated file to specified device.""" 317 kbytes = 512 318 adb = AdbWrapper() 319 with tempfile.NamedTemporaryFile(mode="w") as tmp: 320 rand_str = os.urandom(1024 * kbytes) 321 tmp.write(rand_str) 322 tmp.flush() 323 324 host_md5 = compute_md5(rand_str) 325 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) 326 try: 327 adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE) 328 dev_md5, _ = adb.shell( 329 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() 330 self.assertEqual(host_md5, dev_md5) 331 finally: 332 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) 333 334 # TODO: write push directory test. 335 336 def test_pull(self): 337 """Pull a randomly generated file from specified device.""" 338 kbytes = 512 339 adb = AdbWrapper() 340 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) 341 try: 342 adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format( 343 AdbFile.DEVICE_TEMP_FILE, kbytes)) 344 dev_md5, _ = adb.shell( 345 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() 346 347 with tempfile.NamedTemporaryFile(mode="w") as tmp_write: 348 adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name) 349 with open(tmp_write.name) as tmp_read: 350 host_contents = tmp_read.read() 351 host_md5 = compute_md5(host_contents) 352 self.assertEqual(dev_md5, host_md5) 353 finally: 354 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) 355 356 def test_pull_dir(self): 357 """Pull a randomly generated directory of files from the device.""" 358 adb = AdbWrapper() 359 temp_files = {} 360 host_dir = None 361 try: 362 # create temporary host directory 363 host_dir = tempfile.mkdtemp() 364 365 # create temporary dir on device 366 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 367 adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR)) 368 369 # populate device dir with random files 370 temp_files = make_random_device_files( 371 adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32) 372 373 adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir) 374 375 for device_full_path in temp_files: 376 host_path = os.path.join( 377 host_dir, temp_files[device_full_path].base_name) 378 with open(host_path) as host_file: 379 host_md5 = compute_md5(host_file.read()) 380 self.assertEqual(host_md5, 381 temp_files[device_full_path].md5) 382 finally: 383 for dev_file in temp_files.values(): 384 host_path = os.path.join(host_dir, dev_file.base_name) 385 os.remove(host_path) 386 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 387 if host_dir: 388 os.removedirs(host_dir) 389 390 def test_sync(self): 391 """Sync a randomly generated directory of files to specified device.""" 392 try: 393 adb = AdbWrapper() 394 temp_files = {} 395 396 # create temporary host directory 397 base_dir = tempfile.mkdtemp() 398 399 # create mirror device directory hierarchy within base_dir 400 full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR 401 os.makedirs(full_dir_path) 402 403 # create 32 random files within the host mirror 404 temp_files = make_random_host_files(in_dir=full_dir_path, 405 num_files=32) 406 407 # clean up any trash on the device 408 adb = AdbWrapper(out_dir=base_dir) 409 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 410 411 # issue the sync 412 adb.sync("data") 413 414 # confirm that every file on the device mirrors that on the host 415 for host_full_path in temp_files.keys(): 416 device_full_path = os.path.join( 417 AdbFile.DEVICE_TEMP_DIR, 418 temp_files[host_full_path].base_name) 419 dev_md5, _ = adb.shell( 420 "md5sum {}".format(device_full_path)).split() 421 self.assertEqual(temp_files[host_full_path].md5, dev_md5) 422 423 finally: 424 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) 425 if temp_files: 426 for tf in temp_files.values(): 427 tf.handle.close() 428 if base_dir: 429 os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR) 430 431 432if __name__ == '__main__': 433 random.seed(0) 434 dev_count = get_attached_device_count() 435 if dev_count: 436 suite = unittest.TestLoader().loadTestsFromName(__name__) 437 unittest.TextTestRunner(verbosity=3).run(suite) 438 else: 439 print "Test suite must be run with attached devices" 440