1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8# pylint: disable=missing-docstring 9 10import copy 11from datetime import datetime 12import getpass 13import glob 14import logging 15import os 16import re 17import shutil 18import sys 19import time 20import traceback 21import types 22import weakref 23 24import common 25from autotest_lib.client.bin import client_logging_config 26from autotest_lib.client.bin import harness 27from autotest_lib.client.bin import local_host 28from autotest_lib.client.bin import parallel 29from autotest_lib.client.bin import partition as partition_lib 30from autotest_lib.client.bin import profilers 31from autotest_lib.client.bin import sysinfo 32from autotest_lib.client.bin import test 33from autotest_lib.client.bin import utils 34from autotest_lib.client.common_lib import barrier 35from autotest_lib.client.common_lib import base_job 36from autotest_lib.client.common_lib import packages 37from autotest_lib.client.common_lib import error 38from autotest_lib.client.common_lib import global_config 39from autotest_lib.client.common_lib import logging_manager 40from autotest_lib.client.common_lib import packages 41from autotest_lib.client.cros import cros_logging 42from autotest_lib.client.tools import html_report 43 44GLOBAL_CONFIG = global_config.global_config 45 46LAST_BOOT_TAG = object() 47JOB_PREAMBLE = """ 48from autotest_lib.client.common_lib.error import * 49from autotest_lib.client.bin.utils import * 50""" 51 52 53class StepError(error.AutotestError): 54 pass 55 56class NotAvailableError(error.AutotestError): 57 pass 58 59 60 61def _run_test_complete_on_exit(f): 62 """Decorator for job methods that automatically calls 63 self.harness.run_test_complete when the method exits, if appropriate.""" 64 def wrapped(self, *args, **dargs): 65 try: 66 return f(self, *args, **dargs) 67 finally: 68 if self._logger.global_filename == 'status': 69 self.harness.run_test_complete() 70 if self.drop_caches: 71 utils.drop_caches() 72 wrapped.__name__ = f.__name__ 73 wrapped.__doc__ = f.__doc__ 74 wrapped.__dict__.update(f.__dict__) 75 return wrapped 76 77 78class status_indenter(base_job.status_indenter): 79 """Provide a status indenter that is backed by job._record_prefix.""" 80 def __init__(self, job_): 81 self._job = weakref.proxy(job_) # avoid a circular reference 82 83 84 @property 85 def indent(self): 86 return self._job._record_indent 87 88 89 def increment(self): 90 self._job._record_indent += 1 91 92 93 def decrement(self): 94 self._job._record_indent -= 1 95 96 97class base_client_job(base_job.base_job): 98 """The client-side concrete implementation of base_job. 99 100 Optional properties provided by this implementation: 101 control 102 harness 103 """ 104 105 _WARNING_DISABLE_DELAY = 5 106 107 # _record_indent is a persistent property, but only on the client 108 _job_state = base_job.base_job._job_state 109 _record_indent = _job_state.property_factory( 110 '_state', '_record_indent', 0, namespace='client') 111 _max_disk_usage_rate = _job_state.property_factory( 112 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 113 114 115 def __init__(self, control, options, drop_caches=True): 116 """ 117 Prepare a client side job object. 118 119 @param control: The control file (pathname of). 120 @param options: an object which includes: 121 jobtag: The job tag string (eg "default"). 122 cont: If this is the continuation of this job. 123 harness_type: An alternative server harness. [None] 124 use_external_logging: If true, the enable_external_logging 125 method will be called during construction. [False] 126 @param drop_caches: If true, utils.drop_caches() is called before and 127 between all tests. [True] 128 """ 129 super(base_client_job, self).__init__(options=options) 130 self._pre_record_init(control, options) 131 try: 132 self._post_record_init(control, options, drop_caches) 133 except Exception, err: 134 self.record( 135 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 136 str(err)) 137 raise 138 139 140 @classmethod 141 def _get_environ_autodir(cls): 142 return os.environ['AUTODIR'] 143 144 145 @classmethod 146 def _find_base_directories(cls): 147 """ 148 Determine locations of autodir and clientdir (which are the same) 149 using os.environ. Serverdir does not exist in this context. 150 """ 151 autodir = clientdir = cls._get_environ_autodir() 152 return autodir, clientdir, None 153 154 155 @classmethod 156 def _parse_args(cls, args): 157 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 158 159 160 def _find_resultdir(self, options): 161 """ 162 Determine the directory for storing results. On a client this is 163 always <autodir>/results/<tag>, where tag is passed in on the command 164 line as an option. 165 """ 166 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 167 'output_dir', 168 default="") 169 if options.output_dir: 170 basedir = options.output_dir 171 elif output_dir_config: 172 basedir = output_dir_config 173 else: 174 basedir = self.autodir 175 176 return os.path.join(basedir, 'results', options.tag) 177 178 179 def _get_status_logger(self): 180 """Return a reference to the status logger.""" 181 return self._logger 182 183 184 def _pre_record_init(self, control, options): 185 """ 186 Initialization function that should peform ONLY the required 187 setup so that the self.record() method works. 188 189 As of now self.record() needs self.resultdir, self._group_level, 190 self.harness and of course self._logger. 191 """ 192 if not options.cont: 193 self._cleanup_debugdir_files() 194 self._cleanup_results_dir() 195 196 logging_manager.configure_logging( 197 client_logging_config.ClientLoggingConfig(), 198 results_dir=self.resultdir, 199 verbose=options.verbose) 200 logging.info('Writing results to %s', self.resultdir) 201 202 # init_group_level needs the state 203 self.control = os.path.realpath(control) 204 self._is_continuation = options.cont 205 self._current_step_ancestry = [] 206 self._next_step_index = 0 207 self._load_state() 208 209 _harness = self.handle_persistent_option(options, 'harness') 210 _harness_args = self.handle_persistent_option(options, 'harness_args') 211 212 self.harness = harness.select(_harness, self, _harness_args) 213 214 # set up the status logger 215 def client_job_record_hook(entry): 216 msg_tag = '' 217 if '.' in self._logger.global_filename: 218 msg_tag = self._logger.global_filename.split('.', 1)[1] 219 # send the entry to the job harness 220 message = '\n'.join([entry.message] + entry.extra_message_lines) 221 rendered_entry = self._logger.render_entry(entry) 222 self.harness.test_status_detail(entry.status_code, entry.subdir, 223 entry.operation, message, msg_tag, 224 entry.fields) 225 self.harness.test_status(rendered_entry, msg_tag) 226 # send the entry to stdout, if it's enabled 227 logging.info(rendered_entry) 228 self._logger = base_job.status_logger( 229 self, status_indenter(self), record_hook=client_job_record_hook) 230 231 232 def _post_record_init(self, control, options, drop_caches): 233 """ 234 Perform job initialization not required by self.record(). 235 """ 236 self._init_drop_caches(drop_caches) 237 238 self._init_packages() 239 240 self.sysinfo = sysinfo.sysinfo(self.resultdir) 241 self._load_sysinfo_state() 242 243 if not options.cont: 244 download = os.path.join(self.testdir, 'download') 245 if not os.path.exists(download): 246 os.mkdir(download) 247 248 shutil.copyfile(self.control, 249 os.path.join(self.resultdir, 'control')) 250 251 self.control = control 252 253 self.logging = logging_manager.get_logging_manager( 254 manage_stdout_and_stderr=True, redirect_fds=True) 255 self.logging.start_logging() 256 257 self.profilers = profilers.profilers(self) 258 259 self.machines = [options.hostname] 260 self.machine_dict_list = [{'hostname' : options.hostname}] 261 # Client side tests should always run the same whether or not they are 262 # running in the lab. 263 self.in_lab = False 264 self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) 265 266 self.args = [] 267 if options.args: 268 self.args = self._parse_args(options.args) 269 270 if options.user: 271 self.user = options.user 272 else: 273 self.user = getpass.getuser() 274 275 self.sysinfo.log_per_reboot_data() 276 277 if not options.cont: 278 self.record('START', None, None) 279 280 self.harness.run_start() 281 282 if options.log: 283 self.enable_external_logging() 284 285 self.num_tests_run = None 286 self.num_tests_failed = None 287 288 self.warning_loggers = None 289 self.warning_manager = None 290 291 292 def _init_drop_caches(self, drop_caches): 293 """ 294 Perform the drop caches initialization. 295 """ 296 self.drop_caches_between_iterations = ( 297 GLOBAL_CONFIG.get_config_value('CLIENT', 298 'drop_caches_between_iterations', 299 type=bool, default=True)) 300 self.drop_caches = drop_caches 301 if self.drop_caches: 302 utils.drop_caches() 303 304 305 def _init_packages(self): 306 """ 307 Perform the packages support initialization. 308 """ 309 self.pkgmgr = packages.PackageManager( 310 self.autodir, run_function_dargs={'timeout':3600}) 311 312 313 def _cleanup_results_dir(self): 314 """Delete everything in resultsdir""" 315 assert os.path.exists(self.resultdir) 316 list_files = glob.glob('%s/*' % self.resultdir) 317 for f in list_files: 318 if os.path.isdir(f): 319 shutil.rmtree(f) 320 elif os.path.isfile(f): 321 os.remove(f) 322 323 324 def _cleanup_debugdir_files(self): 325 """ 326 Delete any leftover debugdir files 327 """ 328 list_files = glob.glob("/tmp/autotest_results_dir.*") 329 for f in list_files: 330 os.remove(f) 331 332 333 def disable_warnings(self, warning_type): 334 self.record("INFO", None, None, 335 "disabling %s warnings" % warning_type, 336 {"warnings.disable": warning_type}) 337 time.sleep(self._WARNING_DISABLE_DELAY) 338 339 340 def enable_warnings(self, warning_type): 341 time.sleep(self._WARNING_DISABLE_DELAY) 342 self.record("INFO", None, None, 343 "enabling %s warnings" % warning_type, 344 {"warnings.enable": warning_type}) 345 346 347 def monitor_disk_usage(self, max_rate): 348 """\ 349 Signal that the job should monitor disk space usage on / 350 and generate a warning if a test uses up disk space at a 351 rate exceeding 'max_rate'. 352 353 Parameters: 354 max_rate - the maximium allowed rate of disk consumption 355 during a test, in MB/hour, or 0 to indicate 356 no limit. 357 """ 358 self._max_disk_usage_rate = max_rate 359 360 361 def control_get(self): 362 return self.control 363 364 365 def control_set(self, control): 366 self.control = os.path.abspath(control) 367 368 369 def harness_select(self, which, harness_args): 370 self.harness = harness.select(which, self, harness_args) 371 372 373 def setup_dirs(self, results_dir, tmp_dir): 374 if not tmp_dir: 375 tmp_dir = os.path.join(self.tmpdir, 'build') 376 if not os.path.exists(tmp_dir): 377 os.mkdir(tmp_dir) 378 if not os.path.isdir(tmp_dir): 379 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 380 raise ValueError(e_msg) 381 382 # We label the first build "build" and then subsequent ones 383 # as "build.2", "build.3", etc. Whilst this is a little bit 384 # inconsistent, 99.9% of jobs will only have one build 385 # (that's not done as kernbench, sparse, or buildtest), 386 # so it works out much cleaner. One of life's compromises. 387 if not results_dir: 388 results_dir = os.path.join(self.resultdir, 'build') 389 i = 2 390 while os.path.exists(results_dir): 391 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 392 i += 1 393 if not os.path.exists(results_dir): 394 os.mkdir(results_dir) 395 396 return (results_dir, tmp_dir) 397 398 399 def barrier(self, *args, **kwds): 400 """Create a barrier object""" 401 return barrier.barrier(*args, **kwds) 402 403 404 def install_pkg(self, name, pkg_type, install_dir): 405 ''' 406 This method is a simple wrapper around the actual package 407 installation method in the Packager class. This is used 408 internally by the profilers, deps and tests code. 409 name : name of the package (ex: sleeptest, dbench etc.) 410 pkg_type : Type of the package (ex: test, dep etc.) 411 install_dir : The directory in which the source is actually 412 untarred into. (ex: client/profilers/<name> for profilers) 413 ''' 414 if self.pkgmgr.repositories: 415 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 416 417 418 def add_repository(self, repo_urls): 419 ''' 420 Adds the repository locations to the job so that packages 421 can be fetched from them when needed. The repository list 422 needs to be a string list 423 Ex: job.add_repository(['http://blah1','http://blah2']) 424 ''' 425 for repo_url in repo_urls: 426 self.pkgmgr.add_repository(repo_url) 427 428 # Fetch the packages' checksum file that contains the checksums 429 # of all the packages if it is not already fetched. The checksum 430 # is always fetched whenever a job is first started. This 431 # is not done in the job's constructor as we don't have the list of 432 # the repositories there (and obviously don't care about this file 433 # if we are not using the repos) 434 try: 435 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 436 packages.CHECKSUM_FILE) 437 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, 438 checksum_file_path, use_checksum=False) 439 except error.PackageFetchError: 440 # packaging system might not be working in this case 441 # Silently fall back to the normal case 442 pass 443 444 445 def require_gcc(self): 446 """ 447 Test whether gcc is installed on the machine. 448 """ 449 # check if gcc is installed on the system. 450 try: 451 utils.system('which gcc') 452 except error.CmdError: 453 raise NotAvailableError('gcc is required by this job and is ' 454 'not available on the system') 455 456 457 def setup_dep(self, deps): 458 """Set up the dependencies for this test. 459 deps is a list of libraries required for this test. 460 """ 461 # Fetch the deps from the repositories and set them up. 462 for dep in deps: 463 dep_dir = os.path.join(self.autodir, 'deps', dep) 464 # Search for the dependency in the repositories if specified, 465 # else check locally. 466 try: 467 self.install_pkg(dep, 'dep', dep_dir) 468 except error.PackageInstallError: 469 # see if the dep is there locally 470 pass 471 472 # dep_dir might not exist if it is not fetched from the repos 473 if not os.path.exists(dep_dir): 474 raise error.TestError("Dependency %s does not exist" % dep) 475 476 os.chdir(dep_dir) 477 if execfile('%s.py' % dep, {}) is None: 478 logging.info('Dependency %s successfuly built', dep) 479 480 481 def _runtest(self, url, tag, timeout, args, dargs): 482 try: 483 l = lambda : test.runtest(self, url, tag, args, dargs) 484 pid = parallel.fork_start(self.resultdir, l) 485 486 if timeout: 487 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 488 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 489 else: 490 parallel.fork_waitfor(self.resultdir, pid) 491 492 except error.TestBaseException: 493 # These are already classified with an error type (exit_status) 494 raise 495 except error.JobError: 496 raise # Caught further up and turned into an ABORT. 497 except Exception, e: 498 # Converts all other exceptions thrown by the test regardless 499 # of phase into a TestError(TestBaseException) subclass that 500 # reports them with their full stack trace. 501 raise error.UnhandledTestError(e) 502 503 504 def _run_test_base(self, url, *args, **dargs): 505 """ 506 Prepares arguments and run functions to run_test and run_test_detail. 507 508 @param url A url that identifies the test to run. 509 @param tag An optional keyword argument that will be added to the 510 test and subdir name. 511 @param subdir_tag An optional keyword argument that will be added 512 to the subdir name. 513 514 @returns: 515 subdir: Test subdirectory 516 testname: Test name 517 group_func: Actual test run function 518 timeout: Test timeout 519 """ 520 _group, testname = self.pkgmgr.get_package_name(url, 'test') 521 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 522 self._make_test_outputdir(subdir) 523 524 timeout = dargs.pop('timeout', None) 525 if timeout: 526 logging.debug('Test has timeout: %d sec.', timeout) 527 528 def log_warning(reason): 529 self.record("WARN", subdir, testname, reason) 530 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 531 def group_func(): 532 try: 533 self._runtest(url, tag, timeout, args, dargs) 534 except error.TestBaseException, detail: 535 # The error is already classified, record it properly. 536 self.record(detail.exit_status, subdir, testname, str(detail)) 537 raise 538 else: 539 self.record('GOOD', subdir, testname, 'completed successfully') 540 541 return (subdir, testname, group_func, timeout) 542 543 544 @_run_test_complete_on_exit 545 def run_test(self, url, *args, **dargs): 546 """ 547 Summon a test object and run it. 548 549 @param url A url that identifies the test to run. 550 @param tag An optional keyword argument that will be added to the 551 test and subdir name. 552 @param subdir_tag An optional keyword argument that will be added 553 to the subdir name. 554 555 @returns True if the test passes, False otherwise. 556 """ 557 (subdir, testname, group_func, timeout) = self._run_test_base(url, 558 *args, 559 **dargs) 560 try: 561 self._rungroup(subdir, testname, group_func, timeout) 562 return True 563 except error.TestBaseException: 564 return False 565 # Any other exception here will be given to the caller 566 # 567 # NOTE: The only exception possible from the control file here 568 # is error.JobError as _runtest() turns all others into an 569 # UnhandledTestError that is caught above. 570 571 572 @_run_test_complete_on_exit 573 def run_test_detail(self, url, *args, **dargs): 574 """ 575 Summon a test object and run it, returning test status. 576 577 @param url A url that identifies the test to run. 578 @param tag An optional keyword argument that will be added to the 579 test and subdir name. 580 @param subdir_tag An optional keyword argument that will be added 581 to the subdir name. 582 583 @returns Test status 584 @see: client/common_lib/error.py, exit_status 585 """ 586 (subdir, testname, group_func, timeout) = self._run_test_base(url, 587 *args, 588 **dargs) 589 try: 590 self._rungroup(subdir, testname, group_func, timeout) 591 return 'GOOD' 592 except error.TestBaseException, detail: 593 return detail.exit_status 594 595 596 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 597 """\ 598 subdir: 599 name of the group 600 testname: 601 name of the test to run, or support step 602 function: 603 subroutine to run 604 *args: 605 arguments for the function 606 607 Returns the result of the passed in function 608 """ 609 610 try: 611 optional_fields = None 612 if timeout: 613 optional_fields = {} 614 optional_fields['timeout'] = timeout 615 self.record('START', subdir, testname, 616 optional_fields=optional_fields) 617 618 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 619 try: 620 result = function(*args, **dargs) 621 self.record('END GOOD', subdir, testname) 622 return result 623 except error.TestBaseException, e: 624 self.record('END %s' % e.exit_status, subdir, testname) 625 raise 626 except error.JobError, e: 627 self.record('END ABORT', subdir, testname) 628 raise 629 except Exception, e: 630 # This should only ever happen due to a bug in the given 631 # function's code. The common case of being called by 632 # run_test() will never reach this. If a control file called 633 # run_group() itself, bugs in its function will be caught 634 # here. 635 err_msg = str(e) + '\n' + traceback.format_exc() 636 self.record('END ERROR', subdir, testname, err_msg) 637 raise 638 finally: 639 self._state.discard('client', 'unexpected_reboot') 640 641 642 def run_group(self, function, tag=None, **dargs): 643 """ 644 Run a function nested within a group level. 645 646 function: 647 Callable to run. 648 tag: 649 An optional tag name for the group. If None (default) 650 function.__name__ will be used. 651 **dargs: 652 Named arguments for the function. 653 """ 654 if tag: 655 name = tag 656 else: 657 name = function.__name__ 658 659 try: 660 return self._rungroup(subdir=None, testname=name, 661 function=function, timeout=None, **dargs) 662 except (SystemExit, error.TestBaseException): 663 raise 664 # If there was a different exception, turn it into a TestError. 665 # It will be caught by step_engine or _run_step_fn. 666 except Exception, e: 667 raise error.UnhandledTestError(e) 668 669 670 def cpu_count(self): 671 return utils.count_cpus() # use total system count 672 673 674 def start_reboot(self): 675 self.record('START', None, 'reboot') 676 self.record('GOOD', None, 'reboot.start') 677 678 679 def _record_reboot_failure(self, subdir, operation, status, 680 running_id=None): 681 self.record("ABORT", subdir, operation, status) 682 if not running_id: 683 running_id = utils.running_os_ident() 684 kernel = {"kernel": running_id.split("::")[0]} 685 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 686 687 688 def _check_post_reboot(self, subdir, running_id=None): 689 """ 690 Function to perform post boot checks such as if the system configuration 691 has changed across reboots (specifically, CPUs and partitions). 692 693 @param subdir: The subdir to use in the job.record call. 694 @param running_id: An optional running_id to include in the reboot 695 failure log message 696 697 @raise JobError: Raised if the current configuration does not match the 698 pre-reboot configuration. 699 """ 700 # check to see if any partitions have changed 701 partition_list = partition_lib.get_partition_list(self, 702 exclude_swap=False) 703 mount_info = partition_lib.get_mount_info(partition_list) 704 old_mount_info = self._state.get('client', 'mount_info') 705 if mount_info != old_mount_info: 706 new_entries = mount_info - old_mount_info 707 old_entries = old_mount_info - mount_info 708 description = ("mounted partitions are different after reboot " 709 "(old entries: %s, new entries: %s)" % 710 (old_entries, new_entries)) 711 self._record_reboot_failure(subdir, "reboot.verify_config", 712 description, running_id=running_id) 713 raise error.JobError("Reboot failed: %s" % description) 714 715 # check to see if any CPUs have changed 716 cpu_count = utils.count_cpus() 717 old_count = self._state.get('client', 'cpu_count') 718 if cpu_count != old_count: 719 description = ('Number of CPUs changed after reboot ' 720 '(old count: %d, new count: %d)' % 721 (old_count, cpu_count)) 722 self._record_reboot_failure(subdir, 'reboot.verify_config', 723 description, running_id=running_id) 724 raise error.JobError('Reboot failed: %s' % description) 725 726 727 def partition(self, device, loop_size=0, mountpoint=None): 728 """ 729 Work with a machine partition 730 731 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 732 @param mountpoint: Specify a directory to mount to. If not specified 733 autotest tmp directory will be used. 734 @param loop_size: Size of loopback device (in MB). Defaults to 0. 735 736 @return: A L{client.bin.partition.partition} object 737 """ 738 739 if not mountpoint: 740 mountpoint = self.tmpdir 741 return partition_lib.partition(self, device, loop_size, mountpoint) 742 743 @utils.deprecated 744 def filesystem(self, device, mountpoint=None, loop_size=0): 745 """ Same as partition 746 747 @deprecated: Use partition method instead 748 """ 749 return self.partition(device, loop_size, mountpoint) 750 751 752 def enable_external_logging(self): 753 pass 754 755 756 def disable_external_logging(self): 757 pass 758 759 760 def reboot_setup(self): 761 # save the partition list and mount points, as well as the cpu count 762 partition_list = partition_lib.get_partition_list(self, 763 exclude_swap=False) 764 mount_info = partition_lib.get_mount_info(partition_list) 765 self._state.set('client', 'mount_info', mount_info) 766 self._state.set('client', 'cpu_count', utils.count_cpus()) 767 768 769 def reboot(self): 770 self.reboot_setup() 771 self.harness.run_reboot() 772 773 # HACK: using this as a module sometimes hangs shutdown, so if it's 774 # installed unload it first 775 utils.system("modprobe -r netconsole", ignore_status=True) 776 777 # sync first, so that a sync during shutdown doesn't time out 778 utils.system("sync; sync", ignore_status=True) 779 780 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 781 self.quit() 782 783 784 def noop(self, text): 785 logging.info("job: noop: " + text) 786 787 788 @_run_test_complete_on_exit 789 def parallel(self, *tasklist): 790 """Run tasks in parallel""" 791 792 pids = [] 793 old_log_filename = self._logger.global_filename 794 for i, task in enumerate(tasklist): 795 assert isinstance(task, (tuple, list)) 796 self._logger.global_filename = old_log_filename + (".%d" % i) 797 def task_func(): 798 # stub out _record_indent with a process-local one 799 base_record_indent = self._record_indent 800 proc_local = self._job_state.property_factory( 801 '_state', '_record_indent.%d' % os.getpid(), 802 base_record_indent, namespace='client') 803 self.__class__._record_indent = proc_local 804 task[0](*task[1:]) 805 pids.append(parallel.fork_start(self.resultdir, task_func)) 806 807 old_log_path = os.path.join(self.resultdir, old_log_filename) 808 old_log = open(old_log_path, "a") 809 exceptions = [] 810 for i, pid in enumerate(pids): 811 # wait for the task to finish 812 try: 813 parallel.fork_waitfor(self.resultdir, pid) 814 except Exception, e: 815 exceptions.append(e) 816 # copy the logs from the subtask into the main log 817 new_log_path = old_log_path + (".%d" % i) 818 if os.path.exists(new_log_path): 819 new_log = open(new_log_path) 820 old_log.write(new_log.read()) 821 new_log.close() 822 old_log.flush() 823 os.remove(new_log_path) 824 old_log.close() 825 826 self._logger.global_filename = old_log_filename 827 828 # handle any exceptions raised by the parallel tasks 829 if exceptions: 830 msg = "%d task(s) failed in job.parallel" % len(exceptions) 831 raise error.JobError(msg) 832 833 834 def quit(self): 835 # XXX: should have a better name. 836 self.harness.run_pause() 837 raise error.JobContinue("more to come") 838 839 840 def complete(self, status): 841 """Write pending reports, clean up, and exit""" 842 # write out a job HTML report 843 try: 844 html_report.create_report(self.resultdir) 845 except Exception, e: 846 logging.error("Error writing job HTML report: %s", e) 847 848 # We are about to exit 'complete' so clean up the control file. 849 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 850 shutil.move(self._state_file, dest) 851 852 self.harness.run_complete() 853 self.disable_external_logging() 854 sys.exit(status) 855 856 857 def _load_state(self): 858 # grab any initial state and set up $CONTROL.state as the backing file 859 init_state_file = self.control + '.init.state' 860 self._state_file = self.control + '.state' 861 if os.path.exists(init_state_file): 862 shutil.move(init_state_file, self._state_file) 863 self._state.set_backing_file(self._state_file) 864 865 # initialize the state engine, if necessary 866 has_steps = self._state.has('client', 'steps') 867 if not self._is_continuation and has_steps: 868 raise RuntimeError('Loaded state can only contain client.steps if ' 869 'this is a continuation') 870 871 if not has_steps: 872 logging.debug('Initializing the state engine') 873 self._state.set('client', 'steps', []) 874 875 876 def handle_persistent_option(self, options, option_name): 877 """ 878 Select option from command line or persistent state. 879 Store selected option to allow standalone client to continue 880 after reboot with previously selected options. 881 Priority: 882 1. explicitly specified via command line 883 2. stored in state file (if continuing job '-c') 884 3. default == None 885 """ 886 option = None 887 cmd_line_option = getattr(options, option_name) 888 if cmd_line_option: 889 option = cmd_line_option 890 self._state.set('client', option_name, option) 891 else: 892 stored_option = self._state.get('client', option_name, None) 893 if stored_option: 894 option = stored_option 895 logging.debug('Persistent option %s now set to %s', option_name, option) 896 return option 897 898 899 def __create_step_tuple(self, fn, args, dargs): 900 # Legacy code passes in an array where the first arg is 901 # the function or its name. 902 if isinstance(fn, list): 903 assert(len(args) == 0) 904 assert(len(dargs) == 0) 905 args = fn[1:] 906 fn = fn[0] 907 # Pickling actual functions is hairy, thus we have to call 908 # them by name. Unfortunately, this means only functions 909 # defined globally can be used as a next step. 910 if callable(fn): 911 fn = fn.__name__ 912 if not isinstance(fn, types.StringTypes): 913 raise StepError("Next steps must be functions or " 914 "strings containing the function name") 915 ancestry = copy.copy(self._current_step_ancestry) 916 return (ancestry, fn, args, dargs) 917 918 919 def next_step_append(self, fn, *args, **dargs): 920 """Define the next step and place it at the end""" 921 steps = self._state.get('client', 'steps') 922 steps.append(self.__create_step_tuple(fn, args, dargs)) 923 self._state.set('client', 'steps', steps) 924 925 926 def next_step(self, fn, *args, **dargs): 927 """Create a new step and place it after any steps added 928 while running the current step but before any steps added in 929 previous steps""" 930 steps = self._state.get('client', 'steps') 931 steps.insert(self._next_step_index, 932 self.__create_step_tuple(fn, args, dargs)) 933 self._next_step_index += 1 934 self._state.set('client', 'steps', steps) 935 936 937 def next_step_prepend(self, fn, *args, **dargs): 938 """Insert a new step, executing first""" 939 steps = self._state.get('client', 'steps') 940 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 941 self._next_step_index += 1 942 self._state.set('client', 'steps', steps) 943 944 945 946 def _run_step_fn(self, local_vars, fn, args, dargs): 947 """Run a (step) function within the given context""" 948 949 local_vars['__args'] = args 950 local_vars['__dargs'] = dargs 951 try: 952 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 953 return local_vars['__ret'] 954 except SystemExit: 955 raise # Send error.JobContinue and JobComplete on up to runjob. 956 except error.TestNAError, detail: 957 self.record(detail.exit_status, None, fn, str(detail)) 958 except Exception, detail: 959 raise error.UnhandledJobError(detail) 960 961 962 def _create_frame(self, global_vars, ancestry, fn_name): 963 """Set up the environment like it would have been when this 964 function was first defined. 965 966 Child step engine 'implementations' must have 'return locals()' 967 at end end of their steps. Because of this, we can call the 968 parent function and get back all child functions (i.e. those 969 defined within it). 970 971 Unfortunately, the call stack of the function calling 972 job.next_step might have been deeper than the function it 973 added. In order to make sure that the environment is what it 974 should be, we need to then pop off the frames we built until 975 we find the frame where the function was first defined.""" 976 977 # The copies ensure that the parent frames are not modified 978 # while building child frames. This matters if we then 979 # pop some frames in the next part of this function. 980 current_frame = copy.copy(global_vars) 981 frames = [current_frame] 982 for steps_fn_name in ancestry: 983 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 984 current_frame = copy.copy(ret) 985 frames.append(current_frame) 986 987 # Walk up the stack frames until we find the place fn_name was defined. 988 while len(frames) > 2: 989 if fn_name not in frames[-2]: 990 break 991 if frames[-2][fn_name] != frames[-1][fn_name]: 992 break 993 frames.pop() 994 ancestry.pop() 995 996 return (frames[-1], ancestry) 997 998 999 def _add_step_init(self, local_vars, current_function): 1000 """If the function returned a dictionary that includes a 1001 function named 'step_init', prepend it to our list of steps. 1002 This will only get run the first time a function with a nested 1003 use of the step engine is run.""" 1004 1005 if (isinstance(local_vars, dict) and 1006 'step_init' in local_vars and 1007 callable(local_vars['step_init'])): 1008 # The init step is a child of the function 1009 # we were just running. 1010 self._current_step_ancestry.append(current_function) 1011 self.next_step_prepend('step_init') 1012 1013 1014 def step_engine(self): 1015 """The multi-run engine used when the control file defines step_init. 1016 1017 Does the next step. 1018 """ 1019 1020 # Set up the environment and then interpret the control file. 1021 # Some control files will have code outside of functions, 1022 # which means we need to have our state engine initialized 1023 # before reading in the file. 1024 global_control_vars = {'job': self, 1025 'args': self.args} 1026 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1027 try: 1028 execfile(self.control, global_control_vars, global_control_vars) 1029 except error.TestNAError, detail: 1030 self.record(detail.exit_status, None, self.control, str(detail)) 1031 except SystemExit: 1032 raise # Send error.JobContinue and JobComplete on up to runjob. 1033 except Exception, detail: 1034 # Syntax errors or other general Python exceptions coming out of 1035 # the top level of the control file itself go through here. 1036 raise error.UnhandledJobError(detail) 1037 1038 # If we loaded in a mid-job state file, then we presumably 1039 # know what steps we have yet to run. 1040 if not self._is_continuation: 1041 if 'step_init' in global_control_vars: 1042 self.next_step(global_control_vars['step_init']) 1043 else: 1044 # if last job failed due to unexpected reboot, record it as fail 1045 # so harness gets called 1046 last_job = self._state.get('client', 'unexpected_reboot', None) 1047 if last_job: 1048 subdir, testname = last_job 1049 self.record('FAIL', subdir, testname, 'unexpected reboot') 1050 self.record('END FAIL', subdir, testname) 1051 1052 # Iterate through the steps. If we reboot, we'll simply 1053 # continue iterating on the next step. 1054 while len(self._state.get('client', 'steps')) > 0: 1055 steps = self._state.get('client', 'steps') 1056 (ancestry, fn_name, args, dargs) = steps.pop(0) 1057 self._state.set('client', 'steps', steps) 1058 1059 self._next_step_index = 0 1060 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1061 local_vars, self._current_step_ancestry = ret 1062 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1063 self._add_step_init(local_vars, fn_name) 1064 1065 1066 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1067 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1068 on_every_test) 1069 1070 1071 def add_sysinfo_logfile(self, file, on_every_test=False): 1072 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1073 1074 1075 def _add_sysinfo_loggable(self, loggable, on_every_test): 1076 if on_every_test: 1077 self.sysinfo.test_loggables.add(loggable) 1078 else: 1079 self.sysinfo.boot_loggables.add(loggable) 1080 self._save_sysinfo_state() 1081 1082 1083 def _load_sysinfo_state(self): 1084 state = self._state.get('client', 'sysinfo', None) 1085 if state: 1086 self.sysinfo.deserialize(state) 1087 1088 1089 def _save_sysinfo_state(self): 1090 state = self.sysinfo.serialize() 1091 self._state.set('client', 'sysinfo', state) 1092 1093 1094class disk_usage_monitor: 1095 def __init__(self, logging_func, device, max_mb_per_hour): 1096 self.func = logging_func 1097 self.device = device 1098 self.max_mb_per_hour = max_mb_per_hour 1099 1100 1101 def start(self): 1102 self.initial_space = utils.freespace(self.device) 1103 self.start_time = time.time() 1104 1105 1106 def stop(self): 1107 # if no maximum usage rate was set, we don't need to 1108 # generate any warnings 1109 if not self.max_mb_per_hour: 1110 return 1111 1112 final_space = utils.freespace(self.device) 1113 used_space = self.initial_space - final_space 1114 stop_time = time.time() 1115 total_time = stop_time - self.start_time 1116 # round up the time to one minute, to keep extremely short 1117 # tests from generating false positives due to short, badly 1118 # timed bursts of activity 1119 total_time = max(total_time, 60.0) 1120 1121 # determine the usage rate 1122 bytes_per_sec = used_space / total_time 1123 mb_per_sec = bytes_per_sec / 1024**2 1124 mb_per_hour = mb_per_sec * 60 * 60 1125 1126 if mb_per_hour > self.max_mb_per_hour: 1127 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1128 msg %= (self.device, mb_per_hour) 1129 self.func(msg) 1130 1131 1132 @classmethod 1133 def watch(cls, *monitor_args, **monitor_dargs): 1134 """ Generic decorator to wrap a function call with the 1135 standard create-monitor -> start -> call -> stop idiom.""" 1136 def decorator(func): 1137 def watched_func(*args, **dargs): 1138 monitor = cls(*monitor_args, **monitor_dargs) 1139 monitor.start() 1140 try: 1141 func(*args, **dargs) 1142 finally: 1143 monitor.stop() 1144 return watched_func 1145 return decorator 1146 1147 1148def runjob(control, drop_caches, options): 1149 """ 1150 Run a job using the given control file. 1151 1152 This is the main interface to this module. 1153 1154 @see base_job.__init__ for parameter info. 1155 """ 1156 control = os.path.abspath(control) 1157 state = control + '.state' 1158 # Ensure state file is cleaned up before the job starts to run if autotest 1159 # is not running with the --continue flag 1160 if not options.cont and os.path.isfile(state): 1161 logging.debug('Cleaning up previously found state file') 1162 os.remove(state) 1163 1164 # instantiate the job object ready for the control file. 1165 myjob = None 1166 try: 1167 # Check that the control file is valid 1168 if not os.path.exists(control): 1169 raise error.JobError(control + ": control file not found") 1170 1171 # When continuing, the job is complete when there is no 1172 # state file, ensure we don't try and continue. 1173 if options.cont and not os.path.exists(state): 1174 raise error.JobComplete("all done") 1175 1176 myjob = job(control=control, drop_caches=drop_caches, options=options) 1177 1178 # Load in the users control file, may do any one of: 1179 # 1) execute in toto 1180 # 2) define steps, and select the first via next_step() 1181 myjob.step_engine() 1182 1183 except error.JobContinue: 1184 sys.exit(5) 1185 1186 except error.JobComplete: 1187 sys.exit(1) 1188 1189 except error.JobError, instance: 1190 logging.error("JOB ERROR: " + str(instance)) 1191 if myjob: 1192 command = None 1193 if len(instance.args) > 1: 1194 command = instance.args[1] 1195 myjob.record('ABORT', None, command, str(instance)) 1196 myjob.record('END ABORT', None, None, str(instance)) 1197 assert myjob._record_indent == 0 1198 myjob.complete(1) 1199 else: 1200 sys.exit(1) 1201 1202 except Exception, e: 1203 # NOTE: job._run_step_fn and job.step_engine will turn things into 1204 # a JobError for us. If we get here, its likely an autotest bug. 1205 msg = str(e) + '\n' + traceback.format_exc() 1206 logging.critical("JOB ERROR (autotest bug?): " + msg) 1207 if myjob: 1208 myjob.record('END ABORT', None, None, msg) 1209 assert myjob._record_indent == 0 1210 myjob.complete(1) 1211 else: 1212 sys.exit(1) 1213 1214 # If we get here, then we assume the job is complete and good. 1215 myjob.record('END GOOD', None, None) 1216 assert myjob._record_indent == 0 1217 1218 myjob.complete(0) 1219 1220 1221class job(base_client_job): 1222 1223 def __init__(self, *args, **kwargs): 1224 base_client_job.__init__(self, *args, **kwargs) 1225 1226 1227 def run_test(self, url, *args, **dargs): 1228 log_pauser = cros_logging.LogRotationPauser() 1229 passed = False 1230 try: 1231 log_pauser.begin() 1232 passed = base_client_job.run_test(self, url, *args, **dargs) 1233 if not passed: 1234 # Save the VM state immediately after the test failure. 1235 # This is a NOOP if the the test isn't running in a VM or 1236 # if the VM is not properly configured to save state. 1237 _group, testname = self.pkgmgr.get_package_name(url, 'test') 1238 now = datetime.now().strftime('%I:%M:%S.%f') 1239 checkpoint_name = '%s-%s' % (testname, now) 1240 utils.save_vm_state(checkpoint_name) 1241 finally: 1242 log_pauser.end() 1243 return passed 1244 1245 1246 def reboot(self): 1247 self.reboot_setup() 1248 self.harness.run_reboot() 1249 1250 # sync first, so that a sync during shutdown doesn't time out 1251 utils.system('sync; sync', ignore_status=True) 1252 1253 utils.system('reboot </dev/null >/dev/null 2>&1 &') 1254 self.quit() 1255 1256 1257 def require_gcc(self): 1258 return False 1259