1#!/usr/bin/python 2# pylint: disable=missing-docstring 3 4import logging 5import os 6import re 7import shutil 8import StringIO 9import sys 10import tempfile 11import unittest 12 13import common 14from autotest_lib.client.bin import job, sysinfo, harness 15from autotest_lib.client.bin import utils 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.common_lib import logging_manager, logging_config 18from autotest_lib.client.common_lib import base_job_unittest 19from autotest_lib.client.common_lib.test_utils import mock 20 21 22class job_test_case(unittest.TestCase): 23 """Generic job TestCase class that defines a standard job setUp and 24 tearDown, with some standard stubs.""" 25 26 job_class = job.base_client_job 27 28 def setUp(self): 29 self.god = mock.mock_god(ut=self) 30 self.god.stub_with(job.base_client_job, '_get_environ_autodir', 31 classmethod(lambda cls: '/adir')) 32 self.job = self.job_class.__new__(self.job_class) 33 self.job._job_directory = base_job_unittest.stub_job_directory 34 35 _, self.control_file = tempfile.mkstemp() 36 37 38 def tearDown(self): 39 self.god.unstub_all() 40 os.remove(self.control_file) 41 42 43class test_find_base_directories( 44 base_job_unittest.test_find_base_directories.generic_tests, 45 job_test_case): 46 47 def test_autodir_equals_clientdir(self): 48 autodir, clientdir, _ = self.job._find_base_directories() 49 self.assertEqual(autodir, '/adir') 50 self.assertEqual(clientdir, '/adir') 51 52 53 def test_serverdir_is_none(self): 54 _, _, serverdir = self.job._find_base_directories() 55 self.assertEqual(serverdir, None) 56 57 58class abstract_test_init(base_job_unittest.test_init.generic_tests): 59 """Generic client job mixin used when defining variations on the 60 job.__init__ generic tests.""" 61 OPTIONAL_ATTRIBUTES = ( 62 base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES 63 - set(['control', 'harness'])) 64 65 66class test_init_minimal_options(abstract_test_init, job_test_case): 67 68 def call_init(self): 69 # TODO(jadmanski): refactor more of the __init__ code to not need to 70 # stub out countless random APIs 71 self.god.stub_function_to_return(job.os, 'mkdir', None) 72 self.god.stub_function_to_return(job.os.path, 'exists', True) 73 self.god.stub_function_to_return(self.job, '_load_state', None) 74 self.god.stub_function_to_return(self.job, 'record', None) 75 self.god.stub_function_to_return(job.shutil, 'copyfile', None) 76 self.god.stub_function_to_return(job.logging_manager, 77 'configure_logging', None) 78 class manager: 79 def start_logging(self): 80 return None 81 self.god.stub_function_to_return(job.logging_manager, 82 'get_logging_manager', manager()) 83 class stub_sysinfo: 84 def log_per_reboot_data(self): 85 return None 86 self.god.stub_function_to_return(job.sysinfo, 'sysinfo', 87 stub_sysinfo()) 88 class stub_harness: 89 run_start = lambda self: None 90 self.god.stub_function_to_return(job.harness, 'select', stub_harness()) 91 class options: 92 tag = '' 93 verbose = False 94 cont = False 95 harness = 'stub' 96 harness_args = None 97 hostname = None 98 user = None 99 log = False 100 args = '' 101 output_dir = '' 102 self.god.stub_function_to_return(job.utils, 'drop_caches', None) 103 104 self.job._job_state = base_job_unittest.stub_job_state 105 self.job.__init__(self.control_file, options) 106 107 108class dummy(object): 109 """A simple placeholder for attributes""" 110 pass 111 112 113class first_line_comparator(mock.argument_comparator): 114 def __init__(self, first_line): 115 self.first_line = first_line 116 117 118 def is_satisfied_by(self, parameter): 119 return self.first_line == parameter.splitlines()[0] 120 121 122class test_base_job(unittest.TestCase): 123 def setUp(self): 124 # make god 125 self.god = mock.mock_god(ut=self) 126 127 # need to set some environ variables 128 self.autodir = "autodir" 129 os.environ['AUTODIR'] = self.autodir 130 131 # set up some variables 132 _, self.control = tempfile.mkstemp() 133 self.jobtag = "jobtag" 134 135 # get rid of stdout and logging 136 sys.stdout = StringIO.StringIO() 137 logging_manager.configure_logging(logging_config.TestingConfig()) 138 logging.disable(logging.CRITICAL) 139 def dummy_configure_logging(*args, **kwargs): 140 pass 141 self.god.stub_with(logging_manager, 'configure_logging', 142 dummy_configure_logging) 143 real_get_logging_manager = logging_manager.get_logging_manager 144 def get_logging_manager_no_fds(manage_stdout_and_stderr=False, 145 redirect_fds=False): 146 return real_get_logging_manager(manage_stdout_and_stderr, False) 147 self.god.stub_with(logging_manager, 'get_logging_manager', 148 get_logging_manager_no_fds) 149 150 # stub out some stuff 151 self.god.stub_function(os.path, 'exists') 152 self.god.stub_function(os.path, 'isdir') 153 self.god.stub_function(os, 'makedirs') 154 self.god.stub_function(os, 'mkdir') 155 self.god.stub_function(os, 'remove') 156 self.god.stub_function(shutil, 'rmtree') 157 self.god.stub_function(shutil, 'copyfile') 158 self.god.stub_function(job, 'open') 159 self.god.stub_function(utils, 'system') 160 self.god.stub_function(utils, 'drop_caches') 161 self.god.stub_function(harness, 'select') 162 self.god.stub_function(sysinfo, 'log_per_reboot_data') 163 164 self.god.stub_class(job.local_host, 'LocalHost') 165 self.god.stub_class(sysinfo, 'sysinfo') 166 167 self.god.stub_class_method(job.base_client_job, 168 '_cleanup_debugdir_files') 169 self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir') 170 171 self.god.stub_with(job.base_job.job_directory, '_ensure_valid', 172 lambda *_: None) 173 174 175 def tearDown(self): 176 sys.stdout = sys.__stdout__ 177 self.god.unstub_all() 178 os.remove(self.control) 179 180 181 def _setup_pre_record_init(self, cont): 182 self.god.stub_function(self.job, '_load_state') 183 184 resultdir = os.path.join(self.autodir, 'results', self.jobtag) 185 tmpdir = os.path.join(self.autodir, 'tmp') 186 if not cont: 187 job.base_client_job._cleanup_debugdir_files.expect_call() 188 job.base_client_job._cleanup_results_dir.expect_call() 189 190 self.job._load_state.expect_call() 191 192 my_harness = self.god.create_mock_class(harness.harness, 193 'my_harness') 194 harness.select.expect_call(None, 195 self.job, 196 None).and_return(my_harness) 197 198 return resultdir, my_harness 199 200 201 def _setup_post_record_init(self, cont, resultdir, my_harness): 202 # now some specific stubs 203 self.god.stub_function(self.job, 'config_get') 204 self.god.stub_function(self.job, 'config_set') 205 self.god.stub_function(self.job, 'record') 206 207 # other setup 208 results = os.path.join(self.autodir, 'results') 209 download = os.path.join(self.autodir, 'tests', 'download') 210 pkgdir = os.path.join(self.autodir, 'packages') 211 212 utils.drop_caches.expect_call() 213 job_sysinfo = sysinfo.sysinfo.expect_new(resultdir) 214 if not cont: 215 os.path.exists.expect_call(download).and_return(False) 216 os.mkdir.expect_call(download) 217 shutil.copyfile.expect_call(mock.is_string_comparator(), 218 os.path.join(resultdir, 'control')) 219 220 job.local_host.LocalHost.expect_new(hostname='localhost') 221 job_sysinfo.log_per_reboot_data.expect_call() 222 if not cont: 223 self.job.record.expect_call('START', None, None) 224 225 my_harness.run_start.expect_call() 226 227 228 def construct_job(self, cont): 229 # will construct class instance using __new__ 230 self.job = job.base_client_job.__new__(job.base_client_job) 231 232 # record 233 resultdir, my_harness = self._setup_pre_record_init(cont) 234 self._setup_post_record_init(cont, resultdir, my_harness) 235 236 # finish constructor 237 options = dummy() 238 options.tag = self.jobtag 239 options.cont = cont 240 options.harness = None 241 options.harness_args = None 242 options.log = False 243 options.verbose = False 244 options.hostname = 'localhost' 245 options.user = 'my_user' 246 options.args = '' 247 options.output_dir = '' 248 self.job.__init__(self.control, options) 249 250 # check 251 self.god.check_playback() 252 253 254 def get_partition_mock(self, devname): 255 """ 256 Create a mock of a partition object and return it. 257 """ 258 class mock(object): 259 device = devname 260 get_mountpoint = self.god.create_mock_function('get_mountpoint') 261 return mock 262 263 264 def test_constructor_first_run(self): 265 self.construct_job(False) 266 267 268 def test_constructor_continuation(self): 269 self.construct_job(True) 270 271 272 def test_constructor_post_record_failure(self): 273 """ 274 Test post record initialization failure. 275 """ 276 self.job = job.base_client_job.__new__(job.base_client_job) 277 options = dummy() 278 options.tag = self.jobtag 279 options.cont = False 280 options.harness = None 281 options.harness_args = None 282 options.log = False 283 options.verbose = False 284 options.hostname = 'localhost' 285 options.user = 'my_user' 286 options.args = '' 287 options.output_dir = '' 288 error = Exception('fail') 289 290 self.god.stub_function(self.job, '_post_record_init') 291 self.god.stub_function(self.job, 'record') 292 293 self._setup_pre_record_init(False) 294 self.job._post_record_init.expect_call( 295 self.control, options, True).and_raises(error) 296 self.job.record.expect_call( 297 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 298 str(error)) 299 300 self.assertRaises( 301 Exception, self.job.__init__, self.control, options, 302 drop_caches=True) 303 304 # check 305 self.god.check_playback() 306 307 308 def test_control_functions(self): 309 self.construct_job(True) 310 control_file = "blah" 311 self.job.control_set(control_file) 312 self.assertEquals(self.job.control_get(), os.path.abspath(control_file)) 313 314 315 def test_harness_select(self): 316 self.construct_job(True) 317 318 # record 319 which = "which" 320 harness_args = '' 321 harness.select.expect_call(which, self.job, 322 harness_args).and_return(None) 323 324 # run and test 325 self.job.harness_select(which, harness_args) 326 self.god.check_playback() 327 328 329 def test_setup_dirs_raise(self): 330 self.construct_job(True) 331 332 # setup 333 results_dir = 'foo' 334 tmp_dir = 'bar' 335 336 # record 337 os.path.exists.expect_call(tmp_dir).and_return(True) 338 os.path.isdir.expect_call(tmp_dir).and_return(False) 339 340 # test 341 self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir) 342 self.god.check_playback() 343 344 345 def test_setup_dirs(self): 346 self.construct_job(True) 347 348 # setup 349 results_dir1 = os.path.join(self.job.resultdir, 'build') 350 results_dir2 = os.path.join(self.job.resultdir, 'build.2') 351 results_dir3 = os.path.join(self.job.resultdir, 'build.3') 352 tmp_dir = 'bar' 353 354 # record 355 os.path.exists.expect_call(tmp_dir).and_return(False) 356 os.mkdir.expect_call(tmp_dir) 357 os.path.isdir.expect_call(tmp_dir).and_return(True) 358 os.path.exists.expect_call(results_dir1).and_return(True) 359 os.path.exists.expect_call(results_dir2).and_return(True) 360 os.path.exists.expect_call(results_dir3).and_return(False) 361 os.path.exists.expect_call(results_dir3).and_return(False) 362 os.mkdir.expect_call(results_dir3) 363 364 # test 365 self.assertEqual(self.job.setup_dirs(None, tmp_dir), 366 (results_dir3, tmp_dir)) 367 self.god.check_playback() 368 369 370 def test_run_test_logs_test_error_from_unhandled_error(self): 371 self.construct_job(True) 372 373 # set up stubs 374 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 375 self.god.stub_function(self.job, "_runtest") 376 377 # create an unhandled error object 378 class MyError(error.TestError): 379 pass 380 real_error = MyError("this is the real error message") 381 unhandled_error = error.UnhandledTestError(real_error) 382 383 # set up the recording 384 testname = "error_test" 385 outputdir = os.path.join(self.job.resultdir, testname) 386 self.job.pkgmgr.get_package_name.expect_call( 387 testname, 'test').and_return(("", testname)) 388 os.path.exists.expect_call(outputdir).and_return(False) 389 self.job.record.expect_call("START", testname, testname, 390 optional_fields=None) 391 self.job._runtest.expect_call(testname, "", None, (), {}).and_raises( 392 unhandled_error) 393 self.job.record.expect_call("ERROR", testname, testname, 394 first_line_comparator(str(real_error))) 395 self.job.record.expect_call("END ERROR", testname, testname) 396 self.job.harness.run_test_complete.expect_call() 397 utils.drop_caches.expect_call() 398 399 # run and check 400 self.job.run_test(testname) 401 self.god.check_playback() 402 403 404 def test_run_test_logs_non_test_error_from_unhandled_error(self): 405 self.construct_job(True) 406 407 # set up stubs 408 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 409 self.god.stub_function(self.job, "_runtest") 410 411 # create an unhandled error object 412 class MyError(Exception): 413 pass 414 real_error = MyError("this is the real error message") 415 unhandled_error = error.UnhandledTestError(real_error) 416 reason = first_line_comparator("Unhandled MyError: %s" % real_error) 417 418 # set up the recording 419 testname = "error_test" 420 outputdir = os.path.join(self.job.resultdir, testname) 421 self.job.pkgmgr.get_package_name.expect_call( 422 testname, 'test').and_return(("", testname)) 423 os.path.exists.expect_call(outputdir).and_return(False) 424 self.job.record.expect_call("START", testname, testname, 425 optional_fields=None) 426 self.job._runtest.expect_call(testname, "", None, (), {}).and_raises( 427 unhandled_error) 428 self.job.record.expect_call("ERROR", testname, testname, reason) 429 self.job.record.expect_call("END ERROR", testname, testname) 430 self.job.harness.run_test_complete.expect_call() 431 utils.drop_caches.expect_call() 432 433 # run and check 434 self.job.run_test(testname) 435 self.god.check_playback() 436 437 438 def test_report_reboot_failure(self): 439 self.construct_job(True) 440 441 # record 442 self.job.record.expect_call("ABORT", "sub", "reboot.verify", 443 "boot failure") 444 self.job.record.expect_call("END ABORT", "sub", "reboot", 445 optional_fields={"kernel": "2.6.15-smp"}) 446 447 # playback 448 self.job._record_reboot_failure("sub", "reboot.verify", "boot failure", 449 running_id="2.6.15-smp") 450 self.god.check_playback() 451 452 453 def _setup_check_post_reboot(self, mount_info, cpu_count): 454 # setup 455 self.god.stub_function(job.partition_lib, "get_partition_list") 456 self.god.stub_function(utils, "count_cpus") 457 458 part_list = [self.get_partition_mock("/dev/hda1"), 459 self.get_partition_mock("/dev/hdb1")] 460 mount_list = ["/mnt/hda1", "/mnt/hdb1"] 461 462 # record 463 job.partition_lib.get_partition_list.expect_call( 464 self.job, exclude_swap=False).and_return(part_list) 465 for i in xrange(len(part_list)): 466 part_list[i].get_mountpoint.expect_call().and_return(mount_list[i]) 467 if cpu_count is not None: 468 utils.count_cpus.expect_call().and_return(cpu_count) 469 self.job._state.set('client', 'mount_info', mount_info) 470 self.job._state.set('client', 'cpu_count', 8) 471 472 473 def test_check_post_reboot_success(self): 474 self.construct_job(True) 475 476 mount_info = set([("/dev/hda1", "/mnt/hda1"), 477 ("/dev/hdb1", "/mnt/hdb1")]) 478 self._setup_check_post_reboot(mount_info, 8) 479 480 # playback 481 self.job._check_post_reboot("sub") 482 self.god.check_playback() 483 484 485 def test_check_post_reboot_mounts_failure(self): 486 self.construct_job(True) 487 488 mount_info = set([("/dev/hda1", "/mnt/hda1")]) 489 self._setup_check_post_reboot(mount_info, None) 490 491 self.god.stub_function(self.job, "_record_reboot_failure") 492 self.job._record_reboot_failure.expect_call("sub", 493 "reboot.verify_config", "mounted partitions are different after" 494 " reboot (old entries: set([]), new entries: set([('/dev/hdb1'," 495 " '/mnt/hdb1')]))", running_id=None) 496 497 # playback 498 self.assertRaises(error.JobError, self.job._check_post_reboot, "sub") 499 self.god.check_playback() 500 501 502 def test_check_post_reboot_cpu_failure(self): 503 self.construct_job(True) 504 505 mount_info = set([("/dev/hda1", "/mnt/hda1"), 506 ("/dev/hdb1", "/mnt/hdb1")]) 507 self._setup_check_post_reboot(mount_info, 4) 508 509 self.god.stub_function(self.job, "_record_reboot_failure") 510 self.job._record_reboot_failure.expect_call( 511 'sub', 'reboot.verify_config', 512 'Number of CPUs changed after reboot (old count: 8, new count: 4)', 513 running_id=None) 514 515 # playback 516 self.assertRaises(error.JobError, self.job._check_post_reboot, "sub") 517 self.god.check_playback() 518 519 520 def test_parse_args(self): 521 test_set = {"a='foo bar baz' b='moo apt'": 522 ["a='foo bar baz'", "b='moo apt'"], 523 "a='foo bar baz' only=gah": 524 ["a='foo bar baz'", "only=gah"], 525 "a='b c d' no=argh": 526 ["a='b c d'", "no=argh"]} 527 for t in test_set: 528 parsed_args = job.base_client_job._parse_args(t) 529 expected_args = test_set[t] 530 self.assertEqual(parsed_args, expected_args) 531 532 533 def test_run_test_timeout_parameter_is_propagated(self): 534 self.construct_job(True) 535 536 # set up stubs 537 self.god.stub_function(self.job.pkgmgr, 'get_package_name') 538 self.god.stub_function(self.job, "_runtest") 539 540 # create an unhandled error object 541 #class MyError(error.TestError): 542 # pass 543 #real_error = MyError("this is the real error message") 544 #unhandled_error = error.UnhandledTestError(real_error) 545 546 # set up the recording 547 testname = "test" 548 outputdir = os.path.join(self.job.resultdir, testname) 549 self.job.pkgmgr.get_package_name.expect_call( 550 testname, 'test').and_return(("", testname)) 551 os.path.exists.expect_call(outputdir).and_return(False) 552 timeout = 60 553 optional_fields = {} 554 optional_fields['timeout'] = timeout 555 self.job.record.expect_call("START", testname, testname, 556 optional_fields=optional_fields) 557 self.job._runtest.expect_call(testname, "", timeout, (), {}) 558 self.job.record.expect_call("GOOD", testname, testname, 559 "completed successfully") 560 self.job.record.expect_call("END GOOD", testname, testname) 561 self.job.harness.run_test_complete.expect_call() 562 utils.drop_caches.expect_call() 563 564 # run and check 565 self.job.run_test(testname, timeout=timeout) 566 self.god.check_playback() 567 568 569class test_name_pattern(unittest.TestCase): 570 """Tests for _NAME_PATTERN.""" 571 572 def _one_name_pattern_test(self, line, want): 573 """Parametrized test.""" 574 match = re.match(job._NAME_PATTERN, line) 575 self.assertIsNotNone(match) 576 self.assertEqual(match.group(1), want) 577 578 def test_name_pattern_nospace_single(self): 579 self._one_name_pattern_test("NAME='some_Test'", 'some_Test') 580 581 def test_name_pattern_nospace_double(self): 582 self._one_name_pattern_test('NAME="some_Test"', 'some_Test') 583 584 def test_name_pattern_space_single(self): 585 self._one_name_pattern_test("NAME = 'some_Test'", 'some_Test') 586 587 def test_name_pattern_space_double(self): 588 self._one_name_pattern_test('NAME = "some_Test"', 'some_Test') 589 590 591if __name__ == "__main__": 592 unittest.main() 593