1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8# pylint: disable=missing-docstring 9 10import copy 11from datetime import datetime 12import getpass 13import glob 14import logging 15import os 16import re 17import shutil 18import sys 19import time 20import traceback 21import types 22import weakref 23 24import common 25from autotest_lib.client.bin import client_logging_config 26from autotest_lib.client.bin import harness 27from autotest_lib.client.bin import local_host 28from autotest_lib.client.bin import parallel 29from autotest_lib.client.bin import partition as partition_lib 30from autotest_lib.client.bin import profilers 31from autotest_lib.client.bin import sysinfo 32from autotest_lib.client.bin import test 33from autotest_lib.client.bin import utils 34from autotest_lib.client.common_lib import barrier 35from autotest_lib.client.common_lib import base_job 36from autotest_lib.client.common_lib import control_data 37from autotest_lib.client.common_lib import error 38from autotest_lib.client.common_lib import global_config 39from autotest_lib.client.common_lib import logging_manager 40from autotest_lib.client.common_lib import packages 41from autotest_lib.client.cros import cros_logging 42from autotest_lib.client.tools import html_report 43 44GLOBAL_CONFIG = global_config.global_config 45 46LAST_BOOT_TAG = object() 47JOB_PREAMBLE = """ 48from autotest_lib.client.common_lib.error import * 49from autotest_lib.client.bin.utils import * 50""" 51 52 53class StepError(error.AutotestError): 54 pass 55 56class NotAvailableError(error.AutotestError): 57 pass 58 59 60 61def _run_test_complete_on_exit(f): 62 """Decorator for job methods that automatically calls 63 self.harness.run_test_complete when the method exits, if appropriate.""" 64 def wrapped(self, *args, **dargs): 65 try: 66 return f(self, *args, **dargs) 67 finally: 68 if self._logger.global_filename == 'status': 69 self.harness.run_test_complete() 70 if self.drop_caches: 71 utils.drop_caches() 72 wrapped.__name__ = f.__name__ 73 wrapped.__doc__ = f.__doc__ 74 wrapped.__dict__.update(f.__dict__) 75 return wrapped 76 77 78class status_indenter(base_job.status_indenter): 79 """Provide a status indenter that is backed by job._record_prefix.""" 80 def __init__(self, job_): 81 self._job = weakref.proxy(job_) # avoid a circular reference 82 83 84 @property 85 def indent(self): 86 return self._job._record_indent 87 88 89 def increment(self): 90 self._job._record_indent += 1 91 92 93 def decrement(self): 94 self._job._record_indent -= 1 95 96 97class base_client_job(base_job.base_job): 98 """The client-side concrete implementation of base_job. 99 100 Optional properties provided by this implementation: 101 control 102 harness 103 """ 104 105 _WARNING_DISABLE_DELAY = 5 106 107 # _record_indent is a persistent property, but only on the client 108 _job_state = base_job.base_job._job_state 109 _record_indent = _job_state.property_factory( 110 '_state', '_record_indent', 0, namespace='client') 111 _max_disk_usage_rate = _job_state.property_factory( 112 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 113 114 115 def __init__(self, control, options, drop_caches=True): 116 """ 117 Prepare a client side job object. 118 119 @param control: The control file (pathname of). 120 @param options: an object which includes: 121 jobtag: The job tag string (eg "default"). 122 cont: If this is the continuation of this job. 123 harness_type: An alternative server harness. [None] 124 use_external_logging: If true, the enable_external_logging 125 method will be called during construction. [False] 126 @param drop_caches: If true, utils.drop_caches() is called before and 127 between all tests. [True] 128 """ 129 super(base_client_job, self).__init__(options=options) 130 self._pre_record_init(control, options) 131 try: 132 self._post_record_init(control, options, drop_caches) 133 except Exception, err: 134 self.record( 135 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 136 str(err)) 137 raise 138 139 140 @classmethod 141 def _get_environ_autodir(cls): 142 return os.environ['AUTODIR'] 143 144 145 @classmethod 146 def _find_base_directories(cls): 147 """ 148 Determine locations of autodir and clientdir (which are the same) 149 using os.environ. Serverdir does not exist in this context. 150 """ 151 autodir = clientdir = cls._get_environ_autodir() 152 return autodir, clientdir, None 153 154 155 @classmethod 156 def _parse_args(cls, args): 157 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 158 159 160 def _find_resultdir(self, options): 161 """ 162 Determine the directory for storing results. On a client this is 163 always <autodir>/results/<tag>, where tag is passed in on the command 164 line as an option. 165 """ 166 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 167 'output_dir', 168 default="") 169 if options.output_dir: 170 basedir = options.output_dir 171 elif output_dir_config: 172 basedir = output_dir_config 173 else: 174 basedir = self.autodir 175 176 return os.path.join(basedir, 'results', options.tag) 177 178 179 def _get_status_logger(self): 180 """Return a reference to the status logger.""" 181 return self._logger 182 183 184 def _pre_record_init(self, control, options): 185 """ 186 Initialization function that should peform ONLY the required 187 setup so that the self.record() method works. 188 189 As of now self.record() needs self.resultdir, self._group_level, 190 self.harness and of course self._logger. 191 """ 192 if not options.cont: 193 self._cleanup_debugdir_files() 194 self._cleanup_results_dir() 195 196 logging_manager.configure_logging( 197 client_logging_config.ClientLoggingConfig(), 198 results_dir=self.resultdir, 199 verbose=options.verbose) 200 logging.info('Writing results to %s', self.resultdir) 201 202 # init_group_level needs the state 203 self.control = os.path.realpath(control) 204 self._is_continuation = options.cont 205 self._current_step_ancestry = [] 206 self._next_step_index = 0 207 self._load_state() 208 209 _harness = self.handle_persistent_option(options, 'harness') 210 _harness_args = self.handle_persistent_option(options, 'harness_args') 211 212 self.harness = harness.select(_harness, self, _harness_args) 213 214 if self.control: 215 parsed_control = control_data.parse_control( 216 self.control, raise_warnings=False) 217 self.fast = parsed_control.fast 218 219 # set up the status logger 220 def client_job_record_hook(entry): 221 msg_tag = '' 222 if '.' in self._logger.global_filename: 223 msg_tag = self._logger.global_filename.split('.', 1)[1] 224 # send the entry to the job harness 225 message = '\n'.join([entry.message] + entry.extra_message_lines) 226 rendered_entry = self._logger.render_entry(entry) 227 self.harness.test_status_detail(entry.status_code, entry.subdir, 228 entry.operation, message, msg_tag, 229 entry.fields) 230 self.harness.test_status(rendered_entry, msg_tag) 231 # send the entry to stdout, if it's enabled 232 logging.info(rendered_entry) 233 self._logger = base_job.status_logger( 234 self, status_indenter(self), record_hook=client_job_record_hook) 235 236 237 def _post_record_init(self, control, options, drop_caches): 238 """ 239 Perform job initialization not required by self.record(). 240 """ 241 self._init_drop_caches(drop_caches) 242 243 self._init_packages() 244 245 self.sysinfo = sysinfo.sysinfo(self.resultdir) 246 self._load_sysinfo_state() 247 248 if not options.cont: 249 download = os.path.join(self.testdir, 'download') 250 if not os.path.exists(download): 251 os.mkdir(download) 252 253 shutil.copyfile(self.control, 254 os.path.join(self.resultdir, 'control')) 255 256 self.control = control 257 258 self.logging = logging_manager.get_logging_manager( 259 manage_stdout_and_stderr=True, redirect_fds=True) 260 self.logging.start_logging() 261 262 self.profilers = profilers.profilers(self) 263 264 self.machines = [options.hostname] 265 self.machine_dict_list = [{'hostname' : options.hostname}] 266 # Client side tests should always run the same whether or not they are 267 # running in the lab. 268 self.in_lab = False 269 self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) 270 271 self.args = [] 272 if options.args: 273 self.args = self._parse_args(options.args) 274 275 if options.user: 276 self.user = options.user 277 else: 278 self.user = getpass.getuser() 279 280 self.sysinfo.log_per_reboot_data() 281 282 if not options.cont: 283 self.record('START', None, None) 284 285 self.harness.run_start() 286 287 if options.log: 288 self.enable_external_logging() 289 290 self.num_tests_run = None 291 self.num_tests_failed = None 292 293 self.warning_loggers = None 294 self.warning_manager = None 295 296 297 def _init_drop_caches(self, drop_caches): 298 """ 299 Perform the drop caches initialization. 300 """ 301 self.drop_caches_between_iterations = ( 302 GLOBAL_CONFIG.get_config_value('CLIENT', 303 'drop_caches_between_iterations', 304 type=bool, default=True)) 305 self.drop_caches = drop_caches 306 if self.drop_caches: 307 utils.drop_caches() 308 309 310 def _init_packages(self): 311 """ 312 Perform the packages support initialization. 313 """ 314 self.pkgmgr = packages.PackageManager( 315 self.autodir, run_function_dargs={'timeout':3600}) 316 317 318 def _cleanup_results_dir(self): 319 """Delete everything in resultsdir""" 320 assert os.path.exists(self.resultdir) 321 list_files = glob.glob('%s/*' % self.resultdir) 322 for f in list_files: 323 if os.path.isdir(f): 324 shutil.rmtree(f) 325 elif os.path.isfile(f): 326 os.remove(f) 327 328 329 def _cleanup_debugdir_files(self): 330 """ 331 Delete any leftover debugdir files 332 """ 333 list_files = glob.glob("/tmp/autotest_results_dir.*") 334 for f in list_files: 335 os.remove(f) 336 337 338 def disable_warnings(self, warning_type): 339 self.record("INFO", None, None, 340 "disabling %s warnings" % warning_type, 341 {"warnings.disable": warning_type}) 342 time.sleep(self._WARNING_DISABLE_DELAY) 343 344 345 def enable_warnings(self, warning_type): 346 time.sleep(self._WARNING_DISABLE_DELAY) 347 self.record("INFO", None, None, 348 "enabling %s warnings" % warning_type, 349 {"warnings.enable": warning_type}) 350 351 352 def monitor_disk_usage(self, max_rate): 353 """\ 354 Signal that the job should monitor disk space usage on / 355 and generate a warning if a test uses up disk space at a 356 rate exceeding 'max_rate'. 357 358 Parameters: 359 max_rate - the maximium allowed rate of disk consumption 360 during a test, in MB/hour, or 0 to indicate 361 no limit. 362 """ 363 self._max_disk_usage_rate = max_rate 364 365 366 def control_get(self): 367 return self.control 368 369 370 def control_set(self, control): 371 self.control = os.path.abspath(control) 372 373 374 def harness_select(self, which, harness_args): 375 self.harness = harness.select(which, self, harness_args) 376 377 378 def setup_dirs(self, results_dir, tmp_dir): 379 if not tmp_dir: 380 tmp_dir = os.path.join(self.tmpdir, 'build') 381 if not os.path.exists(tmp_dir): 382 os.mkdir(tmp_dir) 383 if not os.path.isdir(tmp_dir): 384 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 385 raise ValueError(e_msg) 386 387 # We label the first build "build" and then subsequent ones 388 # as "build.2", "build.3", etc. Whilst this is a little bit 389 # inconsistent, 99.9% of jobs will only have one build 390 # (that's not done as kernbench, sparse, or buildtest), 391 # so it works out much cleaner. One of life's compromises. 392 if not results_dir: 393 results_dir = os.path.join(self.resultdir, 'build') 394 i = 2 395 while os.path.exists(results_dir): 396 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 397 i += 1 398 if not os.path.exists(results_dir): 399 os.mkdir(results_dir) 400 401 return (results_dir, tmp_dir) 402 403 404 def barrier(self, *args, **kwds): 405 """Create a barrier object""" 406 return barrier.barrier(*args, **kwds) 407 408 409 def install_pkg(self, name, pkg_type, install_dir): 410 ''' 411 This method is a simple wrapper around the actual package 412 installation method in the Packager class. This is used 413 internally by the profilers, deps and tests code. 414 name : name of the package (ex: sleeptest, dbench etc.) 415 pkg_type : Type of the package (ex: test, dep etc.) 416 install_dir : The directory in which the source is actually 417 untarred into. (ex: client/profilers/<name> for profilers) 418 ''' 419 if self.pkgmgr.repositories: 420 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 421 422 423 def add_repository(self, repo_urls): 424 ''' 425 Adds the repository locations to the job so that packages 426 can be fetched from them when needed. The repository list 427 needs to be a string list 428 Ex: job.add_repository(['http://blah1','http://blah2']) 429 ''' 430 for repo_url in repo_urls: 431 self.pkgmgr.add_repository(repo_url) 432 433 # Fetch the packages' checksum file that contains the checksums 434 # of all the packages if it is not already fetched. The checksum 435 # is always fetched whenever a job is first started. This 436 # is not done in the job's constructor as we don't have the list of 437 # the repositories there (and obviously don't care about this file 438 # if we are not using the repos) 439 try: 440 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 441 packages.CHECKSUM_FILE) 442 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, 443 checksum_file_path, use_checksum=False) 444 except error.PackageFetchError: 445 # packaging system might not be working in this case 446 # Silently fall back to the normal case 447 pass 448 449 450 def require_gcc(self): 451 """ 452 Test whether gcc is installed on the machine. 453 """ 454 # check if gcc is installed on the system. 455 try: 456 utils.system('which gcc') 457 except error.CmdError: 458 raise NotAvailableError('gcc is required by this job and is ' 459 'not available on the system') 460 461 462 def setup_dep(self, deps): 463 """Set up the dependencies for this test. 464 deps is a list of libraries required for this test. 465 """ 466 # Fetch the deps from the repositories and set them up. 467 for dep in deps: 468 dep_dir = os.path.join(self.autodir, 'deps', dep) 469 # Search for the dependency in the repositories if specified, 470 # else check locally. 471 try: 472 self.install_pkg(dep, 'dep', dep_dir) 473 except error.PackageInstallError: 474 # see if the dep is there locally 475 pass 476 477 # dep_dir might not exist if it is not fetched from the repos 478 if not os.path.exists(dep_dir): 479 raise error.TestError("Dependency %s does not exist" % dep) 480 481 os.chdir(dep_dir) 482 if execfile('%s.py' % dep, {}) is None: 483 logging.info('Dependency %s successfuly built', dep) 484 485 486 def _runtest(self, url, tag, timeout, args, dargs): 487 try: 488 l = lambda : test.runtest(self, url, tag, args, dargs) 489 pid = parallel.fork_start(self.resultdir, l) 490 491 self._forkwait(pid, timeout) 492 493 except error.TestBaseException: 494 # These are already classified with an error type (exit_status) 495 raise 496 except error.JobError: 497 raise # Caught further up and turned into an ABORT. 498 except Exception, e: 499 # Converts all other exceptions thrown by the test regardless 500 # of phase into a TestError(TestBaseException) subclass that 501 # reports them with their full stack trace. 502 raise error.UnhandledTestError(e) 503 504 def _forkwait(self, pid, timeout=None): 505 """Wait for the given pid to complete 506 507 @param pid (int) process id to wait for 508 @param timeout (int) seconds to wait before timing out the process""" 509 if timeout: 510 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 511 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 512 else: 513 logging.debug('Waiting for pid %d', pid) 514 parallel.fork_waitfor(self.resultdir, pid) 515 logging.info('pid %d completed', pid) 516 517 518 def _run_test_base(self, url, *args, **dargs): 519 """ 520 Prepares arguments and run functions to run_test and run_test_detail. 521 522 @param url A url that identifies the test to run. 523 @param tag An optional keyword argument that will be added to the 524 test and subdir name. 525 @param subdir_tag An optional keyword argument that will be added 526 to the subdir name. 527 528 @returns: 529 subdir: Test subdirectory 530 testname: Test name 531 group_func: Actual test run function 532 timeout: Test timeout 533 """ 534 _group, testname = self.pkgmgr.get_package_name(url, 'test') 535 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 536 self._make_test_outputdir(subdir) 537 538 timeout = dargs.pop('timeout', None) 539 if timeout: 540 logging.debug('Test has timeout: %d sec.', timeout) 541 542 def log_warning(reason): 543 self.record("WARN", subdir, testname, reason) 544 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 545 def group_func(): 546 try: 547 self._runtest(url, tag, timeout, args, dargs) 548 except error.TestBaseException, detail: 549 # The error is already classified, record it properly. 550 self.record(detail.exit_status, subdir, testname, str(detail)) 551 raise 552 else: 553 self.record('GOOD', subdir, testname, 'completed successfully') 554 555 return (subdir, testname, group_func, timeout) 556 557 558 @_run_test_complete_on_exit 559 def run_test(self, url, *args, **dargs): 560 """ 561 Summon a test object and run it. 562 563 @param url A url that identifies the test to run. 564 @param tag An optional keyword argument that will be added to the 565 test and subdir name. 566 @param subdir_tag An optional keyword argument that will be added 567 to the subdir name. 568 569 @returns True if the test passes, False otherwise. 570 """ 571 (subdir, testname, group_func, timeout) = self._run_test_base(url, 572 *args, 573 **dargs) 574 try: 575 self._rungroup(subdir, testname, group_func, timeout) 576 return True 577 except error.TestBaseException: 578 return False 579 # Any other exception here will be given to the caller 580 # 581 # NOTE: The only exception possible from the control file here 582 # is error.JobError as _runtest() turns all others into an 583 # UnhandledTestError that is caught above. 584 585 586 def stage_control_file(self, url): 587 """ 588 Install the test package and return the control file path. 589 590 @param url The name of the test, e.g. dummy_Pass. This is the 591 string passed to run_test in the client test control file: 592 job.run_test('dummy_Pass') 593 This name can also be something like 'camera_HAL3.jea', 594 which corresponds to a test package containing multiple 595 control files, each with calls to: 596 job.run_test('camera_HAL3', **opts) 597 598 @returns Absolute path to the control file for the test. 599 """ 600 testname, _, _tag = url.partition('.') 601 bindir = os.path.join(self.testdir, testname) 602 self.install_pkg(testname, 'test', bindir) 603 return _locate_test_control_file(bindir, url) 604 605 606 @_run_test_complete_on_exit 607 def run_test_detail(self, url, *args, **dargs): 608 """ 609 Summon a test object and run it, returning test status. 610 611 @param url A url that identifies the test to run. 612 @param tag An optional keyword argument that will be added to the 613 test and subdir name. 614 @param subdir_tag An optional keyword argument that will be added 615 to the subdir name. 616 617 @returns Test status 618 @see: client/common_lib/error.py, exit_status 619 """ 620 (subdir, testname, group_func, timeout) = self._run_test_base(url, 621 *args, 622 **dargs) 623 try: 624 self._rungroup(subdir, testname, group_func, timeout) 625 return 'GOOD' 626 except error.TestBaseException, detail: 627 return detail.exit_status 628 629 630 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 631 """\ 632 subdir: 633 name of the group 634 testname: 635 name of the test to run, or support step 636 function: 637 subroutine to run 638 *args: 639 arguments for the function 640 641 Returns the result of the passed in function 642 """ 643 644 try: 645 optional_fields = None 646 if timeout: 647 optional_fields = {} 648 optional_fields['timeout'] = timeout 649 self.record('START', subdir, testname, 650 optional_fields=optional_fields) 651 652 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 653 try: 654 result = function(*args, **dargs) 655 self.record('END GOOD', subdir, testname) 656 return result 657 except error.TestBaseException, e: 658 self.record('END %s' % e.exit_status, subdir, testname) 659 raise 660 except error.JobError, e: 661 self.record('END ABORT', subdir, testname) 662 raise 663 except Exception, e: 664 # This should only ever happen due to a bug in the given 665 # function's code. The common case of being called by 666 # run_test() will never reach this. If a control file called 667 # run_group() itself, bugs in its function will be caught 668 # here. 669 err_msg = str(e) + '\n' + traceback.format_exc() 670 self.record('END ERROR', subdir, testname, err_msg) 671 raise 672 finally: 673 self._state.discard('client', 'unexpected_reboot') 674 675 676 def run_group(self, function, tag=None, **dargs): 677 """ 678 Run a function nested within a group level. 679 680 function: 681 Callable to run. 682 tag: 683 An optional tag name for the group. If None (default) 684 function.__name__ will be used. 685 **dargs: 686 Named arguments for the function. 687 """ 688 if tag: 689 name = tag 690 else: 691 name = function.__name__ 692 693 try: 694 return self._rungroup(subdir=None, testname=name, 695 function=function, timeout=None, **dargs) 696 except (SystemExit, error.TestBaseException): 697 raise 698 # If there was a different exception, turn it into a TestError. 699 # It will be caught by step_engine or _run_step_fn. 700 except Exception, e: 701 raise error.UnhandledTestError(e) 702 703 704 def cpu_count(self): 705 return utils.count_cpus() # use total system count 706 707 708 def start_reboot(self): 709 self.record('START', None, 'reboot') 710 self.record('GOOD', None, 'reboot.start') 711 712 713 def _record_reboot_failure(self, subdir, operation, status, 714 running_id=None): 715 self.record("ABORT", subdir, operation, status) 716 if not running_id: 717 running_id = utils.running_os_ident() 718 kernel = {"kernel": running_id.split("::")[0]} 719 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 720 721 722 def _check_post_reboot(self, subdir, running_id=None): 723 """ 724 Function to perform post boot checks such as if the system configuration 725 has changed across reboots (specifically, CPUs and partitions). 726 727 @param subdir: The subdir to use in the job.record call. 728 @param running_id: An optional running_id to include in the reboot 729 failure log message 730 731 @raise JobError: Raised if the current configuration does not match the 732 pre-reboot configuration. 733 """ 734 # check to see if any partitions have changed 735 partition_list = partition_lib.get_partition_list(self, 736 exclude_swap=False) 737 mount_info = partition_lib.get_mount_info(partition_list) 738 old_mount_info = self._state.get('client', 'mount_info') 739 if mount_info != old_mount_info: 740 new_entries = mount_info - old_mount_info 741 old_entries = old_mount_info - mount_info 742 description = ("mounted partitions are different after reboot " 743 "(old entries: %s, new entries: %s)" % 744 (old_entries, new_entries)) 745 self._record_reboot_failure(subdir, "reboot.verify_config", 746 description, running_id=running_id) 747 raise error.JobError("Reboot failed: %s" % description) 748 749 # check to see if any CPUs have changed 750 cpu_count = utils.count_cpus() 751 old_count = self._state.get('client', 'cpu_count') 752 if cpu_count != old_count: 753 description = ('Number of CPUs changed after reboot ' 754 '(old count: %d, new count: %d)' % 755 (old_count, cpu_count)) 756 self._record_reboot_failure(subdir, 'reboot.verify_config', 757 description, running_id=running_id) 758 raise error.JobError('Reboot failed: %s' % description) 759 760 761 def partition(self, device, loop_size=0, mountpoint=None): 762 """ 763 Work with a machine partition 764 765 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 766 @param mountpoint: Specify a directory to mount to. If not specified 767 autotest tmp directory will be used. 768 @param loop_size: Size of loopback device (in MB). Defaults to 0. 769 770 @return: A L{client.bin.partition.partition} object 771 """ 772 773 if not mountpoint: 774 mountpoint = self.tmpdir 775 return partition_lib.partition(self, device, loop_size, mountpoint) 776 777 @utils.deprecated 778 def filesystem(self, device, mountpoint=None, loop_size=0): 779 """ Same as partition 780 781 @deprecated: Use partition method instead 782 """ 783 return self.partition(device, loop_size, mountpoint) 784 785 786 def enable_external_logging(self): 787 pass 788 789 790 def disable_external_logging(self): 791 pass 792 793 794 def reboot_setup(self): 795 # save the partition list and mount points, as well as the cpu count 796 partition_list = partition_lib.get_partition_list(self, 797 exclude_swap=False) 798 mount_info = partition_lib.get_mount_info(partition_list) 799 self._state.set('client', 'mount_info', mount_info) 800 self._state.set('client', 'cpu_count', utils.count_cpus()) 801 802 803 def reboot(self): 804 self.reboot_setup() 805 self.harness.run_reboot() 806 807 # HACK: using this as a module sometimes hangs shutdown, so if it's 808 # installed unload it first 809 utils.system("modprobe -r netconsole", ignore_status=True) 810 811 # sync first, so that a sync during shutdown doesn't time out 812 utils.system("sync; sync", ignore_status=True) 813 814 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 815 self.quit() 816 817 818 def noop(self, text): 819 logging.info("job: noop: " + text) 820 821 822 @_run_test_complete_on_exit 823 def parallel(self, *tasklist, **kwargs): 824 """Run tasks in parallel""" 825 826 pids = [] 827 old_log_filename = self._logger.global_filename 828 for i, task in enumerate(tasklist): 829 assert isinstance(task, (tuple, list)) 830 self._logger.global_filename = old_log_filename + (".%d" % i) 831 def task_func(): 832 # stub out _record_indent with a process-local one 833 base_record_indent = self._record_indent 834 proc_local = self._job_state.property_factory( 835 '_state', '_record_indent.%d' % os.getpid(), 836 base_record_indent, namespace='client') 837 self.__class__._record_indent = proc_local 838 task[0](*task[1:]) 839 forked_pid = parallel.fork_start(self.resultdir, task_func) 840 logging.info('Just forked pid %d', forked_pid) 841 pids.append(forked_pid) 842 843 old_log_path = os.path.join(self.resultdir, old_log_filename) 844 old_log = open(old_log_path, "a") 845 exceptions = [] 846 for i, pid in enumerate(pids): 847 # wait for the task to finish 848 try: 849 self._forkwait(pid, kwargs.get('timeout')) 850 except Exception, e: 851 logging.info('pid %d completed with error', pid) 852 exceptions.append(e) 853 # copy the logs from the subtask into the main log 854 new_log_path = old_log_path + (".%d" % i) 855 if os.path.exists(new_log_path): 856 new_log = open(new_log_path) 857 old_log.write(new_log.read()) 858 new_log.close() 859 old_log.flush() 860 os.remove(new_log_path) 861 old_log.close() 862 863 self._logger.global_filename = old_log_filename 864 865 # handle any exceptions raised by the parallel tasks 866 if exceptions: 867 msg = "%d task(s) failed in job.parallel" % len(exceptions) 868 raise error.JobError(msg) 869 870 871 def quit(self): 872 # XXX: should have a better name. 873 self.harness.run_pause() 874 raise error.JobContinue("more to come") 875 876 877 def complete(self, status): 878 """Write pending reports, clean up, and exit""" 879 # write out a job HTML report 880 try: 881 html_report.create_report(self.resultdir) 882 except Exception, e: 883 logging.error("Error writing job HTML report: %s", e) 884 885 # We are about to exit 'complete' so clean up the control file. 886 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 887 shutil.move(self._state_file, dest) 888 889 self.harness.run_complete() 890 self.disable_external_logging() 891 sys.exit(status) 892 893 894 def _load_state(self): 895 # grab any initial state and set up $CONTROL.state as the backing file 896 init_state_file = self.control + '.init.state' 897 self._state_file = self.control + '.state' 898 if os.path.exists(init_state_file): 899 shutil.move(init_state_file, self._state_file) 900 self._state.set_backing_file(self._state_file) 901 902 # initialize the state engine, if necessary 903 has_steps = self._state.has('client', 'steps') 904 if not self._is_continuation and has_steps: 905 raise RuntimeError('Loaded state can only contain client.steps if ' 906 'this is a continuation') 907 908 if not has_steps: 909 logging.debug('Initializing the state engine') 910 self._state.set('client', 'steps', []) 911 912 913 def handle_persistent_option(self, options, option_name): 914 """ 915 Select option from command line or persistent state. 916 Store selected option to allow standalone client to continue 917 after reboot with previously selected options. 918 Priority: 919 1. explicitly specified via command line 920 2. stored in state file (if continuing job '-c') 921 3. default == None 922 """ 923 option = None 924 cmd_line_option = getattr(options, option_name) 925 if cmd_line_option: 926 option = cmd_line_option 927 self._state.set('client', option_name, option) 928 else: 929 stored_option = self._state.get('client', option_name, None) 930 if stored_option: 931 option = stored_option 932 logging.debug('Persistent option %s now set to %s', option_name, option) 933 return option 934 935 936 def __create_step_tuple(self, fn, args, dargs): 937 # Legacy code passes in an array where the first arg is 938 # the function or its name. 939 if isinstance(fn, list): 940 assert(len(args) == 0) 941 assert(len(dargs) == 0) 942 args = fn[1:] 943 fn = fn[0] 944 # Pickling actual functions is hairy, thus we have to call 945 # them by name. Unfortunately, this means only functions 946 # defined globally can be used as a next step. 947 if callable(fn): 948 fn = fn.__name__ 949 if not isinstance(fn, types.StringTypes): 950 raise StepError("Next steps must be functions or " 951 "strings containing the function name") 952 ancestry = copy.copy(self._current_step_ancestry) 953 return (ancestry, fn, args, dargs) 954 955 956 def next_step_append(self, fn, *args, **dargs): 957 """Define the next step and place it at the end""" 958 steps = self._state.get('client', 'steps') 959 steps.append(self.__create_step_tuple(fn, args, dargs)) 960 self._state.set('client', 'steps', steps) 961 962 963 def next_step(self, fn, *args, **dargs): 964 """Create a new step and place it after any steps added 965 while running the current step but before any steps added in 966 previous steps""" 967 steps = self._state.get('client', 'steps') 968 steps.insert(self._next_step_index, 969 self.__create_step_tuple(fn, args, dargs)) 970 self._next_step_index += 1 971 self._state.set('client', 'steps', steps) 972 973 974 def next_step_prepend(self, fn, *args, **dargs): 975 """Insert a new step, executing first""" 976 steps = self._state.get('client', 'steps') 977 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 978 self._next_step_index += 1 979 self._state.set('client', 'steps', steps) 980 981 982 983 def _run_step_fn(self, local_vars, fn, args, dargs): 984 """Run a (step) function within the given context""" 985 986 local_vars['__args'] = args 987 local_vars['__dargs'] = dargs 988 try: 989 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 990 return local_vars['__ret'] 991 except SystemExit: 992 raise # Send error.JobContinue and JobComplete on up to runjob. 993 except error.TestNAError, detail: 994 self.record(detail.exit_status, None, fn, str(detail)) 995 except Exception, detail: 996 raise error.UnhandledJobError(detail) 997 998 999 def _create_frame(self, global_vars, ancestry, fn_name): 1000 """Set up the environment like it would have been when this 1001 function was first defined. 1002 1003 Child step engine 'implementations' must have 'return locals()' 1004 at end end of their steps. Because of this, we can call the 1005 parent function and get back all child functions (i.e. those 1006 defined within it). 1007 1008 Unfortunately, the call stack of the function calling 1009 job.next_step might have been deeper than the function it 1010 added. In order to make sure that the environment is what it 1011 should be, we need to then pop off the frames we built until 1012 we find the frame where the function was first defined.""" 1013 1014 # The copies ensure that the parent frames are not modified 1015 # while building child frames. This matters if we then 1016 # pop some frames in the next part of this function. 1017 current_frame = copy.copy(global_vars) 1018 frames = [current_frame] 1019 for steps_fn_name in ancestry: 1020 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1021 current_frame = copy.copy(ret) 1022 frames.append(current_frame) 1023 1024 # Walk up the stack frames until we find the place fn_name was defined. 1025 while len(frames) > 2: 1026 if fn_name not in frames[-2]: 1027 break 1028 if frames[-2][fn_name] != frames[-1][fn_name]: 1029 break 1030 frames.pop() 1031 ancestry.pop() 1032 1033 return (frames[-1], ancestry) 1034 1035 1036 def _add_step_init(self, local_vars, current_function): 1037 """If the function returned a dictionary that includes a 1038 function named 'step_init', prepend it to our list of steps. 1039 This will only get run the first time a function with a nested 1040 use of the step engine is run.""" 1041 1042 if (isinstance(local_vars, dict) and 1043 'step_init' in local_vars and 1044 callable(local_vars['step_init'])): 1045 # The init step is a child of the function 1046 # we were just running. 1047 self._current_step_ancestry.append(current_function) 1048 self.next_step_prepend('step_init') 1049 1050 1051 def step_engine(self): 1052 """The multi-run engine used when the control file defines step_init. 1053 1054 Does the next step. 1055 """ 1056 1057 # Set up the environment and then interpret the control file. 1058 # Some control files will have code outside of functions, 1059 # which means we need to have our state engine initialized 1060 # before reading in the file. 1061 global_control_vars = {'job': self, 1062 'args': self.args} 1063 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1064 try: 1065 execfile(self.control, global_control_vars, global_control_vars) 1066 except error.TestNAError, detail: 1067 self.record(detail.exit_status, None, self.control, str(detail)) 1068 except SystemExit: 1069 raise # Send error.JobContinue and JobComplete on up to runjob. 1070 except Exception, detail: 1071 # Syntax errors or other general Python exceptions coming out of 1072 # the top level of the control file itself go through here. 1073 raise error.UnhandledJobError(detail) 1074 1075 # If we loaded in a mid-job state file, then we presumably 1076 # know what steps we have yet to run. 1077 if not self._is_continuation: 1078 if 'step_init' in global_control_vars: 1079 self.next_step(global_control_vars['step_init']) 1080 else: 1081 # if last job failed due to unexpected reboot, record it as fail 1082 # so harness gets called 1083 last_job = self._state.get('client', 'unexpected_reboot', None) 1084 if last_job: 1085 subdir, testname = last_job 1086 self.record('FAIL', subdir, testname, 'unexpected reboot') 1087 self.record('END FAIL', subdir, testname) 1088 1089 # Iterate through the steps. If we reboot, we'll simply 1090 # continue iterating on the next step. 1091 while len(self._state.get('client', 'steps')) > 0: 1092 steps = self._state.get('client', 'steps') 1093 (ancestry, fn_name, args, dargs) = steps.pop(0) 1094 self._state.set('client', 'steps', steps) 1095 1096 self._next_step_index = 0 1097 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1098 local_vars, self._current_step_ancestry = ret 1099 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1100 self._add_step_init(local_vars, fn_name) 1101 1102 1103 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1104 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1105 on_every_test) 1106 1107 1108 def add_sysinfo_logfile(self, file, on_every_test=False): 1109 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1110 1111 1112 def _add_sysinfo_loggable(self, loggable, on_every_test): 1113 if on_every_test: 1114 self.sysinfo.test_loggables.add(loggable) 1115 else: 1116 self.sysinfo.boot_loggables.add(loggable) 1117 self._save_sysinfo_state() 1118 1119 1120 def _load_sysinfo_state(self): 1121 state = self._state.get('client', 'sysinfo', None) 1122 if state: 1123 self.sysinfo.deserialize(state) 1124 1125 1126 def _save_sysinfo_state(self): 1127 state = self.sysinfo.serialize() 1128 self._state.set('client', 'sysinfo', state) 1129 1130 1131class disk_usage_monitor: 1132 def __init__(self, logging_func, device, max_mb_per_hour): 1133 self.func = logging_func 1134 self.device = device 1135 self.max_mb_per_hour = max_mb_per_hour 1136 1137 1138 def start(self): 1139 self.initial_space = utils.freespace(self.device) 1140 self.start_time = time.time() 1141 1142 1143 def stop(self): 1144 # if no maximum usage rate was set, we don't need to 1145 # generate any warnings 1146 if not self.max_mb_per_hour: 1147 return 1148 1149 final_space = utils.freespace(self.device) 1150 used_space = self.initial_space - final_space 1151 stop_time = time.time() 1152 total_time = stop_time - self.start_time 1153 # round up the time to one minute, to keep extremely short 1154 # tests from generating false positives due to short, badly 1155 # timed bursts of activity 1156 total_time = max(total_time, 60.0) 1157 1158 # determine the usage rate 1159 bytes_per_sec = used_space / total_time 1160 mb_per_sec = bytes_per_sec / 1024**2 1161 mb_per_hour = mb_per_sec * 60 * 60 1162 1163 if mb_per_hour > self.max_mb_per_hour: 1164 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1165 msg %= (self.device, mb_per_hour) 1166 self.func(msg) 1167 1168 1169 @classmethod 1170 def watch(cls, *monitor_args, **monitor_dargs): 1171 """ Generic decorator to wrap a function call with the 1172 standard create-monitor -> start -> call -> stop idiom.""" 1173 def decorator(func): 1174 def watched_func(*args, **dargs): 1175 monitor = cls(*monitor_args, **monitor_dargs) 1176 monitor.start() 1177 try: 1178 func(*args, **dargs) 1179 finally: 1180 monitor.stop() 1181 return watched_func 1182 return decorator 1183 1184 1185def runjob(control, drop_caches, options): 1186 """ 1187 Run a job using the given control file. 1188 1189 This is the main interface to this module. 1190 1191 @see base_job.__init__ for parameter info. 1192 """ 1193 control = os.path.abspath(control) 1194 state = control + '.state' 1195 # Ensure state file is cleaned up before the job starts to run if autotest 1196 # is not running with the --continue flag 1197 if not options.cont and os.path.isfile(state): 1198 logging.debug('Cleaning up previously found state file') 1199 os.remove(state) 1200 1201 # instantiate the job object ready for the control file. 1202 myjob = None 1203 try: 1204 # Check that the control file is valid 1205 if not os.path.exists(control): 1206 raise error.JobError(control + ": control file not found") 1207 1208 # When continuing, the job is complete when there is no 1209 # state file, ensure we don't try and continue. 1210 if options.cont and not os.path.exists(state): 1211 raise error.JobComplete("all done") 1212 1213 myjob = job(control=control, drop_caches=drop_caches, options=options) 1214 1215 # Load in the users control file, may do any one of: 1216 # 1) execute in toto 1217 # 2) define steps, and select the first via next_step() 1218 myjob.step_engine() 1219 1220 except error.JobContinue: 1221 sys.exit(5) 1222 1223 except error.JobComplete: 1224 sys.exit(1) 1225 1226 except error.JobError, instance: 1227 logging.error("JOB ERROR: " + str(instance)) 1228 if myjob: 1229 command = None 1230 if len(instance.args) > 1: 1231 command = instance.args[1] 1232 myjob.record('ABORT', None, command, str(instance)) 1233 myjob.record('END ABORT', None, None, str(instance)) 1234 assert myjob._record_indent == 0 1235 myjob.complete(1) 1236 else: 1237 sys.exit(1) 1238 1239 except Exception, e: 1240 # NOTE: job._run_step_fn and job.step_engine will turn things into 1241 # a JobError for us. If we get here, its likely an autotest bug. 1242 msg = str(e) + '\n' + traceback.format_exc() 1243 logging.critical("JOB ERROR (autotest bug?): " + msg) 1244 if myjob: 1245 myjob.record('END ABORT', None, None, msg) 1246 assert myjob._record_indent == 0 1247 myjob.complete(1) 1248 else: 1249 sys.exit(1) 1250 1251 # If we get here, then we assume the job is complete and good. 1252 myjob.record('END GOOD', None, None) 1253 assert myjob._record_indent == 0 1254 1255 myjob.complete(0) 1256 1257 1258class job(base_client_job): 1259 1260 def __init__(self, *args, **kwargs): 1261 base_client_job.__init__(self, *args, **kwargs) 1262 1263 1264 def run_test(self, url, *args, **dargs): 1265 log_pauser = cros_logging.LogRotationPauser() 1266 passed = False 1267 try: 1268 log_pauser.begin() 1269 passed = base_client_job.run_test(self, url, *args, **dargs) 1270 if not passed: 1271 # Save the VM state immediately after the test failure. 1272 # This is a NOOP if the the test isn't running in a VM or 1273 # if the VM is not properly configured to save state. 1274 _group, testname = self.pkgmgr.get_package_name(url, 'test') 1275 now = datetime.now().strftime('%I:%M:%S.%f') 1276 checkpoint_name = '%s-%s' % (testname, now) 1277 utils.save_vm_state(checkpoint_name) 1278 finally: 1279 log_pauser.end() 1280 return passed 1281 1282 1283 def reboot(self): 1284 self.reboot_setup() 1285 self.harness.run_reboot() 1286 1287 # sync first, so that a sync during shutdown doesn't time out 1288 utils.system('sync; sync', ignore_status=True) 1289 1290 utils.system('reboot </dev/null >/dev/null 2>&1 &') 1291 self.quit() 1292 1293 1294 def require_gcc(self): 1295 return False 1296 1297 1298# TODO(ayatane): This logic should be deduplicated with 1299# server/cros/dynamic_suite/control_file_getter.py, but the server 1300# libraries are not available on clients. 1301def _locate_test_control_file(dirpath, testname): 1302 """ 1303 Locate the control file for the given test. 1304 1305 @param dirpath Root directory to search. 1306 @param testname Name of test. 1307 1308 @returns Absolute path to the control file. 1309 @raise JobError: Raised if control file not found. 1310 """ 1311 for dirpath, _dirnames, filenames in os.walk(dirpath): 1312 for filename in filenames: 1313 if 'control' not in filename: 1314 continue 1315 path = os.path.join(dirpath, filename) 1316 if _is_control_file_for_test(path, testname): 1317 return os.path.abspath(path) 1318 raise error.JobError( 1319 'could not find client test control file', 1320 dirpath, testname) 1321 1322 1323_NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]" 1324 1325 1326def _is_control_file_for_test(path, testname): 1327 with open(path) as f: 1328 for line in f: 1329 match = re.match(_NAME_PATTERN, line) 1330 if match is not None: 1331 return match.group(1) == testname 1332