• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import heapq
2import os
3import logging
4
5import common
6from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.scheduler import drone_task_queue
10from autotest_lib.scheduler import drone_utility
11from autotest_lib.scheduler import drones
12from autotest_lib.scheduler import scheduler_config
13from autotest_lib.scheduler import thread_lib
14
15try:
16    from chromite.lib import metrics
17except ImportError:
18    metrics = utils.metrics_mock
19
20
21# results on drones will be placed under the drone_installation_directory in a
22# directory with this name
23_DRONE_RESULTS_DIR_SUFFIX = 'results'
24
25WORKING_DIRECTORY = object() # see execute_command()
26
27
28AUTOSERV_PID_FILE = '.autoserv_execute'
29CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
30PARSER_PID_FILE = '.parser_execute'
31ARCHIVER_PID_FILE = '.archiver_execute'
32
33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
34                     ARCHIVER_PID_FILE)
35
36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
37        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
38        type=bool, default=True)
39
40HOSTS_JOB_SUBDIR = 'hosts/'
41PARSE_LOG = '.parse.log'
42ENABLE_ARCHIVING =  global_config.global_config.get_config_value(
43        scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
44
45
46class DroneManagerError(Exception):
47    pass
48
49
50class CustomEquals(object):
51    def _id(self):
52        raise NotImplementedError
53
54
55    def __eq__(self, other):
56        if not isinstance(other, type(self)):
57            return NotImplemented
58        return self._id() == other._id()
59
60
61    def __ne__(self, other):
62        return not self == other
63
64
65    def __hash__(self):
66        return hash(self._id())
67
68
69class Process(CustomEquals):
70    def __init__(self, hostname, pid, ppid=None):
71        self.hostname = hostname
72        self.pid = pid
73        self.ppid = ppid
74
75    def _id(self):
76        return (self.hostname, self.pid)
77
78
79    def __str__(self):
80        return '%s/%s' % (self.hostname, self.pid)
81
82
83    def __repr__(self):
84        return super(Process, self).__repr__() + '<%s>' % self
85
86
87class PidfileId(CustomEquals):
88    def __init__(self, path):
89        self.path = path
90
91
92    def _id(self):
93        return self.path
94
95
96    def __str__(self):
97        return str(self.path)
98
99
100class _PidfileInfo(object):
101    age = 0
102    num_processes = None
103
104
105class PidfileContents(object):
106    process = None
107    exit_status = None
108    num_tests_failed = None
109
110    def is_invalid(self):
111        return False
112
113
114    def is_running(self):
115        return self.process and not self.exit_status
116
117
118class InvalidPidfile(object):
119    process = None
120    exit_status = None
121    num_tests_failed = None
122
123
124    def __init__(self, error):
125        self.error = error
126
127
128    def is_invalid(self):
129        return True
130
131
132    def is_running(self):
133        return False
134
135
136    def __str__(self):
137        return self.error
138
139
140class _DroneHeapWrapper(object):
141    """Wrapper to compare drones based on used_capacity().
142
143    These objects can be used to keep a heap of drones by capacity.
144    """
145    def __init__(self, drone):
146        self.drone = drone
147
148
149    def __cmp__(self, other):
150        assert isinstance(other, _DroneHeapWrapper)
151        return cmp(self.drone.used_capacity(), other.drone.used_capacity())
152
153
154class DroneManager(object):
155    """
156    This class acts as an interface from the scheduler to drones, whether it be
157    only a single "drone" for localhost or multiple remote drones.
158
159    All paths going into and out of this class are relative to the full results
160    directory, except for those returns by absolute_path().
161    """
162
163
164    # Minimum time to wait before next email
165    # about a drone hitting process limit is sent.
166    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
167    _STATS_KEY = 'drone_manager'
168
169
170
171    def __init__(self):
172        # absolute path of base results dir
173        self._results_dir = None
174        # holds Process objects
175        self._process_set = set()
176        # holds the list of all processes running on all drones
177        self._all_processes = {}
178        # maps PidfileId to PidfileContents
179        self._pidfiles = {}
180        # same as _pidfiles
181        self._pidfiles_second_read = {}
182        # maps PidfileId to _PidfileInfo
183        self._registered_pidfile_info = {}
184        # used to generate unique temporary paths
185        self._temporary_path_counter = 0
186        # maps hostname to Drone object
187        self._drones = {}
188        self._results_drone = None
189        # maps results dir to dict mapping file path to contents
190        self._attached_files = {}
191        # heapq of _DroneHeapWrappers
192        self._drone_queue = []
193        # A threaded task queue used to refresh drones asynchronously.
194        if _THREADED_DRONE_MANAGER:
195            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
196                    name='%s.refresh_queue' % self._STATS_KEY)
197        else:
198            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
199
200
201    def initialize(self, base_results_dir, drone_hostnames,
202                   results_repository_hostname):
203        self._results_dir = base_results_dir
204
205        for hostname in drone_hostnames:
206            self._add_drone(hostname)
207
208        if not self._drones:
209            # all drones failed to initialize
210            raise DroneManagerError('No valid drones found')
211
212        self.refresh_drone_configs()
213
214        logging.info('Using results repository on %s',
215                     results_repository_hostname)
216        self._results_drone = drones.get_drone(results_repository_hostname)
217        results_installation_dir = global_config.global_config.get_config_value(
218                scheduler_config.CONFIG_SECTION,
219                'results_host_installation_directory', default=None)
220        if results_installation_dir:
221            self._results_drone.set_autotest_install_dir(
222                    results_installation_dir)
223        # don't initialize() the results drone - we don't want to clear out any
224        # directories and we don't need to kill any processes
225
226
227    def reinitialize_drones(self):
228        for drone in self.get_drones():
229            with metrics.SecondsTimer(
230                    'chromeos/autotest/drone_manager/'
231                    'reinitialize_drones_duration',
232                    fields={'drone': drone.hostname}):
233                drone.call('initialize', self._results_dir)
234
235
236    def shutdown(self):
237        for drone in self.get_drones():
238            drone.shutdown()
239
240
241    def _get_max_pidfile_refreshes(self):
242        """
243        Normally refresh() is called on every monitor_db.Dispatcher.tick().
244
245        @returns: The number of refresh() calls before we forget a pidfile.
246        """
247        pidfile_timeout = global_config.global_config.get_config_value(
248                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
249                type=int, default=2000)
250        return pidfile_timeout
251
252
253    def _add_drone(self, hostname):
254        """
255        Add drone.
256
257        Catches AutoservRunError if the drone fails initialization and does not
258        add it to the list of usable drones.
259
260        @param hostname: Hostname of the drone we are trying to add.
261        """
262        logging.info('Adding drone %s' % hostname)
263        drone = drones.get_drone(hostname)
264        if drone:
265            try:
266                drone.call('initialize', self.absolute_path(''))
267            except error.AutoservRunError as e:
268                logging.error('Failed to initialize drone %s with error: %s',
269                              hostname, e)
270                return
271            self._drones[drone.hostname] = drone
272
273
274    def _remove_drone(self, hostname):
275        self._drones.pop(hostname, None)
276
277
278    def refresh_drone_configs(self):
279        """
280        Reread global config options for all drones.
281        """
282        # Import server_manager_utils is delayed rather than at the beginning of
283        # this module. The reason is that test_that imports drone_manager when
284        # importing autoserv_utils. The import is done before test_that setup
285        # django (test_that only setup django in setup_local_afe, since it's
286        # not needed when test_that runs the test in a lab duts through :lab:
287        # option. Therefore, if server_manager_utils is imported at the
288        # beginning of this module, test_that will fail since django is not
289        # setup yet.
290        from autotest_lib.site_utils import server_manager_utils
291        config = global_config.global_config
292        section = scheduler_config.CONFIG_SECTION
293        config.parse_config_file()
294        for hostname, drone in self._drones.iteritems():
295            if server_manager_utils.use_server_db():
296                server = server_manager_utils.get_servers(hostname=hostname)[0]
297                attributes = dict([(a.attribute, a.value)
298                                   for a in server.attributes.all()])
299                drone.enabled = (
300                        int(attributes.get('disabled', 0)) == 0)
301                drone.max_processes = int(
302                        attributes.get(
303                            'max_processes',
304                            scheduler_config.config.max_processes_per_drone))
305                allowed_users = attributes.get('users', None)
306            else:
307                disabled = config.get_config_value(
308                        section, '%s_disabled' % hostname, default='')
309                drone.enabled = not bool(disabled)
310                drone.max_processes = config.get_config_value(
311                        section, '%s_max_processes' % hostname, type=int,
312                        default=scheduler_config.config.max_processes_per_drone)
313
314                allowed_users = config.get_config_value(
315                        section, '%s_users' % hostname, default=None)
316            if allowed_users:
317                drone.allowed_users = set(allowed_users.split())
318            else:
319                drone.allowed_users = None
320            logging.info('Drone %s.max_processes: %s', hostname,
321                         drone.max_processes)
322            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
323            logging.info('Drone %s.allowed_users: %s', hostname,
324                         drone.allowed_users)
325            logging.info('Drone %s.support_ssp: %s', hostname,
326                         drone.support_ssp)
327
328        self._reorder_drone_queue() # max_processes may have changed
329        # Clear notification record about reaching max_processes limit.
330        self._notify_record = {}
331
332
333    def get_drones(self):
334        return self._drones.itervalues()
335
336
337    def cleanup_orphaned_containers(self):
338        """Queue cleanup_orphaned_containers call at each drone.
339        """
340        for drone in self._drones.values():
341            logging.info('Queue cleanup_orphaned_containers at %s',
342                         drone.hostname)
343            drone.queue_call('cleanup_orphaned_containers')
344
345
346    def _get_drone_for_process(self, process):
347        return self._drones[process.hostname]
348
349
350    def _get_drone_for_pidfile_id(self, pidfile_id):
351        pidfile_contents = self.get_pidfile_contents(pidfile_id)
352        if pidfile_contents.process is None:
353          raise DroneManagerError('Fail to get a drone due to empty pidfile')
354        return self._get_drone_for_process(pidfile_contents.process)
355
356
357    def get_drone_for_pidfile_id(self, pidfile_id):
358        """Public API for luciferlib.
359
360        @param pidfile_id: PidfileId instance.
361        """
362        return self._get_drone_for_pidfile_id(pidfile_id)
363
364
365    def _drop_old_pidfiles(self):
366        # use items() since the dict is modified in unregister_pidfile()
367        for pidfile_id, info in self._registered_pidfile_info.items():
368            if info.age > self._get_max_pidfile_refreshes():
369                logging.warning('dropping leaked pidfile %s', pidfile_id)
370                self.unregister_pidfile(pidfile_id)
371            else:
372                info.age += 1
373
374
375    def _reset(self):
376        self._process_set = set()
377        self._all_processes = {}
378        self._pidfiles = {}
379        self._pidfiles_second_read = {}
380        self._drone_queue = []
381
382
383    def _parse_pidfile(self, drone, raw_contents):
384        """Parse raw pidfile contents.
385
386        @param drone: The drone on which this pidfile was found.
387        @param raw_contents: The raw contents of a pidfile, eg:
388            "pid\nexit_staus\nnum_tests_failed\n".
389        """
390        contents = PidfileContents()
391        if not raw_contents:
392            return contents
393        lines = raw_contents.splitlines()
394        if len(lines) > 3:
395            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
396                                  (len(lines), lines))
397        try:
398            pid = int(lines[0])
399            contents.process = Process(drone.hostname, pid)
400            # if len(lines) == 2, assume we caught Autoserv between writing
401            # exit_status and num_failed_tests, so just ignore it and wait for
402            # the next cycle
403            if len(lines) == 3:
404                contents.exit_status = int(lines[1])
405                contents.num_tests_failed = int(lines[2])
406        except ValueError, exc:
407            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
408
409        return contents
410
411
412    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
413        for pidfile_path, contents in pidfiles.iteritems():
414            pidfile_id = PidfileId(pidfile_path)
415            contents = self._parse_pidfile(drone, contents)
416            store_in_dict[pidfile_id] = contents
417
418
419    def _add_process(self, drone, process_info):
420        process = Process(drone.hostname, int(process_info['pid']),
421                          int(process_info['ppid']))
422        self._process_set.add(process)
423
424
425    def _add_autoserv_process(self, drone, process_info):
426        assert process_info['comm'] == 'autoserv'
427        # only root autoserv processes have pgid == pid
428        if process_info['pgid'] != process_info['pid']:
429            return
430        self._add_process(drone, process_info)
431
432
433    def _enqueue_drone(self, drone):
434        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
435
436
437    def _reorder_drone_queue(self):
438        heapq.heapify(self._drone_queue)
439
440
441    def _compute_active_processes(self, drone):
442        drone.active_processes = 0
443        for pidfile_id, contents in self._pidfiles.iteritems():
444            is_running = contents.exit_status is None
445            on_this_drone = (contents.process
446                             and contents.process.hostname == drone.hostname)
447            if is_running and on_this_drone:
448                info = self._registered_pidfile_info[pidfile_id]
449                if info.num_processes is not None:
450                    drone.active_processes += info.num_processes
451
452        metrics.Gauge('chromeos/autotest/drone/active_processes').set(
453                drone.active_processes,
454                fields={'drone_hostname': drone.hostname})
455
456
457    def _check_drone_process_limit(self, drone):
458        """
459        Notify if the number of processes on |drone| is approaching limit.
460
461        @param drone: A Drone object.
462        """
463        try:
464            percent = float(drone.active_processes) / drone.max_processes
465        except ZeroDivisionError:
466            percent = 100
467        metrics.Float('chromeos/autotest/drone/active_process_percentage'
468                      ).set(percent, fields={'drone_hostname': drone.hostname})
469
470    def trigger_refresh(self):
471        """Triggers a drone manager refresh.
472
473        @raises DroneManagerError: If a drone has un-executed calls.
474            Since they will get clobbered when we queue refresh calls.
475        """
476        self._reset()
477        self._drop_old_pidfiles()
478        pidfile_paths = [pidfile_id.path
479                         for pidfile_id in self._registered_pidfile_info]
480        drones = list(self.get_drones())
481        for drone in drones:
482            calls = drone.get_calls()
483            if calls:
484                raise DroneManagerError('Drone %s has un-executed calls: %s '
485                                        'which might get corrupted through '
486                                        'this invocation' %
487                                        (drone, [str(call) for call in calls]))
488            drone.queue_call('refresh', pidfile_paths)
489        logging.info("Invoking drone refresh.")
490        with metrics.SecondsTimer(
491                'chromeos/autotest/drone_manager/trigger_refresh_duration'):
492            self._refresh_task_queue.execute(drones, wait=False)
493
494
495    def sync_refresh(self):
496        """Complete the drone refresh started by trigger_refresh.
497
498        Waits for all drone threads then refreshes internal datastructures
499        with drone process information.
500        """
501
502        # This gives us a dictionary like what follows:
503        # {drone: [{'pidfiles': (raw contents of pidfile paths),
504        #           'autoserv_processes': (autoserv process info from ps),
505        #           'all_processes': (all process info from ps),
506        #           'parse_processes': (parse process infor from ps),
507        #           'pidfile_second_read': (pidfile contents, again),}]
508        #   drone2: ...}
509        # The values of each drone are only a list because this adheres to the
510        # drone utility interface (each call is executed and its results are
511        # places in a list, but since we never couple the refresh calls with
512        # any other call, this list will always contain a single dict).
513        with metrics.SecondsTimer(
514                'chromeos/autotest/drone_manager/sync_refresh_duration'):
515            all_results = self._refresh_task_queue.get_results()
516        logging.info("Drones refreshed.")
517
518        # The loop below goes through and parses pidfile contents. Pidfiles
519        # are used to track autoserv execution, and will always contain < 3
520        # lines of the following: pid, exit code, number of tests. Each pidfile
521        # is identified by a PidfileId object, which contains a unique pidfile
522        # path (unique because it contains the job id) making it hashable.
523        # All pidfiles are stored in the drone managers _pidfiles dict as:
524        #   {pidfile_id: pidfile_contents(Process(drone, pid),
525        #                                 exit_code, num_tests_failed)}
526        # In handle agents, each agent knows its pidfile_id, and uses this
527        # to retrieve the refreshed contents of its pidfile via the
528        # PidfileRunMonitor (through its tick) before making decisions. If
529        # the agent notices that its process has exited, it unregisters the
530        # pidfile from the drone_managers._registered_pidfile_info dict
531        # through its epilog.
532        for drone, results_list in all_results.iteritems():
533            results = results_list[0]
534            drone_hostname = drone.hostname.replace('.', '_')
535
536            for process_info in results['all_processes']:
537                if process_info['comm'] == 'autoserv':
538                    self._add_autoserv_process(drone, process_info)
539                drone_pid = drone.hostname, int(process_info['pid'])
540                self._all_processes[drone_pid] = process_info
541
542            for process_info in results['parse_processes']:
543                self._add_process(drone, process_info)
544
545            self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
546            self._process_pidfiles(drone, results['pidfiles_second_read'],
547                                   self._pidfiles_second_read)
548
549            self._compute_active_processes(drone)
550            if drone.enabled:
551                self._enqueue_drone(drone)
552                self._check_drone_process_limit(drone)
553
554
555    def refresh(self):
556        """Refresh all drones."""
557        with metrics.SecondsTimer(
558                'chromeos/autotest/drone_manager/refresh_duration'):
559            self.trigger_refresh()
560            self.sync_refresh()
561
562
563    @metrics.SecondsTimerDecorator(
564        'chromeos/autotest/drone_manager/execute_actions_duration')
565    def execute_actions(self):
566        """
567        Called at the end of a scheduler cycle to execute all queued actions
568        on drones.
569        """
570        # Invoke calls queued on all drones since the last call to execute
571        # and wait for them to return.
572        if _THREADED_DRONE_MANAGER:
573            thread_lib.ThreadedTaskQueue(
574                    name='%s.execute_queue' % self._STATS_KEY).execute(
575                            self._drones.values())
576        else:
577            drone_task_queue.DroneTaskQueue().execute(self._drones.values())
578
579        try:
580            self._results_drone.execute_queued_calls()
581        except error.AutoservError:
582            m = 'chromeos/autotest/errors/results_repository_failed'
583            metrics.Counter(m).increment(
584                fields={'drone_hostname': self._results_drone.hostname})
585            self._results_drone.clear_call_queue()
586
587
588    def get_orphaned_autoserv_processes(self):
589        """
590        Returns a set of Process objects for orphaned processes only.
591        """
592        return set(process for process in self._process_set
593                   if process.ppid == 1)
594
595
596    def kill_process(self, process):
597        """
598        Kill the given process.
599        """
600        logging.info('killing %s', process)
601        drone = self._get_drone_for_process(process)
602        drone.queue_kill_process(process)
603
604
605    def _ensure_directory_exists(self, path):
606        if not os.path.exists(path):
607            os.makedirs(path)
608
609
610    def total_running_processes(self):
611        return sum(drone.active_processes for drone in self.get_drones())
612
613
614    def max_runnable_processes(self, username, drone_hostnames_allowed):
615        """
616        Return the maximum number of processes that can be run (in a single
617        execution) given the current load on drones.
618        @param username: login of user to run a process.  may be None.
619        @param drone_hostnames_allowed: list of drones that can be used. May be
620                                        None
621        """
622        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
623                                 if wrapper.drone.usable_by(username) and
624                                 (drone_hostnames_allowed is None or
625                                          wrapper.drone.hostname in
626                                                  drone_hostnames_allowed)]
627        if not usable_drone_wrappers:
628            # all drones disabled or inaccessible
629            return 0
630        runnable_processes = [
631                wrapper.drone.max_processes - wrapper.drone.active_processes
632                for wrapper in usable_drone_wrappers]
633        return max([0] + runnable_processes)
634
635
636    def _least_loaded_drone(self, drones):
637        return min(drones, key=lambda d: d.used_capacity())
638
639
640    def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
641        """Return a drone to use.
642
643        Various options can be passed to optimize drone selection.
644
645        num_processes is the number of processes the drone is intended
646        to run.
647
648        prefer_ssp indicates whether drones supporting server-side
649        packaging should be preferred.  The returned drone is not
650        guaranteed to support it.
651
652        This public API is exposed for luciferlib to wrap.
653
654        Returns a drone instance (see drones.py).
655        """
656        return self._choose_drone_for_execution(
657                num_processes=num_processes,
658                username=None,  # Always allow all drones
659                drone_hostnames_allowed=None,  # Always allow all drones
660                require_ssp=prefer_ssp,
661        )
662
663
664    def _choose_drone_for_execution(self, num_processes, username,
665                                    drone_hostnames_allowed,
666                                    require_ssp=False):
667        """Choose a drone to execute command.
668
669        @param num_processes: Number of processes needed for execution.
670        @param username: Name of the user to execute the command.
671        @param drone_hostnames_allowed: A list of names of drone allowed.
672        @param require_ssp: Require server-side packaging to execute the,
673                            command, default to False.
674
675        @return: A drone object to be used for execution.
676        """
677        # cycle through drones is order of increasing used capacity until
678        # we find one that can handle these processes
679        checked_drones = []
680        usable_drones = []
681        # Drones do not support server-side packaging, used as backup if no
682        # drone is found to run command requires server-side packaging.
683        no_ssp_drones = []
684        drone_to_use = None
685        while self._drone_queue:
686            drone = heapq.heappop(self._drone_queue).drone
687            checked_drones.append(drone)
688            logging.info('Checking drone %s', drone.hostname)
689            if not drone.usable_by(username):
690                continue
691
692            drone_allowed = (drone_hostnames_allowed is None
693                             or drone.hostname in drone_hostnames_allowed)
694            if not drone_allowed:
695                logging.debug('Drone %s not allowed: ', drone.hostname)
696                continue
697            if require_ssp and not drone.support_ssp:
698                logging.debug('Drone %s does not support server-side '
699                              'packaging.', drone.hostname)
700                no_ssp_drones.append(drone)
701                continue
702
703            usable_drones.append(drone)
704
705            if drone.active_processes + num_processes <= drone.max_processes:
706                drone_to_use = drone
707                break
708            logging.info('Drone %s has %d active + %s requested > %s max',
709                         drone.hostname, drone.active_processes, num_processes,
710                         drone.max_processes)
711
712        if not drone_to_use and usable_drones:
713            # Drones are all over loaded, pick the one with least load.
714            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
715                                                   drone.active_processes,
716                                                   drone.max_processes)
717                                     for drone in usable_drones)
718            logging.error('No drone has capacity to handle %d processes (%s) '
719                          'for user %s', num_processes, drone_summary, username)
720            drone_to_use = self._least_loaded_drone(usable_drones)
721        elif not drone_to_use and require_ssp and no_ssp_drones:
722            # No drone supports server-side packaging, choose the least loaded.
723            drone_to_use = self._least_loaded_drone(no_ssp_drones)
724
725        # refill _drone_queue
726        for drone in checked_drones:
727            self._enqueue_drone(drone)
728
729        return drone_to_use
730
731
732    def _substitute_working_directory_into_command(self, command,
733                                                   working_directory):
734        for i, item in enumerate(command):
735            if item is WORKING_DIRECTORY:
736                command[i] = working_directory
737
738
739    def execute_command(self, command, working_directory, pidfile_name,
740                        num_processes, log_file=None, paired_with_pidfile=None,
741                        username=None, drone_hostnames_allowed=None):
742        """
743        Execute the given command, taken as an argv list.
744
745        @param command: command to execute as a list.  if any item is
746                WORKING_DIRECTORY, the absolute path to the working directory
747                will be substituted for it.
748        @param working_directory: directory in which the pidfile will be written
749        @param pidfile_name: name of the pidfile this process will write
750        @param num_processes: number of processes to account for from this
751                execution
752        @param log_file (optional): path (in the results repository) to hold
753                command output.
754        @param paired_with_pidfile (optional): a PidfileId for an
755                already-executed process; the new process will execute on the
756                same drone as the previous process.
757        @param username (optional): login of the user responsible for this
758                process.
759        @param drone_hostnames_allowed (optional): hostnames of the drones that
760                                                   this command is allowed to
761                                                   execute on
762        """
763        abs_working_directory = self.absolute_path(working_directory)
764        if not log_file:
765            log_file = self.get_temporary_path('execute')
766        log_file = self.absolute_path(log_file)
767
768        self._substitute_working_directory_into_command(command,
769                                                        abs_working_directory)
770
771        if paired_with_pidfile:
772            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
773        else:
774            require_ssp = '--require-ssp' in command
775            drone = self._choose_drone_for_execution(
776                    num_processes, username, drone_hostnames_allowed,
777                    require_ssp=require_ssp)
778            # Enable --warn-no-ssp option for autoserv to log a warning and run
779            # the command without using server-side packaging.
780            if require_ssp and not drone.support_ssp:
781                command.append('--warn-no-ssp')
782
783        if not drone:
784            raise DroneManagerError('command failed; no drones available: %s'
785                                    % command)
786
787        logging.info("command = %s", command)
788        logging.info('log file = %s:%s', drone.hostname, log_file)
789        self._write_attached_files(working_directory, drone)
790        drone.queue_call('execute_command', command, abs_working_directory,
791                         log_file, pidfile_name)
792        drone.active_processes += num_processes
793        self._reorder_drone_queue()
794
795        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
796        pidfile_id = PidfileId(pidfile_path)
797        self.register_pidfile(pidfile_id)
798        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
799        return pidfile_id
800
801
802    def get_pidfile_id_from(self, execution_tag, pidfile_name):
803        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
804        return PidfileId(path)
805
806
807    def register_pidfile(self, pidfile_id):
808        """
809        Indicate that the DroneManager should look for the given pidfile when
810        refreshing.
811        """
812        if pidfile_id not in self._registered_pidfile_info:
813            logging.info('monitoring pidfile %s', pidfile_id)
814            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
815        self._reset_pidfile_age(pidfile_id)
816
817
818    def _reset_pidfile_age(self, pidfile_id):
819        if pidfile_id in self._registered_pidfile_info:
820            self._registered_pidfile_info[pidfile_id].age = 0
821
822
823    def unregister_pidfile(self, pidfile_id):
824        if pidfile_id in self._registered_pidfile_info:
825            logging.info('forgetting pidfile %s', pidfile_id)
826            del self._registered_pidfile_info[pidfile_id]
827
828
829    def declare_process_count(self, pidfile_id, num_processes):
830        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
831
832
833    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
834        """
835        Retrieve a PidfileContents object for the given pidfile_id.  If
836        use_second_read is True, use results that were read after the processes
837        were checked, instead of before.
838        """
839        self._reset_pidfile_age(pidfile_id)
840        if use_second_read:
841            pidfile_map = self._pidfiles_second_read
842        else:
843            pidfile_map = self._pidfiles
844        return pidfile_map.get(pidfile_id, PidfileContents())
845
846
847    def is_process_running(self, process):
848        """
849        Check if the given process is in the running process list.
850        """
851        if process in self._process_set:
852            return True
853
854        drone_pid = process.hostname, process.pid
855        if drone_pid in self._all_processes:
856            logging.error('Process %s found, but not an autoserv process. '
857                    'Is %s', process, self._all_processes[drone_pid])
858            return True
859
860        return False
861
862
863    def get_temporary_path(self, base_name):
864        """
865        Get a new temporary path guaranteed to be unique across all drones
866        for this scheduler execution.
867        """
868        self._temporary_path_counter += 1
869        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
870                            '%s.%s' % (base_name, self._temporary_path_counter))
871
872
873    def absolute_path(self, path, on_results_repository=False):
874        if on_results_repository:
875            base_dir = self._results_dir
876        else:
877            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
878                                    _DRONE_RESULTS_DIR_SUFFIX)
879        return os.path.join(base_dir, path)
880
881
882    def _copy_results_helper(self, process, source_path, destination_path,
883                             to_results_repository=False):
884        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
885                      'destination_path: %s, to_results_repository: %s',
886                      process, source_path, destination_path,
887                      to_results_repository)
888        full_source = self.absolute_path(source_path)
889        full_destination = self.absolute_path(
890                destination_path, on_results_repository=to_results_repository)
891        source_drone = self._get_drone_for_process(process)
892        if to_results_repository:
893            source_drone.send_file_to(self._results_drone, full_source,
894                                      full_destination, can_fail=True)
895        else:
896            source_drone.queue_call('copy_file_or_directory', full_source,
897                                    full_destination)
898
899
900    def copy_to_results_repository(self, process, source_path,
901                                   destination_path=None):
902        """
903        Copy results from the given process at source_path to destination_path
904        in the results repository.
905
906        This will only copy the results back for Special Agent Tasks (Cleanup,
907        Verify, Repair) that reside in the hosts/ subdirectory of results if
908        the copy_task_results_back flag has been set to True inside
909        global_config.ini
910
911        It will also only copy .parse.log files back to the scheduler if the
912        copy_parse_log_back flag in global_config.ini has been set to True.
913        """
914        if not ENABLE_ARCHIVING:
915            return
916        copy_task_results_back = global_config.global_config.get_config_value(
917                scheduler_config.CONFIG_SECTION, 'copy_task_results_back',
918                type=bool)
919        copy_parse_log_back = global_config.global_config.get_config_value(
920                scheduler_config.CONFIG_SECTION, 'copy_parse_log_back',
921                type=bool)
922        special_task = source_path.startswith(HOSTS_JOB_SUBDIR)
923        parse_log = source_path.endswith(PARSE_LOG)
924        if (copy_task_results_back or not special_task) and (
925                copy_parse_log_back or not parse_log):
926            if destination_path is None:
927                destination_path = source_path
928            self._copy_results_helper(process, source_path, destination_path,
929                                      to_results_repository=True)
930
931    def _copy_to_results_repository(self, process, source_path,
932                                   destination_path=None):
933        """
934        Copy results from the given process at source_path to destination_path
935        in the results repository, without special task handling.
936        """
937        if destination_path is None:
938            destination_path = source_path
939        self._copy_results_helper(process, source_path, destination_path,
940                                  to_results_repository=True)
941
942
943    def copy_results_on_drone(self, process, source_path, destination_path):
944        """
945        Copy a results directory from one place to another on the drone.
946        """
947        self._copy_results_helper(process, source_path, destination_path)
948
949
950    def _write_attached_files(self, results_dir, drone):
951        attached_files = self._attached_files.pop(results_dir, {})
952        for file_path, contents in attached_files.iteritems():
953            drone.queue_call('write_to_file', self.absolute_path(file_path),
954                             contents)
955
956
957    def attach_file_to_execution(self, results_dir, file_contents,
958                                 file_path=None):
959        """
960        When the process for the results directory is executed, the given file
961        contents will be placed in a file on the drone.  Returns the path at
962        which the file will be placed.
963        """
964        if not file_path:
965            file_path = self.get_temporary_path('attach')
966        files_for_execution = self._attached_files.setdefault(results_dir, {})
967        assert file_path not in files_for_execution
968        files_for_execution[file_path] = file_contents
969        return file_path
970
971
972    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
973        """
974        Write the given lines (as a list of strings) to a file.  If
975        paired_with_process is given, the file will be written on the drone
976        running the given Process.  Otherwise, the file will be written to the
977        results repository.
978        """
979        file_contents = '\n'.join(lines) + '\n'
980        if paired_with_process:
981            drone = self._get_drone_for_process(paired_with_process)
982            on_results_repository = False
983        else:
984            drone = self._results_drone
985            on_results_repository = True
986        full_path = self.absolute_path(
987                file_path, on_results_repository=on_results_repository)
988        drone.queue_call('write_to_file', full_path, file_contents)
989
990
991_the_instance = None
992
993def instance():
994    if _the_instance is None:
995        _set_instance(DroneManager())
996    return _the_instance
997
998
999def _set_instance(instance): # usable for testing
1000    global _the_instance
1001    _the_instance = instance
1002