"""The main job wrapper This is the core infrastructure. Copyright Andy Whitcroft, Martin J. Bligh 2006 """ # pylint: disable=missing-docstring import copy from datetime import datetime import getpass import glob import logging import os import re import shutil import sys import time import traceback import types import weakref import common from autotest_lib.client.bin import client_logging_config from autotest_lib.client.bin import harness from autotest_lib.client.bin import local_host from autotest_lib.client.bin import parallel from autotest_lib.client.bin import partition as partition_lib from autotest_lib.client.bin import profilers from autotest_lib.client.bin import sysinfo from autotest_lib.client.bin import test from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import barrier from autotest_lib.client.common_lib import base_job from autotest_lib.client.common_lib import control_data from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib import logging_manager from autotest_lib.client.common_lib import packages from autotest_lib.client.cros import cros_logging from autotest_lib.client.tools import html_report GLOBAL_CONFIG = global_config.global_config LAST_BOOT_TAG = object() JOB_PREAMBLE = """ from autotest_lib.client.common_lib.error import * from autotest_lib.client.bin.utils import * """ class StepError(error.AutotestError): pass class NotAvailableError(error.AutotestError): pass def _run_test_complete_on_exit(f): """Decorator for job methods that automatically calls self.harness.run_test_complete when the method exits, if appropriate.""" def wrapped(self, *args, **dargs): try: return f(self, *args, **dargs) finally: if self._logger.global_filename == 'status': self.harness.run_test_complete() if self.drop_caches: utils.drop_caches() wrapped.__name__ = f.__name__ wrapped.__doc__ = f.__doc__ wrapped.__dict__.update(f.__dict__) return wrapped class status_indenter(base_job.status_indenter): """Provide a status indenter that is backed by job._record_prefix.""" def __init__(self, job_): self._job = weakref.proxy(job_) # avoid a circular reference @property def indent(self): return self._job._record_indent def increment(self): self._job._record_indent += 1 def decrement(self): self._job._record_indent -= 1 class base_client_job(base_job.base_job): """The client-side concrete implementation of base_job. Optional properties provided by this implementation: control harness """ _WARNING_DISABLE_DELAY = 5 # _record_indent is a persistent property, but only on the client _job_state = base_job.base_job._job_state _record_indent = _job_state.property_factory( '_state', '_record_indent', 0, namespace='client') _max_disk_usage_rate = _job_state.property_factory( '_state', '_max_disk_usage_rate', 0.0, namespace='client') def __init__(self, control, options, drop_caches=True): """ Prepare a client side job object. @param control: The control file (pathname of). @param options: an object which includes: jobtag: The job tag string (eg "default"). cont: If this is the continuation of this job. harness_type: An alternative server harness. [None] use_external_logging: If true, the enable_external_logging method will be called during construction. [False] @param drop_caches: If true, utils.drop_caches() is called before and between all tests. [True] """ super(base_client_job, self).__init__(options=options) self._pre_record_init(control, options) try: self._post_record_init(control, options, drop_caches) except Exception, err: self.record( 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % str(err)) raise @classmethod def _get_environ_autodir(cls): return os.environ['AUTODIR'] @classmethod def _find_base_directories(cls): """ Determine locations of autodir and clientdir (which are the same) using os.environ. Serverdir does not exist in this context. """ autodir = clientdir = cls._get_environ_autodir() return autodir, clientdir, None @classmethod def _parse_args(cls, args): return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) def _find_resultdir(self, options): """ Determine the directory for storing results. On a client this is always /results/, where tag is passed in on the command line as an option. """ output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 'output_dir', default="") if options.output_dir: basedir = options.output_dir elif output_dir_config: basedir = output_dir_config else: basedir = self.autodir return os.path.join(basedir, 'results', options.tag) def _get_status_logger(self): """Return a reference to the status logger.""" return self._logger def _pre_record_init(self, control, options): """ Initialization function that should peform ONLY the required setup so that the self.record() method works. As of now self.record() needs self.resultdir, self._group_level, self.harness and of course self._logger. """ if not options.cont: self._cleanup_debugdir_files() self._cleanup_results_dir() logging_manager.configure_logging( client_logging_config.ClientLoggingConfig(), results_dir=self.resultdir, verbose=options.verbose) logging.info('Writing results to %s', self.resultdir) # init_group_level needs the state self.control = os.path.realpath(control) self._is_continuation = options.cont self._current_step_ancestry = [] self._next_step_index = 0 self._load_state() _harness = self.handle_persistent_option(options, 'harness') _harness_args = self.handle_persistent_option(options, 'harness_args') self.harness = harness.select(_harness, self, _harness_args) if self.control: parsed_control = control_data.parse_control( self.control, raise_warnings=False) self.fast = parsed_control.fast # set up the status logger def client_job_record_hook(entry): msg_tag = '' if '.' in self._logger.global_filename: msg_tag = self._logger.global_filename.split('.', 1)[1] # send the entry to the job harness message = '\n'.join([entry.message] + entry.extra_message_lines) rendered_entry = self._logger.render_entry(entry) self.harness.test_status_detail(entry.status_code, entry.subdir, entry.operation, message, msg_tag, entry.fields) self.harness.test_status(rendered_entry, msg_tag) # send the entry to stdout, if it's enabled logging.info(rendered_entry) self._logger = base_job.status_logger( self, status_indenter(self), record_hook=client_job_record_hook) def _post_record_init(self, control, options, drop_caches): """ Perform job initialization not required by self.record(). """ self._init_drop_caches(drop_caches) self._init_packages() self.sysinfo = sysinfo.sysinfo(self.resultdir) self._load_sysinfo_state() if not options.cont: download = os.path.join(self.testdir, 'download') if not os.path.exists(download): os.mkdir(download) shutil.copyfile(self.control, os.path.join(self.resultdir, 'control')) self.control = control self.logging = logging_manager.get_logging_manager( manage_stdout_and_stderr=True, redirect_fds=True) self.logging.start_logging() self.profilers = profilers.profilers(self) self.machines = [options.hostname] self.machine_dict_list = [{'hostname' : options.hostname}] # Client side tests should always run the same whether or not they are # running in the lab. self.in_lab = False self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) self.args = [] if options.args: self.args = self._parse_args(options.args) if options.user: self.user = options.user else: self.user = getpass.getuser() self.sysinfo.log_per_reboot_data() if not options.cont: self.record('START', None, None) self.harness.run_start() if options.log: self.enable_external_logging() self.num_tests_run = None self.num_tests_failed = None self.warning_loggers = None self.warning_manager = None def _init_drop_caches(self, drop_caches): """ Perform the drop caches initialization. """ self.drop_caches_between_iterations = ( GLOBAL_CONFIG.get_config_value('CLIENT', 'drop_caches_between_iterations', type=bool, default=True)) self.drop_caches = drop_caches if self.drop_caches: utils.drop_caches() def _init_packages(self): """ Perform the packages support initialization. """ self.pkgmgr = packages.PackageManager( self.autodir, run_function_dargs={'timeout':3600}) def _cleanup_results_dir(self): """Delete everything in resultsdir""" assert os.path.exists(self.resultdir) list_files = glob.glob('%s/*' % self.resultdir) for f in list_files: if os.path.isdir(f): shutil.rmtree(f) elif os.path.isfile(f): os.remove(f) def _cleanup_debugdir_files(self): """ Delete any leftover debugdir files """ list_files = glob.glob("/tmp/autotest_results_dir.*") for f in list_files: os.remove(f) def disable_warnings(self, warning_type): self.record("INFO", None, None, "disabling %s warnings" % warning_type, {"warnings.disable": warning_type}) time.sleep(self._WARNING_DISABLE_DELAY) def enable_warnings(self, warning_type): time.sleep(self._WARNING_DISABLE_DELAY) self.record("INFO", None, None, "enabling %s warnings" % warning_type, {"warnings.enable": warning_type}) def monitor_disk_usage(self, max_rate): """\ Signal that the job should monitor disk space usage on / and generate a warning if a test uses up disk space at a rate exceeding 'max_rate'. Parameters: max_rate - the maximium allowed rate of disk consumption during a test, in MB/hour, or 0 to indicate no limit. """ self._max_disk_usage_rate = max_rate def control_get(self): return self.control def control_set(self, control): self.control = os.path.abspath(control) def harness_select(self, which, harness_args): self.harness = harness.select(which, self, harness_args) def setup_dirs(self, results_dir, tmp_dir): if not tmp_dir: tmp_dir = os.path.join(self.tmpdir, 'build') if not os.path.exists(tmp_dir): os.mkdir(tmp_dir) if not os.path.isdir(tmp_dir): e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir raise ValueError(e_msg) # We label the first build "build" and then subsequent ones # as "build.2", "build.3", etc. Whilst this is a little bit # inconsistent, 99.9% of jobs will only have one build # (that's not done as kernbench, sparse, or buildtest), # so it works out much cleaner. One of life's compromises. if not results_dir: results_dir = os.path.join(self.resultdir, 'build') i = 2 while os.path.exists(results_dir): results_dir = os.path.join(self.resultdir, 'build.%d' % i) i += 1 if not os.path.exists(results_dir): os.mkdir(results_dir) return (results_dir, tmp_dir) def barrier(self, *args, **kwds): """Create a barrier object""" return barrier.barrier(*args, **kwds) def install_pkg(self, name, pkg_type, install_dir): ''' This method is a simple wrapper around the actual package installation method in the Packager class. This is used internally by the profilers, deps and tests code. name : name of the package (ex: sleeptest, dbench etc.) pkg_type : Type of the package (ex: test, dep etc.) install_dir : The directory in which the source is actually untarred into. (ex: client/profilers/ for profilers) ''' if self.pkgmgr.repositories: self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) def add_repository(self, repo_urls): ''' Adds the repository locations to the job so that packages can be fetched from them when needed. The repository list needs to be a string list Ex: job.add_repository(['http://blah1','http://blah2']) ''' for repo_url in repo_urls: self.pkgmgr.add_repository(repo_url) # Fetch the packages' checksum file that contains the checksums # of all the packages if it is not already fetched. The checksum # is always fetched whenever a job is first started. This # is not done in the job's constructor as we don't have the list of # the repositories there (and obviously don't care about this file # if we are not using the repos) try: checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, packages.CHECKSUM_FILE) self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, use_checksum=False) except error.PackageFetchError: # packaging system might not be working in this case # Silently fall back to the normal case pass def require_gcc(self): """ Test whether gcc is installed on the machine. """ # check if gcc is installed on the system. try: utils.system('which gcc') except error.CmdError: raise NotAvailableError('gcc is required by this job and is ' 'not available on the system') def setup_dep(self, deps): """Set up the dependencies for this test. deps is a list of libraries required for this test. """ # Fetch the deps from the repositories and set them up. for dep in deps: dep_dir = os.path.join(self.autodir, 'deps', dep) # Search for the dependency in the repositories if specified, # else check locally. try: self.install_pkg(dep, 'dep', dep_dir) except error.PackageInstallError: # see if the dep is there locally pass # dep_dir might not exist if it is not fetched from the repos if not os.path.exists(dep_dir): raise error.TestError("Dependency %s does not exist" % dep) os.chdir(dep_dir) if execfile('%s.py' % dep, {}) is None: logging.info('Dependency %s successfuly built', dep) def _runtest(self, url, tag, timeout, args, dargs): try: l = lambda : test.runtest(self, url, tag, args, dargs) pid = parallel.fork_start(self.resultdir, l) self._forkwait(pid, timeout) except error.TestBaseException: # These are already classified with an error type (exit_status) raise except error.JobError: raise # Caught further up and turned into an ABORT. except Exception, e: # Converts all other exceptions thrown by the test regardless # of phase into a TestError(TestBaseException) subclass that # reports them with their full stack trace. raise error.UnhandledTestError(e) def _forkwait(self, pid, timeout=None): """Wait for the given pid to complete @param pid (int) process id to wait for @param timeout (int) seconds to wait before timing out the process""" if timeout: logging.debug('Waiting for pid %d for %d seconds', pid, timeout) parallel.fork_waitfor_timed(self.resultdir, pid, timeout) else: logging.debug('Waiting for pid %d', pid) parallel.fork_waitfor(self.resultdir, pid) logging.info('pid %d completed', pid) def _run_test_base(self, url, *args, **dargs): """ Prepares arguments and run functions to run_test and run_test_detail. @param url A url that identifies the test to run. @param tag An optional keyword argument that will be added to the test and subdir name. @param subdir_tag An optional keyword argument that will be added to the subdir name. @returns: subdir: Test subdirectory testname: Test name group_func: Actual test run function timeout: Test timeout """ _group, testname = self.pkgmgr.get_package_name(url, 'test') testname, subdir, tag = self._build_tagged_test_name(testname, dargs) self._make_test_outputdir(subdir) timeout = dargs.pop('timeout', None) if timeout: logging.debug('Test has timeout: %d sec.', timeout) def log_warning(reason): self.record("WARN", subdir, testname, reason) @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) def group_func(): try: self._runtest(url, tag, timeout, args, dargs) except error.TestBaseException, detail: # The error is already classified, record it properly. self.record(detail.exit_status, subdir, testname, str(detail)) raise else: self.record('GOOD', subdir, testname, 'completed successfully') return (subdir, testname, group_func, timeout) @_run_test_complete_on_exit def run_test(self, url, *args, **dargs): """ Summon a test object and run it. @param url A url that identifies the test to run. @param tag An optional keyword argument that will be added to the test and subdir name. @param subdir_tag An optional keyword argument that will be added to the subdir name. @returns True if the test passes, False otherwise. """ (subdir, testname, group_func, timeout) = self._run_test_base(url, *args, **dargs) try: self._rungroup(subdir, testname, group_func, timeout) return True except error.TestBaseException: return False # Any other exception here will be given to the caller # # NOTE: The only exception possible from the control file here # is error.JobError as _runtest() turns all others into an # UnhandledTestError that is caught above. def stage_control_file(self, url): """ Install the test package and return the control file path. @param url The name of the test, e.g. dummy_Pass. This is the string passed to run_test in the client test control file: job.run_test('dummy_Pass') This name can also be something like 'camera_HAL3.jea', which corresponds to a test package containing multiple control files, each with calls to: job.run_test('camera_HAL3', **opts) @returns Absolute path to the control file for the test. """ testname, _, _tag = url.partition('.') bindir = os.path.join(self.testdir, testname) self.install_pkg(testname, 'test', bindir) return _locate_test_control_file(bindir, url) @_run_test_complete_on_exit def run_test_detail(self, url, *args, **dargs): """ Summon a test object and run it, returning test status. @param url A url that identifies the test to run. @param tag An optional keyword argument that will be added to the test and subdir name. @param subdir_tag An optional keyword argument that will be added to the subdir name. @returns Test status @see: client/common_lib/error.py, exit_status """ (subdir, testname, group_func, timeout) = self._run_test_base(url, *args, **dargs) try: self._rungroup(subdir, testname, group_func, timeout) return 'GOOD' except error.TestBaseException, detail: return detail.exit_status def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): """\ subdir: name of the group testname: name of the test to run, or support step function: subroutine to run *args: arguments for the function Returns the result of the passed in function """ try: optional_fields = None if timeout: optional_fields = {} optional_fields['timeout'] = timeout self.record('START', subdir, testname, optional_fields=optional_fields) self._state.set('client', 'unexpected_reboot', (subdir, testname)) try: result = function(*args, **dargs) self.record('END GOOD', subdir, testname) return result except error.TestBaseException, e: self.record('END %s' % e.exit_status, subdir, testname) raise except error.JobError, e: self.record('END ABORT', subdir, testname) raise except Exception, e: # This should only ever happen due to a bug in the given # function's code. The common case of being called by # run_test() will never reach this. If a control file called # run_group() itself, bugs in its function will be caught # here. err_msg = str(e) + '\n' + traceback.format_exc() self.record('END ERROR', subdir, testname, err_msg) raise finally: self._state.discard('client', 'unexpected_reboot') def run_group(self, function, tag=None, **dargs): """ Run a function nested within a group level. function: Callable to run. tag: An optional tag name for the group. If None (default) function.__name__ will be used. **dargs: Named arguments for the function. """ if tag: name = tag else: name = function.__name__ try: return self._rungroup(subdir=None, testname=name, function=function, timeout=None, **dargs) except (SystemExit, error.TestBaseException): raise # If there was a different exception, turn it into a TestError. # It will be caught by step_engine or _run_step_fn. except Exception, e: raise error.UnhandledTestError(e) def cpu_count(self): return utils.count_cpus() # use total system count def start_reboot(self): self.record('START', None, 'reboot') self.record('GOOD', None, 'reboot.start') def _record_reboot_failure(self, subdir, operation, status, running_id=None): self.record("ABORT", subdir, operation, status) if not running_id: running_id = utils.running_os_ident() kernel = {"kernel": running_id.split("::")[0]} self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) def _check_post_reboot(self, subdir, running_id=None): """ Function to perform post boot checks such as if the system configuration has changed across reboots (specifically, CPUs and partitions). @param subdir: The subdir to use in the job.record call. @param running_id: An optional running_id to include in the reboot failure log message @raise JobError: Raised if the current configuration does not match the pre-reboot configuration. """ # check to see if any partitions have changed partition_list = partition_lib.get_partition_list(self, exclude_swap=False) mount_info = partition_lib.get_mount_info(partition_list) old_mount_info = self._state.get('client', 'mount_info') if mount_info != old_mount_info: new_entries = mount_info - old_mount_info old_entries = old_mount_info - mount_info description = ("mounted partitions are different after reboot " "(old entries: %s, new entries: %s)" % (old_entries, new_entries)) self._record_reboot_failure(subdir, "reboot.verify_config", description, running_id=running_id) raise error.JobError("Reboot failed: %s" % description) # check to see if any CPUs have changed cpu_count = utils.count_cpus() old_count = self._state.get('client', 'cpu_count') if cpu_count != old_count: description = ('Number of CPUs changed after reboot ' '(old count: %d, new count: %d)' % (old_count, cpu_count)) self._record_reboot_failure(subdir, 'reboot.verify_config', description, running_id=running_id) raise error.JobError('Reboot failed: %s' % description) def partition(self, device, loop_size=0, mountpoint=None): """ Work with a machine partition @param device: e.g. /dev/sda2, /dev/sdb1 etc... @param mountpoint: Specify a directory to mount to. If not specified autotest tmp directory will be used. @param loop_size: Size of loopback device (in MB). Defaults to 0. @return: A L{client.bin.partition.partition} object """ if not mountpoint: mountpoint = self.tmpdir return partition_lib.partition(self, device, loop_size, mountpoint) @utils.deprecated def filesystem(self, device, mountpoint=None, loop_size=0): """ Same as partition @deprecated: Use partition method instead """ return self.partition(device, loop_size, mountpoint) def enable_external_logging(self): pass def disable_external_logging(self): pass def reboot_setup(self): # save the partition list and mount points, as well as the cpu count partition_list = partition_lib.get_partition_list(self, exclude_swap=False) mount_info = partition_lib.get_mount_info(partition_list) self._state.set('client', 'mount_info', mount_info) self._state.set('client', 'cpu_count', utils.count_cpus()) def reboot(self): self.reboot_setup() self.harness.run_reboot() # HACK: using this as a module sometimes hangs shutdown, so if it's # installed unload it first utils.system("modprobe -r netconsole", ignore_status=True) # sync first, so that a sync during shutdown doesn't time out utils.system("sync; sync", ignore_status=True) utils.system("(sleep 5; reboot) /dev/null 2>&1 &") self.quit() def noop(self, text): logging.info("job: noop: " + text) @_run_test_complete_on_exit def parallel(self, *tasklist, **kwargs): """Run tasks in parallel""" pids = [] old_log_filename = self._logger.global_filename for i, task in enumerate(tasklist): assert isinstance(task, (tuple, list)) self._logger.global_filename = old_log_filename + (".%d" % i) def task_func(): # stub out _record_indent with a process-local one base_record_indent = self._record_indent proc_local = self._job_state.property_factory( '_state', '_record_indent.%d' % os.getpid(), base_record_indent, namespace='client') self.__class__._record_indent = proc_local task[0](*task[1:]) forked_pid = parallel.fork_start(self.resultdir, task_func) logging.info('Just forked pid %d', forked_pid) pids.append(forked_pid) old_log_path = os.path.join(self.resultdir, old_log_filename) old_log = open(old_log_path, "a") exceptions = [] for i, pid in enumerate(pids): # wait for the task to finish try: self._forkwait(pid, kwargs.get('timeout')) except Exception, e: logging.info('pid %d completed with error', pid) exceptions.append(e) # copy the logs from the subtask into the main log new_log_path = old_log_path + (".%d" % i) if os.path.exists(new_log_path): new_log = open(new_log_path) old_log.write(new_log.read()) new_log.close() old_log.flush() os.remove(new_log_path) old_log.close() self._logger.global_filename = old_log_filename # handle any exceptions raised by the parallel tasks if exceptions: msg = "%d task(s) failed in job.parallel" % len(exceptions) raise error.JobError(msg) def quit(self): # XXX: should have a better name. self.harness.run_pause() raise error.JobContinue("more to come") def complete(self, status): """Write pending reports, clean up, and exit""" # write out a job HTML report try: html_report.create_report(self.resultdir) except Exception, e: logging.error("Error writing job HTML report: %s", e) # We are about to exit 'complete' so clean up the control file. dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) shutil.move(self._state_file, dest) self.harness.run_complete() self.disable_external_logging() sys.exit(status) def _load_state(self): # grab any initial state and set up $CONTROL.state as the backing file init_state_file = self.control + '.init.state' self._state_file = self.control + '.state' if os.path.exists(init_state_file): shutil.move(init_state_file, self._state_file) self._state.set_backing_file(self._state_file) # initialize the state engine, if necessary has_steps = self._state.has('client', 'steps') if not self._is_continuation and has_steps: raise RuntimeError('Loaded state can only contain client.steps if ' 'this is a continuation') if not has_steps: logging.debug('Initializing the state engine') self._state.set('client', 'steps', []) def handle_persistent_option(self, options, option_name): """ Select option from command line or persistent state. Store selected option to allow standalone client to continue after reboot with previously selected options. Priority: 1. explicitly specified via command line 2. stored in state file (if continuing job '-c') 3. default == None """ option = None cmd_line_option = getattr(options, option_name) if cmd_line_option: option = cmd_line_option self._state.set('client', option_name, option) else: stored_option = self._state.get('client', option_name, None) if stored_option: option = stored_option logging.debug('Persistent option %s now set to %s', option_name, option) return option def __create_step_tuple(self, fn, args, dargs): # Legacy code passes in an array where the first arg is # the function or its name. if isinstance(fn, list): assert(len(args) == 0) assert(len(dargs) == 0) args = fn[1:] fn = fn[0] # Pickling actual functions is hairy, thus we have to call # them by name. Unfortunately, this means only functions # defined globally can be used as a next step. if callable(fn): fn = fn.__name__ if not isinstance(fn, types.StringTypes): raise StepError("Next steps must be functions or " "strings containing the function name") ancestry = copy.copy(self._current_step_ancestry) return (ancestry, fn, args, dargs) def next_step_append(self, fn, *args, **dargs): """Define the next step and place it at the end""" steps = self._state.get('client', 'steps') steps.append(self.__create_step_tuple(fn, args, dargs)) self._state.set('client', 'steps', steps) def next_step(self, fn, *args, **dargs): """Create a new step and place it after any steps added while running the current step but before any steps added in previous steps""" steps = self._state.get('client', 'steps') steps.insert(self._next_step_index, self.__create_step_tuple(fn, args, dargs)) self._next_step_index += 1 self._state.set('client', 'steps', steps) def next_step_prepend(self, fn, *args, **dargs): """Insert a new step, executing first""" steps = self._state.get('client', 'steps') steps.insert(0, self.__create_step_tuple(fn, args, dargs)) self._next_step_index += 1 self._state.set('client', 'steps', steps) def _run_step_fn(self, local_vars, fn, args, dargs): """Run a (step) function within the given context""" local_vars['__args'] = args local_vars['__dargs'] = dargs try: exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) return local_vars['__ret'] except SystemExit: raise # Send error.JobContinue and JobComplete on up to runjob. except error.TestNAError, detail: self.record(detail.exit_status, None, fn, str(detail)) except Exception, detail: raise error.UnhandledJobError(detail) def _create_frame(self, global_vars, ancestry, fn_name): """Set up the environment like it would have been when this function was first defined. Child step engine 'implementations' must have 'return locals()' at end end of their steps. Because of this, we can call the parent function and get back all child functions (i.e. those defined within it). Unfortunately, the call stack of the function calling job.next_step might have been deeper than the function it added. In order to make sure that the environment is what it should be, we need to then pop off the frames we built until we find the frame where the function was first defined.""" # The copies ensure that the parent frames are not modified # while building child frames. This matters if we then # pop some frames in the next part of this function. current_frame = copy.copy(global_vars) frames = [current_frame] for steps_fn_name in ancestry: ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) current_frame = copy.copy(ret) frames.append(current_frame) # Walk up the stack frames until we find the place fn_name was defined. while len(frames) > 2: if fn_name not in frames[-2]: break if frames[-2][fn_name] != frames[-1][fn_name]: break frames.pop() ancestry.pop() return (frames[-1], ancestry) def _add_step_init(self, local_vars, current_function): """If the function returned a dictionary that includes a function named 'step_init', prepend it to our list of steps. This will only get run the first time a function with a nested use of the step engine is run.""" if (isinstance(local_vars, dict) and 'step_init' in local_vars and callable(local_vars['step_init'])): # The init step is a child of the function # we were just running. self._current_step_ancestry.append(current_function) self.next_step_prepend('step_init') def step_engine(self): """The multi-run engine used when the control file defines step_init. Does the next step. """ # Set up the environment and then interpret the control file. # Some control files will have code outside of functions, # which means we need to have our state engine initialized # before reading in the file. global_control_vars = {'job': self, 'args': self.args} exec(JOB_PREAMBLE, global_control_vars, global_control_vars) try: execfile(self.control, global_control_vars, global_control_vars) except error.TestNAError, detail: self.record(detail.exit_status, None, self.control, str(detail)) except SystemExit: raise # Send error.JobContinue and JobComplete on up to runjob. except Exception, detail: # Syntax errors or other general Python exceptions coming out of # the top level of the control file itself go through here. raise error.UnhandledJobError(detail) # If we loaded in a mid-job state file, then we presumably # know what steps we have yet to run. if not self._is_continuation: if 'step_init' in global_control_vars: self.next_step(global_control_vars['step_init']) else: # if last job failed due to unexpected reboot, record it as fail # so harness gets called last_job = self._state.get('client', 'unexpected_reboot', None) if last_job: subdir, testname = last_job self.record('FAIL', subdir, testname, 'unexpected reboot') self.record('END FAIL', subdir, testname) # Iterate through the steps. If we reboot, we'll simply # continue iterating on the next step. while len(self._state.get('client', 'steps')) > 0: steps = self._state.get('client', 'steps') (ancestry, fn_name, args, dargs) = steps.pop(0) self._state.set('client', 'steps', steps) self._next_step_index = 0 ret = self._create_frame(global_control_vars, ancestry, fn_name) local_vars, self._current_step_ancestry = ret local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) self._add_step_init(local_vars, fn_name) def add_sysinfo_command(self, command, logfile=None, on_every_test=False): self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), on_every_test) def add_sysinfo_logfile(self, file, on_every_test=False): self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) def _add_sysinfo_loggable(self, loggable, on_every_test): if on_every_test: self.sysinfo.test_loggables.add(loggable) else: self.sysinfo.boot_loggables.add(loggable) self._save_sysinfo_state() def _load_sysinfo_state(self): state = self._state.get('client', 'sysinfo', None) if state: self.sysinfo.deserialize(state) def _save_sysinfo_state(self): state = self.sysinfo.serialize() self._state.set('client', 'sysinfo', state) class disk_usage_monitor: def __init__(self, logging_func, device, max_mb_per_hour): self.func = logging_func self.device = device self.max_mb_per_hour = max_mb_per_hour def start(self): self.initial_space = utils.freespace(self.device) self.start_time = time.time() def stop(self): # if no maximum usage rate was set, we don't need to # generate any warnings if not self.max_mb_per_hour: return final_space = utils.freespace(self.device) used_space = self.initial_space - final_space stop_time = time.time() total_time = stop_time - self.start_time # round up the time to one minute, to keep extremely short # tests from generating false positives due to short, badly # timed bursts of activity total_time = max(total_time, 60.0) # determine the usage rate bytes_per_sec = used_space / total_time mb_per_sec = bytes_per_sec / 1024**2 mb_per_hour = mb_per_sec * 60 * 60 if mb_per_hour > self.max_mb_per_hour: msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") msg %= (self.device, mb_per_hour) self.func(msg) @classmethod def watch(cls, *monitor_args, **monitor_dargs): """ Generic decorator to wrap a function call with the standard create-monitor -> start -> call -> stop idiom.""" def decorator(func): def watched_func(*args, **dargs): monitor = cls(*monitor_args, **monitor_dargs) monitor.start() try: func(*args, **dargs) finally: monitor.stop() return watched_func return decorator def runjob(control, drop_caches, options): """ Run a job using the given control file. This is the main interface to this module. @see base_job.__init__ for parameter info. """ control = os.path.abspath(control) state = control + '.state' # Ensure state file is cleaned up before the job starts to run if autotest # is not running with the --continue flag if not options.cont and os.path.isfile(state): logging.debug('Cleaning up previously found state file') os.remove(state) # instantiate the job object ready for the control file. myjob = None try: # Check that the control file is valid if not os.path.exists(control): raise error.JobError(control + ": control file not found") # When continuing, the job is complete when there is no # state file, ensure we don't try and continue. if options.cont and not os.path.exists(state): raise error.JobComplete("all done") myjob = job(control=control, drop_caches=drop_caches, options=options) # Load in the users control file, may do any one of: # 1) execute in toto # 2) define steps, and select the first via next_step() myjob.step_engine() except error.JobContinue: sys.exit(5) except error.JobComplete: sys.exit(1) except error.JobError, instance: logging.error("JOB ERROR: " + str(instance)) if myjob: command = None if len(instance.args) > 1: command = instance.args[1] myjob.record('ABORT', None, command, str(instance)) myjob.record('END ABORT', None, None, str(instance)) assert myjob._record_indent == 0 myjob.complete(1) else: sys.exit(1) except Exception, e: # NOTE: job._run_step_fn and job.step_engine will turn things into # a JobError for us. If we get here, its likely an autotest bug. msg = str(e) + '\n' + traceback.format_exc() logging.critical("JOB ERROR (autotest bug?): " + msg) if myjob: myjob.record('END ABORT', None, None, msg) assert myjob._record_indent == 0 myjob.complete(1) else: sys.exit(1) # If we get here, then we assume the job is complete and good. myjob.record('END GOOD', None, None) assert myjob._record_indent == 0 myjob.complete(0) class job(base_client_job): def __init__(self, *args, **kwargs): base_client_job.__init__(self, *args, **kwargs) def run_test(self, url, *args, **dargs): log_pauser = cros_logging.LogRotationPauser() passed = False try: log_pauser.begin() passed = base_client_job.run_test(self, url, *args, **dargs) if not passed: # Save the VM state immediately after the test failure. # This is a NOOP if the the test isn't running in a VM or # if the VM is not properly configured to save state. _group, testname = self.pkgmgr.get_package_name(url, 'test') now = datetime.now().strftime('%I:%M:%S.%f') checkpoint_name = '%s-%s' % (testname, now) utils.save_vm_state(checkpoint_name) finally: log_pauser.end() return passed def reboot(self): self.reboot_setup() self.harness.run_reboot() # sync first, so that a sync during shutdown doesn't time out utils.system('sync; sync', ignore_status=True) utils.system('reboot /dev/null 2>&1 &') self.quit() def require_gcc(self): return False # TODO(ayatane): This logic should be deduplicated with # server/cros/dynamic_suite/control_file_getter.py, but the server # libraries are not available on clients. def _locate_test_control_file(dirpath, testname): """ Locate the control file for the given test. @param dirpath Root directory to search. @param testname Name of test. @returns Absolute path to the control file. @raise JobError: Raised if control file not found. """ for dirpath, _dirnames, filenames in os.walk(dirpath): for filename in filenames: if 'control' not in filename: continue path = os.path.join(dirpath, filename) if _is_control_file_for_test(path, testname): return os.path.abspath(path) raise error.JobError( 'could not find client test control file', dirpath, testname) _NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]" def _is_control_file_for_test(path, testname): with open(path) as f: for line in f: match = re.match(_NAME_PATTERN, line) if match is not None: return match.group(1) == testname