• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8# pylint: disable=missing-docstring
9
10import copy
11from datetime import datetime
12import getpass
13import glob
14import logging
15import os
16import re
17import shutil
18import sys
19import time
20import traceback
21import types
22import weakref
23
24import common
25from autotest_lib.client.bin import client_logging_config
26from autotest_lib.client.bin import harness
27from autotest_lib.client.bin import local_host
28from autotest_lib.client.bin import parallel
29from autotest_lib.client.bin import partition as partition_lib
30from autotest_lib.client.bin import profilers
31from autotest_lib.client.bin import sysinfo
32from autotest_lib.client.bin import test
33from autotest_lib.client.bin import utils
34from autotest_lib.client.common_lib import barrier
35from autotest_lib.client.common_lib import base_job
36from autotest_lib.client.common_lib import control_data
37from autotest_lib.client.common_lib import error
38from autotest_lib.client.common_lib import global_config
39from autotest_lib.client.common_lib import logging_manager
40from autotest_lib.client.common_lib import packages
41from autotest_lib.client.cros import cros_logging
42from autotest_lib.client.tools import html_report
43
44GLOBAL_CONFIG = global_config.global_config
45
46LAST_BOOT_TAG = object()
47JOB_PREAMBLE = """
48from autotest_lib.client.common_lib.error import *
49from autotest_lib.client.bin.utils import *
50"""
51
52
53class StepError(error.AutotestError):
54    pass
55
56class NotAvailableError(error.AutotestError):
57    pass
58
59
60
61def _run_test_complete_on_exit(f):
62    """Decorator for job methods that automatically calls
63    self.harness.run_test_complete when the method exits, if appropriate."""
64    def wrapped(self, *args, **dargs):
65        try:
66            return f(self, *args, **dargs)
67        finally:
68            if self._logger.global_filename == 'status':
69                self.harness.run_test_complete()
70                if self.drop_caches:
71                    utils.drop_caches()
72    wrapped.__name__ = f.__name__
73    wrapped.__doc__ = f.__doc__
74    wrapped.__dict__.update(f.__dict__)
75    return wrapped
76
77
78class status_indenter(base_job.status_indenter):
79    """Provide a status indenter that is backed by job._record_prefix."""
80    def __init__(self, job_):
81        self._job = weakref.proxy(job_)  # avoid a circular reference
82
83
84    @property
85    def indent(self):
86        return self._job._record_indent
87
88
89    def increment(self):
90        self._job._record_indent += 1
91
92
93    def decrement(self):
94        self._job._record_indent -= 1
95
96
97class base_client_job(base_job.base_job):
98    """The client-side concrete implementation of base_job.
99
100    Optional properties provided by this implementation:
101        control
102        harness
103    """
104
105    _WARNING_DISABLE_DELAY = 5
106
107    # _record_indent is a persistent property, but only on the client
108    _job_state = base_job.base_job._job_state
109    _record_indent = _job_state.property_factory(
110        '_state', '_record_indent', 0, namespace='client')
111    _max_disk_usage_rate = _job_state.property_factory(
112        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
113
114
115    def __init__(self, control, options, drop_caches=True):
116        """
117        Prepare a client side job object.
118
119        @param control: The control file (pathname of).
120        @param options: an object which includes:
121                jobtag: The job tag string (eg "default").
122                cont: If this is the continuation of this job.
123                harness_type: An alternative server harness.  [None]
124                use_external_logging: If true, the enable_external_logging
125                          method will be called during construction.  [False]
126        @param drop_caches: If true, utils.drop_caches() is called before and
127                between all tests.  [True]
128        """
129        super(base_client_job, self).__init__(options=options)
130        self._pre_record_init(control, options)
131        try:
132            self._post_record_init(control, options, drop_caches)
133        except Exception, err:
134            self.record(
135                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
136                    str(err))
137            raise
138
139
140    @classmethod
141    def _get_environ_autodir(cls):
142        return os.environ['AUTODIR']
143
144
145    @classmethod
146    def _find_base_directories(cls):
147        """
148        Determine locations of autodir and clientdir (which are the same)
149        using os.environ. Serverdir does not exist in this context.
150        """
151        autodir = clientdir = cls._get_environ_autodir()
152        return autodir, clientdir, None
153
154
155    @classmethod
156    def _parse_args(cls, args):
157        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
158
159
160    def _find_resultdir(self, options):
161        """
162        Determine the directory for storing results. On a client this is
163        always <autodir>/results/<tag>, where tag is passed in on the command
164        line as an option.
165        """
166        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
167                                                           'output_dir',
168                                                            default="")
169        if options.output_dir:
170            basedir = options.output_dir
171        elif output_dir_config:
172            basedir = output_dir_config
173        else:
174            basedir = self.autodir
175
176        return os.path.join(basedir, 'results', options.tag)
177
178
179    def _get_status_logger(self):
180        """Return a reference to the status logger."""
181        return self._logger
182
183
184    def _pre_record_init(self, control, options):
185        """
186        Initialization function that should peform ONLY the required
187        setup so that the self.record() method works.
188
189        As of now self.record() needs self.resultdir, self._group_level,
190        self.harness and of course self._logger.
191        """
192        if not options.cont:
193            self._cleanup_debugdir_files()
194            self._cleanup_results_dir()
195
196        logging_manager.configure_logging(
197            client_logging_config.ClientLoggingConfig(),
198            results_dir=self.resultdir,
199            verbose=options.verbose)
200        logging.info('Writing results to %s', self.resultdir)
201
202        # init_group_level needs the state
203        self.control = os.path.realpath(control)
204        self._is_continuation = options.cont
205        self._current_step_ancestry = []
206        self._next_step_index = 0
207        self._load_state()
208
209        _harness = self.handle_persistent_option(options, 'harness')
210        _harness_args = self.handle_persistent_option(options, 'harness_args')
211
212        self.harness = harness.select(_harness, self, _harness_args)
213
214        if self.control:
215            parsed_control = control_data.parse_control(
216                    self.control, raise_warnings=False)
217            self.fast = parsed_control.fast
218
219        # set up the status logger
220        def client_job_record_hook(entry):
221            msg_tag = ''
222            if '.' in self._logger.global_filename:
223                msg_tag = self._logger.global_filename.split('.', 1)[1]
224            # send the entry to the job harness
225            message = '\n'.join([entry.message] + entry.extra_message_lines)
226            rendered_entry = self._logger.render_entry(entry)
227            self.harness.test_status_detail(entry.status_code, entry.subdir,
228                                            entry.operation, message, msg_tag,
229                                            entry.fields)
230            self.harness.test_status(rendered_entry, msg_tag)
231            # send the entry to stdout, if it's enabled
232            logging.info(rendered_entry)
233        self._logger = base_job.status_logger(
234            self, status_indenter(self), record_hook=client_job_record_hook)
235
236
237    def _post_record_init(self, control, options, drop_caches):
238        """
239        Perform job initialization not required by self.record().
240        """
241        self._init_drop_caches(drop_caches)
242
243        self._init_packages()
244
245        self.sysinfo = sysinfo.sysinfo(self.resultdir)
246        self._load_sysinfo_state()
247
248        if not options.cont:
249            download = os.path.join(self.testdir, 'download')
250            if not os.path.exists(download):
251                os.mkdir(download)
252
253            shutil.copyfile(self.control,
254                            os.path.join(self.resultdir, 'control'))
255
256        self.control = control
257
258        self.logging = logging_manager.get_logging_manager(
259                manage_stdout_and_stderr=True, redirect_fds=True)
260        self.logging.start_logging()
261
262        self.profilers = profilers.profilers(self)
263
264        self.machines = [options.hostname]
265        self.machine_dict_list = [{'hostname' : options.hostname}]
266        # Client side tests should always run the same whether or not they are
267        # running in the lab.
268        self.in_lab = False
269        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
270
271        self.args = []
272        if options.args:
273            self.args = self._parse_args(options.args)
274
275        if options.user:
276            self.user = options.user
277        else:
278            self.user = getpass.getuser()
279
280        self.sysinfo.log_per_reboot_data()
281
282        if not options.cont:
283            self.record('START', None, None)
284
285        self.harness.run_start()
286
287        if options.log:
288            self.enable_external_logging()
289
290        self.num_tests_run = None
291        self.num_tests_failed = None
292
293        self.warning_loggers = None
294        self.warning_manager = None
295
296
297    def _init_drop_caches(self, drop_caches):
298        """
299        Perform the drop caches initialization.
300        """
301        self.drop_caches_between_iterations = (
302                                    GLOBAL_CONFIG.get_config_value('CLIENT',
303                                    'drop_caches_between_iterations',
304                                    type=bool, default=True))
305        self.drop_caches = drop_caches
306        if self.drop_caches:
307            utils.drop_caches()
308
309
310    def _init_packages(self):
311        """
312        Perform the packages support initialization.
313        """
314        self.pkgmgr = packages.PackageManager(
315            self.autodir, run_function_dargs={'timeout':3600})
316
317
318    def _cleanup_results_dir(self):
319        """Delete everything in resultsdir"""
320        assert os.path.exists(self.resultdir)
321        list_files = glob.glob('%s/*' % self.resultdir)
322        for f in list_files:
323            if os.path.isdir(f):
324                shutil.rmtree(f)
325            elif os.path.isfile(f):
326                os.remove(f)
327
328
329    def _cleanup_debugdir_files(self):
330        """
331        Delete any leftover debugdir files
332        """
333        list_files = glob.glob("/tmp/autotest_results_dir.*")
334        for f in list_files:
335            os.remove(f)
336
337
338    def disable_warnings(self, warning_type):
339        self.record("INFO", None, None,
340                    "disabling %s warnings" % warning_type,
341                    {"warnings.disable": warning_type})
342        time.sleep(self._WARNING_DISABLE_DELAY)
343
344
345    def enable_warnings(self, warning_type):
346        time.sleep(self._WARNING_DISABLE_DELAY)
347        self.record("INFO", None, None,
348                    "enabling %s warnings" % warning_type,
349                    {"warnings.enable": warning_type})
350
351
352    def monitor_disk_usage(self, max_rate):
353        """\
354        Signal that the job should monitor disk space usage on /
355        and generate a warning if a test uses up disk space at a
356        rate exceeding 'max_rate'.
357
358        Parameters:
359             max_rate - the maximium allowed rate of disk consumption
360                        during a test, in MB/hour, or 0 to indicate
361                        no limit.
362        """
363        self._max_disk_usage_rate = max_rate
364
365
366    def control_get(self):
367        return self.control
368
369
370    def control_set(self, control):
371        self.control = os.path.abspath(control)
372
373
374    def harness_select(self, which, harness_args):
375        self.harness = harness.select(which, self, harness_args)
376
377
378    def setup_dirs(self, results_dir, tmp_dir):
379        if not tmp_dir:
380            tmp_dir = os.path.join(self.tmpdir, 'build')
381        if not os.path.exists(tmp_dir):
382            os.mkdir(tmp_dir)
383        if not os.path.isdir(tmp_dir):
384            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
385            raise ValueError(e_msg)
386
387        # We label the first build "build" and then subsequent ones
388        # as "build.2", "build.3", etc. Whilst this is a little bit
389        # inconsistent, 99.9% of jobs will only have one build
390        # (that's not done as kernbench, sparse, or buildtest),
391        # so it works out much cleaner. One of life's compromises.
392        if not results_dir:
393            results_dir = os.path.join(self.resultdir, 'build')
394            i = 2
395            while os.path.exists(results_dir):
396                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
397                i += 1
398        if not os.path.exists(results_dir):
399            os.mkdir(results_dir)
400
401        return (results_dir, tmp_dir)
402
403
404    def barrier(self, *args, **kwds):
405        """Create a barrier object"""
406        return barrier.barrier(*args, **kwds)
407
408
409    def install_pkg(self, name, pkg_type, install_dir):
410        '''
411        This method is a simple wrapper around the actual package
412        installation method in the Packager class. This is used
413        internally by the profilers, deps and tests code.
414        name : name of the package (ex: sleeptest, dbench etc.)
415        pkg_type : Type of the package (ex: test, dep etc.)
416        install_dir : The directory in which the source is actually
417                      untarred into. (ex: client/profilers/<name> for profilers)
418        '''
419        if self.pkgmgr.repositories:
420            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
421
422
423    def add_repository(self, repo_urls):
424        '''
425        Adds the repository locations to the job so that packages
426        can be fetched from them when needed. The repository list
427        needs to be a string list
428        Ex: job.add_repository(['http://blah1','http://blah2'])
429        '''
430        for repo_url in repo_urls:
431            self.pkgmgr.add_repository(repo_url)
432
433        # Fetch the packages' checksum file that contains the checksums
434        # of all the packages if it is not already fetched. The checksum
435        # is always fetched whenever a job is first started. This
436        # is not done in the job's constructor as we don't have the list of
437        # the repositories there (and obviously don't care about this file
438        # if we are not using the repos)
439        try:
440            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
441                                              packages.CHECKSUM_FILE)
442            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
443                                  checksum_file_path, use_checksum=False)
444        except error.PackageFetchError:
445            # packaging system might not be working in this case
446            # Silently fall back to the normal case
447            pass
448
449
450    def require_gcc(self):
451        """
452        Test whether gcc is installed on the machine.
453        """
454        # check if gcc is installed on the system.
455        try:
456            utils.system('which gcc')
457        except error.CmdError:
458            raise NotAvailableError('gcc is required by this job and is '
459                                    'not available on the system')
460
461
462    def setup_dep(self, deps):
463        """Set up the dependencies for this test.
464        deps is a list of libraries required for this test.
465        """
466        # Fetch the deps from the repositories and set them up.
467        for dep in deps:
468            dep_dir = os.path.join(self.autodir, 'deps', dep)
469            # Search for the dependency in the repositories if specified,
470            # else check locally.
471            try:
472                self.install_pkg(dep, 'dep', dep_dir)
473            except error.PackageInstallError:
474                # see if the dep is there locally
475                pass
476
477            # dep_dir might not exist if it is not fetched from the repos
478            if not os.path.exists(dep_dir):
479                raise error.TestError("Dependency %s does not exist" % dep)
480
481            os.chdir(dep_dir)
482            if execfile('%s.py' % dep, {}) is None:
483                logging.info('Dependency %s successfuly built', dep)
484
485
486    def _runtest(self, url, tag, timeout, args, dargs):
487        try:
488            l = lambda : test.runtest(self, url, tag, args, dargs)
489            pid = parallel.fork_start(self.resultdir, l)
490
491            if timeout:
492                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
493                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
494            else:
495                parallel.fork_waitfor(self.resultdir, pid)
496
497        except error.TestBaseException:
498            # These are already classified with an error type (exit_status)
499            raise
500        except error.JobError:
501            raise  # Caught further up and turned into an ABORT.
502        except Exception, e:
503            # Converts all other exceptions thrown by the test regardless
504            # of phase into a TestError(TestBaseException) subclass that
505            # reports them with their full stack trace.
506            raise error.UnhandledTestError(e)
507
508
509    def _run_test_base(self, url, *args, **dargs):
510        """
511        Prepares arguments and run functions to run_test and run_test_detail.
512
513        @param url A url that identifies the test to run.
514        @param tag An optional keyword argument that will be added to the
515            test and subdir name.
516        @param subdir_tag An optional keyword argument that will be added
517            to the subdir name.
518
519        @returns:
520                subdir: Test subdirectory
521                testname: Test name
522                group_func: Actual test run function
523                timeout: Test timeout
524        """
525        _group, testname = self.pkgmgr.get_package_name(url, 'test')
526        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
527        self._make_test_outputdir(subdir)
528
529        timeout = dargs.pop('timeout', None)
530        if timeout:
531            logging.debug('Test has timeout: %d sec.', timeout)
532
533        def log_warning(reason):
534            self.record("WARN", subdir, testname, reason)
535        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
536        def group_func():
537            try:
538                self._runtest(url, tag, timeout, args, dargs)
539            except error.TestBaseException, detail:
540                # The error is already classified, record it properly.
541                self.record(detail.exit_status, subdir, testname, str(detail))
542                raise
543            else:
544                self.record('GOOD', subdir, testname, 'completed successfully')
545
546        return (subdir, testname, group_func, timeout)
547
548
549    @_run_test_complete_on_exit
550    def run_test(self, url, *args, **dargs):
551        """
552        Summon a test object and run it.
553
554        @param url A url that identifies the test to run.
555        @param tag An optional keyword argument that will be added to the
556            test and subdir name.
557        @param subdir_tag An optional keyword argument that will be added
558            to the subdir name.
559
560        @returns True if the test passes, False otherwise.
561        """
562        (subdir, testname, group_func, timeout) = self._run_test_base(url,
563                                                                      *args,
564                                                                      **dargs)
565        try:
566            self._rungroup(subdir, testname, group_func, timeout)
567            return True
568        except error.TestBaseException:
569            return False
570        # Any other exception here will be given to the caller
571        #
572        # NOTE: The only exception possible from the control file here
573        # is error.JobError as _runtest() turns all others into an
574        # UnhandledTestError that is caught above.
575
576
577    @_run_test_complete_on_exit
578    def run_test_detail(self, url, *args, **dargs):
579        """
580        Summon a test object and run it, returning test status.
581
582        @param url A url that identifies the test to run.
583        @param tag An optional keyword argument that will be added to the
584            test and subdir name.
585        @param subdir_tag An optional keyword argument that will be added
586            to the subdir name.
587
588        @returns Test status
589        @see: client/common_lib/error.py, exit_status
590        """
591        (subdir, testname, group_func, timeout) = self._run_test_base(url,
592                                                                      *args,
593                                                                      **dargs)
594        try:
595            self._rungroup(subdir, testname, group_func, timeout)
596            return 'GOOD'
597        except error.TestBaseException, detail:
598            return detail.exit_status
599
600
601    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
602        """\
603        subdir:
604                name of the group
605        testname:
606                name of the test to run, or support step
607        function:
608                subroutine to run
609        *args:
610                arguments for the function
611
612        Returns the result of the passed in function
613        """
614
615        try:
616            optional_fields = None
617            if timeout:
618                optional_fields = {}
619                optional_fields['timeout'] = timeout
620            self.record('START', subdir, testname,
621                        optional_fields=optional_fields)
622
623            self._state.set('client', 'unexpected_reboot', (subdir, testname))
624            try:
625                result = function(*args, **dargs)
626                self.record('END GOOD', subdir, testname)
627                return result
628            except error.TestBaseException, e:
629                self.record('END %s' % e.exit_status, subdir, testname)
630                raise
631            except error.JobError, e:
632                self.record('END ABORT', subdir, testname)
633                raise
634            except Exception, e:
635                # This should only ever happen due to a bug in the given
636                # function's code.  The common case of being called by
637                # run_test() will never reach this.  If a control file called
638                # run_group() itself, bugs in its function will be caught
639                # here.
640                err_msg = str(e) + '\n' + traceback.format_exc()
641                self.record('END ERROR', subdir, testname, err_msg)
642                raise
643        finally:
644            self._state.discard('client', 'unexpected_reboot')
645
646
647    def run_group(self, function, tag=None, **dargs):
648        """
649        Run a function nested within a group level.
650
651        function:
652                Callable to run.
653        tag:
654                An optional tag name for the group.  If None (default)
655                function.__name__ will be used.
656        **dargs:
657                Named arguments for the function.
658        """
659        if tag:
660            name = tag
661        else:
662            name = function.__name__
663
664        try:
665            return self._rungroup(subdir=None, testname=name,
666                                  function=function, timeout=None, **dargs)
667        except (SystemExit, error.TestBaseException):
668            raise
669        # If there was a different exception, turn it into a TestError.
670        # It will be caught by step_engine or _run_step_fn.
671        except Exception, e:
672            raise error.UnhandledTestError(e)
673
674
675    def cpu_count(self):
676        return utils.count_cpus()  # use total system count
677
678
679    def start_reboot(self):
680        self.record('START', None, 'reboot')
681        self.record('GOOD', None, 'reboot.start')
682
683
684    def _record_reboot_failure(self, subdir, operation, status,
685                               running_id=None):
686        self.record("ABORT", subdir, operation, status)
687        if not running_id:
688            running_id = utils.running_os_ident()
689        kernel = {"kernel": running_id.split("::")[0]}
690        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
691
692
693    def _check_post_reboot(self, subdir, running_id=None):
694        """
695        Function to perform post boot checks such as if the system configuration
696        has changed across reboots (specifically, CPUs and partitions).
697
698        @param subdir: The subdir to use in the job.record call.
699        @param running_id: An optional running_id to include in the reboot
700            failure log message
701
702        @raise JobError: Raised if the current configuration does not match the
703            pre-reboot configuration.
704        """
705        # check to see if any partitions have changed
706        partition_list = partition_lib.get_partition_list(self,
707                                                          exclude_swap=False)
708        mount_info = partition_lib.get_mount_info(partition_list)
709        old_mount_info = self._state.get('client', 'mount_info')
710        if mount_info != old_mount_info:
711            new_entries = mount_info - old_mount_info
712            old_entries = old_mount_info - mount_info
713            description = ("mounted partitions are different after reboot "
714                           "(old entries: %s, new entries: %s)" %
715                           (old_entries, new_entries))
716            self._record_reboot_failure(subdir, "reboot.verify_config",
717                                        description, running_id=running_id)
718            raise error.JobError("Reboot failed: %s" % description)
719
720        # check to see if any CPUs have changed
721        cpu_count = utils.count_cpus()
722        old_count = self._state.get('client', 'cpu_count')
723        if cpu_count != old_count:
724            description = ('Number of CPUs changed after reboot '
725                           '(old count: %d, new count: %d)' %
726                           (old_count, cpu_count))
727            self._record_reboot_failure(subdir, 'reboot.verify_config',
728                                        description, running_id=running_id)
729            raise error.JobError('Reboot failed: %s' % description)
730
731
732    def partition(self, device, loop_size=0, mountpoint=None):
733        """
734        Work with a machine partition
735
736            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
737            @param mountpoint: Specify a directory to mount to. If not specified
738                               autotest tmp directory will be used.
739            @param loop_size: Size of loopback device (in MB). Defaults to 0.
740
741            @return: A L{client.bin.partition.partition} object
742        """
743
744        if not mountpoint:
745            mountpoint = self.tmpdir
746        return partition_lib.partition(self, device, loop_size, mountpoint)
747
748    @utils.deprecated
749    def filesystem(self, device, mountpoint=None, loop_size=0):
750        """ Same as partition
751
752        @deprecated: Use partition method instead
753        """
754        return self.partition(device, loop_size, mountpoint)
755
756
757    def enable_external_logging(self):
758        pass
759
760
761    def disable_external_logging(self):
762        pass
763
764
765    def reboot_setup(self):
766        # save the partition list and mount points, as well as the cpu count
767        partition_list = partition_lib.get_partition_list(self,
768                                                          exclude_swap=False)
769        mount_info = partition_lib.get_mount_info(partition_list)
770        self._state.set('client', 'mount_info', mount_info)
771        self._state.set('client', 'cpu_count', utils.count_cpus())
772
773
774    def reboot(self):
775        self.reboot_setup()
776        self.harness.run_reboot()
777
778        # HACK: using this as a module sometimes hangs shutdown, so if it's
779        # installed unload it first
780        utils.system("modprobe -r netconsole", ignore_status=True)
781
782        # sync first, so that a sync during shutdown doesn't time out
783        utils.system("sync; sync", ignore_status=True)
784
785        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
786        self.quit()
787
788
789    def noop(self, text):
790        logging.info("job: noop: " + text)
791
792
793    @_run_test_complete_on_exit
794    def parallel(self, *tasklist):
795        """Run tasks in parallel"""
796
797        pids = []
798        old_log_filename = self._logger.global_filename
799        for i, task in enumerate(tasklist):
800            assert isinstance(task, (tuple, list))
801            self._logger.global_filename = old_log_filename + (".%d" % i)
802            def task_func():
803                # stub out _record_indent with a process-local one
804                base_record_indent = self._record_indent
805                proc_local = self._job_state.property_factory(
806                    '_state', '_record_indent.%d' % os.getpid(),
807                    base_record_indent, namespace='client')
808                self.__class__._record_indent = proc_local
809                task[0](*task[1:])
810            pids.append(parallel.fork_start(self.resultdir, task_func))
811
812        old_log_path = os.path.join(self.resultdir, old_log_filename)
813        old_log = open(old_log_path, "a")
814        exceptions = []
815        for i, pid in enumerate(pids):
816            # wait for the task to finish
817            try:
818                parallel.fork_waitfor(self.resultdir, pid)
819            except Exception, e:
820                exceptions.append(e)
821            # copy the logs from the subtask into the main log
822            new_log_path = old_log_path + (".%d" % i)
823            if os.path.exists(new_log_path):
824                new_log = open(new_log_path)
825                old_log.write(new_log.read())
826                new_log.close()
827                old_log.flush()
828                os.remove(new_log_path)
829        old_log.close()
830
831        self._logger.global_filename = old_log_filename
832
833        # handle any exceptions raised by the parallel tasks
834        if exceptions:
835            msg = "%d task(s) failed in job.parallel" % len(exceptions)
836            raise error.JobError(msg)
837
838
839    def quit(self):
840        # XXX: should have a better name.
841        self.harness.run_pause()
842        raise error.JobContinue("more to come")
843
844
845    def complete(self, status):
846        """Write pending reports, clean up, and exit"""
847        # write out a job HTML report
848        try:
849            html_report.create_report(self.resultdir)
850        except Exception, e:
851            logging.error("Error writing job HTML report: %s", e)
852
853        # We are about to exit 'complete' so clean up the control file.
854        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
855        shutil.move(self._state_file, dest)
856
857        self.harness.run_complete()
858        self.disable_external_logging()
859        sys.exit(status)
860
861
862    def _load_state(self):
863        # grab any initial state and set up $CONTROL.state as the backing file
864        init_state_file = self.control + '.init.state'
865        self._state_file = self.control + '.state'
866        if os.path.exists(init_state_file):
867            shutil.move(init_state_file, self._state_file)
868        self._state.set_backing_file(self._state_file)
869
870        # initialize the state engine, if necessary
871        has_steps = self._state.has('client', 'steps')
872        if not self._is_continuation and has_steps:
873            raise RuntimeError('Loaded state can only contain client.steps if '
874                               'this is a continuation')
875
876        if not has_steps:
877            logging.debug('Initializing the state engine')
878            self._state.set('client', 'steps', [])
879
880
881    def handle_persistent_option(self, options, option_name):
882        """
883        Select option from command line or persistent state.
884        Store selected option to allow standalone client to continue
885        after reboot with previously selected options.
886        Priority:
887        1. explicitly specified via command line
888        2. stored in state file (if continuing job '-c')
889        3. default == None
890        """
891        option = None
892        cmd_line_option = getattr(options, option_name)
893        if cmd_line_option:
894            option = cmd_line_option
895            self._state.set('client', option_name, option)
896        else:
897            stored_option = self._state.get('client', option_name, None)
898            if stored_option:
899                option = stored_option
900        logging.debug('Persistent option %s now set to %s', option_name, option)
901        return option
902
903
904    def __create_step_tuple(self, fn, args, dargs):
905        # Legacy code passes in an array where the first arg is
906        # the function or its name.
907        if isinstance(fn, list):
908            assert(len(args) == 0)
909            assert(len(dargs) == 0)
910            args = fn[1:]
911            fn = fn[0]
912        # Pickling actual functions is hairy, thus we have to call
913        # them by name.  Unfortunately, this means only functions
914        # defined globally can be used as a next step.
915        if callable(fn):
916            fn = fn.__name__
917        if not isinstance(fn, types.StringTypes):
918            raise StepError("Next steps must be functions or "
919                            "strings containing the function name")
920        ancestry = copy.copy(self._current_step_ancestry)
921        return (ancestry, fn, args, dargs)
922
923
924    def next_step_append(self, fn, *args, **dargs):
925        """Define the next step and place it at the end"""
926        steps = self._state.get('client', 'steps')
927        steps.append(self.__create_step_tuple(fn, args, dargs))
928        self._state.set('client', 'steps', steps)
929
930
931    def next_step(self, fn, *args, **dargs):
932        """Create a new step and place it after any steps added
933        while running the current step but before any steps added in
934        previous steps"""
935        steps = self._state.get('client', 'steps')
936        steps.insert(self._next_step_index,
937                     self.__create_step_tuple(fn, args, dargs))
938        self._next_step_index += 1
939        self._state.set('client', 'steps', steps)
940
941
942    def next_step_prepend(self, fn, *args, **dargs):
943        """Insert a new step, executing first"""
944        steps = self._state.get('client', 'steps')
945        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
946        self._next_step_index += 1
947        self._state.set('client', 'steps', steps)
948
949
950
951    def _run_step_fn(self, local_vars, fn, args, dargs):
952        """Run a (step) function within the given context"""
953
954        local_vars['__args'] = args
955        local_vars['__dargs'] = dargs
956        try:
957            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
958            return local_vars['__ret']
959        except SystemExit:
960            raise  # Send error.JobContinue and JobComplete on up to runjob.
961        except error.TestNAError, detail:
962            self.record(detail.exit_status, None, fn, str(detail))
963        except Exception, detail:
964            raise error.UnhandledJobError(detail)
965
966
967    def _create_frame(self, global_vars, ancestry, fn_name):
968        """Set up the environment like it would have been when this
969        function was first defined.
970
971        Child step engine 'implementations' must have 'return locals()'
972        at end end of their steps.  Because of this, we can call the
973        parent function and get back all child functions (i.e. those
974        defined within it).
975
976        Unfortunately, the call stack of the function calling
977        job.next_step might have been deeper than the function it
978        added.  In order to make sure that the environment is what it
979        should be, we need to then pop off the frames we built until
980        we find the frame where the function was first defined."""
981
982        # The copies ensure that the parent frames are not modified
983        # while building child frames.  This matters if we then
984        # pop some frames in the next part of this function.
985        current_frame = copy.copy(global_vars)
986        frames = [current_frame]
987        for steps_fn_name in ancestry:
988            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
989            current_frame = copy.copy(ret)
990            frames.append(current_frame)
991
992        # Walk up the stack frames until we find the place fn_name was defined.
993        while len(frames) > 2:
994            if fn_name not in frames[-2]:
995                break
996            if frames[-2][fn_name] != frames[-1][fn_name]:
997                break
998            frames.pop()
999            ancestry.pop()
1000
1001        return (frames[-1], ancestry)
1002
1003
1004    def _add_step_init(self, local_vars, current_function):
1005        """If the function returned a dictionary that includes a
1006        function named 'step_init', prepend it to our list of steps.
1007        This will only get run the first time a function with a nested
1008        use of the step engine is run."""
1009
1010        if (isinstance(local_vars, dict) and
1011            'step_init' in local_vars and
1012            callable(local_vars['step_init'])):
1013            # The init step is a child of the function
1014            # we were just running.
1015            self._current_step_ancestry.append(current_function)
1016            self.next_step_prepend('step_init')
1017
1018
1019    def step_engine(self):
1020        """The multi-run engine used when the control file defines step_init.
1021
1022        Does the next step.
1023        """
1024
1025        # Set up the environment and then interpret the control file.
1026        # Some control files will have code outside of functions,
1027        # which means we need to have our state engine initialized
1028        # before reading in the file.
1029        global_control_vars = {'job': self,
1030                               'args': self.args}
1031        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1032        try:
1033            execfile(self.control, global_control_vars, global_control_vars)
1034        except error.TestNAError, detail:
1035            self.record(detail.exit_status, None, self.control, str(detail))
1036        except SystemExit:
1037            raise  # Send error.JobContinue and JobComplete on up to runjob.
1038        except Exception, detail:
1039            # Syntax errors or other general Python exceptions coming out of
1040            # the top level of the control file itself go through here.
1041            raise error.UnhandledJobError(detail)
1042
1043        # If we loaded in a mid-job state file, then we presumably
1044        # know what steps we have yet to run.
1045        if not self._is_continuation:
1046            if 'step_init' in global_control_vars:
1047                self.next_step(global_control_vars['step_init'])
1048        else:
1049            # if last job failed due to unexpected reboot, record it as fail
1050            # so harness gets called
1051            last_job = self._state.get('client', 'unexpected_reboot', None)
1052            if last_job:
1053                subdir, testname = last_job
1054                self.record('FAIL', subdir, testname, 'unexpected reboot')
1055                self.record('END FAIL', subdir, testname)
1056
1057        # Iterate through the steps.  If we reboot, we'll simply
1058        # continue iterating on the next step.
1059        while len(self._state.get('client', 'steps')) > 0:
1060            steps = self._state.get('client', 'steps')
1061            (ancestry, fn_name, args, dargs) = steps.pop(0)
1062            self._state.set('client', 'steps', steps)
1063
1064            self._next_step_index = 0
1065            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1066            local_vars, self._current_step_ancestry = ret
1067            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1068            self._add_step_init(local_vars, fn_name)
1069
1070
1071    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1072        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1073                                   on_every_test)
1074
1075
1076    def add_sysinfo_logfile(self, file, on_every_test=False):
1077        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1078
1079
1080    def _add_sysinfo_loggable(self, loggable, on_every_test):
1081        if on_every_test:
1082            self.sysinfo.test_loggables.add(loggable)
1083        else:
1084            self.sysinfo.boot_loggables.add(loggable)
1085        self._save_sysinfo_state()
1086
1087
1088    def _load_sysinfo_state(self):
1089        state = self._state.get('client', 'sysinfo', None)
1090        if state:
1091            self.sysinfo.deserialize(state)
1092
1093
1094    def _save_sysinfo_state(self):
1095        state = self.sysinfo.serialize()
1096        self._state.set('client', 'sysinfo', state)
1097
1098
1099class disk_usage_monitor:
1100    def __init__(self, logging_func, device, max_mb_per_hour):
1101        self.func = logging_func
1102        self.device = device
1103        self.max_mb_per_hour = max_mb_per_hour
1104
1105
1106    def start(self):
1107        self.initial_space = utils.freespace(self.device)
1108        self.start_time = time.time()
1109
1110
1111    def stop(self):
1112        # if no maximum usage rate was set, we don't need to
1113        # generate any warnings
1114        if not self.max_mb_per_hour:
1115            return
1116
1117        final_space = utils.freespace(self.device)
1118        used_space = self.initial_space - final_space
1119        stop_time = time.time()
1120        total_time = stop_time - self.start_time
1121        # round up the time to one minute, to keep extremely short
1122        # tests from generating false positives due to short, badly
1123        # timed bursts of activity
1124        total_time = max(total_time, 60.0)
1125
1126        # determine the usage rate
1127        bytes_per_sec = used_space / total_time
1128        mb_per_sec = bytes_per_sec / 1024**2
1129        mb_per_hour = mb_per_sec * 60 * 60
1130
1131        if mb_per_hour > self.max_mb_per_hour:
1132            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1133            msg %= (self.device, mb_per_hour)
1134            self.func(msg)
1135
1136
1137    @classmethod
1138    def watch(cls, *monitor_args, **monitor_dargs):
1139        """ Generic decorator to wrap a function call with the
1140        standard create-monitor -> start -> call -> stop idiom."""
1141        def decorator(func):
1142            def watched_func(*args, **dargs):
1143                monitor = cls(*monitor_args, **monitor_dargs)
1144                monitor.start()
1145                try:
1146                    func(*args, **dargs)
1147                finally:
1148                    monitor.stop()
1149            return watched_func
1150        return decorator
1151
1152
1153def runjob(control, drop_caches, options):
1154    """
1155    Run a job using the given control file.
1156
1157    This is the main interface to this module.
1158
1159    @see base_job.__init__ for parameter info.
1160    """
1161    control = os.path.abspath(control)
1162    state = control + '.state'
1163    # Ensure state file is cleaned up before the job starts to run if autotest
1164    # is not running with the --continue flag
1165    if not options.cont and os.path.isfile(state):
1166        logging.debug('Cleaning up previously found state file')
1167        os.remove(state)
1168
1169    # instantiate the job object ready for the control file.
1170    myjob = None
1171    try:
1172        # Check that the control file is valid
1173        if not os.path.exists(control):
1174            raise error.JobError(control + ": control file not found")
1175
1176        # When continuing, the job is complete when there is no
1177        # state file, ensure we don't try and continue.
1178        if options.cont and not os.path.exists(state):
1179            raise error.JobComplete("all done")
1180
1181        myjob = job(control=control, drop_caches=drop_caches, options=options)
1182
1183        # Load in the users control file, may do any one of:
1184        #  1) execute in toto
1185        #  2) define steps, and select the first via next_step()
1186        myjob.step_engine()
1187
1188    except error.JobContinue:
1189        sys.exit(5)
1190
1191    except error.JobComplete:
1192        sys.exit(1)
1193
1194    except error.JobError, instance:
1195        logging.error("JOB ERROR: " + str(instance))
1196        if myjob:
1197            command = None
1198            if len(instance.args) > 1:
1199                command = instance.args[1]
1200                myjob.record('ABORT', None, command, str(instance))
1201            myjob.record('END ABORT', None, None, str(instance))
1202            assert myjob._record_indent == 0
1203            myjob.complete(1)
1204        else:
1205            sys.exit(1)
1206
1207    except Exception, e:
1208        # NOTE: job._run_step_fn and job.step_engine will turn things into
1209        # a JobError for us.  If we get here, its likely an autotest bug.
1210        msg = str(e) + '\n' + traceback.format_exc()
1211        logging.critical("JOB ERROR (autotest bug?): " + msg)
1212        if myjob:
1213            myjob.record('END ABORT', None, None, msg)
1214            assert myjob._record_indent == 0
1215            myjob.complete(1)
1216        else:
1217            sys.exit(1)
1218
1219    # If we get here, then we assume the job is complete and good.
1220    myjob.record('END GOOD', None, None)
1221    assert myjob._record_indent == 0
1222
1223    myjob.complete(0)
1224
1225
1226class job(base_client_job):
1227
1228    def __init__(self, *args, **kwargs):
1229        base_client_job.__init__(self, *args, **kwargs)
1230
1231
1232    def run_test(self, url, *args, **dargs):
1233        log_pauser = cros_logging.LogRotationPauser()
1234        passed = False
1235        try:
1236            log_pauser.begin()
1237            passed = base_client_job.run_test(self, url, *args, **dargs)
1238            if not passed:
1239                # Save the VM state immediately after the test failure.
1240                # This is a NOOP if the the test isn't running in a VM or
1241                # if the VM is not properly configured to save state.
1242                _group, testname = self.pkgmgr.get_package_name(url, 'test')
1243                now = datetime.now().strftime('%I:%M:%S.%f')
1244                checkpoint_name = '%s-%s' % (testname, now)
1245                utils.save_vm_state(checkpoint_name)
1246        finally:
1247            log_pauser.end()
1248        return passed
1249
1250
1251    def reboot(self):
1252        self.reboot_setup()
1253        self.harness.run_reboot()
1254
1255        # sync first, so that a sync during shutdown doesn't time out
1256        utils.system('sync; sync', ignore_status=True)
1257
1258        utils.system('reboot </dev/null >/dev/null 2>&1 &')
1259        self.quit()
1260
1261
1262    def require_gcc(self):
1263        return False
1264