• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# pylint: disable-msg=C0111
3
4# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7"""
8The main job wrapper for the server side.
9
10This is the core infrastructure. Derived from the client side job.py
11
12Copyright Martin J. Bligh, Andy Whitcroft 2007
13"""
14
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
18
19import errno
20import fcntl
21import getpass
22import itertools
23import logging
24import os
25import pickle
26import platform
27import re
28import select
29import shutil
30import sys
31import tempfile
32import time
33import traceback
34import uuid
35import warnings
36
37from datetime import datetime
38
39from autotest_lib.client.bin import sysinfo
40from autotest_lib.client.common_lib import base_job
41from autotest_lib.client.common_lib import control_data
42from autotest_lib.client.common_lib import error
43from autotest_lib.client.common_lib import logging_manager
44from autotest_lib.client.common_lib import packages
45from autotest_lib.client.common_lib import utils
46from autotest_lib.client.common_lib import seven
47from autotest_lib.server import profilers
48from autotest_lib.server import site_gtest_runner
49from autotest_lib.server import subcommand
50from autotest_lib.server import test
51from autotest_lib.server.autotest import OFFLOAD_ENVVAR
52from autotest_lib.server import utils as server_utils
53from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
54from autotest_lib.server import hosts
55from autotest_lib.server.hosts import abstract_ssh
56from autotest_lib.server.hosts import afe_store
57from autotest_lib.server.hosts import file_store
58from autotest_lib.server.hosts import shadowing_store
59from autotest_lib.server.hosts import factory as host_factory
60from autotest_lib.server.hosts import host_info
61from autotest_lib.server.hosts import ssh_multiplex
62from autotest_lib.tko import models as tko_models
63from autotest_lib.tko import parser_lib
64from six.moves import zip
65
66try:
67    from chromite.lib import metrics
68except ImportError:
69    metrics = utils.metrics_mock
70
71
72def _control_segment_path(name):
73    """Get the pathname of the named control segment file."""
74    server_dir = os.path.dirname(os.path.abspath(__file__))
75    return os.path.join(server_dir, "control_segments", name)
76
77
78CLIENT_CONTROL_FILENAME = 'control'
79SERVER_CONTROL_FILENAME = 'control.srv'
80MACHINES_FILENAME = '.machines'
81DUT_STATEFUL_PATH = "/usr/local"
82
83CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
84CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline')
85CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
86CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
87CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
88VERIFY_CONTROL_FILE = _control_segment_path('verify')
89REPAIR_CONTROL_FILE = _control_segment_path('repair')
90PROVISION_CONTROL_FILE = _control_segment_path('provision')
91VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
92RESET_CONTROL_FILE = _control_segment_path('reset')
93GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
94
95
96def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store,
97                      host_attributes=None):
98    """Converts a list of machine names to list of dicts.
99
100    TODO(crbug.com/678430): This function temporarily has a side effect of
101    creating files under workdir for backing a FileStore. This side-effect will
102    go away once callers of autoserv start passing in the FileStore.
103
104    @param machine_names: A list of machine names.
105    @param store_dir: A directory to contain store backing files.
106    @param use_shadow_store: If True, we should create a ShadowingStore where
107            actual store is backed by the AFE but we create a backing file to
108            shadow the store. If False, backing file should already exist at:
109            ${store_dir}/${hostname}.store
110    @param in_lab: A boolean indicating whether we're running in lab.
111    @param host_attributes: Optional list of host attributes to add for each
112            host.
113    @returns: A list of dicts. Each dict has the following keys:
114            'hostname': Name of the machine originally in machine_names (str).
115            'afe_host': A frontend.Host object for the machine, or a stub if
116                    in_lab is false.
117            'host_info_store': A host_info.CachingHostInfoStore object to obtain
118                    host information. A stub if in_lab is False.
119            'connection_pool': ssh_multiplex.ConnectionPool instance to share
120                    ssh connection across control scripts. This is set to
121                    None, and should be overridden for connection sharing.
122    """
123    # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can
124    # be provided.
125    if in_lab and host_attributes:
126        raise error.AutoservError(
127                'in_lab and host_attribute are mutually exclusive.')
128
129    machine_dict_list = []
130    for machine in machine_names:
131        if not in_lab:
132            afe_host = server_utils.EmptyAFEHost()
133            host_info_store = host_info.InMemoryHostInfoStore()
134            if host_attributes is not None:
135                afe_host.attributes.update(host_attributes)
136                info = host_info.HostInfo(attributes=host_attributes)
137                host_info_store.commit(info)
138        elif use_shadow_store:
139            afe_host = _create_afe_host(machine)
140            host_info_store = _create_afe_backed_host_info_store(store_dir,
141                                                                 machine)
142        else:
143            afe_host = server_utils.EmptyAFEHost()
144            host_info_store = _create_file_backed_host_info_store(store_dir,
145                                                                  machine)
146
147        machine_dict_list.append({
148                'hostname' : machine,
149                'afe_host' : afe_host,
150                'host_info_store': host_info_store,
151                'connection_pool': None,
152        })
153
154    return machine_dict_list
155
156
157class status_indenter(base_job.status_indenter):
158    """Provide a simple integer-backed status indenter."""
159    def __init__(self):
160        self._indent = 0
161
162
163    @property
164    def indent(self):
165        return self._indent
166
167
168    def increment(self):
169        self._indent += 1
170
171
172    def decrement(self):
173        self._indent -= 1
174
175
176    def get_context(self):
177        """Returns a context object for use by job.get_record_context."""
178        class context(object):
179            def __init__(self, indenter, indent):
180                self._indenter = indenter
181                self._indent = indent
182            def restore(self):
183                self._indenter._indent = self._indent
184        return context(self, self._indent)
185
186
187class server_job_record_hook(object):
188    """The job.record hook for server job. Used to inject WARN messages from
189    the console or vlm whenever new logs are written, and to echo any logs
190    to INFO level logging. Implemented as a class so that it can use state to
191    block recursive calls, so that the hook can call job.record itself to
192    log WARN messages.
193
194    Depends on job._read_warnings and job._logger.
195    """
196    def __init__(self, job):
197        self._job = job
198        self._being_called = False
199
200
201    def __call__(self, entry):
202        """A wrapper around the 'real' record hook, the _hook method, which
203        prevents recursion. This isn't making any effort to be threadsafe,
204        the intent is to outright block infinite recursion via a
205        job.record->_hook->job.record->_hook->job.record... chain."""
206        if self._being_called:
207            return
208        self._being_called = True
209        try:
210            self._hook(self._job, entry)
211        finally:
212            self._being_called = False
213
214
215    @staticmethod
216    def _hook(job, entry):
217        """The core hook, which can safely call job.record."""
218        entries = []
219        # poll all our warning loggers for new warnings
220        for timestamp, msg in job._read_warnings():
221            warning_entry = base_job.status_log_entry(
222                'WARN', None, None, msg, {}, timestamp=timestamp)
223            entries.append(warning_entry)
224            job.record_entry(warning_entry)
225        # echo rendered versions of all the status logs to info
226        entries.append(entry)
227        for entry in entries:
228            rendered_entry = job._logger.render_entry(entry)
229            logging.info(rendered_entry)
230
231
232class server_job(base_job.base_job):
233    """The server-side concrete implementation of base_job.
234
235    Optional properties provided by this implementation:
236        serverdir
237
238        warning_manager
239        warning_loggers
240    """
241
242    _STATUS_VERSION = 1
243
244    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
245    def __init__(self, control, args, resultdir, label, user, machines,
246                 machine_dict_list,
247                 client=False,
248                 ssh_user=host_factory.DEFAULT_SSH_USER,
249                 ssh_port=host_factory.DEFAULT_SSH_PORT,
250                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
251                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
252                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
253                 group_name='',
254                 tag='', disable_sysinfo=False,
255                 control_filename=SERVER_CONTROL_FILENAME,
256                 parent_job_id=None, in_lab=False,
257                 use_client_trampoline=False,
258                 sync_offload_dir=''):
259        """
260        Create a server side job object.
261
262        @param control: The pathname of the control file.
263        @param args: Passed to the control file.
264        @param resultdir: Where to throw the results.
265        @param label: Description of the job.
266        @param user: Username for the job (email address).
267        @param machines: A list of hostnames of the machines to use for the job.
268        @param machine_dict_list: A list of dicts for each of the machines above
269                as returned by get_machine_dicts.
270        @param client: True if this is a client-side control file.
271        @param ssh_user: The SSH username.  [root]
272        @param ssh_port: The SSH port number.  [22]
273        @param ssh_pass: The SSH passphrase, if needed.
274        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
275                '-vvv', or an empty string if not needed.
276        @param ssh_options: A string giving additional options that will be
277                included in ssh commands.
278        @param group_name: If supplied, this will be written out as
279                host_group_name in the keyvals file for the parser.
280        @param tag: The job execution tag from the scheduler.  [optional]
281        @param disable_sysinfo: Whether we should disable the sysinfo step of
282                tests for a modest shortening of test time.  [optional]
283        @param control_filename: The filename where the server control file
284                should be written in the results directory.
285        @param parent_job_id: Job ID of the parent job. Default to None if the
286                job does not have a parent job.
287        @param in_lab: Boolean that indicates if this is running in the lab
288                environment.
289        @param use_client_trampoline: Boolean that indicates whether to
290                use the client trampoline flow.  If this is True, control
291                is interpreted as the name of the client test to run.
292                The client control file will be client_trampoline.  The
293                test name will be passed to client_trampoline, which will
294                install the test package and re-exec the actual test
295                control file.
296        @param sync_offload_dir: String; relative path to synchronous offload
297                dir, relative to the results directory. Ignored if empty.
298        """
299        super(server_job, self).__init__(resultdir=resultdir)
300        self.control = control
301        self._uncollected_log_file = os.path.join(self.resultdir,
302                                                  'uncollected_logs')
303        debugdir = os.path.join(self.resultdir, 'debug')
304        if not os.path.exists(debugdir):
305            os.mkdir(debugdir)
306
307        if user:
308            self.user = user
309        else:
310            self.user = getpass.getuser()
311
312        self.args = args
313        self.label = label
314        self.machines = machines
315        self._client = client
316        self.warning_loggers = set()
317        self.warning_manager = warning_manager()
318        self._ssh_user = ssh_user
319        self._ssh_port = ssh_port
320        self._ssh_pass = ssh_pass
321        self._ssh_verbosity_flag = ssh_verbosity_flag
322        self._ssh_options = ssh_options
323        self.tag = tag
324        self.hosts = set()
325        self.drop_caches = False
326        self.drop_caches_between_iterations = False
327        self._control_filename = control_filename
328        self._disable_sysinfo = disable_sysinfo
329        self._use_client_trampoline = use_client_trampoline
330
331        self.logging = logging_manager.get_logging_manager(
332                manage_stdout_and_stderr=True, redirect_fds=True)
333        subcommand.logging_manager_object = self.logging
334
335        self.sysinfo = sysinfo.sysinfo(self.resultdir)
336        self.profilers = profilers.profilers(self)
337        self._sync_offload_dir = sync_offload_dir
338
339        job_data = {'label' : label, 'user' : user,
340                    'hostname' : ','.join(machines),
341                    'drone' : platform.node(),
342                    'status_version' : str(self._STATUS_VERSION),
343                    'job_started' : str(int(time.time()))}
344        # Save parent job id to keyvals, so parser can retrieve the info and
345        # write to tko_jobs record.
346        if parent_job_id:
347            job_data['parent_job_id'] = parent_job_id
348        if group_name:
349            job_data['host_group_name'] = group_name
350
351        # only write these keyvals out on the first job in a resultdir
352        if 'job_started' not in utils.read_keyval(self.resultdir):
353            job_data.update(self._get_job_data())
354            utils.write_keyval(self.resultdir, job_data)
355
356        self.pkgmgr = packages.PackageManager(
357            self.autodir, run_function_dargs={'timeout':600})
358
359        self._register_subcommand_hooks()
360
361        # We no longer parse results as part of the server_job. These arguments
362        # can't be dropped yet because clients haven't all be cleaned up yet.
363        self.num_tests_run = -1
364        self.num_tests_failed = -1
365
366        # set up the status logger
367        self._indenter = status_indenter()
368        self._logger = base_job.status_logger(
369            self, self._indenter, 'status.log', 'status.log',
370            record_hook=server_job_record_hook(self))
371
372        # Initialize a flag to indicate DUT failure during the test, e.g.,
373        # unexpected reboot.
374        self.failed_with_device_error = False
375
376        self._connection_pool = ssh_multiplex.ConnectionPool()
377
378        # List of functions to run after the main job function.
379        self._post_run_hooks = []
380
381        self.parent_job_id = parent_job_id
382        self.in_lab = in_lab
383        self.machine_dict_list = machine_dict_list
384        for machine_dict in self.machine_dict_list:
385            machine_dict['connection_pool'] = self._connection_pool
386
387        # TODO(jrbarnette) The harness attribute is only relevant to
388        # client jobs, but it's required to be present, or we will fail
389        # server job unit tests.  Yes, really.
390        #
391        # TODO(jrbarnette) The utility of the 'harness' attribute even
392        # to client jobs is suspect.  Probably, we should remove it.
393        self.harness = None
394
395        # TODO(ayatane): fast and max_result_size_KB are not set for
396        # client_trampoline jobs.
397        if control and not use_client_trampoline:
398            parsed_control = control_data.parse_control(
399                    control, raise_warnings=False)
400            self.fast = parsed_control.fast
401            self.max_result_size_KB = parsed_control.max_result_size_KB
402        else:
403            self.fast = False
404            # Set the maximum result size to be the default specified in
405            # global config, if the job has no control file associated.
406            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
407
408
409    @classmethod
410    def _find_base_directories(cls):
411        """
412        Determine locations of autodir, clientdir and serverdir. Assumes
413        that this file is located within serverdir and uses __file__ along
414        with relative paths to resolve the location.
415        """
416        serverdir = os.path.abspath(os.path.dirname(__file__))
417        autodir = os.path.normpath(os.path.join(serverdir, '..'))
418        clientdir = os.path.join(autodir, 'client')
419        return autodir, clientdir, serverdir
420
421
422    def _find_resultdir(self, resultdir, *args, **dargs):
423        """
424        Determine the location of resultdir. For server jobs we expect one to
425        always be explicitly passed in to __init__, so just return that.
426        """
427        if resultdir:
428            return os.path.normpath(resultdir)
429        else:
430            return None
431
432
433    def _get_status_logger(self):
434        """Return a reference to the status logger."""
435        return self._logger
436
437
438    @staticmethod
439    def _load_control_file(path):
440        f = open(path)
441        try:
442            control_file = f.read()
443        finally:
444            f.close()
445        return re.sub('\r', '', control_file)
446
447
448    def _register_subcommand_hooks(self):
449        """
450        Register some hooks into the subcommand modules that allow us
451        to properly clean up self.hosts created in forked subprocesses.
452        """
453        def on_fork(cmd):
454            self._existing_hosts_on_fork = set(self.hosts)
455        def on_join(cmd):
456            new_hosts = self.hosts - self._existing_hosts_on_fork
457            for host in new_hosts:
458                host.close()
459        subcommand.subcommand.register_fork_hook(on_fork)
460        subcommand.subcommand.register_join_hook(on_join)
461
462
463    # TODO crbug.com/285395 add a kwargs parameter.
464    def _make_namespace(self):
465        """Create a namespace dictionary to be passed along to control file.
466
467        Creates a namespace argument populated with standard values:
468        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
469        and ssh_options.
470        """
471        namespace = {'machines' : self.machine_dict_list,
472                     'job' : self,
473                     'ssh_user' : self._ssh_user,
474                     'ssh_port' : self._ssh_port,
475                     'ssh_pass' : self._ssh_pass,
476                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
477                     'ssh_options' : self._ssh_options}
478        return namespace
479
480
481    def cleanup(self, labels):
482        """Cleanup machines.
483
484        @param labels: Comma separated job labels, will be used to
485                       determine special task actions.
486        """
487        if not self.machines:
488            raise error.AutoservError('No machines specified to cleanup')
489        if self.resultdir:
490            os.chdir(self.resultdir)
491
492        namespace = self._make_namespace()
493        namespace.update({'job_labels': labels, 'args': ''})
494        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
495
496
497    def verify(self, labels):
498        """Verify machines are all ssh-able.
499
500        @param labels: Comma separated job labels, will be used to
501                       determine special task actions.
502        """
503        if not self.machines:
504            raise error.AutoservError('No machines specified to verify')
505        if self.resultdir:
506            os.chdir(self.resultdir)
507
508        namespace = self._make_namespace()
509        namespace.update({'job_labels': labels, 'args': ''})
510        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
511
512
513    def reset(self, labels):
514        """Reset machines by first cleanup then verify each machine.
515
516        @param labels: Comma separated job labels, will be used to
517                       determine special task actions.
518        """
519        if not self.machines:
520            raise error.AutoservError('No machines specified to reset.')
521        if self.resultdir:
522            os.chdir(self.resultdir)
523
524        namespace = self._make_namespace()
525        namespace.update({'job_labels': labels, 'args': ''})
526        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
527
528
529    def repair(self, labels):
530        """Repair machines.
531
532        @param labels: Comma separated job labels, will be used to
533                       determine special task actions.
534        """
535        if not self.machines:
536            raise error.AutoservError('No machines specified to repair')
537        if self.resultdir:
538            os.chdir(self.resultdir)
539
540        namespace = self._make_namespace()
541        namespace.update({'job_labels': labels, 'args': ''})
542        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
543
544
545    def provision(self, labels):
546        """
547        Provision all hosts to match |labels|.
548
549        @param labels: A comma seperated string of labels to provision the
550                       host to.
551
552        """
553        control = self._load_control_file(PROVISION_CONTROL_FILE)
554        self.run(control=control, job_labels=labels)
555
556
557    def precheck(self):
558        """
559        perform any additional checks in derived classes.
560        """
561        pass
562
563
564    def enable_external_logging(self):
565        """
566        Start or restart external logging mechanism.
567        """
568        pass
569
570
571    def disable_external_logging(self):
572        """
573        Pause or stop external logging mechanism.
574        """
575        pass
576
577
578    def use_external_logging(self):
579        """
580        Return True if external logging should be used.
581        """
582        return False
583
584
585    def _make_parallel_wrapper(self, function, machines, log):
586        """Wrap function as appropriate for calling by parallel_simple."""
587        # machines could be a list of dictionaries, e.g.,
588        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
589        # The dictionary is generated in server_job.__init__, refer to
590        # variable machine_dict_list, then passed in with namespace, see method
591        # server_job._make_namespace.
592        # To compare the machinese to self.machines, which is a list of machine
593        # hostname, we need to convert machines back to a list of hostnames.
594        if (machines and isinstance(machines, list)
595            and isinstance(machines[0], dict)):
596            machines = [m['hostname'] for m in machines]
597        if len(machines) > 1 and log:
598            def wrapper(machine):
599                hostname = server_utils.get_hostname_from_machine(machine)
600                self.push_execution_context(hostname)
601                os.chdir(self.resultdir)
602                machine_data = {'hostname' : hostname,
603                                'status_version' : str(self._STATUS_VERSION)}
604                utils.write_keyval(self.resultdir, machine_data)
605                result = function(machine)
606                return result
607        else:
608            wrapper = function
609        return wrapper
610
611
612    def parallel_simple(self, function, machines, log=True, timeout=None,
613                        return_results=False):
614        """
615        Run 'function' using parallel_simple, with an extra wrapper to handle
616        the necessary setup for continuous parsing, if possible. If continuous
617        parsing is already properly initialized then this should just work.
618
619        @param function: A callable to run in parallel given each machine.
620        @param machines: A list of machine names to be passed one per subcommand
621                invocation of function.
622        @param log: If True, output will be written to output in a subdirectory
623                named after each machine.
624        @param timeout: Seconds after which the function call should timeout.
625        @param return_results: If True instead of an AutoServError being raised
626                on any error a list of the results|exceptions from the function
627                called on each arg is returned.  [default: False]
628
629        @raises error.AutotestError: If any of the functions failed.
630        """
631        wrapper = self._make_parallel_wrapper(function, machines, log)
632        return subcommand.parallel_simple(
633                wrapper, machines,
634                subdir_name_constructor=server_utils.get_hostname_from_machine,
635                log=log, timeout=timeout, return_results=return_results)
636
637
638    def parallel_on_machines(self, function, machines, timeout=None):
639        """
640        @param function: Called in parallel with one machine as its argument.
641        @param machines: A list of machines to call function(machine) on.
642        @param timeout: Seconds after which the function call should timeout.
643
644        @returns A list of machines on which function(machine) returned
645                without raising an exception.
646        """
647        results = self.parallel_simple(function, machines, timeout=timeout,
648                                       return_results=True)
649        success_machines = []
650        for result, machine in zip(results, machines):
651            if not isinstance(result, Exception):
652                success_machines.append(machine)
653        return success_machines
654
655
656    def record_skipped_test(self, skipped_test, message=None):
657        """Insert a failure record into status.log for this test."""
658        msg = message
659        if msg is None:
660            msg = 'No valid machines found for test %s.' % skipped_test
661        logging.info(msg)
662        self.record('START', None, skipped_test.test_name)
663        self.record('INFO', None, skipped_test.test_name, msg)
664        self.record('END TEST_NA', None, skipped_test.test_name, msg)
665
666
667    def _has_failed_tests(self):
668        """Parse status log for failed tests.
669
670        This checks the current working directory and is intended only for use
671        by the run() method.
672
673        @return boolean
674        """
675        path = os.getcwd()
676
677        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
678        # make code reuse plausible.
679        job_keyval = tko_models.job.read_keyval(path)
680        status_version = job_keyval.get("status_version", 0)
681
682        # parse out the job
683        parser = parser_lib.parser(status_version)
684        job = parser.make_job(path)
685        status_log = os.path.join(path, "status.log")
686        if not os.path.exists(status_log):
687            status_log = os.path.join(path, "status")
688        if not os.path.exists(status_log):
689            logging.warning("! Unable to parse job, no status file")
690            return True
691
692        # parse the status logs
693        status_lines = open(status_log).readlines()
694        parser.start(job)
695        tests = parser.end(status_lines)
696
697        # parser.end can return the same object multiple times, so filter out
698        # dups
699        job.tests = []
700        already_added = set()
701        for test in tests:
702            if test not in already_added:
703                already_added.add(test)
704                job.tests.append(test)
705
706        failed = False
707        for test in job.tests:
708            # The current job is still running and shouldn't count as failed.
709            # The parser will fail to parse the exit status of the job since it
710            # hasn't exited yet (this running right now is the job).
711            failed = failed or (test.status != 'GOOD'
712                                and not _is_current_server_job(test))
713        return failed
714
715
716    def _collect_crashes(self, namespace, collect_crashinfo):
717        """Collect crashes.
718
719        @param namespace: namespace dict.
720        @param collect_crashinfo: whether to collect crashinfo in addition to
721                dumps
722        """
723        if collect_crashinfo:
724            # includes crashdumps
725            crash_control_file = CRASHINFO_CONTROL_FILE
726        else:
727            crash_control_file = CRASHDUMPS_CONTROL_FILE
728        self._execute_code(crash_control_file, namespace)
729
730
731    _USE_TEMP_DIR = object()
732    def run(self, collect_crashdumps=True, namespace={}, control=None,
733            control_file_dir=None, verify_job_repo_url=False,
734            only_collect_crashinfo=False, skip_crash_collection=False,
735            job_labels='', use_packaging=True):
736        # for a normal job, make sure the uncollected logs file exists
737        # for a crashinfo-only run it should already exist, bail out otherwise
738        created_uncollected_logs = False
739        logging.info("I am PID %s", os.getpid())
740        if self.resultdir and not os.path.exists(self._uncollected_log_file):
741            if only_collect_crashinfo:
742                # if this is a crashinfo-only run, and there were no existing
743                # uncollected logs, just bail out early
744                logging.info("No existing uncollected logs, "
745                             "skipping crashinfo collection")
746                return
747            else:
748                log_file = open(self._uncollected_log_file, "wb")
749                pickle.dump([], log_file)
750                log_file.close()
751                created_uncollected_logs = True
752
753        # use a copy so changes don't affect the original dictionary
754        namespace = namespace.copy()
755        machines = self.machines
756        if control is None:
757            if self.control is None:
758                control = ''
759            elif self._use_client_trampoline:
760                # Some tests must be loaded and staged before they can be run,
761                # see crbug.com/883403#c42 and #c46 for details.
762                control = self._load_control_file(
763                        CLIENT_TRAMPOLINE_CONTROL_FILE)
764                # repr of a string is safe for eval.
765                control = (('trampoline_testname = %r\n' % str(self.control))
766                           + control)
767            else:
768                control = self._load_control_file(self.control)
769        if control_file_dir is None:
770            control_file_dir = self.resultdir
771
772        self.aborted = False
773        namespace.update(self._make_namespace())
774        namespace.update({
775                'args': self.args,
776                'job_labels': job_labels,
777                'gtest_runner': site_gtest_runner.gtest_runner(),
778        })
779        test_start_time = int(time.time())
780
781        if self.resultdir:
782            os.chdir(self.resultdir)
783            # touch status.log so that the parser knows a job is running here
784            open(self.get_status_log_path(), 'a').close()
785            self.enable_external_logging()
786
787        collect_crashinfo = True
788        temp_control_file_dir = None
789        try:
790            try:
791                if not self.fast:
792                    with metrics.SecondsTimer(
793                            'chromeos/autotest/job/get_network_stats',
794                            fields = {'stage': 'start'}):
795                        namespace['network_stats_label'] = 'at-start'
796                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
797                                           namespace)
798
799                if only_collect_crashinfo:
800                    return
801
802                # If the verify_job_repo_url option is set but we're unable
803                # to actually verify that the job_repo_url contains the autotest
804                # package, this job will fail.
805                if verify_job_repo_url:
806                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
807                                       namespace)
808                else:
809                    logging.warning('Not checking if job_repo_url contains '
810                                    'autotest packages on %s', machines)
811
812                # determine the dir to write the control files to
813                cfd_specified = (control_file_dir
814                                 and control_file_dir is not self._USE_TEMP_DIR)
815                if cfd_specified:
816                    temp_control_file_dir = None
817                else:
818                    temp_control_file_dir = tempfile.mkdtemp(
819                        suffix='temp_control_file_dir')
820                    control_file_dir = temp_control_file_dir
821                server_control_file = os.path.join(control_file_dir,
822                                                   self._control_filename)
823                client_control_file = os.path.join(control_file_dir,
824                                                   CLIENT_CONTROL_FILENAME)
825                if self._client:
826                    namespace['control'] = control
827                    utils.open_write_close(client_control_file, control)
828                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
829                                    server_control_file)
830                else:
831                    utils.open_write_close(server_control_file, control)
832
833                sync_dir = self._offload_dir_target_path()
834                if self._sync_offload_dir:
835                    logging.info("Preparing synchronous offload dir")
836                    self.create_marker_file()
837                    logging.info("Offload dir and marker file ready")
838                    logging.debug("Results dir is %s", self.resultdir)
839                    logging.debug("Synchronous offload dir is %s", sync_dir)
840                logging.info("Processing control file")
841                namespace['use_packaging'] = use_packaging
842                namespace['synchronous_offload_dir'] = sync_dir
843                os.environ[OFFLOAD_ENVVAR] = sync_dir
844                self._execute_code(server_control_file, namespace)
845                logging.info("Finished processing control file")
846                self._maybe_retrieve_client_offload_dirs()
847                # If no device error occurred, no need to collect crashinfo.
848                collect_crashinfo = self.failed_with_device_error
849            except Exception as e:
850                try:
851                    logging.exception(
852                            'Exception escaped control file, job aborting:')
853                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
854                                    ' ', str(e))
855                    self.record('INFO', None, None, str(e),
856                                {'job_abort_reason': reason})
857                except:
858                    pass # don't let logging exceptions here interfere
859                raise
860        finally:
861            if temp_control_file_dir:
862                # Clean up temp directory used for copies of the control files
863                try:
864                    shutil.rmtree(temp_control_file_dir)
865                except Exception as e:
866                    logging.warning('Could not remove temp directory %s: %s',
867                                 temp_control_file_dir, e)
868
869            if machines and (collect_crashdumps or collect_crashinfo):
870                if skip_crash_collection or self.fast:
871                    logging.info('Skipping crash dump/info collection '
872                                 'as requested.')
873                else:
874                    with metrics.SecondsTimer(
875                            'chromeos/autotest/job/collect_crashinfo'):
876                        namespace['test_start_time'] = test_start_time
877                        # Remove crash files for passing tests.
878                        # TODO(ayatane): Tests that create crash files should be
879                        # reported.
880                        namespace['has_failed_tests'] = self._has_failed_tests()
881                        self._collect_crashes(namespace, collect_crashinfo)
882            self.disable_external_logging()
883            if self._uncollected_log_file and created_uncollected_logs:
884                os.remove(self._uncollected_log_file)
885
886            if not self.fast:
887                with metrics.SecondsTimer(
888                        'chromeos/autotest/job/get_network_stats',
889                        fields = {'stage': 'end'}):
890                    namespace['network_stats_label'] = 'at-end'
891                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
892                                       namespace)
893
894    def _server_offload_dir_path(self):
895        return os.path.join(self.resultdir, self._sync_offload_dir)
896
897    def _offload_dir_target_path(self):
898        if not self._sync_offload_dir:
899            return ''
900        if self._client:
901            return os.path.join(DUT_STATEFUL_PATH, self._sync_offload_dir)
902        return os.path.join(self.resultdir, self._sync_offload_dir)
903
904    def _maybe_retrieve_client_offload_dirs(self):
905        if not(self._sync_offload_dir and self._client):
906            logging.info("No client dir to retrieve.")
907            return ''
908        logging.info("Retrieving synchronous offload dir from client")
909        server_path = self._server_offload_dir_path()
910        client_path = self._offload_dir_target_path()
911        def serial(machine):
912            host = hosts.create_host(machine)
913            server_subpath = os.path.join(server_path, host.hostname)
914            # Empty final piece ensures that get_file gets a trailing slash,
915            #  which makes it copy dir contents rather than the dir itself.
916            client_subpath = os.path.join(client_path, '')
917            logging.debug("Client dir to retrieve is %s", client_subpath)
918            os.makedirs(server_subpath)
919            host.get_file(client_subpath, server_subpath)
920        self.parallel_simple(serial, self.machines)
921        logging.debug("Synchronous offload dir retrieved to %s", server_path)
922        return server_path
923
924    def _create_client_offload_dirs(self):
925        marker_string = "client %s%s" % (
926            "in SSP " if utils.is_in_container() else "",
927            str(datetime.utcnow())
928        )
929        offload_path = self._offload_dir_target_path()
930        _, file_path = tempfile.mkstemp()
931        def serial(machine):
932            host = hosts.create_host(machine)
933            marker_path = os.path.join(offload_path, "sync_offloads_marker")
934            host.run(("mkdir -p %s" % offload_path), ignore_status=False)
935            host.send_file(file_path, marker_path)
936
937        try:
938            utils.open_write_close(file_path, marker_string)
939            self.parallel_simple(serial, self.machines)
940        finally:
941            os.remove(file_path)
942
943
944    def create_marker_file(self):
945        """Create a marker file in the leaf task's synchronous offload dir.
946
947        This ensures that we will get some results offloaded if the test fails
948        to create output properly, distinguishing offload errs from test errs.
949        @obj_param _client: Boolean, whether the control file is client-side.
950        @obj_param _sync_offload_dir: rel. path from results dir to offload dir.
951
952        @returns: path to offload dir on the machine of the leaf task
953        """
954        # Make the server-side directory regardless
955        try:
956            # 2.7 makedirs doesn't have an option for pre-existing directories
957            os.makedirs(self._server_offload_dir_path())
958        except OSError as e:
959            if e.errno != errno.EEXIST:
960                raise
961        if not self._client:
962            offload_path = self._offload_dir_target_path()
963            marker_string = "server %s%s" % (
964                "in SSP " if utils.is_in_container() else "",
965                str(datetime.utcnow())
966            )
967            utils.open_write_close(
968                os.path.join(offload_path, "sync_offloads_marker"),
969                marker_string
970            )
971            return offload_path
972        return self._create_client_offload_dirs()
973
974    def run_test(self, url, *args, **dargs):
975        """
976        Summon a test object and run it.
977
978        tag
979                tag to add to testname
980        url
981                url of the test to run
982        """
983        if self._disable_sysinfo:
984            dargs['disable_sysinfo'] = True
985
986        group, testname = self.pkgmgr.get_package_name(url, 'test')
987        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
988        outputdir = self._make_test_outputdir(subdir)
989
990        def group_func():
991            try:
992                test.runtest(self, url, tag, args, dargs)
993            except error.TestBaseException as e:
994                self.record(e.exit_status, subdir, testname, str(e))
995                raise
996            except Exception as e:
997                info = str(e) + "\n" + traceback.format_exc()
998                self.record('FAIL', subdir, testname, info)
999                raise
1000            else:
1001                self.record('GOOD', subdir, testname, 'completed successfully')
1002
1003        try:
1004            result = self._run_group(testname, subdir, group_func)
1005        except error.TestBaseException as e:
1006            return False
1007        else:
1008            return True
1009
1010
1011    def _run_group(self, name, subdir, function, *args, **dargs):
1012        """Underlying method for running something inside of a group."""
1013        result, exc_info = None, None
1014        try:
1015            self.record('START', subdir, name)
1016            result = function(*args, **dargs)
1017        except error.TestBaseException as e:
1018            self.record("END %s" % e.exit_status, subdir, name)
1019            raise
1020        except Exception as e:
1021            err_msg = str(e) + '\n'
1022            err_msg += traceback.format_exc()
1023            self.record('END ABORT', subdir, name, err_msg)
1024            raise error.JobError(name + ' failed\n' + traceback.format_exc())
1025        else:
1026            self.record('END GOOD', subdir, name)
1027        finally:
1028            for hook in self._post_run_hooks:
1029                hook()
1030
1031        return result
1032
1033
1034    def run_group(self, function, *args, **dargs):
1035        """\
1036        @param function: subroutine to run
1037        @returns: (result, exc_info). When the call succeeds, result contains
1038                the return value of |function| and exc_info is None. If
1039                |function| raises an exception, exc_info contains the tuple
1040                returned by sys.exc_info(), and result is None.
1041        """
1042
1043        name = function.__name__
1044        # Allow the tag for the group to be specified.
1045        tag = dargs.pop('tag', None)
1046        if tag:
1047            name = tag
1048
1049        try:
1050            result = self._run_group(name, None, function, *args, **dargs)[0]
1051        except error.TestBaseException:
1052            return None, sys.exc_info()
1053        return result, None
1054
1055
1056    def run_op(self, op, op_func, get_kernel_func):
1057        """\
1058        A specialization of run_group meant specifically for handling
1059        management operation. Includes support for capturing the kernel version
1060        after the operation.
1061
1062        Args:
1063           op: name of the operation.
1064           op_func: a function that carries out the operation (reboot, suspend)
1065           get_kernel_func: a function that returns a string
1066                            representing the kernel version.
1067        """
1068        try:
1069            self.record('START', None, op)
1070            op_func()
1071        except Exception as e:
1072            err_msg = str(e) + '\n' + traceback.format_exc()
1073            self.record('END FAIL', None, op, err_msg)
1074            raise
1075        else:
1076            kernel = get_kernel_func()
1077            self.record('END GOOD', None, op,
1078                        optional_fields={"kernel": kernel})
1079
1080
1081    def run_control(self, path):
1082        """Execute a control file found at path (relative to the autotest
1083        path). Intended for executing a control file within a control file,
1084        not for running the top-level job control file."""
1085        path = os.path.join(self.autodir, path)
1086        control_file = self._load_control_file(path)
1087        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
1088
1089
1090    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1091        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1092                                   on_every_test)
1093
1094
1095    def add_sysinfo_logfile(self, file, on_every_test=False):
1096        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1097
1098
1099    def _add_sysinfo_loggable(self, loggable, on_every_test):
1100        if on_every_test:
1101            self.sysinfo.test_loggables.add(loggable)
1102        else:
1103            self.sysinfo.boot_loggables.add(loggable)
1104
1105
1106    def _read_warnings(self):
1107        """Poll all the warning loggers and extract any new warnings that have
1108        been logged. If the warnings belong to a category that is currently
1109        disabled, this method will discard them and they will no longer be
1110        retrievable.
1111
1112        Returns a list of (timestamp, message) tuples, where timestamp is an
1113        integer epoch timestamp."""
1114        warnings = []
1115        while True:
1116            # pull in a line of output from every logger that has
1117            # output ready to be read
1118            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1119            closed_loggers = set()
1120            for logger in loggers:
1121                line = logger.readline()
1122                # record any broken pipes (aka line == empty)
1123                if len(line) == 0:
1124                    closed_loggers.add(logger)
1125                    continue
1126                # parse out the warning
1127                timestamp, msgtype, msg = line.split('\t', 2)
1128                timestamp = int(timestamp)
1129                # if the warning is valid, add it to the results
1130                if self.warning_manager.is_valid(timestamp, msgtype):
1131                    warnings.append((timestamp, msg.strip()))
1132
1133            # stop listening to loggers that are closed
1134            self.warning_loggers -= closed_loggers
1135
1136            # stop if none of the loggers have any output left
1137            if not loggers:
1138                break
1139
1140        # sort into timestamp order
1141        warnings.sort()
1142        return warnings
1143
1144
1145    def _unique_subdirectory(self, base_subdirectory_name):
1146        """Compute a unique results subdirectory based on the given name.
1147
1148        Appends base_subdirectory_name with a number as necessary to find a
1149        directory name that doesn't already exist.
1150        """
1151        subdirectory = base_subdirectory_name
1152        counter = 1
1153        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1154            subdirectory = base_subdirectory_name + '.' + str(counter)
1155            counter += 1
1156        return subdirectory
1157
1158
1159    def get_record_context(self):
1160        """Returns an object representing the current job.record context.
1161
1162        The object returned is an opaque object with a 0-arg restore method
1163        which can be called to restore the job.record context (i.e. indentation)
1164        to the current level. The intention is that it should be used when
1165        something external which generate job.record calls (e.g. an autotest
1166        client) can fail catastrophically and the server job record state
1167        needs to be reset to its original "known good" state.
1168
1169        @return: A context object with a 0-arg restore() method."""
1170        return self._indenter.get_context()
1171
1172
1173    def record_summary(self, status_code, test_name, reason='', attributes=None,
1174                       distinguishing_attributes=(), child_test_ids=None):
1175        """Record a summary test result.
1176
1177        @param status_code: status code string, see
1178                common_lib.log.is_valid_status()
1179        @param test_name: name of the test
1180        @param reason: (optional) string providing detailed reason for test
1181                outcome
1182        @param attributes: (optional) dict of string keyvals to associate with
1183                this result
1184        @param distinguishing_attributes: (optional) list of attribute names
1185                that should be used to distinguish identically-named test
1186                results.  These attributes should be present in the attributes
1187                parameter.  This is used to generate user-friendly subdirectory
1188                names.
1189        @param child_test_ids: (optional) list of test indices for test results
1190                used in generating this result.
1191        """
1192        subdirectory_name_parts = [test_name]
1193        for attribute in distinguishing_attributes:
1194            assert attributes
1195            assert attribute in attributes, '%s not in %s' % (attribute,
1196                                                              attributes)
1197            subdirectory_name_parts.append(attributes[attribute])
1198        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1199
1200        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1201        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1202        os.mkdir(subdirectory_path)
1203
1204        self.record(status_code, subdirectory, test_name,
1205                    status=reason, optional_fields={'is_summary': True})
1206
1207        if attributes:
1208            utils.write_keyval(subdirectory_path, attributes)
1209
1210        if child_test_ids:
1211            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1212            summary_data = {'child_test_ids': ids_string}
1213            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1214                               summary_data)
1215
1216
1217    def add_post_run_hook(self, hook):
1218        """
1219        Registers a hook to run after the main job function.
1220
1221        This provides a mechanism by which tests that perform multiple tests of
1222        their own can write additional top-level results to the TKO status.log
1223        file.
1224
1225        @param hook: Function to invoke (without any args) after the main job
1226            function completes and the job status is logged.
1227        """
1228        self._post_run_hooks.append(hook)
1229
1230
1231    def disable_warnings(self, warning_type):
1232        self.warning_manager.disable_warnings(warning_type)
1233        self.record("INFO", None, None,
1234                    "disabling %s warnings" % warning_type,
1235                    {"warnings.disable": warning_type})
1236
1237
1238    def enable_warnings(self, warning_type):
1239        self.warning_manager.enable_warnings(warning_type)
1240        self.record("INFO", None, None,
1241                    "enabling %s warnings" % warning_type,
1242                    {"warnings.enable": warning_type})
1243
1244
1245    def get_status_log_path(self, subdir=None):
1246        """Return the path to the job status log.
1247
1248        @param subdir - Optional paramter indicating that you want the path
1249            to a subdirectory status log.
1250
1251        @returns The path where the status log should be.
1252        """
1253        if self.resultdir:
1254            if subdir:
1255                return os.path.join(self.resultdir, subdir, "status.log")
1256            else:
1257                return os.path.join(self.resultdir, "status.log")
1258        else:
1259            return None
1260
1261
1262    def _update_uncollected_logs_list(self, update_func):
1263        """Updates the uncollected logs list in a multi-process safe manner.
1264
1265        @param update_func - a function that updates the list of uncollected
1266            logs. Should take one parameter, the list to be updated.
1267        """
1268        # Skip log collection if file _uncollected_log_file does not exist.
1269        if not (self._uncollected_log_file and
1270                os.path.exists(self._uncollected_log_file)):
1271            return
1272        if self._uncollected_log_file:
1273            log_file = open(self._uncollected_log_file, "rb+")
1274            fcntl.flock(log_file, fcntl.LOCK_EX)
1275        try:
1276            uncollected_logs = pickle.load(log_file)
1277            update_func(uncollected_logs)
1278            log_file.seek(0)
1279            log_file.truncate()
1280            pickle.dump(uncollected_logs, log_file)
1281            log_file.flush()
1282        finally:
1283            fcntl.flock(log_file, fcntl.LOCK_UN)
1284            log_file.close()
1285
1286
1287    def add_client_log(self, hostname, remote_path, local_path):
1288        """Adds a new set of client logs to the list of uncollected logs,
1289        to allow for future log recovery.
1290
1291        @param host - the hostname of the machine holding the logs
1292        @param remote_path - the directory on the remote machine holding logs
1293        @param local_path - the local directory to copy the logs into
1294        """
1295        def update_func(logs_list):
1296            logs_list.append((hostname, remote_path, local_path))
1297        self._update_uncollected_logs_list(update_func)
1298
1299
1300    def remove_client_log(self, hostname, remote_path, local_path):
1301        """Removes a set of client logs from the list of uncollected logs,
1302        to allow for future log recovery.
1303
1304        @param host - the hostname of the machine holding the logs
1305        @param remote_path - the directory on the remote machine holding logs
1306        @param local_path - the local directory to copy the logs into
1307        """
1308        def update_func(logs_list):
1309            logs_list.remove((hostname, remote_path, local_path))
1310        self._update_uncollected_logs_list(update_func)
1311
1312
1313    def get_client_logs(self):
1314        """Retrieves the list of uncollected logs, if it exists.
1315
1316        @returns A list of (host, remote_path, local_path) tuples. Returns
1317                 an empty list if no uncollected logs file exists.
1318        """
1319        log_exists = (self._uncollected_log_file and
1320                      os.path.exists(self._uncollected_log_file))
1321        if log_exists:
1322            return pickle.load(open(self._uncollected_log_file))
1323        else:
1324            return []
1325
1326
1327    def _fill_server_control_namespace(self, namespace, protect=True):
1328        """
1329        Prepare a namespace to be used when executing server control files.
1330
1331        This sets up the control file API by importing modules and making them
1332        available under the appropriate names within namespace.
1333
1334        For use by _execute_code().
1335
1336        Args:
1337          namespace: The namespace dictionary to fill in.
1338          protect: Boolean.  If True (the default) any operation that would
1339              clobber an existing entry in namespace will cause an error.
1340        Raises:
1341          error.AutoservError: When a name would be clobbered by import.
1342        """
1343        def _import_names(module_name, names=()):
1344            """
1345            Import a module and assign named attributes into namespace.
1346
1347            Args:
1348                module_name: The string module name.
1349                names: A limiting list of names to import from module_name.  If
1350                    empty (the default), all names are imported from the module
1351                    similar to a "from foo.bar import *" statement.
1352            Raises:
1353                error.AutoservError: When a name being imported would clobber
1354                    a name already in namespace.
1355            """
1356            module = __import__(module_name, {}, {}, names)
1357
1358            # No names supplied?  Import * from the lowest level module.
1359            # (Ugh, why do I have to implement this part myself?)
1360            if not names:
1361                for submodule_name in module_name.split('.')[1:]:
1362                    module = getattr(module, submodule_name)
1363                if hasattr(module, '__all__'):
1364                    names = getattr(module, '__all__')
1365                else:
1366                    names = dir(module)
1367
1368            # Install each name into namespace, checking to make sure it
1369            # doesn't override anything that already exists.
1370            for name in names:
1371                # Check for conflicts to help prevent future problems.
1372                if name in namespace and protect:
1373                    if namespace[name] is not getattr(module, name):
1374                        raise error.AutoservError('importing name '
1375                                '%s from %s %r would override %r' %
1376                                (name, module_name, getattr(module, name),
1377                                 namespace[name]))
1378                    else:
1379                        # Encourage cleanliness and the use of __all__ for a
1380                        # more concrete API with less surprises on '*' imports.
1381                        warnings.warn('%s (%r) being imported from %s for use '
1382                                      'in server control files is not the '
1383                                      'first occurrence of that import.' %
1384                                      (name, namespace[name], module_name))
1385
1386                namespace[name] = getattr(module, name)
1387
1388
1389        # This is the equivalent of prepending a bunch of import statements to
1390        # the front of the control script.
1391        namespace.update(os=os, sys=sys, logging=logging)
1392        _import_names('autotest_lib.server',
1393                ('hosts', 'autotest', 'standalone_profiler'))
1394        _import_names('autotest_lib.server.subcommand',
1395                      ('parallel', 'parallel_simple', 'subcommand'))
1396        _import_names('autotest_lib.server.utils',
1397                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1398        _import_names('autotest_lib.client.common_lib.error')
1399        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1400
1401        # Inject ourself as the job object into other classes within the API.
1402        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1403        #
1404        # XXX Autotest does not appear to use .job.  Who does?
1405        namespace['autotest'].Autotest.job = self
1406        # server.hosts.base_classes.Host uses .job.
1407        namespace['hosts'].Host.job = self
1408        namespace['hosts'].factory.ssh_user = self._ssh_user
1409        namespace['hosts'].factory.ssh_port = self._ssh_port
1410        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1411        namespace['hosts'].factory.ssh_verbosity_flag = (
1412                self._ssh_verbosity_flag)
1413        namespace['hosts'].factory.ssh_options = self._ssh_options
1414
1415
1416    def _execute_code(self, code_file, namespace, protect=True):
1417        """
1418        Execute code using a copy of namespace as a server control script.
1419
1420        Unless protect_namespace is explicitly set to False, the dict will not
1421        be modified.
1422
1423        Args:
1424          code_file: The filename of the control file to execute.
1425          namespace: A dict containing names to make available during execution.
1426          protect: Boolean.  If True (the default) a copy of the namespace dict
1427              is used during execution to prevent the code from modifying its
1428              contents outside of this function.  If False the raw dict is
1429              passed in and modifications will be allowed.
1430        """
1431        if protect:
1432            namespace = namespace.copy()
1433        self._fill_server_control_namespace(namespace, protect=protect)
1434        # TODO: Simplify and get rid of the special cases for only 1 machine.
1435        if len(self.machines) > 1:
1436            machines_text = '\n'.join(self.machines) + '\n'
1437            # Only rewrite the file if it does not match our machine list.
1438            try:
1439                machines_f = open(MACHINES_FILENAME, 'r')
1440                existing_machines_text = machines_f.read()
1441                machines_f.close()
1442            except EnvironmentError:
1443                existing_machines_text = None
1444            if machines_text != existing_machines_text:
1445                utils.open_write_close(MACHINES_FILENAME, machines_text)
1446        seven.exec_file(code_file, locals_=namespace, globals_=namespace)
1447
1448
1449    def preprocess_client_state(self):
1450        """
1451        Produce a state file for initializing the state of a client job.
1452
1453        Creates a new client state file with all the current server state, as
1454        well as some pre-set client state.
1455
1456        @returns The path of the file the state was written into.
1457        """
1458        # initialize the sysinfo state
1459        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1460
1461        # dump the state out to a tempfile
1462        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1463        os.close(fd)
1464
1465        # write_to_file doesn't need locking, we exclusively own file_path
1466        self._state.write_to_file(file_path)
1467        return file_path
1468
1469
1470    def postprocess_client_state(self, state_path):
1471        """
1472        Update the state of this job with the state from a client job.
1473
1474        Updates the state of the server side of a job with the final state
1475        of a client job that was run. Updates the non-client-specific state,
1476        pulls in some specific bits from the client-specific state, and then
1477        discards the rest. Removes the state file afterwards
1478
1479        @param state_file A path to the state file from the client.
1480        """
1481        # update the on-disk state
1482        try:
1483            self._state.read_from_file(state_path)
1484            os.remove(state_path)
1485        except OSError as e:
1486            # ignore file-not-found errors
1487            if e.errno != errno.ENOENT:
1488                raise
1489            else:
1490                logging.debug('Client state file %s not found', state_path)
1491
1492        # update the sysinfo state
1493        if self._state.has('client', 'sysinfo'):
1494            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1495
1496        # drop all the client-specific state
1497        self._state.discard_namespace('client')
1498
1499
1500    def clear_all_known_hosts(self):
1501        """Clears known hosts files for all AbstractSSHHosts."""
1502        for host in self.hosts:
1503            if isinstance(host, abstract_ssh.AbstractSSHHost):
1504                host.clear_known_hosts()
1505
1506
1507    def close(self):
1508        """Closes this job's operation."""
1509
1510        # Use shallow copy, because host.close() internally discards itself.
1511        for host in list(self.hosts):
1512            host.close()
1513        assert not self.hosts
1514        self._connection_pool.shutdown()
1515
1516
1517    def _get_job_data(self):
1518        """Add custom data to the job keyval info.
1519
1520        When multiple machines are used in a job, change the hostname to
1521        the platform of the first machine instead of machine1,machine2,...  This
1522        makes the job reports easier to read and keeps the tko_machines table from
1523        growing too large.
1524
1525        Returns:
1526            keyval dictionary with new hostname value, or empty dictionary.
1527        """
1528        job_data = {}
1529        # Only modify hostname on multimachine jobs. Assume all host have the same
1530        # platform.
1531        if len(self.machines) > 1:
1532            # Search through machines for first machine with a platform.
1533            for host in self.machines:
1534                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1535                keyvals = utils.read_keyval(keyval_path)
1536                host_plat = keyvals.get('platform', None)
1537                if not host_plat:
1538                    continue
1539                job_data['hostname'] = host_plat
1540                break
1541        return job_data
1542
1543
1544class warning_manager(object):
1545    """Class for controlling warning logs. Manages the enabling and disabling
1546    of warnings."""
1547    def __init__(self):
1548        # a map of warning types to a list of disabled time intervals
1549        self.disabled_warnings = {}
1550
1551
1552    def is_valid(self, timestamp, warning_type):
1553        """Indicates if a warning (based on the time it occured and its type)
1554        is a valid warning. A warning is considered "invalid" if this type of
1555        warning was marked as "disabled" at the time the warning occured."""
1556        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1557        for start, end in disabled_intervals:
1558            if timestamp >= start and (end is None or timestamp < end):
1559                return False
1560        return True
1561
1562
1563    def disable_warnings(self, warning_type, current_time_func=time.time):
1564        """As of now, disables all further warnings of this type."""
1565        intervals = self.disabled_warnings.setdefault(warning_type, [])
1566        if not intervals or intervals[-1][1] is not None:
1567            intervals.append((int(current_time_func()), None))
1568
1569
1570    def enable_warnings(self, warning_type, current_time_func=time.time):
1571        """As of now, enables all further warnings of this type."""
1572        intervals = self.disabled_warnings.get(warning_type, [])
1573        if intervals and intervals[-1][1] is None:
1574            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1575
1576
1577def _is_current_server_job(test):
1578    """Return True if parsed test is the currently running job.
1579
1580    @param test: test instance from tko parser.
1581    """
1582    return test.testname == 'SERVER_JOB'
1583
1584
1585def _create_afe_host(hostname):
1586    """Create an afe_host object backed by the AFE.
1587
1588    @param hostname: Name of the host for which we want the Host object.
1589    @returns: An object of type frontend.AFE
1590    """
1591    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1592    hosts = afe.get_hosts(hostname=hostname)
1593    if not hosts:
1594        raise error.AutoservError('No hosts named %s found' % hostname)
1595
1596    return hosts[0]
1597
1598
1599def _create_file_backed_host_info_store(store_dir, hostname):
1600    """Create a CachingHostInfoStore backed by an existing file.
1601
1602    @param store_dir: A directory to contain store backing files.
1603    @param hostname: Name of the host for which we want the store.
1604
1605    @returns: An object of type CachingHostInfoStore.
1606    """
1607    backing_file_path = os.path.join(store_dir, '%s.store' % hostname)
1608    if not os.path.isfile(backing_file_path):
1609        raise error.AutoservError(
1610                'Requested FileStore but no backing file at %s'
1611                % backing_file_path
1612        )
1613    return file_store.FileStore(backing_file_path)
1614
1615
1616def _create_afe_backed_host_info_store(store_dir, hostname):
1617    """Create a CachingHostInfoStore backed by the AFE.
1618
1619    @param store_dir: A directory to contain store backing files.
1620    @param hostname: Name of the host for which we want the store.
1621
1622    @returns: An object of type CachingHostInfoStore.
1623    """
1624    primary_store = afe_store.AfeStore(hostname)
1625    try:
1626        primary_store.get(force_refresh=True)
1627    except host_info.StoreError:
1628        raise error.AutoservError(
1629                'Could not obtain HostInfo for hostname %s' % hostname)
1630    # Since the store wasn't initialized external to autoserv, we must
1631    # ensure that the store we create is unique within store_dir.
1632    backing_file_path = os.path.join(
1633            _make_unique_subdir(store_dir),
1634            '%s.store' % hostname,
1635    )
1636    logging.info('Shadowing AFE store with a FileStore at %s',
1637                 backing_file_path)
1638    shadow_store = file_store.FileStore(backing_file_path)
1639    return shadowing_store.ShadowingStore(primary_store, shadow_store)
1640
1641
1642def _make_unique_subdir(workdir):
1643    """Creates a new subdir within workdir and returns the path to it."""
1644    store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4())
1645    _make_dirs_if_needed(store_dir)
1646    return store_dir
1647
1648
1649def _make_dirs_if_needed(path):
1650    """os.makedirs, but ignores failure because the leaf directory exists"""
1651    try:
1652        os.makedirs(path)
1653    except OSError as e:
1654        if e.errno != errno.EEXIST:
1655            raise
1656