• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8# pylint: disable=missing-docstring
9
10import copy
11from datetime import datetime
12import getpass
13import glob
14import logging
15import os
16import re
17import shutil
18import sys
19import time
20import traceback
21import types
22import weakref
23
24import common
25from autotest_lib.client.bin import client_logging_config
26from autotest_lib.client.bin import harness
27from autotest_lib.client.bin import local_host
28from autotest_lib.client.bin import parallel
29from autotest_lib.client.bin import partition as partition_lib
30from autotest_lib.client.bin import profilers
31from autotest_lib.client.bin import sysinfo
32from autotest_lib.client.bin import test
33from autotest_lib.client.bin import utils
34from autotest_lib.client.common_lib import barrier
35from autotest_lib.client.common_lib import base_job
36from autotest_lib.client.common_lib import packages
37from autotest_lib.client.common_lib import error
38from autotest_lib.client.common_lib import global_config
39from autotest_lib.client.common_lib import logging_manager
40from autotest_lib.client.common_lib import packages
41from autotest_lib.client.cros import cros_logging
42from autotest_lib.client.tools import html_report
43
44GLOBAL_CONFIG = global_config.global_config
45
46LAST_BOOT_TAG = object()
47JOB_PREAMBLE = """
48from autotest_lib.client.common_lib.error import *
49from autotest_lib.client.bin.utils import *
50"""
51
52
53class StepError(error.AutotestError):
54    pass
55
56class NotAvailableError(error.AutotestError):
57    pass
58
59
60
61def _run_test_complete_on_exit(f):
62    """Decorator for job methods that automatically calls
63    self.harness.run_test_complete when the method exits, if appropriate."""
64    def wrapped(self, *args, **dargs):
65        try:
66            return f(self, *args, **dargs)
67        finally:
68            if self._logger.global_filename == 'status':
69                self.harness.run_test_complete()
70                if self.drop_caches:
71                    utils.drop_caches()
72    wrapped.__name__ = f.__name__
73    wrapped.__doc__ = f.__doc__
74    wrapped.__dict__.update(f.__dict__)
75    return wrapped
76
77
78class status_indenter(base_job.status_indenter):
79    """Provide a status indenter that is backed by job._record_prefix."""
80    def __init__(self, job_):
81        self._job = weakref.proxy(job_)  # avoid a circular reference
82
83
84    @property
85    def indent(self):
86        return self._job._record_indent
87
88
89    def increment(self):
90        self._job._record_indent += 1
91
92
93    def decrement(self):
94        self._job._record_indent -= 1
95
96
97class base_client_job(base_job.base_job):
98    """The client-side concrete implementation of base_job.
99
100    Optional properties provided by this implementation:
101        control
102        harness
103    """
104
105    _WARNING_DISABLE_DELAY = 5
106
107    # _record_indent is a persistent property, but only on the client
108    _job_state = base_job.base_job._job_state
109    _record_indent = _job_state.property_factory(
110        '_state', '_record_indent', 0, namespace='client')
111    _max_disk_usage_rate = _job_state.property_factory(
112        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
113
114
115    def __init__(self, control, options, drop_caches=True):
116        """
117        Prepare a client side job object.
118
119        @param control: The control file (pathname of).
120        @param options: an object which includes:
121                jobtag: The job tag string (eg "default").
122                cont: If this is the continuation of this job.
123                harness_type: An alternative server harness.  [None]
124                use_external_logging: If true, the enable_external_logging
125                          method will be called during construction.  [False]
126        @param drop_caches: If true, utils.drop_caches() is called before and
127                between all tests.  [True]
128        """
129        super(base_client_job, self).__init__(options=options)
130        self._pre_record_init(control, options)
131        try:
132            self._post_record_init(control, options, drop_caches)
133        except Exception, err:
134            self.record(
135                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
136                    str(err))
137            raise
138
139
140    @classmethod
141    def _get_environ_autodir(cls):
142        return os.environ['AUTODIR']
143
144
145    @classmethod
146    def _find_base_directories(cls):
147        """
148        Determine locations of autodir and clientdir (which are the same)
149        using os.environ. Serverdir does not exist in this context.
150        """
151        autodir = clientdir = cls._get_environ_autodir()
152        return autodir, clientdir, None
153
154
155    @classmethod
156    def _parse_args(cls, args):
157        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
158
159
160    def _find_resultdir(self, options):
161        """
162        Determine the directory for storing results. On a client this is
163        always <autodir>/results/<tag>, where tag is passed in on the command
164        line as an option.
165        """
166        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
167                                                           'output_dir',
168                                                            default="")
169        if options.output_dir:
170            basedir = options.output_dir
171        elif output_dir_config:
172            basedir = output_dir_config
173        else:
174            basedir = self.autodir
175
176        return os.path.join(basedir, 'results', options.tag)
177
178
179    def _get_status_logger(self):
180        """Return a reference to the status logger."""
181        return self._logger
182
183
184    def _pre_record_init(self, control, options):
185        """
186        Initialization function that should peform ONLY the required
187        setup so that the self.record() method works.
188
189        As of now self.record() needs self.resultdir, self._group_level,
190        self.harness and of course self._logger.
191        """
192        if not options.cont:
193            self._cleanup_debugdir_files()
194            self._cleanup_results_dir()
195
196        logging_manager.configure_logging(
197            client_logging_config.ClientLoggingConfig(),
198            results_dir=self.resultdir,
199            verbose=options.verbose)
200        logging.info('Writing results to %s', self.resultdir)
201
202        # init_group_level needs the state
203        self.control = os.path.realpath(control)
204        self._is_continuation = options.cont
205        self._current_step_ancestry = []
206        self._next_step_index = 0
207        self._load_state()
208
209        _harness = self.handle_persistent_option(options, 'harness')
210        _harness_args = self.handle_persistent_option(options, 'harness_args')
211
212        self.harness = harness.select(_harness, self, _harness_args)
213
214        # set up the status logger
215        def client_job_record_hook(entry):
216            msg_tag = ''
217            if '.' in self._logger.global_filename:
218                msg_tag = self._logger.global_filename.split('.', 1)[1]
219            # send the entry to the job harness
220            message = '\n'.join([entry.message] + entry.extra_message_lines)
221            rendered_entry = self._logger.render_entry(entry)
222            self.harness.test_status_detail(entry.status_code, entry.subdir,
223                                            entry.operation, message, msg_tag,
224                                            entry.fields)
225            self.harness.test_status(rendered_entry, msg_tag)
226            # send the entry to stdout, if it's enabled
227            logging.info(rendered_entry)
228        self._logger = base_job.status_logger(
229            self, status_indenter(self), record_hook=client_job_record_hook)
230
231
232    def _post_record_init(self, control, options, drop_caches):
233        """
234        Perform job initialization not required by self.record().
235        """
236        self._init_drop_caches(drop_caches)
237
238        self._init_packages()
239
240        self.sysinfo = sysinfo.sysinfo(self.resultdir)
241        self._load_sysinfo_state()
242
243        if not options.cont:
244            download = os.path.join(self.testdir, 'download')
245            if not os.path.exists(download):
246                os.mkdir(download)
247
248            shutil.copyfile(self.control,
249                            os.path.join(self.resultdir, 'control'))
250
251        self.control = control
252
253        self.logging = logging_manager.get_logging_manager(
254                manage_stdout_and_stderr=True, redirect_fds=True)
255        self.logging.start_logging()
256
257        self.profilers = profilers.profilers(self)
258
259        self.machines = [options.hostname]
260        self.machine_dict_list = [{'hostname' : options.hostname}]
261        # Client side tests should always run the same whether or not they are
262        # running in the lab.
263        self.in_lab = False
264        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
265
266        self.args = []
267        if options.args:
268            self.args = self._parse_args(options.args)
269
270        if options.user:
271            self.user = options.user
272        else:
273            self.user = getpass.getuser()
274
275        self.sysinfo.log_per_reboot_data()
276
277        if not options.cont:
278            self.record('START', None, None)
279
280        self.harness.run_start()
281
282        if options.log:
283            self.enable_external_logging()
284
285        self.num_tests_run = None
286        self.num_tests_failed = None
287
288        self.warning_loggers = None
289        self.warning_manager = None
290
291
292    def _init_drop_caches(self, drop_caches):
293        """
294        Perform the drop caches initialization.
295        """
296        self.drop_caches_between_iterations = (
297                                    GLOBAL_CONFIG.get_config_value('CLIENT',
298                                    'drop_caches_between_iterations',
299                                    type=bool, default=True))
300        self.drop_caches = drop_caches
301        if self.drop_caches:
302            utils.drop_caches()
303
304
305    def _init_packages(self):
306        """
307        Perform the packages support initialization.
308        """
309        self.pkgmgr = packages.PackageManager(
310            self.autodir, run_function_dargs={'timeout':3600})
311
312
313    def _cleanup_results_dir(self):
314        """Delete everything in resultsdir"""
315        assert os.path.exists(self.resultdir)
316        list_files = glob.glob('%s/*' % self.resultdir)
317        for f in list_files:
318            if os.path.isdir(f):
319                shutil.rmtree(f)
320            elif os.path.isfile(f):
321                os.remove(f)
322
323
324    def _cleanup_debugdir_files(self):
325        """
326        Delete any leftover debugdir files
327        """
328        list_files = glob.glob("/tmp/autotest_results_dir.*")
329        for f in list_files:
330            os.remove(f)
331
332
333    def disable_warnings(self, warning_type):
334        self.record("INFO", None, None,
335                    "disabling %s warnings" % warning_type,
336                    {"warnings.disable": warning_type})
337        time.sleep(self._WARNING_DISABLE_DELAY)
338
339
340    def enable_warnings(self, warning_type):
341        time.sleep(self._WARNING_DISABLE_DELAY)
342        self.record("INFO", None, None,
343                    "enabling %s warnings" % warning_type,
344                    {"warnings.enable": warning_type})
345
346
347    def monitor_disk_usage(self, max_rate):
348        """\
349        Signal that the job should monitor disk space usage on /
350        and generate a warning if a test uses up disk space at a
351        rate exceeding 'max_rate'.
352
353        Parameters:
354             max_rate - the maximium allowed rate of disk consumption
355                        during a test, in MB/hour, or 0 to indicate
356                        no limit.
357        """
358        self._max_disk_usage_rate = max_rate
359
360
361    def control_get(self):
362        return self.control
363
364
365    def control_set(self, control):
366        self.control = os.path.abspath(control)
367
368
369    def harness_select(self, which, harness_args):
370        self.harness = harness.select(which, self, harness_args)
371
372
373    def setup_dirs(self, results_dir, tmp_dir):
374        if not tmp_dir:
375            tmp_dir = os.path.join(self.tmpdir, 'build')
376        if not os.path.exists(tmp_dir):
377            os.mkdir(tmp_dir)
378        if not os.path.isdir(tmp_dir):
379            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
380            raise ValueError(e_msg)
381
382        # We label the first build "build" and then subsequent ones
383        # as "build.2", "build.3", etc. Whilst this is a little bit
384        # inconsistent, 99.9% of jobs will only have one build
385        # (that's not done as kernbench, sparse, or buildtest),
386        # so it works out much cleaner. One of life's compromises.
387        if not results_dir:
388            results_dir = os.path.join(self.resultdir, 'build')
389            i = 2
390            while os.path.exists(results_dir):
391                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
392                i += 1
393        if not os.path.exists(results_dir):
394            os.mkdir(results_dir)
395
396        return (results_dir, tmp_dir)
397
398
399    def barrier(self, *args, **kwds):
400        """Create a barrier object"""
401        return barrier.barrier(*args, **kwds)
402
403
404    def install_pkg(self, name, pkg_type, install_dir):
405        '''
406        This method is a simple wrapper around the actual package
407        installation method in the Packager class. This is used
408        internally by the profilers, deps and tests code.
409        name : name of the package (ex: sleeptest, dbench etc.)
410        pkg_type : Type of the package (ex: test, dep etc.)
411        install_dir : The directory in which the source is actually
412                      untarred into. (ex: client/profilers/<name> for profilers)
413        '''
414        if self.pkgmgr.repositories:
415            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
416
417
418    def add_repository(self, repo_urls):
419        '''
420        Adds the repository locations to the job so that packages
421        can be fetched from them when needed. The repository list
422        needs to be a string list
423        Ex: job.add_repository(['http://blah1','http://blah2'])
424        '''
425        for repo_url in repo_urls:
426            self.pkgmgr.add_repository(repo_url)
427
428        # Fetch the packages' checksum file that contains the checksums
429        # of all the packages if it is not already fetched. The checksum
430        # is always fetched whenever a job is first started. This
431        # is not done in the job's constructor as we don't have the list of
432        # the repositories there (and obviously don't care about this file
433        # if we are not using the repos)
434        try:
435            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
436                                              packages.CHECKSUM_FILE)
437            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
438                                  checksum_file_path, use_checksum=False)
439        except error.PackageFetchError:
440            # packaging system might not be working in this case
441            # Silently fall back to the normal case
442            pass
443
444
445    def require_gcc(self):
446        """
447        Test whether gcc is installed on the machine.
448        """
449        # check if gcc is installed on the system.
450        try:
451            utils.system('which gcc')
452        except error.CmdError:
453            raise NotAvailableError('gcc is required by this job and is '
454                                    'not available on the system')
455
456
457    def setup_dep(self, deps):
458        """Set up the dependencies for this test.
459        deps is a list of libraries required for this test.
460        """
461        # Fetch the deps from the repositories and set them up.
462        for dep in deps:
463            dep_dir = os.path.join(self.autodir, 'deps', dep)
464            # Search for the dependency in the repositories if specified,
465            # else check locally.
466            try:
467                self.install_pkg(dep, 'dep', dep_dir)
468            except error.PackageInstallError:
469                # see if the dep is there locally
470                pass
471
472            # dep_dir might not exist if it is not fetched from the repos
473            if not os.path.exists(dep_dir):
474                raise error.TestError("Dependency %s does not exist" % dep)
475
476            os.chdir(dep_dir)
477            if execfile('%s.py' % dep, {}) is None:
478                logging.info('Dependency %s successfuly built', dep)
479
480
481    def _runtest(self, url, tag, timeout, args, dargs):
482        try:
483            l = lambda : test.runtest(self, url, tag, args, dargs)
484            pid = parallel.fork_start(self.resultdir, l)
485
486            if timeout:
487                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
488                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
489            else:
490                parallel.fork_waitfor(self.resultdir, pid)
491
492        except error.TestBaseException:
493            # These are already classified with an error type (exit_status)
494            raise
495        except error.JobError:
496            raise  # Caught further up and turned into an ABORT.
497        except Exception, e:
498            # Converts all other exceptions thrown by the test regardless
499            # of phase into a TestError(TestBaseException) subclass that
500            # reports them with their full stack trace.
501            raise error.UnhandledTestError(e)
502
503
504    def _run_test_base(self, url, *args, **dargs):
505        """
506        Prepares arguments and run functions to run_test and run_test_detail.
507
508        @param url A url that identifies the test to run.
509        @param tag An optional keyword argument that will be added to the
510            test and subdir name.
511        @param subdir_tag An optional keyword argument that will be added
512            to the subdir name.
513
514        @returns:
515                subdir: Test subdirectory
516                testname: Test name
517                group_func: Actual test run function
518                timeout: Test timeout
519        """
520        _group, testname = self.pkgmgr.get_package_name(url, 'test')
521        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
522        self._make_test_outputdir(subdir)
523
524        timeout = dargs.pop('timeout', None)
525        if timeout:
526            logging.debug('Test has timeout: %d sec.', timeout)
527
528        def log_warning(reason):
529            self.record("WARN", subdir, testname, reason)
530        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
531        def group_func():
532            try:
533                self._runtest(url, tag, timeout, args, dargs)
534            except error.TestBaseException, detail:
535                # The error is already classified, record it properly.
536                self.record(detail.exit_status, subdir, testname, str(detail))
537                raise
538            else:
539                self.record('GOOD', subdir, testname, 'completed successfully')
540
541        return (subdir, testname, group_func, timeout)
542
543
544    @_run_test_complete_on_exit
545    def run_test(self, url, *args, **dargs):
546        """
547        Summon a test object and run it.
548
549        @param url A url that identifies the test to run.
550        @param tag An optional keyword argument that will be added to the
551            test and subdir name.
552        @param subdir_tag An optional keyword argument that will be added
553            to the subdir name.
554
555        @returns True if the test passes, False otherwise.
556        """
557        (subdir, testname, group_func, timeout) = self._run_test_base(url,
558                                                                      *args,
559                                                                      **dargs)
560        try:
561            self._rungroup(subdir, testname, group_func, timeout)
562            return True
563        except error.TestBaseException:
564            return False
565        # Any other exception here will be given to the caller
566        #
567        # NOTE: The only exception possible from the control file here
568        # is error.JobError as _runtest() turns all others into an
569        # UnhandledTestError that is caught above.
570
571
572    @_run_test_complete_on_exit
573    def run_test_detail(self, url, *args, **dargs):
574        """
575        Summon a test object and run it, returning test status.
576
577        @param url A url that identifies the test to run.
578        @param tag An optional keyword argument that will be added to the
579            test and subdir name.
580        @param subdir_tag An optional keyword argument that will be added
581            to the subdir name.
582
583        @returns Test status
584        @see: client/common_lib/error.py, exit_status
585        """
586        (subdir, testname, group_func, timeout) = self._run_test_base(url,
587                                                                      *args,
588                                                                      **dargs)
589        try:
590            self._rungroup(subdir, testname, group_func, timeout)
591            return 'GOOD'
592        except error.TestBaseException, detail:
593            return detail.exit_status
594
595
596    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
597        """\
598        subdir:
599                name of the group
600        testname:
601                name of the test to run, or support step
602        function:
603                subroutine to run
604        *args:
605                arguments for the function
606
607        Returns the result of the passed in function
608        """
609
610        try:
611            optional_fields = None
612            if timeout:
613                optional_fields = {}
614                optional_fields['timeout'] = timeout
615            self.record('START', subdir, testname,
616                        optional_fields=optional_fields)
617
618            self._state.set('client', 'unexpected_reboot', (subdir, testname))
619            try:
620                result = function(*args, **dargs)
621                self.record('END GOOD', subdir, testname)
622                return result
623            except error.TestBaseException, e:
624                self.record('END %s' % e.exit_status, subdir, testname)
625                raise
626            except error.JobError, e:
627                self.record('END ABORT', subdir, testname)
628                raise
629            except Exception, e:
630                # This should only ever happen due to a bug in the given
631                # function's code.  The common case of being called by
632                # run_test() will never reach this.  If a control file called
633                # run_group() itself, bugs in its function will be caught
634                # here.
635                err_msg = str(e) + '\n' + traceback.format_exc()
636                self.record('END ERROR', subdir, testname, err_msg)
637                raise
638        finally:
639            self._state.discard('client', 'unexpected_reboot')
640
641
642    def run_group(self, function, tag=None, **dargs):
643        """
644        Run a function nested within a group level.
645
646        function:
647                Callable to run.
648        tag:
649                An optional tag name for the group.  If None (default)
650                function.__name__ will be used.
651        **dargs:
652                Named arguments for the function.
653        """
654        if tag:
655            name = tag
656        else:
657            name = function.__name__
658
659        try:
660            return self._rungroup(subdir=None, testname=name,
661                                  function=function, timeout=None, **dargs)
662        except (SystemExit, error.TestBaseException):
663            raise
664        # If there was a different exception, turn it into a TestError.
665        # It will be caught by step_engine or _run_step_fn.
666        except Exception, e:
667            raise error.UnhandledTestError(e)
668
669
670    def cpu_count(self):
671        return utils.count_cpus()  # use total system count
672
673
674    def start_reboot(self):
675        self.record('START', None, 'reboot')
676        self.record('GOOD', None, 'reboot.start')
677
678
679    def _record_reboot_failure(self, subdir, operation, status,
680                               running_id=None):
681        self.record("ABORT", subdir, operation, status)
682        if not running_id:
683            running_id = utils.running_os_ident()
684        kernel = {"kernel": running_id.split("::")[0]}
685        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
686
687
688    def _check_post_reboot(self, subdir, running_id=None):
689        """
690        Function to perform post boot checks such as if the system configuration
691        has changed across reboots (specifically, CPUs and partitions).
692
693        @param subdir: The subdir to use in the job.record call.
694        @param running_id: An optional running_id to include in the reboot
695            failure log message
696
697        @raise JobError: Raised if the current configuration does not match the
698            pre-reboot configuration.
699        """
700        # check to see if any partitions have changed
701        partition_list = partition_lib.get_partition_list(self,
702                                                          exclude_swap=False)
703        mount_info = partition_lib.get_mount_info(partition_list)
704        old_mount_info = self._state.get('client', 'mount_info')
705        if mount_info != old_mount_info:
706            new_entries = mount_info - old_mount_info
707            old_entries = old_mount_info - mount_info
708            description = ("mounted partitions are different after reboot "
709                           "(old entries: %s, new entries: %s)" %
710                           (old_entries, new_entries))
711            self._record_reboot_failure(subdir, "reboot.verify_config",
712                                        description, running_id=running_id)
713            raise error.JobError("Reboot failed: %s" % description)
714
715        # check to see if any CPUs have changed
716        cpu_count = utils.count_cpus()
717        old_count = self._state.get('client', 'cpu_count')
718        if cpu_count != old_count:
719            description = ('Number of CPUs changed after reboot '
720                           '(old count: %d, new count: %d)' %
721                           (old_count, cpu_count))
722            self._record_reboot_failure(subdir, 'reboot.verify_config',
723                                        description, running_id=running_id)
724            raise error.JobError('Reboot failed: %s' % description)
725
726
727    def partition(self, device, loop_size=0, mountpoint=None):
728        """
729        Work with a machine partition
730
731            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
732            @param mountpoint: Specify a directory to mount to. If not specified
733                               autotest tmp directory will be used.
734            @param loop_size: Size of loopback device (in MB). Defaults to 0.
735
736            @return: A L{client.bin.partition.partition} object
737        """
738
739        if not mountpoint:
740            mountpoint = self.tmpdir
741        return partition_lib.partition(self, device, loop_size, mountpoint)
742
743    @utils.deprecated
744    def filesystem(self, device, mountpoint=None, loop_size=0):
745        """ Same as partition
746
747        @deprecated: Use partition method instead
748        """
749        return self.partition(device, loop_size, mountpoint)
750
751
752    def enable_external_logging(self):
753        pass
754
755
756    def disable_external_logging(self):
757        pass
758
759
760    def reboot_setup(self):
761        # save the partition list and mount points, as well as the cpu count
762        partition_list = partition_lib.get_partition_list(self,
763                                                          exclude_swap=False)
764        mount_info = partition_lib.get_mount_info(partition_list)
765        self._state.set('client', 'mount_info', mount_info)
766        self._state.set('client', 'cpu_count', utils.count_cpus())
767
768
769    def reboot(self):
770        self.reboot_setup()
771        self.harness.run_reboot()
772
773        # HACK: using this as a module sometimes hangs shutdown, so if it's
774        # installed unload it first
775        utils.system("modprobe -r netconsole", ignore_status=True)
776
777        # sync first, so that a sync during shutdown doesn't time out
778        utils.system("sync; sync", ignore_status=True)
779
780        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
781        self.quit()
782
783
784    def noop(self, text):
785        logging.info("job: noop: " + text)
786
787
788    @_run_test_complete_on_exit
789    def parallel(self, *tasklist):
790        """Run tasks in parallel"""
791
792        pids = []
793        old_log_filename = self._logger.global_filename
794        for i, task in enumerate(tasklist):
795            assert isinstance(task, (tuple, list))
796            self._logger.global_filename = old_log_filename + (".%d" % i)
797            def task_func():
798                # stub out _record_indent with a process-local one
799                base_record_indent = self._record_indent
800                proc_local = self._job_state.property_factory(
801                    '_state', '_record_indent.%d' % os.getpid(),
802                    base_record_indent, namespace='client')
803                self.__class__._record_indent = proc_local
804                task[0](*task[1:])
805            pids.append(parallel.fork_start(self.resultdir, task_func))
806
807        old_log_path = os.path.join(self.resultdir, old_log_filename)
808        old_log = open(old_log_path, "a")
809        exceptions = []
810        for i, pid in enumerate(pids):
811            # wait for the task to finish
812            try:
813                parallel.fork_waitfor(self.resultdir, pid)
814            except Exception, e:
815                exceptions.append(e)
816            # copy the logs from the subtask into the main log
817            new_log_path = old_log_path + (".%d" % i)
818            if os.path.exists(new_log_path):
819                new_log = open(new_log_path)
820                old_log.write(new_log.read())
821                new_log.close()
822                old_log.flush()
823                os.remove(new_log_path)
824        old_log.close()
825
826        self._logger.global_filename = old_log_filename
827
828        # handle any exceptions raised by the parallel tasks
829        if exceptions:
830            msg = "%d task(s) failed in job.parallel" % len(exceptions)
831            raise error.JobError(msg)
832
833
834    def quit(self):
835        # XXX: should have a better name.
836        self.harness.run_pause()
837        raise error.JobContinue("more to come")
838
839
840    def complete(self, status):
841        """Write pending reports, clean up, and exit"""
842        # write out a job HTML report
843        try:
844            html_report.create_report(self.resultdir)
845        except Exception, e:
846            logging.error("Error writing job HTML report: %s", e)
847
848        # We are about to exit 'complete' so clean up the control file.
849        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
850        shutil.move(self._state_file, dest)
851
852        self.harness.run_complete()
853        self.disable_external_logging()
854        sys.exit(status)
855
856
857    def _load_state(self):
858        # grab any initial state and set up $CONTROL.state as the backing file
859        init_state_file = self.control + '.init.state'
860        self._state_file = self.control + '.state'
861        if os.path.exists(init_state_file):
862            shutil.move(init_state_file, self._state_file)
863        self._state.set_backing_file(self._state_file)
864
865        # initialize the state engine, if necessary
866        has_steps = self._state.has('client', 'steps')
867        if not self._is_continuation and has_steps:
868            raise RuntimeError('Loaded state can only contain client.steps if '
869                               'this is a continuation')
870
871        if not has_steps:
872            logging.debug('Initializing the state engine')
873            self._state.set('client', 'steps', [])
874
875
876    def handle_persistent_option(self, options, option_name):
877        """
878        Select option from command line or persistent state.
879        Store selected option to allow standalone client to continue
880        after reboot with previously selected options.
881        Priority:
882        1. explicitly specified via command line
883        2. stored in state file (if continuing job '-c')
884        3. default == None
885        """
886        option = None
887        cmd_line_option = getattr(options, option_name)
888        if cmd_line_option:
889            option = cmd_line_option
890            self._state.set('client', option_name, option)
891        else:
892            stored_option = self._state.get('client', option_name, None)
893            if stored_option:
894                option = stored_option
895        logging.debug('Persistent option %s now set to %s', option_name, option)
896        return option
897
898
899    def __create_step_tuple(self, fn, args, dargs):
900        # Legacy code passes in an array where the first arg is
901        # the function or its name.
902        if isinstance(fn, list):
903            assert(len(args) == 0)
904            assert(len(dargs) == 0)
905            args = fn[1:]
906            fn = fn[0]
907        # Pickling actual functions is hairy, thus we have to call
908        # them by name.  Unfortunately, this means only functions
909        # defined globally can be used as a next step.
910        if callable(fn):
911            fn = fn.__name__
912        if not isinstance(fn, types.StringTypes):
913            raise StepError("Next steps must be functions or "
914                            "strings containing the function name")
915        ancestry = copy.copy(self._current_step_ancestry)
916        return (ancestry, fn, args, dargs)
917
918
919    def next_step_append(self, fn, *args, **dargs):
920        """Define the next step and place it at the end"""
921        steps = self._state.get('client', 'steps')
922        steps.append(self.__create_step_tuple(fn, args, dargs))
923        self._state.set('client', 'steps', steps)
924
925
926    def next_step(self, fn, *args, **dargs):
927        """Create a new step and place it after any steps added
928        while running the current step but before any steps added in
929        previous steps"""
930        steps = self._state.get('client', 'steps')
931        steps.insert(self._next_step_index,
932                     self.__create_step_tuple(fn, args, dargs))
933        self._next_step_index += 1
934        self._state.set('client', 'steps', steps)
935
936
937    def next_step_prepend(self, fn, *args, **dargs):
938        """Insert a new step, executing first"""
939        steps = self._state.get('client', 'steps')
940        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
941        self._next_step_index += 1
942        self._state.set('client', 'steps', steps)
943
944
945
946    def _run_step_fn(self, local_vars, fn, args, dargs):
947        """Run a (step) function within the given context"""
948
949        local_vars['__args'] = args
950        local_vars['__dargs'] = dargs
951        try:
952            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
953            return local_vars['__ret']
954        except SystemExit:
955            raise  # Send error.JobContinue and JobComplete on up to runjob.
956        except error.TestNAError, detail:
957            self.record(detail.exit_status, None, fn, str(detail))
958        except Exception, detail:
959            raise error.UnhandledJobError(detail)
960
961
962    def _create_frame(self, global_vars, ancestry, fn_name):
963        """Set up the environment like it would have been when this
964        function was first defined.
965
966        Child step engine 'implementations' must have 'return locals()'
967        at end end of their steps.  Because of this, we can call the
968        parent function and get back all child functions (i.e. those
969        defined within it).
970
971        Unfortunately, the call stack of the function calling
972        job.next_step might have been deeper than the function it
973        added.  In order to make sure that the environment is what it
974        should be, we need to then pop off the frames we built until
975        we find the frame where the function was first defined."""
976
977        # The copies ensure that the parent frames are not modified
978        # while building child frames.  This matters if we then
979        # pop some frames in the next part of this function.
980        current_frame = copy.copy(global_vars)
981        frames = [current_frame]
982        for steps_fn_name in ancestry:
983            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
984            current_frame = copy.copy(ret)
985            frames.append(current_frame)
986
987        # Walk up the stack frames until we find the place fn_name was defined.
988        while len(frames) > 2:
989            if fn_name not in frames[-2]:
990                break
991            if frames[-2][fn_name] != frames[-1][fn_name]:
992                break
993            frames.pop()
994            ancestry.pop()
995
996        return (frames[-1], ancestry)
997
998
999    def _add_step_init(self, local_vars, current_function):
1000        """If the function returned a dictionary that includes a
1001        function named 'step_init', prepend it to our list of steps.
1002        This will only get run the first time a function with a nested
1003        use of the step engine is run."""
1004
1005        if (isinstance(local_vars, dict) and
1006            'step_init' in local_vars and
1007            callable(local_vars['step_init'])):
1008            # The init step is a child of the function
1009            # we were just running.
1010            self._current_step_ancestry.append(current_function)
1011            self.next_step_prepend('step_init')
1012
1013
1014    def step_engine(self):
1015        """The multi-run engine used when the control file defines step_init.
1016
1017        Does the next step.
1018        """
1019
1020        # Set up the environment and then interpret the control file.
1021        # Some control files will have code outside of functions,
1022        # which means we need to have our state engine initialized
1023        # before reading in the file.
1024        global_control_vars = {'job': self,
1025                               'args': self.args}
1026        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1027        try:
1028            execfile(self.control, global_control_vars, global_control_vars)
1029        except error.TestNAError, detail:
1030            self.record(detail.exit_status, None, self.control, str(detail))
1031        except SystemExit:
1032            raise  # Send error.JobContinue and JobComplete on up to runjob.
1033        except Exception, detail:
1034            # Syntax errors or other general Python exceptions coming out of
1035            # the top level of the control file itself go through here.
1036            raise error.UnhandledJobError(detail)
1037
1038        # If we loaded in a mid-job state file, then we presumably
1039        # know what steps we have yet to run.
1040        if not self._is_continuation:
1041            if 'step_init' in global_control_vars:
1042                self.next_step(global_control_vars['step_init'])
1043        else:
1044            # if last job failed due to unexpected reboot, record it as fail
1045            # so harness gets called
1046            last_job = self._state.get('client', 'unexpected_reboot', None)
1047            if last_job:
1048                subdir, testname = last_job
1049                self.record('FAIL', subdir, testname, 'unexpected reboot')
1050                self.record('END FAIL', subdir, testname)
1051
1052        # Iterate through the steps.  If we reboot, we'll simply
1053        # continue iterating on the next step.
1054        while len(self._state.get('client', 'steps')) > 0:
1055            steps = self._state.get('client', 'steps')
1056            (ancestry, fn_name, args, dargs) = steps.pop(0)
1057            self._state.set('client', 'steps', steps)
1058
1059            self._next_step_index = 0
1060            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1061            local_vars, self._current_step_ancestry = ret
1062            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1063            self._add_step_init(local_vars, fn_name)
1064
1065
1066    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1067        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1068                                   on_every_test)
1069
1070
1071    def add_sysinfo_logfile(self, file, on_every_test=False):
1072        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1073
1074
1075    def _add_sysinfo_loggable(self, loggable, on_every_test):
1076        if on_every_test:
1077            self.sysinfo.test_loggables.add(loggable)
1078        else:
1079            self.sysinfo.boot_loggables.add(loggable)
1080        self._save_sysinfo_state()
1081
1082
1083    def _load_sysinfo_state(self):
1084        state = self._state.get('client', 'sysinfo', None)
1085        if state:
1086            self.sysinfo.deserialize(state)
1087
1088
1089    def _save_sysinfo_state(self):
1090        state = self.sysinfo.serialize()
1091        self._state.set('client', 'sysinfo', state)
1092
1093
1094class disk_usage_monitor:
1095    def __init__(self, logging_func, device, max_mb_per_hour):
1096        self.func = logging_func
1097        self.device = device
1098        self.max_mb_per_hour = max_mb_per_hour
1099
1100
1101    def start(self):
1102        self.initial_space = utils.freespace(self.device)
1103        self.start_time = time.time()
1104
1105
1106    def stop(self):
1107        # if no maximum usage rate was set, we don't need to
1108        # generate any warnings
1109        if not self.max_mb_per_hour:
1110            return
1111
1112        final_space = utils.freespace(self.device)
1113        used_space = self.initial_space - final_space
1114        stop_time = time.time()
1115        total_time = stop_time - self.start_time
1116        # round up the time to one minute, to keep extremely short
1117        # tests from generating false positives due to short, badly
1118        # timed bursts of activity
1119        total_time = max(total_time, 60.0)
1120
1121        # determine the usage rate
1122        bytes_per_sec = used_space / total_time
1123        mb_per_sec = bytes_per_sec / 1024**2
1124        mb_per_hour = mb_per_sec * 60 * 60
1125
1126        if mb_per_hour > self.max_mb_per_hour:
1127            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1128            msg %= (self.device, mb_per_hour)
1129            self.func(msg)
1130
1131
1132    @classmethod
1133    def watch(cls, *monitor_args, **monitor_dargs):
1134        """ Generic decorator to wrap a function call with the
1135        standard create-monitor -> start -> call -> stop idiom."""
1136        def decorator(func):
1137            def watched_func(*args, **dargs):
1138                monitor = cls(*monitor_args, **monitor_dargs)
1139                monitor.start()
1140                try:
1141                    func(*args, **dargs)
1142                finally:
1143                    monitor.stop()
1144            return watched_func
1145        return decorator
1146
1147
1148def runjob(control, drop_caches, options):
1149    """
1150    Run a job using the given control file.
1151
1152    This is the main interface to this module.
1153
1154    @see base_job.__init__ for parameter info.
1155    """
1156    control = os.path.abspath(control)
1157    state = control + '.state'
1158    # Ensure state file is cleaned up before the job starts to run if autotest
1159    # is not running with the --continue flag
1160    if not options.cont and os.path.isfile(state):
1161        logging.debug('Cleaning up previously found state file')
1162        os.remove(state)
1163
1164    # instantiate the job object ready for the control file.
1165    myjob = None
1166    try:
1167        # Check that the control file is valid
1168        if not os.path.exists(control):
1169            raise error.JobError(control + ": control file not found")
1170
1171        # When continuing, the job is complete when there is no
1172        # state file, ensure we don't try and continue.
1173        if options.cont and not os.path.exists(state):
1174            raise error.JobComplete("all done")
1175
1176        myjob = job(control=control, drop_caches=drop_caches, options=options)
1177
1178        # Load in the users control file, may do any one of:
1179        #  1) execute in toto
1180        #  2) define steps, and select the first via next_step()
1181        myjob.step_engine()
1182
1183    except error.JobContinue:
1184        sys.exit(5)
1185
1186    except error.JobComplete:
1187        sys.exit(1)
1188
1189    except error.JobError, instance:
1190        logging.error("JOB ERROR: " + str(instance))
1191        if myjob:
1192            command = None
1193            if len(instance.args) > 1:
1194                command = instance.args[1]
1195                myjob.record('ABORT', None, command, str(instance))
1196            myjob.record('END ABORT', None, None, str(instance))
1197            assert myjob._record_indent == 0
1198            myjob.complete(1)
1199        else:
1200            sys.exit(1)
1201
1202    except Exception, e:
1203        # NOTE: job._run_step_fn and job.step_engine will turn things into
1204        # a JobError for us.  If we get here, its likely an autotest bug.
1205        msg = str(e) + '\n' + traceback.format_exc()
1206        logging.critical("JOB ERROR (autotest bug?): " + msg)
1207        if myjob:
1208            myjob.record('END ABORT', None, None, msg)
1209            assert myjob._record_indent == 0
1210            myjob.complete(1)
1211        else:
1212            sys.exit(1)
1213
1214    # If we get here, then we assume the job is complete and good.
1215    myjob.record('END GOOD', None, None)
1216    assert myjob._record_indent == 0
1217
1218    myjob.complete(0)
1219
1220
1221class job(base_client_job):
1222
1223    def __init__(self, *args, **kwargs):
1224        base_client_job.__init__(self, *args, **kwargs)
1225
1226
1227    def run_test(self, url, *args, **dargs):
1228        log_pauser = cros_logging.LogRotationPauser()
1229        passed = False
1230        try:
1231            log_pauser.begin()
1232            passed = base_client_job.run_test(self, url, *args, **dargs)
1233            if not passed:
1234                # Save the VM state immediately after the test failure.
1235                # This is a NOOP if the the test isn't running in a VM or
1236                # if the VM is not properly configured to save state.
1237                _group, testname = self.pkgmgr.get_package_name(url, 'test')
1238                now = datetime.now().strftime('%I:%M:%S.%f')
1239                checkpoint_name = '%s-%s' % (testname, now)
1240                utils.save_vm_state(checkpoint_name)
1241        finally:
1242            log_pauser.end()
1243        return passed
1244
1245
1246    def reboot(self):
1247        self.reboot_setup()
1248        self.harness.run_reboot()
1249
1250        # sync first, so that a sync during shutdown doesn't time out
1251        utils.system('sync; sync', ignore_status=True)
1252
1253        utils.system('reboot </dev/null >/dev/null 2>&1 &')
1254        self.quit()
1255
1256
1257    def require_gcc(self):
1258        return False
1259