1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8# pylint: disable=missing-docstring 9 10import copy 11from datetime import datetime 12import getpass 13import glob 14import logging 15import os 16import re 17import shutil 18import sys 19import time 20import traceback 21import types 22import weakref 23 24import common 25from autotest_lib.client.bin import client_logging_config 26from autotest_lib.client.bin import harness 27from autotest_lib.client.bin import local_host 28from autotest_lib.client.bin import parallel 29from autotest_lib.client.bin import partition as partition_lib 30from autotest_lib.client.bin import profilers 31from autotest_lib.client.bin import sysinfo 32from autotest_lib.client.bin import test 33from autotest_lib.client.bin import utils 34from autotest_lib.client.common_lib import barrier 35from autotest_lib.client.common_lib import base_job 36from autotest_lib.client.common_lib import control_data 37from autotest_lib.client.common_lib import error 38from autotest_lib.client.common_lib import global_config 39from autotest_lib.client.common_lib import logging_manager 40from autotest_lib.client.common_lib import packages 41from autotest_lib.client.cros import cros_logging 42from autotest_lib.client.tools import html_report 43 44GLOBAL_CONFIG = global_config.global_config 45 46LAST_BOOT_TAG = object() 47JOB_PREAMBLE = """ 48from autotest_lib.client.common_lib.error import * 49from autotest_lib.client.bin.utils import * 50""" 51 52 53class StepError(error.AutotestError): 54 pass 55 56class NotAvailableError(error.AutotestError): 57 pass 58 59 60 61def _run_test_complete_on_exit(f): 62 """Decorator for job methods that automatically calls 63 self.harness.run_test_complete when the method exits, if appropriate.""" 64 def wrapped(self, *args, **dargs): 65 try: 66 return f(self, *args, **dargs) 67 finally: 68 if self._logger.global_filename == 'status': 69 self.harness.run_test_complete() 70 if self.drop_caches: 71 utils.drop_caches() 72 wrapped.__name__ = f.__name__ 73 wrapped.__doc__ = f.__doc__ 74 wrapped.__dict__.update(f.__dict__) 75 return wrapped 76 77 78class status_indenter(base_job.status_indenter): 79 """Provide a status indenter that is backed by job._record_prefix.""" 80 def __init__(self, job_): 81 self._job = weakref.proxy(job_) # avoid a circular reference 82 83 84 @property 85 def indent(self): 86 return self._job._record_indent 87 88 89 def increment(self): 90 self._job._record_indent += 1 91 92 93 def decrement(self): 94 self._job._record_indent -= 1 95 96 97class base_client_job(base_job.base_job): 98 """The client-side concrete implementation of base_job. 99 100 Optional properties provided by this implementation: 101 control 102 harness 103 """ 104 105 _WARNING_DISABLE_DELAY = 5 106 107 # _record_indent is a persistent property, but only on the client 108 _job_state = base_job.base_job._job_state 109 _record_indent = _job_state.property_factory( 110 '_state', '_record_indent', 0, namespace='client') 111 _max_disk_usage_rate = _job_state.property_factory( 112 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 113 114 115 def __init__(self, control, options, drop_caches=True): 116 """ 117 Prepare a client side job object. 118 119 @param control: The control file (pathname of). 120 @param options: an object which includes: 121 jobtag: The job tag string (eg "default"). 122 cont: If this is the continuation of this job. 123 harness_type: An alternative server harness. [None] 124 use_external_logging: If true, the enable_external_logging 125 method will be called during construction. [False] 126 @param drop_caches: If true, utils.drop_caches() is called before and 127 between all tests. [True] 128 """ 129 super(base_client_job, self).__init__(options=options) 130 self._pre_record_init(control, options) 131 try: 132 self._post_record_init(control, options, drop_caches) 133 except Exception, err: 134 self.record( 135 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 136 str(err)) 137 raise 138 139 140 @classmethod 141 def _get_environ_autodir(cls): 142 return os.environ['AUTODIR'] 143 144 145 @classmethod 146 def _find_base_directories(cls): 147 """ 148 Determine locations of autodir and clientdir (which are the same) 149 using os.environ. Serverdir does not exist in this context. 150 """ 151 autodir = clientdir = cls._get_environ_autodir() 152 return autodir, clientdir, None 153 154 155 @classmethod 156 def _parse_args(cls, args): 157 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 158 159 160 def _find_resultdir(self, options): 161 """ 162 Determine the directory for storing results. On a client this is 163 always <autodir>/results/<tag>, where tag is passed in on the command 164 line as an option. 165 """ 166 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 167 'output_dir', 168 default="") 169 if options.output_dir: 170 basedir = options.output_dir 171 elif output_dir_config: 172 basedir = output_dir_config 173 else: 174 basedir = self.autodir 175 176 return os.path.join(basedir, 'results', options.tag) 177 178 179 def _get_status_logger(self): 180 """Return a reference to the status logger.""" 181 return self._logger 182 183 184 def _pre_record_init(self, control, options): 185 """ 186 Initialization function that should peform ONLY the required 187 setup so that the self.record() method works. 188 189 As of now self.record() needs self.resultdir, self._group_level, 190 self.harness and of course self._logger. 191 """ 192 if not options.cont: 193 self._cleanup_debugdir_files() 194 self._cleanup_results_dir() 195 196 logging_manager.configure_logging( 197 client_logging_config.ClientLoggingConfig(), 198 results_dir=self.resultdir, 199 verbose=options.verbose) 200 logging.info('Writing results to %s', self.resultdir) 201 202 # init_group_level needs the state 203 self.control = os.path.realpath(control) 204 self._is_continuation = options.cont 205 self._current_step_ancestry = [] 206 self._next_step_index = 0 207 self._load_state() 208 209 _harness = self.handle_persistent_option(options, 'harness') 210 _harness_args = self.handle_persistent_option(options, 'harness_args') 211 212 self.harness = harness.select(_harness, self, _harness_args) 213 214 if self.control: 215 parsed_control = control_data.parse_control( 216 self.control, raise_warnings=False) 217 self.fast = parsed_control.fast 218 219 # set up the status logger 220 def client_job_record_hook(entry): 221 msg_tag = '' 222 if '.' in self._logger.global_filename: 223 msg_tag = self._logger.global_filename.split('.', 1)[1] 224 # send the entry to the job harness 225 message = '\n'.join([entry.message] + entry.extra_message_lines) 226 rendered_entry = self._logger.render_entry(entry) 227 self.harness.test_status_detail(entry.status_code, entry.subdir, 228 entry.operation, message, msg_tag, 229 entry.fields) 230 self.harness.test_status(rendered_entry, msg_tag) 231 # send the entry to stdout, if it's enabled 232 logging.info(rendered_entry) 233 self._logger = base_job.status_logger( 234 self, status_indenter(self), record_hook=client_job_record_hook) 235 236 237 def _post_record_init(self, control, options, drop_caches): 238 """ 239 Perform job initialization not required by self.record(). 240 """ 241 self._init_drop_caches(drop_caches) 242 243 self._init_packages() 244 245 self.sysinfo = sysinfo.sysinfo(self.resultdir) 246 self._load_sysinfo_state() 247 248 if not options.cont: 249 download = os.path.join(self.testdir, 'download') 250 if not os.path.exists(download): 251 os.mkdir(download) 252 253 shutil.copyfile(self.control, 254 os.path.join(self.resultdir, 'control')) 255 256 self.control = control 257 258 self.logging = logging_manager.get_logging_manager( 259 manage_stdout_and_stderr=True, redirect_fds=True) 260 self.logging.start_logging() 261 262 self.profilers = profilers.profilers(self) 263 264 self.machines = [options.hostname] 265 self.machine_dict_list = [{'hostname' : options.hostname}] 266 # Client side tests should always run the same whether or not they are 267 # running in the lab. 268 self.in_lab = False 269 self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) 270 271 self.args = [] 272 if options.args: 273 self.args = self._parse_args(options.args) 274 275 if options.user: 276 self.user = options.user 277 else: 278 self.user = getpass.getuser() 279 280 self.sysinfo.log_per_reboot_data() 281 282 if not options.cont: 283 self.record('START', None, None) 284 285 self.harness.run_start() 286 287 if options.log: 288 self.enable_external_logging() 289 290 self.num_tests_run = None 291 self.num_tests_failed = None 292 293 self.warning_loggers = None 294 self.warning_manager = None 295 296 297 def _init_drop_caches(self, drop_caches): 298 """ 299 Perform the drop caches initialization. 300 """ 301 self.drop_caches_between_iterations = ( 302 GLOBAL_CONFIG.get_config_value('CLIENT', 303 'drop_caches_between_iterations', 304 type=bool, default=True)) 305 self.drop_caches = drop_caches 306 if self.drop_caches: 307 utils.drop_caches() 308 309 310 def _init_packages(self): 311 """ 312 Perform the packages support initialization. 313 """ 314 self.pkgmgr = packages.PackageManager( 315 self.autodir, run_function_dargs={'timeout':3600}) 316 317 318 def _cleanup_results_dir(self): 319 """Delete everything in resultsdir""" 320 assert os.path.exists(self.resultdir) 321 list_files = glob.glob('%s/*' % self.resultdir) 322 for f in list_files: 323 if os.path.isdir(f): 324 shutil.rmtree(f) 325 elif os.path.isfile(f): 326 os.remove(f) 327 328 329 def _cleanup_debugdir_files(self): 330 """ 331 Delete any leftover debugdir files 332 """ 333 list_files = glob.glob("/tmp/autotest_results_dir.*") 334 for f in list_files: 335 os.remove(f) 336 337 338 def disable_warnings(self, warning_type): 339 self.record("INFO", None, None, 340 "disabling %s warnings" % warning_type, 341 {"warnings.disable": warning_type}) 342 time.sleep(self._WARNING_DISABLE_DELAY) 343 344 345 def enable_warnings(self, warning_type): 346 time.sleep(self._WARNING_DISABLE_DELAY) 347 self.record("INFO", None, None, 348 "enabling %s warnings" % warning_type, 349 {"warnings.enable": warning_type}) 350 351 352 def monitor_disk_usage(self, max_rate): 353 """\ 354 Signal that the job should monitor disk space usage on / 355 and generate a warning if a test uses up disk space at a 356 rate exceeding 'max_rate'. 357 358 Parameters: 359 max_rate - the maximium allowed rate of disk consumption 360 during a test, in MB/hour, or 0 to indicate 361 no limit. 362 """ 363 self._max_disk_usage_rate = max_rate 364 365 366 def control_get(self): 367 return self.control 368 369 370 def control_set(self, control): 371 self.control = os.path.abspath(control) 372 373 374 def harness_select(self, which, harness_args): 375 self.harness = harness.select(which, self, harness_args) 376 377 378 def setup_dirs(self, results_dir, tmp_dir): 379 if not tmp_dir: 380 tmp_dir = os.path.join(self.tmpdir, 'build') 381 if not os.path.exists(tmp_dir): 382 os.mkdir(tmp_dir) 383 if not os.path.isdir(tmp_dir): 384 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 385 raise ValueError(e_msg) 386 387 # We label the first build "build" and then subsequent ones 388 # as "build.2", "build.3", etc. Whilst this is a little bit 389 # inconsistent, 99.9% of jobs will only have one build 390 # (that's not done as kernbench, sparse, or buildtest), 391 # so it works out much cleaner. One of life's compromises. 392 if not results_dir: 393 results_dir = os.path.join(self.resultdir, 'build') 394 i = 2 395 while os.path.exists(results_dir): 396 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 397 i += 1 398 if not os.path.exists(results_dir): 399 os.mkdir(results_dir) 400 401 return (results_dir, tmp_dir) 402 403 404 def barrier(self, *args, **kwds): 405 """Create a barrier object""" 406 return barrier.barrier(*args, **kwds) 407 408 409 def install_pkg(self, name, pkg_type, install_dir): 410 ''' 411 This method is a simple wrapper around the actual package 412 installation method in the Packager class. This is used 413 internally by the profilers, deps and tests code. 414 name : name of the package (ex: sleeptest, dbench etc.) 415 pkg_type : Type of the package (ex: test, dep etc.) 416 install_dir : The directory in which the source is actually 417 untarred into. (ex: client/profilers/<name> for profilers) 418 ''' 419 if self.pkgmgr.repositories: 420 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 421 422 423 def add_repository(self, repo_urls): 424 ''' 425 Adds the repository locations to the job so that packages 426 can be fetched from them when needed. The repository list 427 needs to be a string list 428 Ex: job.add_repository(['http://blah1','http://blah2']) 429 ''' 430 for repo_url in repo_urls: 431 self.pkgmgr.add_repository(repo_url) 432 433 # Fetch the packages' checksum file that contains the checksums 434 # of all the packages if it is not already fetched. The checksum 435 # is always fetched whenever a job is first started. This 436 # is not done in the job's constructor as we don't have the list of 437 # the repositories there (and obviously don't care about this file 438 # if we are not using the repos) 439 try: 440 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 441 packages.CHECKSUM_FILE) 442 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, 443 checksum_file_path, use_checksum=False) 444 except error.PackageFetchError: 445 # packaging system might not be working in this case 446 # Silently fall back to the normal case 447 pass 448 449 450 def require_gcc(self): 451 """ 452 Test whether gcc is installed on the machine. 453 """ 454 # check if gcc is installed on the system. 455 try: 456 utils.system('which gcc') 457 except error.CmdError: 458 raise NotAvailableError('gcc is required by this job and is ' 459 'not available on the system') 460 461 462 def setup_dep(self, deps): 463 """Set up the dependencies for this test. 464 deps is a list of libraries required for this test. 465 """ 466 # Fetch the deps from the repositories and set them up. 467 for dep in deps: 468 dep_dir = os.path.join(self.autodir, 'deps', dep) 469 # Search for the dependency in the repositories if specified, 470 # else check locally. 471 try: 472 self.install_pkg(dep, 'dep', dep_dir) 473 except error.PackageInstallError: 474 # see if the dep is there locally 475 pass 476 477 # dep_dir might not exist if it is not fetched from the repos 478 if not os.path.exists(dep_dir): 479 raise error.TestError("Dependency %s does not exist" % dep) 480 481 os.chdir(dep_dir) 482 if execfile('%s.py' % dep, {}) is None: 483 logging.info('Dependency %s successfuly built', dep) 484 485 486 def _runtest(self, url, tag, timeout, args, dargs): 487 try: 488 l = lambda : test.runtest(self, url, tag, args, dargs) 489 pid = parallel.fork_start(self.resultdir, l) 490 491 if timeout: 492 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 493 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 494 else: 495 parallel.fork_waitfor(self.resultdir, pid) 496 497 except error.TestBaseException: 498 # These are already classified with an error type (exit_status) 499 raise 500 except error.JobError: 501 raise # Caught further up and turned into an ABORT. 502 except Exception, e: 503 # Converts all other exceptions thrown by the test regardless 504 # of phase into a TestError(TestBaseException) subclass that 505 # reports them with their full stack trace. 506 raise error.UnhandledTestError(e) 507 508 509 def _run_test_base(self, url, *args, **dargs): 510 """ 511 Prepares arguments and run functions to run_test and run_test_detail. 512 513 @param url A url that identifies the test to run. 514 @param tag An optional keyword argument that will be added to the 515 test and subdir name. 516 @param subdir_tag An optional keyword argument that will be added 517 to the subdir name. 518 519 @returns: 520 subdir: Test subdirectory 521 testname: Test name 522 group_func: Actual test run function 523 timeout: Test timeout 524 """ 525 _group, testname = self.pkgmgr.get_package_name(url, 'test') 526 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 527 self._make_test_outputdir(subdir) 528 529 timeout = dargs.pop('timeout', None) 530 if timeout: 531 logging.debug('Test has timeout: %d sec.', timeout) 532 533 def log_warning(reason): 534 self.record("WARN", subdir, testname, reason) 535 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 536 def group_func(): 537 try: 538 self._runtest(url, tag, timeout, args, dargs) 539 except error.TestBaseException, detail: 540 # The error is already classified, record it properly. 541 self.record(detail.exit_status, subdir, testname, str(detail)) 542 raise 543 else: 544 self.record('GOOD', subdir, testname, 'completed successfully') 545 546 return (subdir, testname, group_func, timeout) 547 548 549 @_run_test_complete_on_exit 550 def run_test(self, url, *args, **dargs): 551 """ 552 Summon a test object and run it. 553 554 @param url A url that identifies the test to run. 555 @param tag An optional keyword argument that will be added to the 556 test and subdir name. 557 @param subdir_tag An optional keyword argument that will be added 558 to the subdir name. 559 560 @returns True if the test passes, False otherwise. 561 """ 562 (subdir, testname, group_func, timeout) = self._run_test_base(url, 563 *args, 564 **dargs) 565 try: 566 self._rungroup(subdir, testname, group_func, timeout) 567 return True 568 except error.TestBaseException: 569 return False 570 # Any other exception here will be given to the caller 571 # 572 # NOTE: The only exception possible from the control file here 573 # is error.JobError as _runtest() turns all others into an 574 # UnhandledTestError that is caught above. 575 576 577 @_run_test_complete_on_exit 578 def run_test_detail(self, url, *args, **dargs): 579 """ 580 Summon a test object and run it, returning test status. 581 582 @param url A url that identifies the test to run. 583 @param tag An optional keyword argument that will be added to the 584 test and subdir name. 585 @param subdir_tag An optional keyword argument that will be added 586 to the subdir name. 587 588 @returns Test status 589 @see: client/common_lib/error.py, exit_status 590 """ 591 (subdir, testname, group_func, timeout) = self._run_test_base(url, 592 *args, 593 **dargs) 594 try: 595 self._rungroup(subdir, testname, group_func, timeout) 596 return 'GOOD' 597 except error.TestBaseException, detail: 598 return detail.exit_status 599 600 601 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 602 """\ 603 subdir: 604 name of the group 605 testname: 606 name of the test to run, or support step 607 function: 608 subroutine to run 609 *args: 610 arguments for the function 611 612 Returns the result of the passed in function 613 """ 614 615 try: 616 optional_fields = None 617 if timeout: 618 optional_fields = {} 619 optional_fields['timeout'] = timeout 620 self.record('START', subdir, testname, 621 optional_fields=optional_fields) 622 623 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 624 try: 625 result = function(*args, **dargs) 626 self.record('END GOOD', subdir, testname) 627 return result 628 except error.TestBaseException, e: 629 self.record('END %s' % e.exit_status, subdir, testname) 630 raise 631 except error.JobError, e: 632 self.record('END ABORT', subdir, testname) 633 raise 634 except Exception, e: 635 # This should only ever happen due to a bug in the given 636 # function's code. The common case of being called by 637 # run_test() will never reach this. If a control file called 638 # run_group() itself, bugs in its function will be caught 639 # here. 640 err_msg = str(e) + '\n' + traceback.format_exc() 641 self.record('END ERROR', subdir, testname, err_msg) 642 raise 643 finally: 644 self._state.discard('client', 'unexpected_reboot') 645 646 647 def run_group(self, function, tag=None, **dargs): 648 """ 649 Run a function nested within a group level. 650 651 function: 652 Callable to run. 653 tag: 654 An optional tag name for the group. If None (default) 655 function.__name__ will be used. 656 **dargs: 657 Named arguments for the function. 658 """ 659 if tag: 660 name = tag 661 else: 662 name = function.__name__ 663 664 try: 665 return self._rungroup(subdir=None, testname=name, 666 function=function, timeout=None, **dargs) 667 except (SystemExit, error.TestBaseException): 668 raise 669 # If there was a different exception, turn it into a TestError. 670 # It will be caught by step_engine or _run_step_fn. 671 except Exception, e: 672 raise error.UnhandledTestError(e) 673 674 675 def cpu_count(self): 676 return utils.count_cpus() # use total system count 677 678 679 def start_reboot(self): 680 self.record('START', None, 'reboot') 681 self.record('GOOD', None, 'reboot.start') 682 683 684 def _record_reboot_failure(self, subdir, operation, status, 685 running_id=None): 686 self.record("ABORT", subdir, operation, status) 687 if not running_id: 688 running_id = utils.running_os_ident() 689 kernel = {"kernel": running_id.split("::")[0]} 690 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 691 692 693 def _check_post_reboot(self, subdir, running_id=None): 694 """ 695 Function to perform post boot checks such as if the system configuration 696 has changed across reboots (specifically, CPUs and partitions). 697 698 @param subdir: The subdir to use in the job.record call. 699 @param running_id: An optional running_id to include in the reboot 700 failure log message 701 702 @raise JobError: Raised if the current configuration does not match the 703 pre-reboot configuration. 704 """ 705 # check to see if any partitions have changed 706 partition_list = partition_lib.get_partition_list(self, 707 exclude_swap=False) 708 mount_info = partition_lib.get_mount_info(partition_list) 709 old_mount_info = self._state.get('client', 'mount_info') 710 if mount_info != old_mount_info: 711 new_entries = mount_info - old_mount_info 712 old_entries = old_mount_info - mount_info 713 description = ("mounted partitions are different after reboot " 714 "(old entries: %s, new entries: %s)" % 715 (old_entries, new_entries)) 716 self._record_reboot_failure(subdir, "reboot.verify_config", 717 description, running_id=running_id) 718 raise error.JobError("Reboot failed: %s" % description) 719 720 # check to see if any CPUs have changed 721 cpu_count = utils.count_cpus() 722 old_count = self._state.get('client', 'cpu_count') 723 if cpu_count != old_count: 724 description = ('Number of CPUs changed after reboot ' 725 '(old count: %d, new count: %d)' % 726 (old_count, cpu_count)) 727 self._record_reboot_failure(subdir, 'reboot.verify_config', 728 description, running_id=running_id) 729 raise error.JobError('Reboot failed: %s' % description) 730 731 732 def partition(self, device, loop_size=0, mountpoint=None): 733 """ 734 Work with a machine partition 735 736 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 737 @param mountpoint: Specify a directory to mount to. If not specified 738 autotest tmp directory will be used. 739 @param loop_size: Size of loopback device (in MB). Defaults to 0. 740 741 @return: A L{client.bin.partition.partition} object 742 """ 743 744 if not mountpoint: 745 mountpoint = self.tmpdir 746 return partition_lib.partition(self, device, loop_size, mountpoint) 747 748 @utils.deprecated 749 def filesystem(self, device, mountpoint=None, loop_size=0): 750 """ Same as partition 751 752 @deprecated: Use partition method instead 753 """ 754 return self.partition(device, loop_size, mountpoint) 755 756 757 def enable_external_logging(self): 758 pass 759 760 761 def disable_external_logging(self): 762 pass 763 764 765 def reboot_setup(self): 766 # save the partition list and mount points, as well as the cpu count 767 partition_list = partition_lib.get_partition_list(self, 768 exclude_swap=False) 769 mount_info = partition_lib.get_mount_info(partition_list) 770 self._state.set('client', 'mount_info', mount_info) 771 self._state.set('client', 'cpu_count', utils.count_cpus()) 772 773 774 def reboot(self): 775 self.reboot_setup() 776 self.harness.run_reboot() 777 778 # HACK: using this as a module sometimes hangs shutdown, so if it's 779 # installed unload it first 780 utils.system("modprobe -r netconsole", ignore_status=True) 781 782 # sync first, so that a sync during shutdown doesn't time out 783 utils.system("sync; sync", ignore_status=True) 784 785 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 786 self.quit() 787 788 789 def noop(self, text): 790 logging.info("job: noop: " + text) 791 792 793 @_run_test_complete_on_exit 794 def parallel(self, *tasklist): 795 """Run tasks in parallel""" 796 797 pids = [] 798 old_log_filename = self._logger.global_filename 799 for i, task in enumerate(tasklist): 800 assert isinstance(task, (tuple, list)) 801 self._logger.global_filename = old_log_filename + (".%d" % i) 802 def task_func(): 803 # stub out _record_indent with a process-local one 804 base_record_indent = self._record_indent 805 proc_local = self._job_state.property_factory( 806 '_state', '_record_indent.%d' % os.getpid(), 807 base_record_indent, namespace='client') 808 self.__class__._record_indent = proc_local 809 task[0](*task[1:]) 810 pids.append(parallel.fork_start(self.resultdir, task_func)) 811 812 old_log_path = os.path.join(self.resultdir, old_log_filename) 813 old_log = open(old_log_path, "a") 814 exceptions = [] 815 for i, pid in enumerate(pids): 816 # wait for the task to finish 817 try: 818 parallel.fork_waitfor(self.resultdir, pid) 819 except Exception, e: 820 exceptions.append(e) 821 # copy the logs from the subtask into the main log 822 new_log_path = old_log_path + (".%d" % i) 823 if os.path.exists(new_log_path): 824 new_log = open(new_log_path) 825 old_log.write(new_log.read()) 826 new_log.close() 827 old_log.flush() 828 os.remove(new_log_path) 829 old_log.close() 830 831 self._logger.global_filename = old_log_filename 832 833 # handle any exceptions raised by the parallel tasks 834 if exceptions: 835 msg = "%d task(s) failed in job.parallel" % len(exceptions) 836 raise error.JobError(msg) 837 838 839 def quit(self): 840 # XXX: should have a better name. 841 self.harness.run_pause() 842 raise error.JobContinue("more to come") 843 844 845 def complete(self, status): 846 """Write pending reports, clean up, and exit""" 847 # write out a job HTML report 848 try: 849 html_report.create_report(self.resultdir) 850 except Exception, e: 851 logging.error("Error writing job HTML report: %s", e) 852 853 # We are about to exit 'complete' so clean up the control file. 854 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 855 shutil.move(self._state_file, dest) 856 857 self.harness.run_complete() 858 self.disable_external_logging() 859 sys.exit(status) 860 861 862 def _load_state(self): 863 # grab any initial state and set up $CONTROL.state as the backing file 864 init_state_file = self.control + '.init.state' 865 self._state_file = self.control + '.state' 866 if os.path.exists(init_state_file): 867 shutil.move(init_state_file, self._state_file) 868 self._state.set_backing_file(self._state_file) 869 870 # initialize the state engine, if necessary 871 has_steps = self._state.has('client', 'steps') 872 if not self._is_continuation and has_steps: 873 raise RuntimeError('Loaded state can only contain client.steps if ' 874 'this is a continuation') 875 876 if not has_steps: 877 logging.debug('Initializing the state engine') 878 self._state.set('client', 'steps', []) 879 880 881 def handle_persistent_option(self, options, option_name): 882 """ 883 Select option from command line or persistent state. 884 Store selected option to allow standalone client to continue 885 after reboot with previously selected options. 886 Priority: 887 1. explicitly specified via command line 888 2. stored in state file (if continuing job '-c') 889 3. default == None 890 """ 891 option = None 892 cmd_line_option = getattr(options, option_name) 893 if cmd_line_option: 894 option = cmd_line_option 895 self._state.set('client', option_name, option) 896 else: 897 stored_option = self._state.get('client', option_name, None) 898 if stored_option: 899 option = stored_option 900 logging.debug('Persistent option %s now set to %s', option_name, option) 901 return option 902 903 904 def __create_step_tuple(self, fn, args, dargs): 905 # Legacy code passes in an array where the first arg is 906 # the function or its name. 907 if isinstance(fn, list): 908 assert(len(args) == 0) 909 assert(len(dargs) == 0) 910 args = fn[1:] 911 fn = fn[0] 912 # Pickling actual functions is hairy, thus we have to call 913 # them by name. Unfortunately, this means only functions 914 # defined globally can be used as a next step. 915 if callable(fn): 916 fn = fn.__name__ 917 if not isinstance(fn, types.StringTypes): 918 raise StepError("Next steps must be functions or " 919 "strings containing the function name") 920 ancestry = copy.copy(self._current_step_ancestry) 921 return (ancestry, fn, args, dargs) 922 923 924 def next_step_append(self, fn, *args, **dargs): 925 """Define the next step and place it at the end""" 926 steps = self._state.get('client', 'steps') 927 steps.append(self.__create_step_tuple(fn, args, dargs)) 928 self._state.set('client', 'steps', steps) 929 930 931 def next_step(self, fn, *args, **dargs): 932 """Create a new step and place it after any steps added 933 while running the current step but before any steps added in 934 previous steps""" 935 steps = self._state.get('client', 'steps') 936 steps.insert(self._next_step_index, 937 self.__create_step_tuple(fn, args, dargs)) 938 self._next_step_index += 1 939 self._state.set('client', 'steps', steps) 940 941 942 def next_step_prepend(self, fn, *args, **dargs): 943 """Insert a new step, executing first""" 944 steps = self._state.get('client', 'steps') 945 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 946 self._next_step_index += 1 947 self._state.set('client', 'steps', steps) 948 949 950 951 def _run_step_fn(self, local_vars, fn, args, dargs): 952 """Run a (step) function within the given context""" 953 954 local_vars['__args'] = args 955 local_vars['__dargs'] = dargs 956 try: 957 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 958 return local_vars['__ret'] 959 except SystemExit: 960 raise # Send error.JobContinue and JobComplete on up to runjob. 961 except error.TestNAError, detail: 962 self.record(detail.exit_status, None, fn, str(detail)) 963 except Exception, detail: 964 raise error.UnhandledJobError(detail) 965 966 967 def _create_frame(self, global_vars, ancestry, fn_name): 968 """Set up the environment like it would have been when this 969 function was first defined. 970 971 Child step engine 'implementations' must have 'return locals()' 972 at end end of their steps. Because of this, we can call the 973 parent function and get back all child functions (i.e. those 974 defined within it). 975 976 Unfortunately, the call stack of the function calling 977 job.next_step might have been deeper than the function it 978 added. In order to make sure that the environment is what it 979 should be, we need to then pop off the frames we built until 980 we find the frame where the function was first defined.""" 981 982 # The copies ensure that the parent frames are not modified 983 # while building child frames. This matters if we then 984 # pop some frames in the next part of this function. 985 current_frame = copy.copy(global_vars) 986 frames = [current_frame] 987 for steps_fn_name in ancestry: 988 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 989 current_frame = copy.copy(ret) 990 frames.append(current_frame) 991 992 # Walk up the stack frames until we find the place fn_name was defined. 993 while len(frames) > 2: 994 if fn_name not in frames[-2]: 995 break 996 if frames[-2][fn_name] != frames[-1][fn_name]: 997 break 998 frames.pop() 999 ancestry.pop() 1000 1001 return (frames[-1], ancestry) 1002 1003 1004 def _add_step_init(self, local_vars, current_function): 1005 """If the function returned a dictionary that includes a 1006 function named 'step_init', prepend it to our list of steps. 1007 This will only get run the first time a function with a nested 1008 use of the step engine is run.""" 1009 1010 if (isinstance(local_vars, dict) and 1011 'step_init' in local_vars and 1012 callable(local_vars['step_init'])): 1013 # The init step is a child of the function 1014 # we were just running. 1015 self._current_step_ancestry.append(current_function) 1016 self.next_step_prepend('step_init') 1017 1018 1019 def step_engine(self): 1020 """The multi-run engine used when the control file defines step_init. 1021 1022 Does the next step. 1023 """ 1024 1025 # Set up the environment and then interpret the control file. 1026 # Some control files will have code outside of functions, 1027 # which means we need to have our state engine initialized 1028 # before reading in the file. 1029 global_control_vars = {'job': self, 1030 'args': self.args} 1031 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1032 try: 1033 execfile(self.control, global_control_vars, global_control_vars) 1034 except error.TestNAError, detail: 1035 self.record(detail.exit_status, None, self.control, str(detail)) 1036 except SystemExit: 1037 raise # Send error.JobContinue and JobComplete on up to runjob. 1038 except Exception, detail: 1039 # Syntax errors or other general Python exceptions coming out of 1040 # the top level of the control file itself go through here. 1041 raise error.UnhandledJobError(detail) 1042 1043 # If we loaded in a mid-job state file, then we presumably 1044 # know what steps we have yet to run. 1045 if not self._is_continuation: 1046 if 'step_init' in global_control_vars: 1047 self.next_step(global_control_vars['step_init']) 1048 else: 1049 # if last job failed due to unexpected reboot, record it as fail 1050 # so harness gets called 1051 last_job = self._state.get('client', 'unexpected_reboot', None) 1052 if last_job: 1053 subdir, testname = last_job 1054 self.record('FAIL', subdir, testname, 'unexpected reboot') 1055 self.record('END FAIL', subdir, testname) 1056 1057 # Iterate through the steps. If we reboot, we'll simply 1058 # continue iterating on the next step. 1059 while len(self._state.get('client', 'steps')) > 0: 1060 steps = self._state.get('client', 'steps') 1061 (ancestry, fn_name, args, dargs) = steps.pop(0) 1062 self._state.set('client', 'steps', steps) 1063 1064 self._next_step_index = 0 1065 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1066 local_vars, self._current_step_ancestry = ret 1067 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1068 self._add_step_init(local_vars, fn_name) 1069 1070 1071 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1072 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1073 on_every_test) 1074 1075 1076 def add_sysinfo_logfile(self, file, on_every_test=False): 1077 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1078 1079 1080 def _add_sysinfo_loggable(self, loggable, on_every_test): 1081 if on_every_test: 1082 self.sysinfo.test_loggables.add(loggable) 1083 else: 1084 self.sysinfo.boot_loggables.add(loggable) 1085 self._save_sysinfo_state() 1086 1087 1088 def _load_sysinfo_state(self): 1089 state = self._state.get('client', 'sysinfo', None) 1090 if state: 1091 self.sysinfo.deserialize(state) 1092 1093 1094 def _save_sysinfo_state(self): 1095 state = self.sysinfo.serialize() 1096 self._state.set('client', 'sysinfo', state) 1097 1098 1099class disk_usage_monitor: 1100 def __init__(self, logging_func, device, max_mb_per_hour): 1101 self.func = logging_func 1102 self.device = device 1103 self.max_mb_per_hour = max_mb_per_hour 1104 1105 1106 def start(self): 1107 self.initial_space = utils.freespace(self.device) 1108 self.start_time = time.time() 1109 1110 1111 def stop(self): 1112 # if no maximum usage rate was set, we don't need to 1113 # generate any warnings 1114 if not self.max_mb_per_hour: 1115 return 1116 1117 final_space = utils.freespace(self.device) 1118 used_space = self.initial_space - final_space 1119 stop_time = time.time() 1120 total_time = stop_time - self.start_time 1121 # round up the time to one minute, to keep extremely short 1122 # tests from generating false positives due to short, badly 1123 # timed bursts of activity 1124 total_time = max(total_time, 60.0) 1125 1126 # determine the usage rate 1127 bytes_per_sec = used_space / total_time 1128 mb_per_sec = bytes_per_sec / 1024**2 1129 mb_per_hour = mb_per_sec * 60 * 60 1130 1131 if mb_per_hour > self.max_mb_per_hour: 1132 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1133 msg %= (self.device, mb_per_hour) 1134 self.func(msg) 1135 1136 1137 @classmethod 1138 def watch(cls, *monitor_args, **monitor_dargs): 1139 """ Generic decorator to wrap a function call with the 1140 standard create-monitor -> start -> call -> stop idiom.""" 1141 def decorator(func): 1142 def watched_func(*args, **dargs): 1143 monitor = cls(*monitor_args, **monitor_dargs) 1144 monitor.start() 1145 try: 1146 func(*args, **dargs) 1147 finally: 1148 monitor.stop() 1149 return watched_func 1150 return decorator 1151 1152 1153def runjob(control, drop_caches, options): 1154 """ 1155 Run a job using the given control file. 1156 1157 This is the main interface to this module. 1158 1159 @see base_job.__init__ for parameter info. 1160 """ 1161 control = os.path.abspath(control) 1162 state = control + '.state' 1163 # Ensure state file is cleaned up before the job starts to run if autotest 1164 # is not running with the --continue flag 1165 if not options.cont and os.path.isfile(state): 1166 logging.debug('Cleaning up previously found state file') 1167 os.remove(state) 1168 1169 # instantiate the job object ready for the control file. 1170 myjob = None 1171 try: 1172 # Check that the control file is valid 1173 if not os.path.exists(control): 1174 raise error.JobError(control + ": control file not found") 1175 1176 # When continuing, the job is complete when there is no 1177 # state file, ensure we don't try and continue. 1178 if options.cont and not os.path.exists(state): 1179 raise error.JobComplete("all done") 1180 1181 myjob = job(control=control, drop_caches=drop_caches, options=options) 1182 1183 # Load in the users control file, may do any one of: 1184 # 1) execute in toto 1185 # 2) define steps, and select the first via next_step() 1186 myjob.step_engine() 1187 1188 except error.JobContinue: 1189 sys.exit(5) 1190 1191 except error.JobComplete: 1192 sys.exit(1) 1193 1194 except error.JobError, instance: 1195 logging.error("JOB ERROR: " + str(instance)) 1196 if myjob: 1197 command = None 1198 if len(instance.args) > 1: 1199 command = instance.args[1] 1200 myjob.record('ABORT', None, command, str(instance)) 1201 myjob.record('END ABORT', None, None, str(instance)) 1202 assert myjob._record_indent == 0 1203 myjob.complete(1) 1204 else: 1205 sys.exit(1) 1206 1207 except Exception, e: 1208 # NOTE: job._run_step_fn and job.step_engine will turn things into 1209 # a JobError for us. If we get here, its likely an autotest bug. 1210 msg = str(e) + '\n' + traceback.format_exc() 1211 logging.critical("JOB ERROR (autotest bug?): " + msg) 1212 if myjob: 1213 myjob.record('END ABORT', None, None, msg) 1214 assert myjob._record_indent == 0 1215 myjob.complete(1) 1216 else: 1217 sys.exit(1) 1218 1219 # If we get here, then we assume the job is complete and good. 1220 myjob.record('END GOOD', None, None) 1221 assert myjob._record_indent == 0 1222 1223 myjob.complete(0) 1224 1225 1226class job(base_client_job): 1227 1228 def __init__(self, *args, **kwargs): 1229 base_client_job.__init__(self, *args, **kwargs) 1230 1231 1232 def run_test(self, url, *args, **dargs): 1233 log_pauser = cros_logging.LogRotationPauser() 1234 passed = False 1235 try: 1236 log_pauser.begin() 1237 passed = base_client_job.run_test(self, url, *args, **dargs) 1238 if not passed: 1239 # Save the VM state immediately after the test failure. 1240 # This is a NOOP if the the test isn't running in a VM or 1241 # if the VM is not properly configured to save state. 1242 _group, testname = self.pkgmgr.get_package_name(url, 'test') 1243 now = datetime.now().strftime('%I:%M:%S.%f') 1244 checkpoint_name = '%s-%s' % (testname, now) 1245 utils.save_vm_state(checkpoint_name) 1246 finally: 1247 log_pauser.end() 1248 return passed 1249 1250 1251 def reboot(self): 1252 self.reboot_setup() 1253 self.harness.run_reboot() 1254 1255 # sync first, so that a sync during shutdown doesn't time out 1256 utils.system('sync; sync', ignore_status=True) 1257 1258 utils.system('reboot </dev/null >/dev/null 2>&1 &') 1259 self.quit() 1260 1261 1262 def require_gcc(self): 1263 return False 1264