• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable-msg=C0111
2
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6"""
7The main job wrapper for the server side.
8
9This is the core infrastructure. Derived from the client side job.py
10
11Copyright Martin J. Bligh, Andy Whitcroft 2007
12"""
13
14import errno
15import fcntl
16import getpass
17import itertools
18import logging
19import multiprocessing
20import os
21import pickle
22import platform
23import re
24import select
25import shutil
26import sys
27import tempfile
28import time
29import traceback
30import warnings
31
32from autotest_lib.client.bin import sysinfo
33from autotest_lib.client.common_lib import base_job
34from autotest_lib.client.common_lib import control_data
35from autotest_lib.client.common_lib import error
36from autotest_lib.client.common_lib import global_config
37from autotest_lib.client.common_lib import logging_manager
38from autotest_lib.client.common_lib import packages
39from autotest_lib.client.common_lib import utils
40from autotest_lib.server import profilers
41from autotest_lib.server import site_gtest_runner
42from autotest_lib.server import site_server_job_utils
43from autotest_lib.server import subcommand
44from autotest_lib.server import test
45from autotest_lib.server import utils as server_utils
46from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
47from autotest_lib.server.hosts import abstract_ssh
48from autotest_lib.server.hosts import afe_store
49from autotest_lib.server.hosts import factory as host_factory
50from autotest_lib.server.hosts import host_info
51from autotest_lib.server.hosts import ssh_multiplex
52from autotest_lib.tko import db as tko_db
53from autotest_lib.tko import models as tko_models
54from autotest_lib.tko import status_lib
55from autotest_lib.tko import parser_lib
56from autotest_lib.tko import utils as tko_utils
57
58
59INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
60        'autoserv', 'incremental_tko_parsing', type=bool, default=False)
61
62def _control_segment_path(name):
63    """Get the pathname of the named control segment file."""
64    server_dir = os.path.dirname(os.path.abspath(__file__))
65    return os.path.join(server_dir, "control_segments", name)
66
67
68CLIENT_CONTROL_FILENAME = 'control'
69SERVER_CONTROL_FILENAME = 'control.srv'
70MACHINES_FILENAME = '.machines'
71
72CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
73CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
74CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
75INSTALL_CONTROL_FILE = _control_segment_path('install')
76CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
77VERIFY_CONTROL_FILE = _control_segment_path('verify')
78REPAIR_CONTROL_FILE = _control_segment_path('repair')
79PROVISION_CONTROL_FILE = _control_segment_path('provision')
80VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
81RESET_CONTROL_FILE = _control_segment_path('reset')
82GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
83
84
85def get_machine_dicts(machine_names, in_lab, host_attributes=None,
86                      connection_pool=None):
87    """Converts a list of machine names to list of dicts.
88
89    @param machine_names: A list of machine names.
90    @param in_lab: A boolean indicating whether we're running in lab.
91    @param host_attributes: Optional list of host attributes to add for each
92            host.
93    @param connection_pool: ssh_multiplex.ConnectionPool instance to share
94            master connections across control scripts.
95    @returns: A list of dicts. Each dict has the following keys:
96            'hostname': Name of the machine originally in machine_names (str).
97            'afe_host': A frontend.Host object for the machine, or a stub if
98                    in_lab is false.
99            'host_info_store': A host_info.CachingHostInfoStore object to obtain
100                    host information. A stub if in_lab is False.
101            'connection_pool': ssh_multiplex.ConnectionPool instance to share
102                    master ssh connection across control scripts.
103    """
104    machine_dict_list = []
105    for machine in machine_names:
106        # See autoserv_parser.parse_args. Only one of in_lab or host_attributes
107        # can be provided.
108        if not in_lab:
109            afe_host = server_utils.EmptyAFEHost()
110            host_info_store = host_info.InMemoryHostInfoStore()
111            if host_attributes is not None:
112                afe_host.attributes.update(host_attributes)
113                info = host_info.HostInfo(attributes=host_attributes)
114                host_info_store.commit(info)
115        elif host_attributes:
116            raise error.AutoservError(
117                    'in_lab and host_attribute are mutually exclusive. '
118                    'Obtained in_lab:%s, host_attributes:%s'
119                    % (in_lab, host_attributes))
120        else:
121            afe_host = _create_afe_host(machine)
122            host_info_store = _create_host_info_store(machine)
123
124        machine_dict_list.append({
125                'hostname' : machine,
126                'afe_host' : afe_host,
127                'host_info_store': host_info_store,
128                'connection_pool': connection_pool,
129        })
130
131    return machine_dict_list
132
133
134class status_indenter(base_job.status_indenter):
135    """Provide a simple integer-backed status indenter."""
136    def __init__(self):
137        self._indent = 0
138
139
140    @property
141    def indent(self):
142        return self._indent
143
144
145    def increment(self):
146        self._indent += 1
147
148
149    def decrement(self):
150        self._indent -= 1
151
152
153    def get_context(self):
154        """Returns a context object for use by job.get_record_context."""
155        class context(object):
156            def __init__(self, indenter, indent):
157                self._indenter = indenter
158                self._indent = indent
159            def restore(self):
160                self._indenter._indent = self._indent
161        return context(self, self._indent)
162
163
164class server_job_record_hook(object):
165    """The job.record hook for server job. Used to inject WARN messages from
166    the console or vlm whenever new logs are written, and to echo any logs
167    to INFO level logging. Implemented as a class so that it can use state to
168    block recursive calls, so that the hook can call job.record itself to
169    log WARN messages.
170
171    Depends on job._read_warnings and job._logger.
172    """
173    def __init__(self, job):
174        self._job = job
175        self._being_called = False
176
177
178    def __call__(self, entry):
179        """A wrapper around the 'real' record hook, the _hook method, which
180        prevents recursion. This isn't making any effort to be threadsafe,
181        the intent is to outright block infinite recursion via a
182        job.record->_hook->job.record->_hook->job.record... chain."""
183        if self._being_called:
184            return
185        self._being_called = True
186        try:
187            self._hook(self._job, entry)
188        finally:
189            self._being_called = False
190
191
192    @staticmethod
193    def _hook(job, entry):
194        """The core hook, which can safely call job.record."""
195        entries = []
196        # poll all our warning loggers for new warnings
197        for timestamp, msg in job._read_warnings():
198            warning_entry = base_job.status_log_entry(
199                'WARN', None, None, msg, {}, timestamp=timestamp)
200            entries.append(warning_entry)
201            job.record_entry(warning_entry)
202        # echo rendered versions of all the status logs to info
203        entries.append(entry)
204        for entry in entries:
205            rendered_entry = job._logger.render_entry(entry)
206            logging.info(rendered_entry)
207            job._parse_status(rendered_entry)
208
209
210class server_job(base_job.base_job):
211    """The server-side concrete implementation of base_job.
212
213    Optional properties provided by this implementation:
214        serverdir
215
216        num_tests_run
217        num_tests_failed
218
219        warning_manager
220        warning_loggers
221    """
222
223    _STATUS_VERSION = 1
224
225    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
226    def __init__(self, control, args, resultdir, label, user, machines,
227                 client=False, parse_job='',
228                 ssh_user=host_factory.DEFAULT_SSH_USER,
229                 ssh_port=host_factory.DEFAULT_SSH_PORT,
230                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
231                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
232                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
233                 test_retry=0, group_name='',
234                 tag='', disable_sysinfo=False,
235                 control_filename=SERVER_CONTROL_FILENAME,
236                 parent_job_id=None, host_attributes=None, in_lab=False):
237        """
238        Create a server side job object.
239
240        @param control: The pathname of the control file.
241        @param args: Passed to the control file.
242        @param resultdir: Where to throw the results.
243        @param label: Description of the job.
244        @param user: Username for the job (email address).
245        @param client: True if this is a client-side control file.
246        @param parse_job: string, if supplied it is the job execution tag that
247                the results will be passed through to the TKO parser with.
248        @param ssh_user: The SSH username.  [root]
249        @param ssh_port: The SSH port number.  [22]
250        @param ssh_pass: The SSH passphrase, if needed.
251        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
252                '-vvv', or an empty string if not needed.
253        @param ssh_options: A string giving additional options that will be
254                            included in ssh commands.
255        @param test_retry: The number of times to retry a test if the test did
256                not complete successfully.
257        @param group_name: If supplied, this will be written out as
258                host_group_name in the keyvals file for the parser.
259        @param tag: The job execution tag from the scheduler.  [optional]
260        @param disable_sysinfo: Whether we should disable the sysinfo step of
261                tests for a modest shortening of test time.  [optional]
262        @param control_filename: The filename where the server control file
263                should be written in the results directory.
264        @param parent_job_id: Job ID of the parent job. Default to None if the
265                job does not have a parent job.
266        @param host_attributes: Dict of host attributes passed into autoserv
267                                via the command line. If specified here, these
268                                attributes will apply to all machines.
269        @param in_lab: Boolean that indicates if this is running in the lab
270                       environment.
271        """
272        super(server_job, self).__init__(resultdir=resultdir,
273                                         test_retry=test_retry)
274        self.test_retry = test_retry
275        self.control = control
276        self._uncollected_log_file = os.path.join(self.resultdir,
277                                                  'uncollected_logs')
278        debugdir = os.path.join(self.resultdir, 'debug')
279        if not os.path.exists(debugdir):
280            os.mkdir(debugdir)
281
282        if user:
283            self.user = user
284        else:
285            self.user = getpass.getuser()
286
287        self.args = args
288        self.label = label
289        self.machines = machines
290        self._client = client
291        self.warning_loggers = set()
292        self.warning_manager = warning_manager()
293        self._ssh_user = ssh_user
294        self._ssh_port = ssh_port
295        self._ssh_pass = ssh_pass
296        self._ssh_verbosity_flag = ssh_verbosity_flag
297        self._ssh_options = ssh_options
298        self.tag = tag
299        self.hosts = set()
300        self.drop_caches = False
301        self.drop_caches_between_iterations = False
302        self._control_filename = control_filename
303        self._disable_sysinfo = disable_sysinfo
304
305        self.logging = logging_manager.get_logging_manager(
306                manage_stdout_and_stderr=True, redirect_fds=True)
307        subcommand.logging_manager_object = self.logging
308
309        self.sysinfo = sysinfo.sysinfo(self.resultdir)
310        self.profilers = profilers.profilers(self)
311
312        job_data = {'label' : label, 'user' : user,
313                    'hostname' : ','.join(machines),
314                    'drone' : platform.node(),
315                    'status_version' : str(self._STATUS_VERSION),
316                    'job_started' : str(int(time.time()))}
317        # Save parent job id to keyvals, so parser can retrieve the info and
318        # write to tko_jobs record.
319        if parent_job_id:
320            job_data['parent_job_id'] = parent_job_id
321        if group_name:
322            job_data['host_group_name'] = group_name
323
324        # only write these keyvals out on the first job in a resultdir
325        if 'job_started' not in utils.read_keyval(self.resultdir):
326            job_data.update(self._get_job_data())
327            utils.write_keyval(self.resultdir, job_data)
328
329        self._parse_job = parse_job
330        self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
331                              and len(machines) <= 1)
332        self.pkgmgr = packages.PackageManager(
333            self.autodir, run_function_dargs={'timeout':600})
334        self.num_tests_run = 0
335        self.num_tests_failed = 0
336
337        self._register_subcommand_hooks()
338
339        # set up the status logger
340        self._indenter = status_indenter()
341        self._logger = base_job.status_logger(
342            self, self._indenter, 'status.log', 'status.log',
343            record_hook=server_job_record_hook(self))
344
345        # Initialize a flag to indicate DUT failure during the test, e.g.,
346        # unexpected reboot.
347        self.failed_with_device_error = False
348
349        self._connection_pool = ssh_multiplex.ConnectionPool()
350
351        self.parent_job_id = parent_job_id
352        self.in_lab = in_lab
353        self.machine_dict_list = get_machine_dicts(
354                self.machines, self.in_lab, host_attributes,
355                self._connection_pool)
356
357        # TODO(jrbarnette) The harness attribute is only relevant to
358        # client jobs, but it's required to be present, or we will fail
359        # server job unit tests.  Yes, really.
360        #
361        # TODO(jrbarnette) The utility of the 'harness' attribute even
362        # to client jobs is suspect.  Probably, we should remove it.
363        self.harness = None
364
365        if control:
366            self.max_result_size_KB = control_data.parse_control(
367                    control, raise_warnings=False).max_result_size_KB
368        else:
369            # Set the maximum result size to be the default specified in
370            # global config, if the job has no control file associated.
371            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
372
373
374    @classmethod
375    def _find_base_directories(cls):
376        """
377        Determine locations of autodir, clientdir and serverdir. Assumes
378        that this file is located within serverdir and uses __file__ along
379        with relative paths to resolve the location.
380        """
381        serverdir = os.path.abspath(os.path.dirname(__file__))
382        autodir = os.path.normpath(os.path.join(serverdir, '..'))
383        clientdir = os.path.join(autodir, 'client')
384        return autodir, clientdir, serverdir
385
386
387    def _find_resultdir(self, resultdir, *args, **dargs):
388        """
389        Determine the location of resultdir. For server jobs we expect one to
390        always be explicitly passed in to __init__, so just return that.
391        """
392        if resultdir:
393            return os.path.normpath(resultdir)
394        else:
395            return None
396
397
398    def _get_status_logger(self):
399        """Return a reference to the status logger."""
400        return self._logger
401
402
403    @staticmethod
404    def _load_control_file(path):
405        f = open(path)
406        try:
407            control_file = f.read()
408        finally:
409            f.close()
410        return re.sub('\r', '', control_file)
411
412
413    def _register_subcommand_hooks(self):
414        """
415        Register some hooks into the subcommand modules that allow us
416        to properly clean up self.hosts created in forked subprocesses.
417        """
418        def on_fork(cmd):
419            self._existing_hosts_on_fork = set(self.hosts)
420        def on_join(cmd):
421            new_hosts = self.hosts - self._existing_hosts_on_fork
422            for host in new_hosts:
423                host.close()
424        subcommand.subcommand.register_fork_hook(on_fork)
425        subcommand.subcommand.register_join_hook(on_join)
426
427
428    def init_parser(self):
429        """
430        Start the continuous parsing of self.resultdir. This sets up
431        the database connection and inserts the basic job object into
432        the database if necessary.
433        """
434        if not self._using_parser:
435            return
436        # redirect parser debugging to .parse.log
437        parse_log = os.path.join(self.resultdir, '.parse.log')
438        parse_log = open(parse_log, 'w', 0)
439        tko_utils.redirect_parser_debugging(parse_log)
440        # create a job model object and set up the db
441        self.results_db = tko_db.db(autocommit=True)
442        self.parser = parser_lib.parser(self._STATUS_VERSION)
443        self.job_model = self.parser.make_job(self.resultdir)
444        self.parser.start(self.job_model)
445        # check if a job already exists in the db and insert it if
446        # it does not
447        job_idx = self.results_db.find_job(self._parse_job)
448        if job_idx is None:
449            self.results_db.insert_job(self._parse_job, self.job_model,
450                                       self.parent_job_id)
451        else:
452            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
453            self.job_model.index = job_idx
454            self.job_model.machine_idx = machine_idx
455
456
457    def cleanup_parser(self):
458        """
459        This should be called after the server job is finished
460        to carry out any remaining cleanup (e.g. flushing any
461        remaining test results to the results db)
462        """
463        if not self._using_parser:
464            return
465        final_tests = self.parser.end()
466        for test in final_tests:
467            self.__insert_test(test)
468        self._using_parser = False
469
470    # TODO crbug.com/285395 add a kwargs parameter.
471    def _make_namespace(self):
472        """Create a namespace dictionary to be passed along to control file.
473
474        Creates a namespace argument populated with standard values:
475        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
476        and ssh_options.
477        """
478        namespace = {'machines' : self.machine_dict_list,
479                     'job' : self,
480                     'ssh_user' : self._ssh_user,
481                     'ssh_port' : self._ssh_port,
482                     'ssh_pass' : self._ssh_pass,
483                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
484                     'ssh_options' : self._ssh_options}
485        return namespace
486
487
488    def cleanup(self, labels):
489        """Cleanup machines.
490
491        @param labels: Comma separated job labels, will be used to
492                       determine special task actions.
493        """
494        if not self.machines:
495            raise error.AutoservError('No machines specified to cleanup')
496        if self.resultdir:
497            os.chdir(self.resultdir)
498
499        namespace = self._make_namespace()
500        namespace.update({'job_labels': labels, 'args': ''})
501        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
502
503
504    def verify(self, labels):
505        """Verify machines are all ssh-able.
506
507        @param labels: Comma separated job labels, will be used to
508                       determine special task actions.
509        """
510        if not self.machines:
511            raise error.AutoservError('No machines specified to verify')
512        if self.resultdir:
513            os.chdir(self.resultdir)
514
515        namespace = self._make_namespace()
516        namespace.update({'job_labels': labels, 'args': ''})
517        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
518
519
520    def reset(self, labels):
521        """Reset machines by first cleanup then verify each machine.
522
523        @param labels: Comma separated job labels, will be used to
524                       determine special task actions.
525        """
526        if not self.machines:
527            raise error.AutoservError('No machines specified to reset.')
528        if self.resultdir:
529            os.chdir(self.resultdir)
530
531        namespace = self._make_namespace()
532        namespace.update({'job_labels': labels, 'args': ''})
533        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
534
535
536    def repair(self, labels):
537        """Repair machines.
538
539        @param labels: Comma separated job labels, will be used to
540                       determine special task actions.
541        """
542        if not self.machines:
543            raise error.AutoservError('No machines specified to repair')
544        if self.resultdir:
545            os.chdir(self.resultdir)
546
547        namespace = self._make_namespace()
548        namespace.update({'job_labels': labels, 'args': ''})
549        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
550
551
552    def provision(self, labels):
553        """
554        Provision all hosts to match |labels|.
555
556        @param labels: A comma seperated string of labels to provision the
557                       host to.
558
559        """
560        control = self._load_control_file(PROVISION_CONTROL_FILE)
561        self.run(control=control, job_labels=labels)
562
563
564    def precheck(self):
565        """
566        perform any additional checks in derived classes.
567        """
568        pass
569
570
571    def enable_external_logging(self):
572        """
573        Start or restart external logging mechanism.
574        """
575        pass
576
577
578    def disable_external_logging(self):
579        """
580        Pause or stop external logging mechanism.
581        """
582        pass
583
584
585    def use_external_logging(self):
586        """
587        Return True if external logging should be used.
588        """
589        return False
590
591
592    def _make_parallel_wrapper(self, function, machines, log):
593        """Wrap function as appropriate for calling by parallel_simple."""
594        # machines could be a list of dictionaries, e.g.,
595        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
596        # The dictionary is generated in server_job.__init__, refer to
597        # variable machine_dict_list, then passed in with namespace, see method
598        # server_job._make_namespace.
599        # To compare the machinese to self.machines, which is a list of machine
600        # hostname, we need to convert machines back to a list of hostnames.
601        # Note that the order of hostnames doesn't matter, as is_forking will be
602        # True if there are more than one machine.
603        if (machines and isinstance(machines, list)
604            and isinstance(machines[0], dict)):
605            machines = [m['hostname'] for m in machines]
606        is_forking = not (len(machines) == 1 and self.machines == machines)
607        if self._parse_job and is_forking and log:
608            def wrapper(machine):
609                hostname = server_utils.get_hostname_from_machine(machine)
610                self._parse_job += "/" + hostname
611                self._using_parser = INCREMENTAL_TKO_PARSING
612                self.machines = [machine]
613                self.push_execution_context(hostname)
614                os.chdir(self.resultdir)
615                utils.write_keyval(self.resultdir, {"hostname": hostname})
616                self.init_parser()
617                result = function(machine)
618                self.cleanup_parser()
619                return result
620        elif len(machines) > 1 and log:
621            def wrapper(machine):
622                hostname = server_utils.get_hostname_from_machine(machine)
623                self.push_execution_context(hostname)
624                os.chdir(self.resultdir)
625                machine_data = {'hostname' : hostname,
626                                'status_version' : str(self._STATUS_VERSION)}
627                utils.write_keyval(self.resultdir, machine_data)
628                result = function(machine)
629                return result
630        else:
631            wrapper = function
632        return wrapper
633
634
635    def parallel_simple(self, function, machines, log=True, timeout=None,
636                        return_results=False):
637        """
638        Run 'function' using parallel_simple, with an extra wrapper to handle
639        the necessary setup for continuous parsing, if possible. If continuous
640        parsing is already properly initialized then this should just work.
641
642        @param function: A callable to run in parallel given each machine.
643        @param machines: A list of machine names to be passed one per subcommand
644                invocation of function.
645        @param log: If True, output will be written to output in a subdirectory
646                named after each machine.
647        @param timeout: Seconds after which the function call should timeout.
648        @param return_results: If True instead of an AutoServError being raised
649                on any error a list of the results|exceptions from the function
650                called on each arg is returned.  [default: False]
651
652        @raises error.AutotestError: If any of the functions failed.
653        """
654        wrapper = self._make_parallel_wrapper(function, machines, log)
655        return subcommand.parallel_simple(wrapper, machines,
656                                          log=log, timeout=timeout,
657                                          return_results=return_results)
658
659
660    def parallel_on_machines(self, function, machines, timeout=None):
661        """
662        @param function: Called in parallel with one machine as its argument.
663        @param machines: A list of machines to call function(machine) on.
664        @param timeout: Seconds after which the function call should timeout.
665
666        @returns A list of machines on which function(machine) returned
667                without raising an exception.
668        """
669        results = self.parallel_simple(function, machines, timeout=timeout,
670                                       return_results=True)
671        success_machines = []
672        for result, machine in itertools.izip(results, machines):
673            if not isinstance(result, Exception):
674                success_machines.append(machine)
675        return success_machines
676
677
678    def distribute_across_machines(self, tests, machines,
679                                   continuous_parsing=False):
680        """Run each test in tests once using machines.
681
682        Instead of running each test on each machine like parallel_on_machines,
683        run each test once across all machines. Put another way, the total
684        number of tests run by parallel_on_machines is len(tests) *
685        len(machines). The number of tests run by distribute_across_machines is
686        len(tests).
687
688        Args:
689            tests: List of tests to run.
690            machines: List of machines to use.
691            continuous_parsing: Bool, if true parse job while running.
692        """
693        # The Queue is thread safe, but since a machine may have to search
694        # through the queue to find a valid test the lock provides exclusive
695        # queue access for more than just the get call.
696        test_queue = multiprocessing.JoinableQueue()
697        test_queue_lock = multiprocessing.Lock()
698
699        unique_machine_attributes = []
700        sub_commands = []
701        work_dir = self.resultdir
702
703        for machine in machines:
704            if 'group' in self.resultdir:
705                work_dir = os.path.join(self.resultdir, machine)
706
707            mw = site_server_job_utils.machine_worker(self,
708                                                      machine,
709                                                      work_dir,
710                                                      test_queue,
711                                                      test_queue_lock,
712                                                      continuous_parsing)
713
714            # Create the subcommand instance to run this machine worker.
715            sub_commands.append(subcommand.subcommand(mw.run,
716                                                      [],
717                                                      work_dir))
718
719            # To (potentially) speed up searching for valid tests create a list
720            # of unique attribute sets present in the machines for this job. If
721            # sets were hashable we could just use a dictionary for fast
722            # verification. This at least reduces the search space from the
723            # number of machines to the number of unique machines.
724            if not mw.attribute_set in unique_machine_attributes:
725                unique_machine_attributes.append(mw.attribute_set)
726
727        # Only queue tests which are valid on at least one machine.  Record
728        # skipped tests in the status.log file using record_skipped_test().
729        for test_entry in tests:
730            # Check if it's an old style test entry.
731            if len(test_entry) > 2 and not isinstance(test_entry[2], dict):
732                test_attribs = {'include': test_entry[2]}
733                if len(test_entry) > 3:
734                    test_attribs['exclude'] = test_entry[3]
735                if len(test_entry) > 4:
736                    test_attribs['attributes'] = test_entry[4]
737
738                test_entry = list(test_entry[:2])
739                test_entry.append(test_attribs)
740
741            ti = site_server_job_utils.test_item(*test_entry)
742            machine_found = False
743            for ma in unique_machine_attributes:
744                if ti.validate(ma):
745                    test_queue.put(ti)
746                    machine_found = True
747                    break
748            if not machine_found:
749                self.record_skipped_test(ti)
750
751        # Run valid tests and wait for completion.
752        subcommand.parallel(sub_commands)
753
754
755    def record_skipped_test(self, skipped_test, message=None):
756        """Insert a failure record into status.log for this test."""
757        msg = message
758        if msg is None:
759            msg = 'No valid machines found for test %s.' % skipped_test
760        logging.info(msg)
761        self.record('START', None, skipped_test.test_name)
762        self.record('INFO', None, skipped_test.test_name, msg)
763        self.record('END TEST_NA', None, skipped_test.test_name, msg)
764
765
766    def _has_failed_tests(self):
767        """Parse status log for failed tests.
768
769        This checks the current working directory and is intended only for use
770        by the run() method.
771
772        @return boolean
773        """
774        path = os.getcwd()
775
776        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
777        # make code reuse plausible.
778        job_keyval = tko_models.job.read_keyval(path)
779        status_version = job_keyval.get("status_version", 0)
780
781        # parse out the job
782        parser = parser_lib.parser(status_version)
783        job = parser.make_job(path)
784        status_log = os.path.join(path, "status.log")
785        if not os.path.exists(status_log):
786            status_log = os.path.join(path, "status")
787        if not os.path.exists(status_log):
788            logging.warning("! Unable to parse job, no status file")
789            return True
790
791        # parse the status logs
792        status_lines = open(status_log).readlines()
793        parser.start(job)
794        tests = parser.end(status_lines)
795
796        # parser.end can return the same object multiple times, so filter out
797        # dups
798        job.tests = []
799        already_added = set()
800        for test in tests:
801            if test not in already_added:
802                already_added.add(test)
803                job.tests.append(test)
804
805        failed = False
806        for test in job.tests:
807            # The current job is still running and shouldn't count as failed.
808            # The parser will fail to parse the exit status of the job since it
809            # hasn't exited yet (this running right now is the job).
810            failed = failed or (test.status != 'GOOD'
811                                and not _is_current_server_job(test))
812        return failed
813
814
815    def _collect_crashes(self, namespace, collect_crashinfo):
816        """Collect crashes.
817
818        @param namespace: namespace dict.
819        @param collect_crashinfo: whether to collect crashinfo in addition to
820                dumps
821        """
822        if collect_crashinfo:
823            # includes crashdumps
824            crash_control_file = CRASHINFO_CONTROL_FILE
825        else:
826            crash_control_file = CRASHDUMPS_CONTROL_FILE
827        self._execute_code(crash_control_file, namespace)
828
829
830    _USE_TEMP_DIR = object()
831    def run(self, install_before=False, install_after=False,
832            collect_crashdumps=True, namespace={}, control=None,
833            control_file_dir=None, verify_job_repo_url=False,
834            only_collect_crashinfo=False, skip_crash_collection=False,
835            job_labels='', use_packaging=True):
836        # for a normal job, make sure the uncollected logs file exists
837        # for a crashinfo-only run it should already exist, bail out otherwise
838        created_uncollected_logs = False
839        logging.info("I am PID %s", os.getpid())
840        if self.resultdir and not os.path.exists(self._uncollected_log_file):
841            if only_collect_crashinfo:
842                # if this is a crashinfo-only run, and there were no existing
843                # uncollected logs, just bail out early
844                logging.info("No existing uncollected logs, "
845                             "skipping crashinfo collection")
846                return
847            else:
848                log_file = open(self._uncollected_log_file, "w")
849                pickle.dump([], log_file)
850                log_file.close()
851                created_uncollected_logs = True
852
853        # use a copy so changes don't affect the original dictionary
854        namespace = namespace.copy()
855        machines = self.machines
856        if control is None:
857            if self.control is None:
858                control = ''
859            else:
860                control = self._load_control_file(self.control)
861        if control_file_dir is None:
862            control_file_dir = self.resultdir
863
864        self.aborted = False
865        namespace.update(self._make_namespace())
866        namespace.update({
867                'args': self.args,
868                'job_labels': job_labels,
869                'gtest_runner': site_gtest_runner.gtest_runner(),
870        })
871        test_start_time = int(time.time())
872
873        if self.resultdir:
874            os.chdir(self.resultdir)
875            # touch status.log so that the parser knows a job is running here
876            open(self.get_status_log_path(), 'a').close()
877            self.enable_external_logging()
878
879        collect_crashinfo = True
880        temp_control_file_dir = None
881        try:
882            try:
883                namespace['network_stats_label'] = 'at-start'
884                self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
885                if install_before and machines:
886                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
887
888                if only_collect_crashinfo:
889                    return
890
891                # If the verify_job_repo_url option is set but we're unable
892                # to actually verify that the job_repo_url contains the autotest
893                # package, this job will fail.
894                if verify_job_repo_url:
895                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
896                                       namespace)
897                else:
898                    logging.warning('Not checking if job_repo_url contains '
899                                    'autotest packages on %s', machines)
900
901                # determine the dir to write the control files to
902                cfd_specified = (control_file_dir
903                                 and control_file_dir is not self._USE_TEMP_DIR)
904                if cfd_specified:
905                    temp_control_file_dir = None
906                else:
907                    temp_control_file_dir = tempfile.mkdtemp(
908                        suffix='temp_control_file_dir')
909                    control_file_dir = temp_control_file_dir
910                server_control_file = os.path.join(control_file_dir,
911                                                   self._control_filename)
912                client_control_file = os.path.join(control_file_dir,
913                                                   CLIENT_CONTROL_FILENAME)
914                if self._client:
915                    namespace['control'] = control
916                    utils.open_write_close(client_control_file, control)
917                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
918                                    server_control_file)
919                else:
920                    utils.open_write_close(server_control_file, control)
921
922                logging.info("Processing control file")
923                namespace['use_packaging'] = use_packaging
924                self._execute_code(server_control_file, namespace)
925                logging.info("Finished processing control file")
926
927                # If no device error occured, no need to collect crashinfo.
928                collect_crashinfo = self.failed_with_device_error
929            except Exception as e:
930                try:
931                    logging.exception(
932                            'Exception escaped control file, job aborting:')
933                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
934                                    ' ', str(e))
935                    self.record('INFO', None, None, str(e),
936                                {'job_abort_reason': reason})
937                except:
938                    pass # don't let logging exceptions here interfere
939                raise
940        finally:
941            if temp_control_file_dir:
942                # Clean up temp directory used for copies of the control files
943                try:
944                    shutil.rmtree(temp_control_file_dir)
945                except Exception as e:
946                    logging.warning('Could not remove temp directory %s: %s',
947                                 temp_control_file_dir, e)
948
949            if machines and (collect_crashdumps or collect_crashinfo):
950                if skip_crash_collection:
951                    logging.info('Skipping crash dump/info collection '
952                                 'as requested.')
953                else:
954                    namespace['test_start_time'] = test_start_time
955                    # Remove crash files for passing tests.
956                    # TODO(ayatane): Tests that create crash files should be
957                    # reported.
958                    namespace['has_failed_tests'] = self._has_failed_tests()
959                    self._collect_crashes(namespace, collect_crashinfo)
960            self.disable_external_logging()
961            if self._uncollected_log_file and created_uncollected_logs:
962                os.remove(self._uncollected_log_file)
963            if install_after and machines:
964                self._execute_code(INSTALL_CONTROL_FILE, namespace)
965            namespace['network_stats_label'] = 'at-end'
966            self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
967
968
969    def run_test(self, url, *args, **dargs):
970        """
971        Summon a test object and run it.
972
973        tag
974                tag to add to testname
975        url
976                url of the test to run
977        """
978        if self._disable_sysinfo:
979            dargs['disable_sysinfo'] = True
980
981        group, testname = self.pkgmgr.get_package_name(url, 'test')
982        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
983        outputdir = self._make_test_outputdir(subdir)
984
985        def group_func():
986            try:
987                test.runtest(self, url, tag, args, dargs)
988            except error.TestBaseException as e:
989                self.record(e.exit_status, subdir, testname, str(e))
990                raise
991            except Exception as e:
992                info = str(e) + "\n" + traceback.format_exc()
993                self.record('FAIL', subdir, testname, info)
994                raise
995            else:
996                self.record('GOOD', subdir, testname, 'completed successfully')
997
998        try:
999            result = self._run_group(testname, subdir, group_func)
1000        except error.TestBaseException as e:
1001            return False
1002        else:
1003            return True
1004
1005
1006    def _run_group(self, name, subdir, function, *args, **dargs):
1007        """Underlying method for running something inside of a group."""
1008        result, exc_info = None, None
1009        try:
1010            self.record('START', subdir, name)
1011            result = function(*args, **dargs)
1012        except error.TestBaseException as e:
1013            self.record("END %s" % e.exit_status, subdir, name)
1014            raise
1015        except Exception as e:
1016            err_msg = str(e) + '\n'
1017            err_msg += traceback.format_exc()
1018            self.record('END ABORT', subdir, name, err_msg)
1019            raise error.JobError(name + ' failed\n' + traceback.format_exc())
1020        else:
1021            self.record('END GOOD', subdir, name)
1022
1023        return result
1024
1025
1026    def run_group(self, function, *args, **dargs):
1027        """\
1028        @param function: subroutine to run
1029        @returns: (result, exc_info). When the call succeeds, result contains
1030                the return value of |function| and exc_info is None. If
1031                |function| raises an exception, exc_info contains the tuple
1032                returned by sys.exc_info(), and result is None.
1033        """
1034
1035        name = function.__name__
1036        # Allow the tag for the group to be specified.
1037        tag = dargs.pop('tag', None)
1038        if tag:
1039            name = tag
1040
1041        try:
1042            result = self._run_group(name, None, function, *args, **dargs)[0]
1043        except error.TestBaseException:
1044            return None, sys.exc_info()
1045        return result, None
1046
1047
1048    def run_op(self, op, op_func, get_kernel_func):
1049        """\
1050        A specialization of run_group meant specifically for handling
1051        management operation. Includes support for capturing the kernel version
1052        after the operation.
1053
1054        Args:
1055           op: name of the operation.
1056           op_func: a function that carries out the operation (reboot, suspend)
1057           get_kernel_func: a function that returns a string
1058                            representing the kernel version.
1059        """
1060        try:
1061            self.record('START', None, op)
1062            op_func()
1063        except Exception as e:
1064            err_msg = str(e) + '\n' + traceback.format_exc()
1065            self.record('END FAIL', None, op, err_msg)
1066            raise
1067        else:
1068            kernel = get_kernel_func()
1069            self.record('END GOOD', None, op,
1070                        optional_fields={"kernel": kernel})
1071
1072
1073    def run_control(self, path):
1074        """Execute a control file found at path (relative to the autotest
1075        path). Intended for executing a control file within a control file,
1076        not for running the top-level job control file."""
1077        path = os.path.join(self.autodir, path)
1078        control_file = self._load_control_file(path)
1079        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
1080
1081
1082    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1083        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1084                                   on_every_test)
1085
1086
1087    def add_sysinfo_logfile(self, file, on_every_test=False):
1088        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1089
1090
1091    def _add_sysinfo_loggable(self, loggable, on_every_test):
1092        if on_every_test:
1093            self.sysinfo.test_loggables.add(loggable)
1094        else:
1095            self.sysinfo.boot_loggables.add(loggable)
1096
1097
1098    def _read_warnings(self):
1099        """Poll all the warning loggers and extract any new warnings that have
1100        been logged. If the warnings belong to a category that is currently
1101        disabled, this method will discard them and they will no longer be
1102        retrievable.
1103
1104        Returns a list of (timestamp, message) tuples, where timestamp is an
1105        integer epoch timestamp."""
1106        warnings = []
1107        while True:
1108            # pull in a line of output from every logger that has
1109            # output ready to be read
1110            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1111            closed_loggers = set()
1112            for logger in loggers:
1113                line = logger.readline()
1114                # record any broken pipes (aka line == empty)
1115                if len(line) == 0:
1116                    closed_loggers.add(logger)
1117                    continue
1118                # parse out the warning
1119                timestamp, msgtype, msg = line.split('\t', 2)
1120                timestamp = int(timestamp)
1121                # if the warning is valid, add it to the results
1122                if self.warning_manager.is_valid(timestamp, msgtype):
1123                    warnings.append((timestamp, msg.strip()))
1124
1125            # stop listening to loggers that are closed
1126            self.warning_loggers -= closed_loggers
1127
1128            # stop if none of the loggers have any output left
1129            if not loggers:
1130                break
1131
1132        # sort into timestamp order
1133        warnings.sort()
1134        return warnings
1135
1136
1137    def _unique_subdirectory(self, base_subdirectory_name):
1138        """Compute a unique results subdirectory based on the given name.
1139
1140        Appends base_subdirectory_name with a number as necessary to find a
1141        directory name that doesn't already exist.
1142        """
1143        subdirectory = base_subdirectory_name
1144        counter = 1
1145        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1146            subdirectory = base_subdirectory_name + '.' + str(counter)
1147            counter += 1
1148        return subdirectory
1149
1150
1151    def get_record_context(self):
1152        """Returns an object representing the current job.record context.
1153
1154        The object returned is an opaque object with a 0-arg restore method
1155        which can be called to restore the job.record context (i.e. indentation)
1156        to the current level. The intention is that it should be used when
1157        something external which generate job.record calls (e.g. an autotest
1158        client) can fail catastrophically and the server job record state
1159        needs to be reset to its original "known good" state.
1160
1161        @return: A context object with a 0-arg restore() method."""
1162        return self._indenter.get_context()
1163
1164
1165    def record_summary(self, status_code, test_name, reason='', attributes=None,
1166                       distinguishing_attributes=(), child_test_ids=None):
1167        """Record a summary test result.
1168
1169        @param status_code: status code string, see
1170                common_lib.log.is_valid_status()
1171        @param test_name: name of the test
1172        @param reason: (optional) string providing detailed reason for test
1173                outcome
1174        @param attributes: (optional) dict of string keyvals to associate with
1175                this result
1176        @param distinguishing_attributes: (optional) list of attribute names
1177                that should be used to distinguish identically-named test
1178                results.  These attributes should be present in the attributes
1179                parameter.  This is used to generate user-friendly subdirectory
1180                names.
1181        @param child_test_ids: (optional) list of test indices for test results
1182                used in generating this result.
1183        """
1184        subdirectory_name_parts = [test_name]
1185        for attribute in distinguishing_attributes:
1186            assert attributes
1187            assert attribute in attributes, '%s not in %s' % (attribute,
1188                                                              attributes)
1189            subdirectory_name_parts.append(attributes[attribute])
1190        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1191
1192        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1193        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1194        os.mkdir(subdirectory_path)
1195
1196        self.record(status_code, subdirectory, test_name,
1197                    status=reason, optional_fields={'is_summary': True})
1198
1199        if attributes:
1200            utils.write_keyval(subdirectory_path, attributes)
1201
1202        if child_test_ids:
1203            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1204            summary_data = {'child_test_ids': ids_string}
1205            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1206                               summary_data)
1207
1208
1209    def disable_warnings(self, warning_type):
1210        self.warning_manager.disable_warnings(warning_type)
1211        self.record("INFO", None, None,
1212                    "disabling %s warnings" % warning_type,
1213                    {"warnings.disable": warning_type})
1214
1215
1216    def enable_warnings(self, warning_type):
1217        self.warning_manager.enable_warnings(warning_type)
1218        self.record("INFO", None, None,
1219                    "enabling %s warnings" % warning_type,
1220                    {"warnings.enable": warning_type})
1221
1222
1223    def get_status_log_path(self, subdir=None):
1224        """Return the path to the job status log.
1225
1226        @param subdir - Optional paramter indicating that you want the path
1227            to a subdirectory status log.
1228
1229        @returns The path where the status log should be.
1230        """
1231        if self.resultdir:
1232            if subdir:
1233                return os.path.join(self.resultdir, subdir, "status.log")
1234            else:
1235                return os.path.join(self.resultdir, "status.log")
1236        else:
1237            return None
1238
1239
1240    def _update_uncollected_logs_list(self, update_func):
1241        """Updates the uncollected logs list in a multi-process safe manner.
1242
1243        @param update_func - a function that updates the list of uncollected
1244            logs. Should take one parameter, the list to be updated.
1245        """
1246        # Skip log collection if file _uncollected_log_file does not exist.
1247        if not (self._uncollected_log_file and
1248                os.path.exists(self._uncollected_log_file)):
1249            return
1250        if self._uncollected_log_file:
1251            log_file = open(self._uncollected_log_file, "r+")
1252            fcntl.flock(log_file, fcntl.LOCK_EX)
1253        try:
1254            uncollected_logs = pickle.load(log_file)
1255            update_func(uncollected_logs)
1256            log_file.seek(0)
1257            log_file.truncate()
1258            pickle.dump(uncollected_logs, log_file)
1259            log_file.flush()
1260        finally:
1261            fcntl.flock(log_file, fcntl.LOCK_UN)
1262            log_file.close()
1263
1264
1265    def add_client_log(self, hostname, remote_path, local_path):
1266        """Adds a new set of client logs to the list of uncollected logs,
1267        to allow for future log recovery.
1268
1269        @param host - the hostname of the machine holding the logs
1270        @param remote_path - the directory on the remote machine holding logs
1271        @param local_path - the local directory to copy the logs into
1272        """
1273        def update_func(logs_list):
1274            logs_list.append((hostname, remote_path, local_path))
1275        self._update_uncollected_logs_list(update_func)
1276
1277
1278    def remove_client_log(self, hostname, remote_path, local_path):
1279        """Removes a set of client logs from the list of uncollected logs,
1280        to allow for future log recovery.
1281
1282        @param host - the hostname of the machine holding the logs
1283        @param remote_path - the directory on the remote machine holding logs
1284        @param local_path - the local directory to copy the logs into
1285        """
1286        def update_func(logs_list):
1287            logs_list.remove((hostname, remote_path, local_path))
1288        self._update_uncollected_logs_list(update_func)
1289
1290
1291    def get_client_logs(self):
1292        """Retrieves the list of uncollected logs, if it exists.
1293
1294        @returns A list of (host, remote_path, local_path) tuples. Returns
1295                 an empty list if no uncollected logs file exists.
1296        """
1297        log_exists = (self._uncollected_log_file and
1298                      os.path.exists(self._uncollected_log_file))
1299        if log_exists:
1300            return pickle.load(open(self._uncollected_log_file))
1301        else:
1302            return []
1303
1304
1305    def _fill_server_control_namespace(self, namespace, protect=True):
1306        """
1307        Prepare a namespace to be used when executing server control files.
1308
1309        This sets up the control file API by importing modules and making them
1310        available under the appropriate names within namespace.
1311
1312        For use by _execute_code().
1313
1314        Args:
1315          namespace: The namespace dictionary to fill in.
1316          protect: Boolean.  If True (the default) any operation that would
1317              clobber an existing entry in namespace will cause an error.
1318        Raises:
1319          error.AutoservError: When a name would be clobbered by import.
1320        """
1321        def _import_names(module_name, names=()):
1322            """
1323            Import a module and assign named attributes into namespace.
1324
1325            Args:
1326                module_name: The string module name.
1327                names: A limiting list of names to import from module_name.  If
1328                    empty (the default), all names are imported from the module
1329                    similar to a "from foo.bar import *" statement.
1330            Raises:
1331                error.AutoservError: When a name being imported would clobber
1332                    a name already in namespace.
1333            """
1334            module = __import__(module_name, {}, {}, names)
1335
1336            # No names supplied?  Import * from the lowest level module.
1337            # (Ugh, why do I have to implement this part myself?)
1338            if not names:
1339                for submodule_name in module_name.split('.')[1:]:
1340                    module = getattr(module, submodule_name)
1341                if hasattr(module, '__all__'):
1342                    names = getattr(module, '__all__')
1343                else:
1344                    names = dir(module)
1345
1346            # Install each name into namespace, checking to make sure it
1347            # doesn't override anything that already exists.
1348            for name in names:
1349                # Check for conflicts to help prevent future problems.
1350                if name in namespace and protect:
1351                    if namespace[name] is not getattr(module, name):
1352                        raise error.AutoservError('importing name '
1353                                '%s from %s %r would override %r' %
1354                                (name, module_name, getattr(module, name),
1355                                 namespace[name]))
1356                    else:
1357                        # Encourage cleanliness and the use of __all__ for a
1358                        # more concrete API with less surprises on '*' imports.
1359                        warnings.warn('%s (%r) being imported from %s for use '
1360                                      'in server control files is not the '
1361                                      'first occurrence of that import.' %
1362                                      (name, namespace[name], module_name))
1363
1364                namespace[name] = getattr(module, name)
1365
1366
1367        # This is the equivalent of prepending a bunch of import statements to
1368        # the front of the control script.
1369        namespace.update(os=os, sys=sys, logging=logging)
1370        _import_names('autotest_lib.server',
1371                ('hosts', 'autotest', 'standalone_profiler'))
1372        _import_names('autotest_lib.server.subcommand',
1373                      ('parallel', 'parallel_simple', 'subcommand'))
1374        _import_names('autotest_lib.server.utils',
1375                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1376        _import_names('autotest_lib.client.common_lib.error')
1377        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1378
1379        # Inject ourself as the job object into other classes within the API.
1380        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1381        #
1382        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
1383        namespace['autotest'].Autotest.job = self
1384        # server.hosts.base_classes.Host uses .job.
1385        namespace['hosts'].Host.job = self
1386        namespace['hosts'].TestBed.job = self
1387        namespace['hosts'].factory.ssh_user = self._ssh_user
1388        namespace['hosts'].factory.ssh_port = self._ssh_port
1389        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1390        namespace['hosts'].factory.ssh_verbosity_flag = (
1391                self._ssh_verbosity_flag)
1392        namespace['hosts'].factory.ssh_options = self._ssh_options
1393
1394
1395    def _execute_code(self, code_file, namespace, protect=True):
1396        """
1397        Execute code using a copy of namespace as a server control script.
1398
1399        Unless protect_namespace is explicitly set to False, the dict will not
1400        be modified.
1401
1402        Args:
1403          code_file: The filename of the control file to execute.
1404          namespace: A dict containing names to make available during execution.
1405          protect: Boolean.  If True (the default) a copy of the namespace dict
1406              is used during execution to prevent the code from modifying its
1407              contents outside of this function.  If False the raw dict is
1408              passed in and modifications will be allowed.
1409        """
1410        if protect:
1411            namespace = namespace.copy()
1412        self._fill_server_control_namespace(namespace, protect=protect)
1413        # TODO: Simplify and get rid of the special cases for only 1 machine.
1414        if len(self.machines) > 1:
1415            machines_text = '\n'.join(self.machines) + '\n'
1416            # Only rewrite the file if it does not match our machine list.
1417            try:
1418                machines_f = open(MACHINES_FILENAME, 'r')
1419                existing_machines_text = machines_f.read()
1420                machines_f.close()
1421            except EnvironmentError:
1422                existing_machines_text = None
1423            if machines_text != existing_machines_text:
1424                utils.open_write_close(MACHINES_FILENAME, machines_text)
1425        execfile(code_file, namespace, namespace)
1426
1427
1428    def _parse_status(self, new_line):
1429        if not self._using_parser:
1430            return
1431        new_tests = self.parser.process_lines([new_line])
1432        for test in new_tests:
1433            self.__insert_test(test)
1434
1435
1436    def __insert_test(self, test):
1437        """
1438        An internal method to insert a new test result into the
1439        database. This method will not raise an exception, even if an
1440        error occurs during the insert, to avoid failing a test
1441        simply because of unexpected database issues."""
1442        self.num_tests_run += 1
1443        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1444            self.num_tests_failed += 1
1445        try:
1446            self.results_db.insert_test(self.job_model, test)
1447        except Exception:
1448            msg = ("WARNING: An unexpected error occured while "
1449                   "inserting test results into the database. "
1450                   "Ignoring error.\n" + traceback.format_exc())
1451            print >> sys.stderr, msg
1452
1453
1454    def preprocess_client_state(self):
1455        """
1456        Produce a state file for initializing the state of a client job.
1457
1458        Creates a new client state file with all the current server state, as
1459        well as some pre-set client state.
1460
1461        @returns The path of the file the state was written into.
1462        """
1463        # initialize the sysinfo state
1464        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1465
1466        # dump the state out to a tempfile
1467        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1468        os.close(fd)
1469
1470        # write_to_file doesn't need locking, we exclusively own file_path
1471        self._state.write_to_file(file_path)
1472        return file_path
1473
1474
1475    def postprocess_client_state(self, state_path):
1476        """
1477        Update the state of this job with the state from a client job.
1478
1479        Updates the state of the server side of a job with the final state
1480        of a client job that was run. Updates the non-client-specific state,
1481        pulls in some specific bits from the client-specific state, and then
1482        discards the rest. Removes the state file afterwards
1483
1484        @param state_file A path to the state file from the client.
1485        """
1486        # update the on-disk state
1487        try:
1488            self._state.read_from_file(state_path)
1489            os.remove(state_path)
1490        except OSError, e:
1491            # ignore file-not-found errors
1492            if e.errno != errno.ENOENT:
1493                raise
1494            else:
1495                logging.debug('Client state file %s not found', state_path)
1496
1497        # update the sysinfo state
1498        if self._state.has('client', 'sysinfo'):
1499            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1500
1501        # drop all the client-specific state
1502        self._state.discard_namespace('client')
1503
1504
1505    def clear_all_known_hosts(self):
1506        """Clears known hosts files for all AbstractSSHHosts."""
1507        for host in self.hosts:
1508            if isinstance(host, abstract_ssh.AbstractSSHHost):
1509                host.clear_known_hosts()
1510
1511
1512    def close(self):
1513        """Closes this job's operation."""
1514
1515        # Use shallow copy, because host.close() internally discards itself.
1516        for host in list(self.hosts):
1517            host.close()
1518        assert not self.hosts
1519        self._connection_pool.shutdown()
1520
1521
1522    def _get_job_data(self):
1523        """Add custom data to the job keyval info.
1524
1525        When multiple machines are used in a job, change the hostname to
1526        the platform of the first machine instead of machine1,machine2,...  This
1527        makes the job reports easier to read and keeps the tko_machines table from
1528        growing too large.
1529
1530        Returns:
1531            keyval dictionary with new hostname value, or empty dictionary.
1532        """
1533        job_data = {}
1534        # Only modify hostname on multimachine jobs. Assume all host have the same
1535        # platform.
1536        if len(self.machines) > 1:
1537            # Search through machines for first machine with a platform.
1538            for host in self.machines:
1539                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1540                keyvals = utils.read_keyval(keyval_path)
1541                host_plat = keyvals.get('platform', None)
1542                if not host_plat:
1543                    continue
1544                job_data['hostname'] = host_plat
1545                break
1546        return job_data
1547
1548
1549class warning_manager(object):
1550    """Class for controlling warning logs. Manages the enabling and disabling
1551    of warnings."""
1552    def __init__(self):
1553        # a map of warning types to a list of disabled time intervals
1554        self.disabled_warnings = {}
1555
1556
1557    def is_valid(self, timestamp, warning_type):
1558        """Indicates if a warning (based on the time it occured and its type)
1559        is a valid warning. A warning is considered "invalid" if this type of
1560        warning was marked as "disabled" at the time the warning occured."""
1561        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1562        for start, end in disabled_intervals:
1563            if timestamp >= start and (end is None or timestamp < end):
1564                return False
1565        return True
1566
1567
1568    def disable_warnings(self, warning_type, current_time_func=time.time):
1569        """As of now, disables all further warnings of this type."""
1570        intervals = self.disabled_warnings.setdefault(warning_type, [])
1571        if not intervals or intervals[-1][1] is not None:
1572            intervals.append((int(current_time_func()), None))
1573
1574
1575    def enable_warnings(self, warning_type, current_time_func=time.time):
1576        """As of now, enables all further warnings of this type."""
1577        intervals = self.disabled_warnings.get(warning_type, [])
1578        if intervals and intervals[-1][1] is None:
1579            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1580
1581
1582def _is_current_server_job(test):
1583    """Return True if parsed test is the currently running job.
1584
1585    @param test: test instance from tko parser.
1586    """
1587    return test.testname == 'SERVER_JOB'
1588
1589
1590def _create_afe_host(hostname):
1591    """Create an afe_host object backed by the AFE.
1592
1593    @param hostname: Name of the host for which we want the Host object.
1594    @returns: An object of type frontend.AFE
1595    """
1596    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1597    hosts = afe.get_hosts(hostname=hostname)
1598    if not hosts:
1599        raise error.AutoservError('No hosts named %s found' % hostname)
1600
1601    return hosts[0]
1602
1603
1604def _create_host_info_store(hostname):
1605    """Create a real or stub afe_store.AfeStore object.
1606
1607    @param hostname: Name of the host for which we want the store.
1608    @returns: An object of type afe_store.AfeStore
1609    """
1610    host_info_store = afe_store.AfeStore(hostname)
1611    try:
1612        host_info_store.get(force_refresh=True)
1613    except host_info.StoreError:
1614        raise error.AutoservError('Could not obtain HostInfo for hostname %s' %
1615                                  hostname)
1616    return host_info_store
1617