• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import heapq
2import os
3import logging
4
5import common
6from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.scheduler import drone_task_queue
10from autotest_lib.scheduler import drone_utility
11from autotest_lib.scheduler import drones
12from autotest_lib.scheduler import scheduler_config
13from autotest_lib.scheduler import thread_lib
14
15try:
16    from chromite.lib import metrics
17except ImportError:
18    metrics = utils.metrics_mock
19
20
21# results on drones will be placed under the drone_installation_directory in a
22# directory with this name
23_DRONE_RESULTS_DIR_SUFFIX = 'results'
24
25WORKING_DIRECTORY = object() # see execute_command()
26
27
28AUTOSERV_PID_FILE = '.autoserv_execute'
29CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
30PARSER_PID_FILE = '.parser_execute'
31ARCHIVER_PID_FILE = '.archiver_execute'
32
33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
34                     ARCHIVER_PID_FILE)
35
36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
37        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
38        type=bool, default=True)
39
40HOSTS_JOB_SUBDIR = 'hosts/'
41PARSE_LOG = '.parse.log'
42ENABLE_ARCHIVING =  global_config.global_config.get_config_value(
43        scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
44
45
46class DroneManagerError(Exception):
47    pass
48
49
50class CustomEquals(object):
51    def _id(self):
52        raise NotImplementedError
53
54
55    def __eq__(self, other):
56        if not isinstance(other, type(self)):
57            return NotImplemented
58        return self._id() == other._id()
59
60
61    def __ne__(self, other):
62        return not self == other
63
64
65    def __hash__(self):
66        return hash(self._id())
67
68
69class Process(CustomEquals):
70    def __init__(self, hostname, pid, ppid=None):
71        self.hostname = hostname
72        self.pid = pid
73        self.ppid = ppid
74
75    def _id(self):
76        return (self.hostname, self.pid)
77
78
79    def __str__(self):
80        return '%s/%s' % (self.hostname, self.pid)
81
82
83    def __repr__(self):
84        return super(Process, self).__repr__() + '<%s>' % self
85
86
87class PidfileId(CustomEquals):
88    def __init__(self, path):
89        self.path = path
90
91
92    def _id(self):
93        return self.path
94
95
96    def __str__(self):
97        return str(self.path)
98
99
100class _PidfileInfo(object):
101    age = 0
102    num_processes = None
103
104
105class PidfileContents(object):
106    process = None
107    exit_status = None
108    num_tests_failed = None
109
110    def is_invalid(self):
111        return False
112
113
114    def is_running(self):
115        return self.process and not self.exit_status
116
117
118class InvalidPidfile(object):
119    process = None
120    exit_status = None
121    num_tests_failed = None
122
123
124    def __init__(self, error):
125        self.error = error
126
127
128    def is_invalid(self):
129        return True
130
131
132    def is_running(self):
133        return False
134
135
136    def __str__(self):
137        return self.error
138
139
140class _DroneHeapWrapper(object):
141    """Wrapper to compare drones based on used_capacity().
142
143    These objects can be used to keep a heap of drones by capacity.
144    """
145    def __init__(self, drone):
146        self.drone = drone
147
148
149    def __cmp__(self, other):
150        assert isinstance(other, _DroneHeapWrapper)
151        return cmp(self.drone.used_capacity(), other.drone.used_capacity())
152
153
154class DroneManager(object):
155    """
156    This class acts as an interface from the scheduler to drones, whether it be
157    only a single "drone" for localhost or multiple remote drones.
158
159    All paths going into and out of this class are relative to the full results
160    directory, except for those returns by absolute_path().
161    """
162
163
164    # Minimum time to wait before next email
165    # about a drone hitting process limit is sent.
166    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
167    _STATS_KEY = 'drone_manager'
168
169    _ACTIVE_PROCESS_GAUGE = metrics.Gauge(
170        'chromeos/autotest/drone/active_processes')
171
172
173    def __init__(self):
174        # absolute path of base results dir
175        self._results_dir = None
176        # holds Process objects
177        self._process_set = set()
178        # holds the list of all processes running on all drones
179        self._all_processes = {}
180        # maps PidfileId to PidfileContents
181        self._pidfiles = {}
182        # same as _pidfiles
183        self._pidfiles_second_read = {}
184        # maps PidfileId to _PidfileInfo
185        self._registered_pidfile_info = {}
186        # used to generate unique temporary paths
187        self._temporary_path_counter = 0
188        # maps hostname to Drone object
189        self._drones = {}
190        self._results_drone = None
191        # maps results dir to dict mapping file path to contents
192        self._attached_files = {}
193        # heapq of _DroneHeapWrappers
194        self._drone_queue = []
195        # A threaded task queue used to refresh drones asynchronously.
196        if _THREADED_DRONE_MANAGER:
197            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
198                    name='%s.refresh_queue' % self._STATS_KEY)
199        else:
200            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
201
202
203    def initialize(self, base_results_dir, drone_hostnames,
204                   results_repository_hostname):
205        self._results_dir = base_results_dir
206
207        for hostname in drone_hostnames:
208            self._add_drone(hostname)
209
210        if not self._drones:
211            # all drones failed to initialize
212            raise DroneManagerError('No valid drones found')
213
214        self.refresh_drone_configs()
215
216        logging.info('Using results repository on %s',
217                     results_repository_hostname)
218        self._results_drone = drones.get_drone(results_repository_hostname)
219        results_installation_dir = global_config.global_config.get_config_value(
220                scheduler_config.CONFIG_SECTION,
221                'results_host_installation_directory', default=None)
222        if results_installation_dir:
223            self._results_drone.set_autotest_install_dir(
224                    results_installation_dir)
225        # don't initialize() the results drone - we don't want to clear out any
226        # directories and we don't need to kill any processes
227
228
229    def reinitialize_drones(self):
230        for drone in self.get_drones():
231            with metrics.SecondsTimer(
232                    'chromeos/autotest/drone_manager/'
233                    'reinitialize_drones_duration',
234                    fields={'drone': drone.hostname}):
235                drone.call('initialize', self._results_dir)
236
237
238    def shutdown(self):
239        for drone in self.get_drones():
240            drone.shutdown()
241
242
243    def _get_max_pidfile_refreshes(self):
244        """
245        Normally refresh() is called on every monitor_db.Dispatcher.tick().
246
247        @returns: The number of refresh() calls before we forget a pidfile.
248        """
249        pidfile_timeout = global_config.global_config.get_config_value(
250                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
251                type=int, default=2000)
252        return pidfile_timeout
253
254
255    def _add_drone(self, hostname):
256        """
257        Add drone.
258
259        Catches AutoservRunError if the drone fails initialization and does not
260        add it to the list of usable drones.
261
262        @param hostname: Hostname of the drone we are trying to add.
263        """
264        logging.info('Adding drone %s' % hostname)
265        drone = drones.get_drone(hostname)
266        if drone:
267            try:
268                drone.call('initialize', self.absolute_path(''))
269            except error.AutoservRunError as e:
270                logging.error('Failed to initialize drone %s with error: %s',
271                              hostname, e)
272                return
273            self._drones[drone.hostname] = drone
274
275
276    def _remove_drone(self, hostname):
277        self._drones.pop(hostname, None)
278
279
280    def refresh_drone_configs(self):
281        """
282        Reread global config options for all drones.
283        """
284        # Import server_manager_utils is delayed rather than at the beginning of
285        # this module. The reason is that test_that imports drone_manager when
286        # importing autoserv_utils. The import is done before test_that setup
287        # django (test_that only setup django in setup_local_afe, since it's
288        # not needed when test_that runs the test in a lab duts through :lab:
289        # option. Therefore, if server_manager_utils is imported at the
290        # beginning of this module, test_that will fail since django is not
291        # setup yet.
292        from autotest_lib.site_utils import server_manager_utils
293        config = global_config.global_config
294        section = scheduler_config.CONFIG_SECTION
295        config.parse_config_file()
296        for hostname, drone in self._drones.iteritems():
297            if server_manager_utils.use_server_db():
298                server = server_manager_utils.get_servers(hostname=hostname)[0]
299                attributes = dict([(a.attribute, a.value)
300                                   for a in server.attributes.all()])
301                drone.enabled = (
302                        int(attributes.get('disabled', 0)) == 0)
303                drone.max_processes = int(
304                        attributes.get(
305                            'max_processes',
306                            scheduler_config.config.max_processes_per_drone))
307                allowed_users = attributes.get('users', None)
308            else:
309                disabled = config.get_config_value(
310                        section, '%s_disabled' % hostname, default='')
311                drone.enabled = not bool(disabled)
312                drone.max_processes = config.get_config_value(
313                        section, '%s_max_processes' % hostname, type=int,
314                        default=scheduler_config.config.max_processes_per_drone)
315
316                allowed_users = config.get_config_value(
317                        section, '%s_users' % hostname, default=None)
318            if allowed_users:
319                drone.allowed_users = set(allowed_users.split())
320            else:
321                drone.allowed_users = None
322            logging.info('Drone %s.max_processes: %s', hostname,
323                         drone.max_processes)
324            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
325            logging.info('Drone %s.allowed_users: %s', hostname,
326                         drone.allowed_users)
327            logging.info('Drone %s.support_ssp: %s', hostname,
328                         drone.support_ssp)
329
330        self._reorder_drone_queue() # max_processes may have changed
331        # Clear notification record about reaching max_processes limit.
332        self._notify_record = {}
333
334
335    def get_drones(self):
336        return self._drones.itervalues()
337
338
339    def cleanup_orphaned_containers(self):
340        """Queue cleanup_orphaned_containers call at each drone.
341        """
342        for drone in self._drones.values():
343            logging.info('Queue cleanup_orphaned_containers at %s',
344                         drone.hostname)
345            drone.queue_call('cleanup_orphaned_containers')
346
347
348    def _get_drone_for_process(self, process):
349        return self._drones[process.hostname]
350
351
352    def _get_drone_for_pidfile_id(self, pidfile_id):
353        pidfile_contents = self.get_pidfile_contents(pidfile_id)
354        assert pidfile_contents.process is not None
355        return self._get_drone_for_process(pidfile_contents.process)
356
357
358    def _drop_old_pidfiles(self):
359        # use items() since the dict is modified in unregister_pidfile()
360        for pidfile_id, info in self._registered_pidfile_info.items():
361            if info.age > self._get_max_pidfile_refreshes():
362                logging.warning('dropping leaked pidfile %s', pidfile_id)
363                self.unregister_pidfile(pidfile_id)
364            else:
365                info.age += 1
366
367
368    def _reset(self):
369        self._process_set = set()
370        self._all_processes = {}
371        self._pidfiles = {}
372        self._pidfiles_second_read = {}
373        self._drone_queue = []
374
375
376    def _parse_pidfile(self, drone, raw_contents):
377        """Parse raw pidfile contents.
378
379        @param drone: The drone on which this pidfile was found.
380        @param raw_contents: The raw contents of a pidfile, eg:
381            "pid\nexit_staus\nnum_tests_failed\n".
382        """
383        contents = PidfileContents()
384        if not raw_contents:
385            return contents
386        lines = raw_contents.splitlines()
387        if len(lines) > 3:
388            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
389                                  (len(lines), lines))
390        try:
391            pid = int(lines[0])
392            contents.process = Process(drone.hostname, pid)
393            # if len(lines) == 2, assume we caught Autoserv between writing
394            # exit_status and num_failed_tests, so just ignore it and wait for
395            # the next cycle
396            if len(lines) == 3:
397                contents.exit_status = int(lines[1])
398                contents.num_tests_failed = int(lines[2])
399        except ValueError, exc:
400            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
401
402        return contents
403
404
405    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
406        for pidfile_path, contents in pidfiles.iteritems():
407            pidfile_id = PidfileId(pidfile_path)
408            contents = self._parse_pidfile(drone, contents)
409            store_in_dict[pidfile_id] = contents
410
411
412    def _add_process(self, drone, process_info):
413        process = Process(drone.hostname, int(process_info['pid']),
414                          int(process_info['ppid']))
415        self._process_set.add(process)
416
417
418    def _add_autoserv_process(self, drone, process_info):
419        assert process_info['comm'] == 'autoserv'
420        # only root autoserv processes have pgid == pid
421        if process_info['pgid'] != process_info['pid']:
422            return
423        self._add_process(drone, process_info)
424
425
426    def _enqueue_drone(self, drone):
427        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
428
429
430    def _reorder_drone_queue(self):
431        heapq.heapify(self._drone_queue)
432
433
434    def _compute_active_processes(self, drone):
435        drone.active_processes = 0
436        for pidfile_id, contents in self._pidfiles.iteritems():
437            is_running = contents.exit_status is None
438            on_this_drone = (contents.process
439                             and contents.process.hostname == drone.hostname)
440            if is_running and on_this_drone:
441                info = self._registered_pidfile_info[pidfile_id]
442                if info.num_processes is not None:
443                    drone.active_processes += info.num_processes
444        self._ACTIVE_PROCESS_GAUGE.set(
445                drone.active_processes,
446                fields={'drone_hostname': drone.hostname})
447
448
449    def _check_drone_process_limit(self, drone):
450        """
451        Notify if the number of processes on |drone| is approaching limit.
452
453        @param drone: A Drone object.
454        """
455        try:
456            percent = float(drone.active_processes) / drone.max_processes
457        except ZeroDivisionError:
458            percent = 100
459        metrics.Float('chromeos/autotest/drone/active_process_percentage'
460                      ).set(percent, fields={'drone_hostname': drone.hostname})
461
462    def trigger_refresh(self):
463        """Triggers a drone manager refresh.
464
465        @raises DroneManagerError: If a drone has un-executed calls.
466            Since they will get clobbered when we queue refresh calls.
467        """
468        self._reset()
469        self._drop_old_pidfiles()
470        pidfile_paths = [pidfile_id.path
471                         for pidfile_id in self._registered_pidfile_info]
472        drones = list(self.get_drones())
473        for drone in drones:
474            calls = drone.get_calls()
475            if calls:
476                raise DroneManagerError('Drone %s has un-executed calls: %s '
477                                        'which might get corrupted through '
478                                        'this invocation' %
479                                        (drone, [str(call) for call in calls]))
480            drone.queue_call('refresh', pidfile_paths)
481        logging.info("Invoking drone refresh.")
482        with metrics.SecondsTimer(
483                'chromeos/autotest/drone_manager/trigger_refresh_duration'):
484            self._refresh_task_queue.execute(drones, wait=False)
485
486
487    def sync_refresh(self):
488        """Complete the drone refresh started by trigger_refresh.
489
490        Waits for all drone threads then refreshes internal datastructures
491        with drone process information.
492        """
493
494        # This gives us a dictionary like what follows:
495        # {drone: [{'pidfiles': (raw contents of pidfile paths),
496        #           'autoserv_processes': (autoserv process info from ps),
497        #           'all_processes': (all process info from ps),
498        #           'parse_processes': (parse process infor from ps),
499        #           'pidfile_second_read': (pidfile contents, again),}]
500        #   drone2: ...}
501        # The values of each drone are only a list because this adheres to the
502        # drone utility interface (each call is executed and its results are
503        # places in a list, but since we never couple the refresh calls with
504        # any other call, this list will always contain a single dict).
505        with metrics.SecondsTimer(
506                'chromeos/autotest/drone_manager/sync_refresh_duration'):
507            all_results = self._refresh_task_queue.get_results()
508        logging.info("Drones refreshed.")
509
510        # The loop below goes through and parses pidfile contents. Pidfiles
511        # are used to track autoserv execution, and will always contain < 3
512        # lines of the following: pid, exit code, number of tests. Each pidfile
513        # is identified by a PidfileId object, which contains a unique pidfile
514        # path (unique because it contains the job id) making it hashable.
515        # All pidfiles are stored in the drone managers _pidfiles dict as:
516        #   {pidfile_id: pidfile_contents(Process(drone, pid),
517        #                                 exit_code, num_tests_failed)}
518        # In handle agents, each agent knows its pidfile_id, and uses this
519        # to retrieve the refreshed contents of its pidfile via the
520        # PidfileRunMonitor (through its tick) before making decisions. If
521        # the agent notices that its process has exited, it unregisters the
522        # pidfile from the drone_managers._registered_pidfile_info dict
523        # through its epilog.
524        for drone, results_list in all_results.iteritems():
525            results = results_list[0]
526            drone_hostname = drone.hostname.replace('.', '_')
527
528            for process_info in results['all_processes']:
529                if process_info['comm'] == 'autoserv':
530                    self._add_autoserv_process(drone, process_info)
531                drone_pid = drone.hostname, int(process_info['pid'])
532                self._all_processes[drone_pid] = process_info
533
534            for process_info in results['parse_processes']:
535                self._add_process(drone, process_info)
536
537            self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
538            self._process_pidfiles(drone, results['pidfiles_second_read'],
539                                   self._pidfiles_second_read)
540
541            self._compute_active_processes(drone)
542            if drone.enabled:
543                self._enqueue_drone(drone)
544                self._check_drone_process_limit(drone)
545
546
547    def refresh(self):
548        """Refresh all drones."""
549        with metrics.SecondsTimer(
550                'chromeos/autotest/drone_manager/refresh_duration'):
551            self.trigger_refresh()
552            self.sync_refresh()
553
554
555    @metrics.SecondsTimerDecorator(
556        'chromeos/autotest/drone_manager/execute_actions_duration')
557    def execute_actions(self):
558        """
559        Called at the end of a scheduler cycle to execute all queued actions
560        on drones.
561        """
562        # Invoke calls queued on all drones since the last call to execute
563        # and wait for them to return.
564        if _THREADED_DRONE_MANAGER:
565            thread_lib.ThreadedTaskQueue(
566                    name='%s.execute_queue' % self._STATS_KEY).execute(
567                            self._drones.values())
568        else:
569            drone_task_queue.DroneTaskQueue().execute(self._drones.values())
570
571        try:
572            self._results_drone.execute_queued_calls()
573        except error.AutoservError:
574            m = 'chromeos/autotest/errors/results_repository_failed'
575            metrics.Counter(m).increment(
576                fields={'drone_hostname': self._results_drone.hostname})
577            self._results_drone.clear_call_queue()
578
579
580    def get_orphaned_autoserv_processes(self):
581        """
582        Returns a set of Process objects for orphaned processes only.
583        """
584        return set(process for process in self._process_set
585                   if process.ppid == 1)
586
587
588    def kill_process(self, process):
589        """
590        Kill the given process.
591        """
592        logging.info('killing %s', process)
593        drone = self._get_drone_for_process(process)
594        drone.queue_kill_process(process)
595
596
597    def _ensure_directory_exists(self, path):
598        if not os.path.exists(path):
599            os.makedirs(path)
600
601
602    def total_running_processes(self):
603        return sum(drone.active_processes for drone in self.get_drones())
604
605
606    def max_runnable_processes(self, username, drone_hostnames_allowed):
607        """
608        Return the maximum number of processes that can be run (in a single
609        execution) given the current load on drones.
610        @param username: login of user to run a process.  may be None.
611        @param drone_hostnames_allowed: list of drones that can be used. May be
612                                        None
613        """
614        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
615                                 if wrapper.drone.usable_by(username) and
616                                 (drone_hostnames_allowed is None or
617                                          wrapper.drone.hostname in
618                                                  drone_hostnames_allowed)]
619        if not usable_drone_wrappers:
620            # all drones disabled or inaccessible
621            return 0
622        runnable_processes = [
623                wrapper.drone.max_processes - wrapper.drone.active_processes
624                for wrapper in usable_drone_wrappers]
625        return max([0] + runnable_processes)
626
627
628    def _least_loaded_drone(self, drones):
629        drone_to_use = drones[0]
630        for drone in drones[1:]:
631            if drone.used_capacity() < drone_to_use.used_capacity():
632                drone_to_use = drone
633        return drone_to_use
634
635
636    def _choose_drone_for_execution(self, num_processes, username,
637                                    drone_hostnames_allowed,
638                                    require_ssp=False):
639        """Choose a drone to execute command.
640
641        @param num_processes: Number of processes needed for execution.
642        @param username: Name of the user to execute the command.
643        @param drone_hostnames_allowed: A list of names of drone allowed.
644        @param require_ssp: Require server-side packaging to execute the,
645                            command, default to False.
646
647        @return: A drone object to be used for execution.
648        """
649        # cycle through drones is order of increasing used capacity until
650        # we find one that can handle these processes
651        checked_drones = []
652        usable_drones = []
653        # Drones do not support server-side packaging, used as backup if no
654        # drone is found to run command requires server-side packaging.
655        no_ssp_drones = []
656        drone_to_use = None
657        while self._drone_queue:
658            drone = heapq.heappop(self._drone_queue).drone
659            checked_drones.append(drone)
660            logging.info('Checking drone %s', drone.hostname)
661            if not drone.usable_by(username):
662                continue
663
664            drone_allowed = (drone_hostnames_allowed is None
665                             or drone.hostname in drone_hostnames_allowed)
666            if not drone_allowed:
667                logging.debug('Drone %s not allowed: ', drone.hostname)
668                continue
669            if require_ssp and not drone.support_ssp:
670                logging.debug('Drone %s does not support server-side '
671                              'packaging.', drone.hostname)
672                no_ssp_drones.append(drone)
673                continue
674
675            usable_drones.append(drone)
676
677            if drone.active_processes + num_processes <= drone.max_processes:
678                drone_to_use = drone
679                break
680            logging.info('Drone %s has %d active + %s requested > %s max',
681                         drone.hostname, drone.active_processes, num_processes,
682                         drone.max_processes)
683
684        if not drone_to_use and usable_drones:
685            # Drones are all over loaded, pick the one with least load.
686            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
687                                                   drone.active_processes,
688                                                   drone.max_processes)
689                                     for drone in usable_drones)
690            logging.error('No drone has capacity to handle %d processes (%s) '
691                          'for user %s', num_processes, drone_summary, username)
692            drone_to_use = self._least_loaded_drone(usable_drones)
693        elif not drone_to_use and require_ssp and no_ssp_drones:
694            # No drone supports server-side packaging, choose the least loaded.
695            drone_to_use = self._least_loaded_drone(no_ssp_drones)
696
697        # refill _drone_queue
698        for drone in checked_drones:
699            self._enqueue_drone(drone)
700
701        return drone_to_use
702
703
704    def _substitute_working_directory_into_command(self, command,
705                                                   working_directory):
706        for i, item in enumerate(command):
707            if item is WORKING_DIRECTORY:
708                command[i] = working_directory
709
710
711    def execute_command(self, command, working_directory, pidfile_name,
712                        num_processes, log_file=None, paired_with_pidfile=None,
713                        username=None, drone_hostnames_allowed=None):
714        """
715        Execute the given command, taken as an argv list.
716
717        @param command: command to execute as a list.  if any item is
718                WORKING_DIRECTORY, the absolute path to the working directory
719                will be substituted for it.
720        @param working_directory: directory in which the pidfile will be written
721        @param pidfile_name: name of the pidfile this process will write
722        @param num_processes: number of processes to account for from this
723                execution
724        @param log_file (optional): path (in the results repository) to hold
725                command output.
726        @param paired_with_pidfile (optional): a PidfileId for an
727                already-executed process; the new process will execute on the
728                same drone as the previous process.
729        @param username (optional): login of the user responsible for this
730                process.
731        @param drone_hostnames_allowed (optional): hostnames of the drones that
732                                                   this command is allowed to
733                                                   execute on
734        """
735        abs_working_directory = self.absolute_path(working_directory)
736        if not log_file:
737            log_file = self.get_temporary_path('execute')
738        log_file = self.absolute_path(log_file)
739
740        self._substitute_working_directory_into_command(command,
741                                                        abs_working_directory)
742
743        if paired_with_pidfile:
744            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
745        else:
746            require_ssp = '--require-ssp' in command
747            drone = self._choose_drone_for_execution(
748                    num_processes, username, drone_hostnames_allowed,
749                    require_ssp=require_ssp)
750            # Enable --warn-no-ssp option for autoserv to log a warning and run
751            # the command without using server-side packaging.
752            if require_ssp and not drone.support_ssp:
753                command.append('--warn-no-ssp')
754
755        if not drone:
756            raise DroneManagerError('command failed; no drones available: %s'
757                                    % command)
758
759        logging.info("command = %s", command)
760        logging.info('log file = %s:%s', drone.hostname, log_file)
761        self._write_attached_files(working_directory, drone)
762        drone.queue_call('execute_command', command, abs_working_directory,
763                         log_file, pidfile_name)
764        drone.active_processes += num_processes
765        self._reorder_drone_queue()
766
767        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
768        pidfile_id = PidfileId(pidfile_path)
769        self.register_pidfile(pidfile_id)
770        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
771        return pidfile_id
772
773
774    def get_pidfile_id_from(self, execution_tag, pidfile_name):
775        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
776        return PidfileId(path)
777
778
779    def register_pidfile(self, pidfile_id):
780        """
781        Indicate that the DroneManager should look for the given pidfile when
782        refreshing.
783        """
784        if pidfile_id not in self._registered_pidfile_info:
785            logging.info('monitoring pidfile %s', pidfile_id)
786            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
787        self._reset_pidfile_age(pidfile_id)
788
789
790    def _reset_pidfile_age(self, pidfile_id):
791        if pidfile_id in self._registered_pidfile_info:
792            self._registered_pidfile_info[pidfile_id].age = 0
793
794
795    def unregister_pidfile(self, pidfile_id):
796        if pidfile_id in self._registered_pidfile_info:
797            logging.info('forgetting pidfile %s', pidfile_id)
798            del self._registered_pidfile_info[pidfile_id]
799
800
801    def declare_process_count(self, pidfile_id, num_processes):
802        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
803
804
805    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
806        """
807        Retrieve a PidfileContents object for the given pidfile_id.  If
808        use_second_read is True, use results that were read after the processes
809        were checked, instead of before.
810        """
811        self._reset_pidfile_age(pidfile_id)
812        if use_second_read:
813            pidfile_map = self._pidfiles_second_read
814        else:
815            pidfile_map = self._pidfiles
816        return pidfile_map.get(pidfile_id, PidfileContents())
817
818
819    def is_process_running(self, process):
820        """
821        Check if the given process is in the running process list.
822        """
823        if process in self._process_set:
824            return True
825
826        drone_pid = process.hostname, process.pid
827        if drone_pid in self._all_processes:
828            logging.error('Process %s found, but not an autoserv process. '
829                    'Is %s', process, self._all_processes[drone_pid])
830            return True
831
832        return False
833
834
835    def get_temporary_path(self, base_name):
836        """
837        Get a new temporary path guaranteed to be unique across all drones
838        for this scheduler execution.
839        """
840        self._temporary_path_counter += 1
841        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
842                            '%s.%s' % (base_name, self._temporary_path_counter))
843
844
845    def absolute_path(self, path, on_results_repository=False):
846        if on_results_repository:
847            base_dir = self._results_dir
848        else:
849            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
850                                    _DRONE_RESULTS_DIR_SUFFIX)
851        return os.path.join(base_dir, path)
852
853
854    def _copy_results_helper(self, process, source_path, destination_path,
855                             to_results_repository=False):
856        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
857                      'destination_path: %s, to_results_repository: %s',
858                      process, source_path, destination_path,
859                      to_results_repository)
860        full_source = self.absolute_path(source_path)
861        full_destination = self.absolute_path(
862                destination_path, on_results_repository=to_results_repository)
863        source_drone = self._get_drone_for_process(process)
864        if to_results_repository:
865            source_drone.send_file_to(self._results_drone, full_source,
866                                      full_destination, can_fail=True)
867        else:
868            source_drone.queue_call('copy_file_or_directory', full_source,
869                                    full_destination)
870
871
872    def copy_to_results_repository(self, process, source_path,
873                                   destination_path=None):
874        """
875        Copy results from the given process at source_path to destination_path
876        in the results repository.
877
878        This will only copy the results back for Special Agent Tasks (Cleanup,
879        Verify, Repair) that reside in the hosts/ subdirectory of results if
880        the copy_task_results_back flag has been set to True inside
881        global_config.ini
882
883        It will also only copy .parse.log files back to the scheduler if the
884        copy_parse_log_back flag in global_config.ini has been set to True.
885        """
886        if not ENABLE_ARCHIVING:
887            return
888        copy_task_results_back = global_config.global_config.get_config_value(
889                scheduler_config.CONFIG_SECTION, 'copy_task_results_back',
890                type=bool)
891        copy_parse_log_back = global_config.global_config.get_config_value(
892                scheduler_config.CONFIG_SECTION, 'copy_parse_log_back',
893                type=bool)
894        special_task = source_path.startswith(HOSTS_JOB_SUBDIR)
895        parse_log = source_path.endswith(PARSE_LOG)
896        if (copy_task_results_back or not special_task) and (
897                copy_parse_log_back or not parse_log):
898            if destination_path is None:
899                destination_path = source_path
900            self._copy_results_helper(process, source_path, destination_path,
901                                      to_results_repository=True)
902
903    def _copy_to_results_repository(self, process, source_path,
904                                   destination_path=None):
905        """
906        Copy results from the given process at source_path to destination_path
907        in the results repository, without special task handling.
908        """
909        if destination_path is None:
910            destination_path = source_path
911        self._copy_results_helper(process, source_path, destination_path,
912                                  to_results_repository=True)
913
914
915    def copy_results_on_drone(self, process, source_path, destination_path):
916        """
917        Copy a results directory from one place to another on the drone.
918        """
919        self._copy_results_helper(process, source_path, destination_path)
920
921
922    def _write_attached_files(self, results_dir, drone):
923        attached_files = self._attached_files.pop(results_dir, {})
924        for file_path, contents in attached_files.iteritems():
925            drone.queue_call('write_to_file', self.absolute_path(file_path),
926                             contents)
927
928
929    def attach_file_to_execution(self, results_dir, file_contents,
930                                 file_path=None):
931        """
932        When the process for the results directory is executed, the given file
933        contents will be placed in a file on the drone.  Returns the path at
934        which the file will be placed.
935        """
936        if not file_path:
937            file_path = self.get_temporary_path('attach')
938        files_for_execution = self._attached_files.setdefault(results_dir, {})
939        assert file_path not in files_for_execution
940        files_for_execution[file_path] = file_contents
941        return file_path
942
943
944    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
945        """
946        Write the given lines (as a list of strings) to a file.  If
947        paired_with_process is given, the file will be written on the drone
948        running the given Process.  Otherwise, the file will be written to the
949        results repository.
950        """
951        file_contents = '\n'.join(lines) + '\n'
952        if paired_with_process:
953            drone = self._get_drone_for_process(paired_with_process)
954            on_results_repository = False
955        else:
956            drone = self._results_drone
957            on_results_repository = True
958        full_path = self.absolute_path(
959                file_path, on_results_repository=on_results_repository)
960        drone.queue_call('write_to_file', full_path, file_contents)
961
962
963_the_instance = None
964
965def instance():
966    if _the_instance is None:
967        _set_instance(DroneManager())
968    return _the_instance
969
970
971def _set_instance(instance): # usable for testing
972    global _the_instance
973    _the_instance = instance
974