1#!/usr/bin/python 2 3import StringIO 4import errno 5import logging 6import os 7import select 8import shutil 9import socket 10import subprocess 11import time 12import unittest 13import urllib2 14 15import common 16from autotest_lib.client.common_lib import autotemp 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.common_lib import utils 19from autotest_lib.client.common_lib.test_utils import mock 20 21metrics = utils.metrics_mock 22 23 24class test_read_one_line(unittest.TestCase): 25 def setUp(self): 26 self.god = mock.mock_god(ut=self) 27 self.god.stub_function(utils, "open") 28 29 30 def tearDown(self): 31 self.god.unstub_all() 32 33 34 def test_ip_to_long(self): 35 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) 36 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) 37 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) 38 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) 39 40 41 def test_long_to_ip(self): 42 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') 43 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') 44 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') 45 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') 46 47 48 def test_create_subnet_mask(self): 49 self.assertEqual(utils.create_subnet_mask(0), 0) 50 self.assertEqual(utils.create_subnet_mask(32), 4294967295) 51 self.assertEqual(utils.create_subnet_mask(25), 4294967168) 52 53 54 def test_format_ip_with_mask(self): 55 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), 56 '0.0.0.0/0') 57 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), 58 '192.168.0.1/32') 59 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), 60 '192.168.0.0/26') 61 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), 62 '192.168.0.192/26') 63 64 65 def create_test_file(self, contents): 66 test_file = StringIO.StringIO(contents) 67 utils.open.expect_call("filename", "r").and_return(test_file) 68 69 70 def test_reads_one_line_file(self): 71 self.create_test_file("abc\n") 72 self.assertEqual("abc", utils.read_one_line("filename")) 73 self.god.check_playback() 74 75 76 def test_strips_read_lines(self): 77 self.create_test_file("abc \n") 78 self.assertEqual("abc ", utils.read_one_line("filename")) 79 self.god.check_playback() 80 81 82 def test_drops_extra_lines(self): 83 self.create_test_file("line 1\nline 2\nline 3\n") 84 self.assertEqual("line 1", utils.read_one_line("filename")) 85 self.god.check_playback() 86 87 88 def test_works_on_empty_file(self): 89 self.create_test_file("") 90 self.assertEqual("", utils.read_one_line("filename")) 91 self.god.check_playback() 92 93 94 def test_works_on_file_with_no_newlines(self): 95 self.create_test_file("line but no newline") 96 self.assertEqual("line but no newline", 97 utils.read_one_line("filename")) 98 self.god.check_playback() 99 100 101 def test_preserves_leading_whitespace(self): 102 self.create_test_file(" has leading whitespace") 103 self.assertEqual(" has leading whitespace", 104 utils.read_one_line("filename")) 105 106 107class test_write_one_line(unittest.TestCase): 108 def setUp(self): 109 self.god = mock.mock_god(ut=self) 110 self.god.stub_function(utils, "open") 111 112 113 def tearDown(self): 114 self.god.unstub_all() 115 116 117 def get_write_one_line_output(self, content): 118 test_file = mock.SaveDataAfterCloseStringIO() 119 utils.open.expect_call("filename", "w").and_return(test_file) 120 utils.write_one_line("filename", content) 121 self.god.check_playback() 122 return test_file.final_data 123 124 125 def test_writes_one_line_file(self): 126 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 127 128 129 def test_preserves_existing_newline(self): 130 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 131 132 133 def test_preserves_leading_whitespace(self): 134 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 135 136 137 def test_preserves_trailing_whitespace(self): 138 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 139 140 141 def test_handles_empty_input(self): 142 self.assertEqual("\n", self.get_write_one_line_output("")) 143 144 145class test_open_write_close(unittest.TestCase): 146 def setUp(self): 147 self.god = mock.mock_god(ut=self) 148 self.god.stub_function(utils, "open") 149 150 151 def tearDown(self): 152 self.god.unstub_all() 153 154 155 def test_simple_functionality(self): 156 data = "\n\nwhee\n" 157 test_file = mock.SaveDataAfterCloseStringIO() 158 utils.open.expect_call("filename", "w").and_return(test_file) 159 utils.open_write_close("filename", data) 160 self.god.check_playback() 161 self.assertEqual(data, test_file.final_data) 162 163 164class test_read_keyval(unittest.TestCase): 165 def setUp(self): 166 self.god = mock.mock_god(ut=self) 167 self.god.stub_function(utils, "open") 168 self.god.stub_function(os.path, "isdir") 169 self.god.stub_function(os.path, "exists") 170 171 172 def tearDown(self): 173 self.god.unstub_all() 174 175 176 def create_test_file(self, filename, contents): 177 test_file = StringIO.StringIO(contents) 178 os.path.exists.expect_call(filename).and_return(True) 179 utils.open.expect_call(filename).and_return(test_file) 180 181 182 def read_keyval(self, contents): 183 os.path.isdir.expect_call("file").and_return(False) 184 self.create_test_file("file", contents) 185 keyval = utils.read_keyval("file") 186 self.god.check_playback() 187 return keyval 188 189 190 def test_returns_empty_when_file_doesnt_exist(self): 191 os.path.isdir.expect_call("file").and_return(False) 192 os.path.exists.expect_call("file").and_return(False) 193 self.assertEqual({}, utils.read_keyval("file")) 194 self.god.check_playback() 195 196 197 def test_accesses_files_directly(self): 198 os.path.isdir.expect_call("file").and_return(False) 199 self.create_test_file("file", "") 200 utils.read_keyval("file") 201 self.god.check_playback() 202 203 204 def test_accesses_directories_through_keyval_file(self): 205 os.path.isdir.expect_call("dir").and_return(True) 206 self.create_test_file("dir/keyval", "") 207 utils.read_keyval("dir") 208 self.god.check_playback() 209 210 211 def test_values_are_rstripped(self): 212 keyval = self.read_keyval("a=b \n") 213 self.assertEquals(keyval, {"a": "b"}) 214 215 216 def test_comments_are_ignored(self): 217 keyval = self.read_keyval("a=b # a comment\n") 218 self.assertEquals(keyval, {"a": "b"}) 219 220 221 def test_integers_become_ints(self): 222 keyval = self.read_keyval("a=1\n") 223 self.assertEquals(keyval, {"a": 1}) 224 self.assertEquals(int, type(keyval["a"])) 225 226 227 def test_float_values_become_floats(self): 228 keyval = self.read_keyval("a=1.5\n") 229 self.assertEquals(keyval, {"a": 1.5}) 230 self.assertEquals(float, type(keyval["a"])) 231 232 233 def test_multiple_lines(self): 234 keyval = self.read_keyval("a=one\nb=two\n") 235 self.assertEquals(keyval, {"a": "one", "b": "two"}) 236 237 238 def test_the_last_duplicate_line_is_used(self): 239 keyval = self.read_keyval("a=one\nb=two\na=three\n") 240 self.assertEquals(keyval, {"a": "three", "b": "two"}) 241 242 243 def test_extra_equals_are_included_in_values(self): 244 keyval = self.read_keyval("a=b=c\n") 245 self.assertEquals(keyval, {"a": "b=c"}) 246 247 248 def test_non_alphanumeric_keynames_are_rejected(self): 249 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 250 251 252 def test_underscores_are_allowed_in_key_names(self): 253 keyval = self.read_keyval("a_b=value\n") 254 self.assertEquals(keyval, {"a_b": "value"}) 255 256 257 def test_dashes_are_allowed_in_key_names(self): 258 keyval = self.read_keyval("a-b=value\n") 259 self.assertEquals(keyval, {"a-b": "value"}) 260 261 def test_empty_value_is_allowed(self): 262 keyval = self.read_keyval("a=\n") 263 self.assertEquals(keyval, {"a": ""}) 264 265 266class test_write_keyval(unittest.TestCase): 267 def setUp(self): 268 self.god = mock.mock_god(ut=self) 269 self.god.stub_function(utils, "open") 270 self.god.stub_function(os.path, "isdir") 271 272 273 def tearDown(self): 274 self.god.unstub_all() 275 276 277 def assertHasLines(self, value, lines): 278 vlines = value.splitlines() 279 vlines.sort() 280 self.assertEquals(vlines, sorted(lines)) 281 282 283 def write_keyval(self, filename, dictionary, expected_filename=None, 284 type_tag=None): 285 if expected_filename is None: 286 expected_filename = filename 287 test_file = StringIO.StringIO() 288 self.god.stub_function(test_file, "close") 289 utils.open.expect_call(expected_filename, "a").and_return(test_file) 290 test_file.close.expect_call() 291 if type_tag is None: 292 utils.write_keyval(filename, dictionary) 293 else: 294 utils.write_keyval(filename, dictionary, type_tag) 295 return test_file.getvalue() 296 297 298 def write_keyval_file(self, dictionary, type_tag=None): 299 os.path.isdir.expect_call("file").and_return(False) 300 return self.write_keyval("file", dictionary, type_tag=type_tag) 301 302 303 def test_accesses_files_directly(self): 304 os.path.isdir.expect_call("file").and_return(False) 305 result = self.write_keyval("file", {"a": "1"}) 306 self.assertEquals(result, "a=1\n") 307 308 309 def test_accesses_directories_through_keyval_file(self): 310 os.path.isdir.expect_call("dir").and_return(True) 311 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 312 self.assertEquals(result, "b=2\n") 313 314 315 def test_numbers_are_stringified(self): 316 result = self.write_keyval_file({"c": 3}) 317 self.assertEquals(result, "c=3\n") 318 319 320 def test_type_tags_are_excluded_by_default(self): 321 result = self.write_keyval_file({"d": "a string"}) 322 self.assertEquals(result, "d=a string\n") 323 self.assertRaises(ValueError, self.write_keyval_file, 324 {"d{perf}": "a string"}) 325 326 327 def test_perf_tags_are_allowed(self): 328 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 329 type_tag="perf") 330 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 331 self.assertRaises(ValueError, self.write_keyval_file, 332 {"a": 1, "b": 2}, type_tag="perf") 333 334 335 def test_non_alphanumeric_keynames_are_rejected(self): 336 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 337 338 339 def test_underscores_are_allowed_in_key_names(self): 340 result = self.write_keyval_file({"a_b": "value"}) 341 self.assertEquals(result, "a_b=value\n") 342 343 344 def test_dashes_are_allowed_in_key_names(self): 345 result = self.write_keyval_file({"a-b": "value"}) 346 self.assertEquals(result, "a-b=value\n") 347 348 349class test_is_url(unittest.TestCase): 350 def test_accepts_http(self): 351 self.assertTrue(utils.is_url("http://example.com")) 352 353 354 def test_accepts_ftp(self): 355 self.assertTrue(utils.is_url("ftp://ftp.example.com")) 356 357 358 def test_rejects_local_path(self): 359 self.assertFalse(utils.is_url("/home/username/file")) 360 361 362 def test_rejects_local_filename(self): 363 self.assertFalse(utils.is_url("filename")) 364 365 366 def test_rejects_relative_local_path(self): 367 self.assertFalse(utils.is_url("somedir/somesubdir/file")) 368 369 370 def test_rejects_local_path_containing_url(self): 371 self.assertFalse(utils.is_url("somedir/http://path/file")) 372 373 374class test_urlopen(unittest.TestCase): 375 def setUp(self): 376 self.god = mock.mock_god(ut=self) 377 378 379 def tearDown(self): 380 self.god.unstub_all() 381 382 383 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 384 *expected_args): 385 expected_args += (None,) * (2 - len(expected_args)) 386 def urlopen(url, data=None): 387 self.assertEquals(expected_args, (url,data)) 388 test_func(socket.getdefaulttimeout()) 389 return expected_return 390 self.god.stub_with(urllib2, "urlopen", urlopen) 391 392 393 def stub_urlopen_with_timeout_check(self, expected_timeout, 394 expected_return, *expected_args): 395 def test_func(timeout): 396 self.assertEquals(timeout, expected_timeout) 397 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 398 *expected_args) 399 400 401 def test_timeout_set_during_call(self): 402 self.stub_urlopen_with_timeout_check(30, "retval", "url") 403 retval = utils.urlopen("url", timeout=30) 404 self.assertEquals(retval, "retval") 405 406 407 def test_timeout_reset_after_call(self): 408 old_timeout = socket.getdefaulttimeout() 409 self.stub_urlopen_with_timeout_check(30, None, "url") 410 try: 411 socket.setdefaulttimeout(1234) 412 utils.urlopen("url", timeout=30) 413 self.assertEquals(1234, socket.getdefaulttimeout()) 414 finally: 415 socket.setdefaulttimeout(old_timeout) 416 417 418 def test_timeout_set_by_default(self): 419 def test_func(timeout): 420 self.assertTrue(timeout is not None) 421 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 422 utils.urlopen("url") 423 424 425 def test_args_are_untouched(self): 426 self.stub_urlopen_with_timeout_check(30, None, "http://url", 427 "POST data") 428 utils.urlopen("http://url", timeout=30, data="POST data") 429 430 431class test_urlretrieve(unittest.TestCase): 432 def setUp(self): 433 self.god = mock.mock_god(ut=self) 434 435 436 def tearDown(self): 437 self.god.unstub_all() 438 439 440 def test_urlopen_passed_arguments(self): 441 self.god.stub_function(utils, "urlopen") 442 self.god.stub_function(utils.shutil, "copyfileobj") 443 self.god.stub_function(utils, "open") 444 445 url = "url" 446 dest = "somefile" 447 data = object() 448 timeout = 10 449 450 src_file = self.god.create_mock_class(file, "file") 451 dest_file = self.god.create_mock_class(file, "file") 452 453 (utils.urlopen.expect_call(url, data=data, timeout=timeout) 454 .and_return(src_file)) 455 utils.open.expect_call(dest, "wb").and_return(dest_file) 456 utils.shutil.copyfileobj.expect_call(src_file, dest_file) 457 dest_file.close.expect_call() 458 src_file.close.expect_call() 459 460 utils.urlretrieve(url, dest, data=data, timeout=timeout) 461 self.god.check_playback() 462 463 464class test_merge_trees(unittest.TestCase): 465 # a some path-handling helper functions 466 def src(self, *path_segments): 467 return os.path.join(self.src_tree.name, *path_segments) 468 469 470 def dest(self, *path_segments): 471 return os.path.join(self.dest_tree.name, *path_segments) 472 473 474 def paths(self, *path_segments): 475 return self.src(*path_segments), self.dest(*path_segments) 476 477 478 def assertFileEqual(self, *path_segments): 479 src, dest = self.paths(*path_segments) 480 self.assertEqual(True, os.path.isfile(src)) 481 self.assertEqual(True, os.path.isfile(dest)) 482 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 483 self.assertEqual(open(src).read(), open(dest).read()) 484 485 486 def assertFileContents(self, contents, *path_segments): 487 dest = self.dest(*path_segments) 488 self.assertEqual(True, os.path.isfile(dest)) 489 self.assertEqual(os.path.getsize(dest), len(contents)) 490 self.assertEqual(contents, open(dest).read()) 491 492 493 def setUp(self): 494 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 495 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 496 497 # empty subdirs 498 os.mkdir(self.src("empty")) 499 os.mkdir(self.dest("empty")) 500 501 502 def tearDown(self): 503 self.src_tree.clean() 504 self.dest_tree.clean() 505 506 507 def test_both_dont_exist(self): 508 utils.merge_trees(*self.paths("empty")) 509 510 511 def test_file_only_at_src(self): 512 print >> open(self.src("src_only"), "w"), "line 1" 513 utils.merge_trees(*self.paths("src_only")) 514 self.assertFileEqual("src_only") 515 516 517 def test_file_only_at_dest(self): 518 print >> open(self.dest("dest_only"), "w"), "line 1" 519 utils.merge_trees(*self.paths("dest_only")) 520 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 521 self.assertFileContents("line 1\n", "dest_only") 522 523 524 def test_file_at_both(self): 525 print >> open(self.dest("in_both"), "w"), "line 1" 526 print >> open(self.src("in_both"), "w"), "line 2" 527 utils.merge_trees(*self.paths("in_both")) 528 self.assertFileContents("line 1\nline 2\n", "in_both") 529 530 531 def test_directory_with_files_in_both(self): 532 print >> open(self.dest("in_both"), "w"), "line 1" 533 print >> open(self.src("in_both"), "w"), "line 3" 534 utils.merge_trees(*self.paths()) 535 self.assertFileContents("line 1\nline 3\n", "in_both") 536 537 538 def test_directory_with_mix_of_files(self): 539 print >> open(self.dest("in_dest"), "w"), "dest line" 540 print >> open(self.src("in_src"), "w"), "src line" 541 utils.merge_trees(*self.paths()) 542 self.assertFileContents("dest line\n", "in_dest") 543 self.assertFileContents("src line\n", "in_src") 544 545 546 def test_directory_with_subdirectories(self): 547 os.mkdir(self.src("src_subdir")) 548 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" 549 os.mkdir(self.src("both_subdir")) 550 os.mkdir(self.dest("both_subdir")) 551 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" 552 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" 553 utils.merge_trees(*self.paths()) 554 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 555 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 556 "subfile") 557 558 559class test_get_relative_path(unittest.TestCase): 560 def test_not_absolute(self): 561 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") 562 563 def test_same_dir(self): 564 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") 565 566 def test_forward_dir(self): 567 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 568 569 def test_previous_dir(self): 570 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 571 572 def test_parallel_dir(self): 573 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), 574 "../../../c/d") 575 576 577class test_sh_escape(unittest.TestCase): 578 def _test_in_shell(self, text): 579 escaped_text = utils.sh_escape(text) 580 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 581 stdin=open(os.devnull, 'r'), 582 stdout=subprocess.PIPE, 583 stderr=open(os.devnull, 'w')) 584 stdout, _ = proc.communicate() 585 self.assertEqual(proc.returncode, 0) 586 self.assertEqual(stdout[:-1], text) 587 588 589 def test_normal_string(self): 590 self._test_in_shell('abcd') 591 592 593 def test_spaced_string(self): 594 self._test_in_shell('abcd efgh') 595 596 597 def test_dollar(self): 598 self._test_in_shell('$') 599 600 601 def test_single_quote(self): 602 self._test_in_shell('\'') 603 604 605 def test_single_quoted_string(self): 606 self._test_in_shell('\'efgh\'') 607 608 609 def test_string_with_single_quote(self): 610 self._test_in_shell("a'b") 611 612 613 def test_string_with_escaped_single_quote(self): 614 self._test_in_shell(r"a\'b") 615 616 617 def test_double_quote(self): 618 self._test_in_shell('"') 619 620 621 def test_double_quoted_string(self): 622 self._test_in_shell('"abcd"') 623 624 625 def test_backtick(self): 626 self._test_in_shell('`') 627 628 629 def test_backticked_string(self): 630 self._test_in_shell('`jklm`') 631 632 633 def test_backslash(self): 634 self._test_in_shell('\\') 635 636 637 def test_backslashed_special_characters(self): 638 self._test_in_shell('\\$') 639 self._test_in_shell('\\"') 640 self._test_in_shell('\\\'') 641 self._test_in_shell('\\`') 642 643 644 def test_backslash_codes(self): 645 self._test_in_shell('\\n') 646 self._test_in_shell('\\r') 647 self._test_in_shell('\\t') 648 self._test_in_shell('\\v') 649 self._test_in_shell('\\b') 650 self._test_in_shell('\\a') 651 self._test_in_shell('\\000') 652 653 def test_real_newline(self): 654 self._test_in_shell('\n') 655 self._test_in_shell('\\\n') 656 657 658class test_sh_quote_word(test_sh_escape): 659 """Run tests on sh_quote_word. 660 661 Inherit from test_sh_escape to get the same tests to run on both. 662 """ 663 664 def _test_in_shell(self, text): 665 quoted_word = utils.sh_quote_word(text) 666 echoed_value = subprocess.check_output('echo %s' % quoted_word, 667 shell=True) 668 self.assertEqual(echoed_value, text + '\n') 669 670 671class test_nested_sh_quote_word(test_sh_quote_word): 672 """Run nested tests on sh_quote_word. 673 674 Inherit from test_sh_quote_word to get the same tests to run on both. 675 """ 676 677 def _test_in_shell(self, text): 678 command = 'echo ' + utils.sh_quote_word(text) 679 nested_command = 'echo ' + utils.sh_quote_word(command) 680 produced_command = subprocess.check_output(nested_command, shell=True) 681 echoed_value = subprocess.check_output(produced_command, shell=True) 682 self.assertEqual(echoed_value, text + '\n') 683 684 685class test_run(unittest.TestCase): 686 """ 687 Test the utils.run() function. 688 689 Note: This test runs simple external commands to test the utils.run() 690 API without assuming implementation details. 691 """ 692 693 # Log levels in ascending severity. 694 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, 695 logging.CRITICAL] 696 697 698 def setUp(self): 699 self.god = mock.mock_god(ut=self) 700 self.god.stub_function(utils.logging, 'warning') 701 self.god.stub_function(utils.logging, 'debug') 702 703 # Log level -> StringIO.StringIO. 704 self.logs = {} 705 for level in self.LOG_LEVELS: 706 self.logs[level] = StringIO.StringIO() 707 708 # Override logging_manager.LoggingFile to return buffers. 709 def logging_file(level=None, prefix=None): 710 return self.logs[level] 711 self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file) 712 713 def tearDown(self): 714 self.god.unstub_all() 715 716 717 def __check_result(self, result, command, exit_status=0, stdout='', 718 stderr=''): 719 self.assertEquals(result.command, command) 720 self.assertEquals(result.exit_status, exit_status) 721 self.assertEquals(result.stdout, stdout) 722 self.assertEquals(result.stderr, stderr) 723 724 725 def __get_logs(self): 726 """Returns contents of log buffers at all levels. 727 728 @return: 5-element list of strings corresponding to logged messages 729 at the levels in self.LOG_LEVELS. 730 """ 731 return [self.logs[v].getvalue() for v in self.LOG_LEVELS] 732 733 734 def test_default_simple(self): 735 cmd = 'echo "hello world"' 736 # expect some king of logging.debug() call but don't care about args 737 utils.logging.debug.expect_any_call() 738 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') 739 740 741 def test_default_failure(self): 742 cmd = 'exit 11' 743 try: 744 utils.run(cmd, verbose=False) 745 except utils.error.CmdError, err: 746 self.__check_result(err.result_obj, cmd, exit_status=11) 747 748 749 def test_ignore_status(self): 750 cmd = 'echo error >&2 && exit 11' 751 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), 752 cmd, exit_status=11, stderr='error\n') 753 754 755 def test_timeout(self): 756 # we expect a logging.warning() message, don't care about the contents 757 utils.logging.warning.expect_any_call() 758 try: 759 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) 760 except utils.error.CmdError, err: 761 self.assertEquals(err.result_obj.stdout, 'output') 762 763 764 def test_stdout_stderr_tee(self): 765 cmd = 'echo output && echo error >&2' 766 stdout_tee = StringIO.StringIO() 767 stderr_tee = StringIO.StringIO() 768 769 self.__check_result(utils.run( 770 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, 771 verbose=False), cmd, stdout='output\n', stderr='error\n') 772 self.assertEqual(stdout_tee.getvalue(), 'output\n') 773 self.assertEqual(stderr_tee.getvalue(), 'error\n') 774 775 776 def test_stdin_string(self): 777 cmd = 'cat' 778 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), 779 cmd, stdout='hi!\n') 780 781 782 def test_stdout_tee_to_logs_info(self): 783 """Test logging stdout at the info level.""" 784 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 785 stdout_level=logging.INFO, verbose=False) 786 self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', '']) 787 788 789 def test_stdout_tee_to_logs_warning(self): 790 """Test logging stdout at the warning level.""" 791 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 792 stdout_level=logging.WARNING, verbose=False) 793 self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', '']) 794 795 796 def test_stdout_and_stderr_tee_to_logs(self): 797 """Test simultaneous stdout and stderr log levels.""" 798 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 799 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 800 stderr_level=logging.ERROR, verbose=False) 801 self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', '']) 802 803 804 def test_default_expected_stderr_log_level(self): 805 """Test default expected stderr log level. 806 807 stderr should be logged at the same level as stdout when 808 stderr_is_expected is true and stderr_level isn't passed. 809 """ 810 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 811 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 812 stderr_is_expected=True, verbose=False) 813 self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', '']) 814 815 816 def test_safe_args(self): 817 # NOTE: The string in expected_quoted_cmd depends on the internal 818 # implementation of shell quoting which is used by utils.run(), 819 # in this case, sh_quote_word(). 820 expected_quoted_cmd = "echo 'hello \"world' again" 821 self.__check_result(utils.run( 822 'echo', verbose=False, args=('hello "world', 'again')), 823 expected_quoted_cmd, stdout='hello "world again\n') 824 825 826 def test_safe_args_given_string(self): 827 self.assertRaises(TypeError, utils.run, 'echo', args='hello') 828 829 830 def test_wait_interrupt(self): 831 """Test that we actually select twice if the first one returns EINTR.""" 832 utils.logging.debug.expect_any_call() 833 834 bg_job = utils.BgJob('echo "hello world"') 835 bg_job.result.exit_status = 0 836 self.god.stub_function(utils.select, 'select') 837 838 utils.select.select.expect_any_call().and_raises( 839 select.error(errno.EINTR, 'Select interrupted')) 840 utils.logging.warning.expect_any_call() 841 842 utils.select.select.expect_any_call().and_return( 843 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None)) 844 utils.logging.warning.expect_any_call() 845 846 self.assertFalse( 847 utils._wait_for_commands([bg_job], time.time(), None)) 848 849 850class test_compare_versions(unittest.TestCase): 851 def test_zerofill(self): 852 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) 853 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) 854 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) 855 856 857 def test_unequal_len(self): 858 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) 859 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) 860 861 862 def test_dash_delimited(self): 863 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) 864 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) 865 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) 866 867 868 def test_alphabets(self): 869 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) 870 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) 871 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) 872 873 874 def test_mix_symbols(self): 875 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) 876 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) 877 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) 878 879 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) 880 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) 881 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) 882 883 884class test_args_to_dict(unittest.TestCase): 885 def test_no_args(self): 886 result = utils.args_to_dict([]) 887 self.assertEqual({}, result) 888 889 890 def test_matches(self): 891 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', 892 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) 893 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', 894 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) 895 896 897 def test_unmatches(self): 898 # Temporarily shut warning messages from args_to_dict() when an argument 899 # doesn't match its pattern. 900 logger = logging.getLogger() 901 saved_level = logger.level 902 logger.setLevel(logging.ERROR) 903 904 try: 905 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', 906 ':VAL', '=VVV', 'WORD']) 907 self.assertEqual({}, result) 908 finally: 909 # Restore level. 910 logger.setLevel(saved_level) 911 912 913class test_get_random_port(unittest.TestCase): 914 def do_bind(self, port, socket_type, socket_proto): 915 s = socket.socket(socket.AF_INET, socket_type, socket_proto) 916 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 917 s.bind(('', port)) 918 return s 919 920 921 def test_get_port(self): 922 for _ in xrange(100): 923 p = utils.get_unused_port() 924 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) 925 self.assert_(s.getsockname()) 926 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 927 self.assert_(s.getsockname()) 928 929 930def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 931 """Test global function. 932 """ 933 934 935class TestClass(object): 936 """Test class. 937 """ 938 939 def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 940 """Test instance function. 941 """ 942 943 944 @classmethod 945 def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 946 """Test class function. 947 """ 948 949 950 @staticmethod 951 def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 952 """Test static function. 953 """ 954 955 956class GetFunctionArgUnittest(unittest.TestCase): 957 """Tests for method get_function_arg_value.""" 958 959 def run_test(self, func, insert_arg): 960 """Run test. 961 962 @param func: Function being called with given arguments. 963 @param insert_arg: Set to True to insert an object in the argument list. 964 This is to mock instance/class object. 965 """ 966 if insert_arg: 967 args = (None, 1, 2, 3) 968 else: 969 args = (1, 2, 3) 970 for i in range(1, 7): 971 self.assertEquals(utils.get_function_arg_value( 972 func, 'arg%d'%i, args, {}), i) 973 974 self.assertEquals(utils.get_function_arg_value( 975 func, 'arg7', args, {'arg7': 7}), 7) 976 self.assertRaises( 977 KeyError, utils.get_function_arg_value, 978 func, 'arg3', args[:-1], {}) 979 980 981 def test_global_function(self): 982 """Test global function. 983 """ 984 self.run_test(test_function, False) 985 986 987 def test_instance_function(self): 988 """Test instance function. 989 """ 990 self.run_test(TestClass().test_instance_function, True) 991 992 993 def test_class_function(self): 994 """Test class function. 995 """ 996 self.run_test(TestClass.test_class_function, True) 997 998 999 def test_static_function(self): 1000 """Test static function. 1001 """ 1002 self.run_test(TestClass.test_static_function, False) 1003 1004 1005class VersionMatchUnittest(unittest.TestCase): 1006 """Test version_match function.""" 1007 1008 def test_version_match(self): 1009 """Test version_match function.""" 1010 canary_build = 'lumpy-release/R43-6803.0.0' 1011 canary_release = '6803.0.0' 1012 cq_build = 'lumpy-release/R43-6803.0.0-rc1' 1013 cq_release = '6803.0.0-rc1' 1014 trybot_paladin_build = 'trybot-lumpy-paladin/R43-6803.0.0-b123' 1015 trybot_paladin_release = '6803.0.2015_03_12_2103' 1016 trybot_pre_cq_build = 'trybot-wifi-pre-cq/R43-7000.0.0-b36' 1017 trybot_pre_cq_release = '7000.0.2016_03_12_2103' 1018 trybot_toolchain_build = 'trybot-nyan_big-gcc-toolchain/R56-8936.0.0-b14' 1019 trybot_toolchain_release = '8936.0.2016_10_26_1403' 1020 cros_cheets_build = 'lumpy-release/R49-6899.0.0-cheetsth' 1021 cros_cheets_release = '6899.0.0' 1022 trybot_paladin_cheets_build = 'trybot-lumpy-paladin/R50-6900.0.0-b123-cheetsth' 1023 trybot_paladin_cheets_release = '6900.0.2015_03_12_2103' 1024 1025 builds = [canary_build, cq_build, trybot_paladin_build, 1026 trybot_pre_cq_build, trybot_toolchain_build, 1027 cros_cheets_build, trybot_paladin_cheets_build] 1028 releases = [canary_release, cq_release, trybot_paladin_release, 1029 trybot_pre_cq_release, trybot_toolchain_release, 1030 cros_cheets_release, trybot_paladin_cheets_release] 1031 for i in range(len(builds)): 1032 for j in range(len(releases)): 1033 self.assertEqual( 1034 utils.version_match(builds[i], releases[j]), i==j, 1035 'Build version %s should%s match release version %s.' % 1036 (builds[i], '' if i==j else ' not', releases[j])) 1037 1038 1039class IsInSameSubnetUnittest(unittest.TestCase): 1040 """Test is_in_same_subnet function.""" 1041 1042 def test_is_in_same_subnet(self): 1043 """Test is_in_same_subnet function.""" 1044 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1045 23)) 1046 self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1047 24)) 1048 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255', 1049 24)) 1050 self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0', 1051 24)) 1052 1053 1054class GetWirelessSsidUnittest(unittest.TestCase): 1055 """Test get_wireless_ssid function.""" 1056 1057 DEFAULT_SSID = 'default' 1058 SSID_1 = 'ssid_1' 1059 SSID_2 = 'ssid_2' 1060 SSID_3 = 'ssid_3' 1061 1062 def test_get_wireless_ssid(self): 1063 """Test is_in_same_subnet function.""" 1064 god = mock.mock_god() 1065 god.stub_function_to_return(utils.CONFIG, 'get_config_value', 1066 self.DEFAULT_SSID) 1067 god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex', 1068 {'wireless_ssid_1.2.3.4/24': self.SSID_1, 1069 'wireless_ssid_4.3.2.1/16': self.SSID_2, 1070 'wireless_ssid_4.3.2.111/32': self.SSID_3}) 1071 self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100')) 1072 self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100')) 1073 self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111')) 1074 self.assertEqual(self.DEFAULT_SSID, 1075 utils.get_wireless_ssid('100.0.0.100')) 1076 1077 1078class LaunchControlBuildParseUnittest(unittest.TestCase): 1079 """Test various parsing functions related to Launch Control builds and 1080 devices. 1081 """ 1082 1083 def test_parse_launch_control_target(self): 1084 """Test parse_launch_control_target function.""" 1085 target_tests = { 1086 ('shamu', 'userdebug'): 'shamu-userdebug', 1087 ('shamu', 'eng'): 'shamu-eng', 1088 ('shamu-board', 'eng'): 'shamu-board-eng', 1089 (None, None): 'bad_target', 1090 (None, None): 'target'} 1091 for result, target in target_tests.items(): 1092 self.assertEqual(result, utils.parse_launch_control_target(target)) 1093 1094 1095class GetOffloaderUriTest(unittest.TestCase): 1096 """Test get_offload_gsuri function.""" 1097 _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket' 1098 1099 def setUp(self): 1100 self.god = mock.mock_god() 1101 1102 def tearDown(self): 1103 self.god.unstub_all() 1104 1105 def test_get_default_lab_offload_gsuri(self): 1106 """Test default lab offload gsuri .""" 1107 self.god.mock_up(utils.CONFIG, 'CONFIG') 1108 self.god.stub_function_to_return(utils, 'is_moblab', False) 1109 self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI, 1110 utils.get_offload_gsuri()) 1111 1112 self.god.check_playback() 1113 1114 def test_get_default_moblab_offload_gsuri(self): 1115 self.god.mock_up(utils.CONFIG, 'CONFIG') 1116 self.god.stub_function_to_return(utils, 'is_moblab', True) 1117 utils.CONFIG.get_config_value.expect_call( 1118 'CROS', 'image_storage_server').and_return( 1119 self._IMAGE_STORAGE_SERVER) 1120 self.god.stub_function_to_return(utils, 1121 'get_moblab_serial_number', 'test_serial_number') 1122 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1123 expected_gsuri = '%sresults/%s/%s/' % ( 1124 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id') 1125 cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI 1126 utils.DEFAULT_OFFLOAD_GSURI = None 1127 gsuri = utils.get_offload_gsuri() 1128 utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri 1129 self.assertEqual(expected_gsuri, gsuri) 1130 1131 self.god.check_playback() 1132 1133 def test_get_moblab_offload_gsuri(self): 1134 """Test default lab offload gsuri .""" 1135 self.god.mock_up(utils.CONFIG, 'CONFIG') 1136 self.god.stub_function_to_return(utils, 'is_moblab', True) 1137 self.god.stub_function_to_return(utils, 1138 'get_moblab_serial_number', 'test_serial_number') 1139 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1140 gsuri = '%s%s/%s/' % ( 1141 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id') 1142 self.assertEqual(gsuri, utils.get_offload_gsuri()) 1143 1144 self.god.check_playback() 1145 1146 1147 1148class MockMetricsTest(unittest.TestCase): 1149 """Test metrics mock class can handle various metrics calls.""" 1150 1151 def test_Counter(self): 1152 """Test the mock class can create an instance and call any method. 1153 """ 1154 c = metrics.Counter('counter') 1155 c.increment(fields={'key': 1}) 1156 1157 1158 def test_Context(self): 1159 """Test the mock class can handle context class. 1160 """ 1161 test_value = None 1162 with metrics.SecondsTimer('context') as t: 1163 test_value = 'called_in_context' 1164 t['random_key'] = 'pass' 1165 self.assertEqual('called_in_context', test_value) 1166 1167 1168 def test_decorator(self): 1169 """Test the mock class can handle decorator. 1170 """ 1171 class TestClass(object): 1172 1173 def __init__(self): 1174 self.value = None 1175 1176 test_value = TestClass() 1177 test_value.value = None 1178 @metrics.SecondsTimerDecorator('decorator') 1179 def test(arg): 1180 arg.value = 'called_in_decorator' 1181 1182 test(test_value) 1183 self.assertEqual('called_in_decorator', test_value.value) 1184 1185 1186 def test_setitem(self): 1187 """Test the mock class can handle set item call. 1188 """ 1189 timer = metrics.SecondsTimer('name') 1190 timer['random_key'] = 'pass' 1191 1192 1193 1194if __name__ == "__main__": 1195 unittest.main() 1196