• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2
3"""Utility module that executes management commands on the drone.
4
51. This is the module responsible for orchestrating processes on a drone.
62. It receives instructions via stdin and replies via stdout.
73. Each invocation is responsible for the initiation of a set of batched calls.
84. The batched calls may be synchronous or asynchronous.
95. The caller is responsible for monitoring asynchronous calls through pidfiles.
10"""
11
12#pylint: disable-msg=missing-docstring
13
14import argparse
15import collections
16import datetime
17import getpass
18import itertools
19import logging
20import multiprocessing
21import os
22import pickle
23import shutil
24import signal
25import subprocess
26import sys
27import time
28import traceback
29
30import common
31
32from autotest_lib.client.common_lib import error
33from autotest_lib.client.common_lib import global_config
34from autotest_lib.client.common_lib import logging_manager
35from autotest_lib.client.common_lib import utils
36from autotest_lib.client.common_lib.cros import retry
37from autotest_lib.scheduler import drone_logging_config
38from autotest_lib.scheduler import scheduler_config
39from autotest_lib.server import subcommand
40
41
42# An environment variable we add to the environment to enable us to
43# distinguish processes we started from those that were started by
44# something else during recovery.  Name credit goes to showard. ;)
45DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
46
47_TEMPORARY_DIRECTORY = 'drone_tmp'
48_TRANSFER_FAILED_FILE = '.transfer_failed'
49
50# script and log file for cleaning up orphaned lxc containers.
51LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
52                                  'lxc_cleanup.py')
53LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
54                                    'lxc_cleanup.log')
55
56
57class _MethodCall(object):
58    def __init__(self, method, args, kwargs):
59        self._method = method
60        self._args = args
61        self._kwargs = kwargs
62
63
64    def execute_on(self, drone_utility):
65        method = getattr(drone_utility, self._method)
66        return method(*self._args, **self._kwargs)
67
68
69    def __str__(self):
70        args = ', '.join(repr(arg) for arg in self._args)
71        kwargs = ', '.join('%s=%r' % (key, value) for key, value in
72                           self._kwargs.iteritems())
73        full_args = ', '.join(item for item in (args, kwargs) if item)
74        return '%s(%s)' % (self._method, full_args)
75
76
77def call(method, *args, **kwargs):
78    return _MethodCall(method, args, kwargs)
79
80
81class DroneUtility(object):
82    """
83    This class executes actual OS calls on the drone machine.
84
85    All paths going into and out of this class are absolute.
86    """
87    _WARNING_DURATION = 400
88
89    def __init__(self):
90        # Tattoo ourselves so that all of our spawn bears our mark.
91        os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
92
93        self.warnings = []
94        self._subcommands = []
95
96
97    def initialize(self, results_dir):
98        temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
99        if os.path.exists(temporary_directory):
100            # TODO crbug.com/391111: before we have a better solution to
101            # periodically cleanup tmp files, we have to use rm -rf to delete
102            # the whole folder. shutil.rmtree has performance issue when a
103            # folder has large amount of files, e.g., drone_tmp.
104            os.system('rm -rf %s' % temporary_directory)
105        self._ensure_directory_exists(temporary_directory)
106        # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
107        # and clean up build_externals so it NO-OP's.
108        build_externals = global_config.global_config.get_config_value(
109                scheduler_config.CONFIG_SECTION, 'drone_build_externals',
110                default=True, type=bool)
111        if build_externals:
112            build_extern_cmd = os.path.join(common.autotest_dir,
113                                            'utils', 'build_externals.py')
114            utils.run(build_extern_cmd)
115
116
117    def _warn(self, warning):
118        self.warnings.append(warning)
119
120
121    def refresh(self, pidfile_paths):
122        """Refreshes our view of the processes referred to by pdfile_paths.
123
124        See drone_utility.ProcessRefresher.__call__ for details.
125        """
126        check_mark = global_config.global_config.get_config_value(
127            'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
128        use_pool = global_config.global_config.get_config_value(
129            'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False)
130        result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths)
131        self.warnings += warnings
132        return result
133
134
135    def get_signal_queue_to_kill(self, process):
136        """Get the signal queue needed to kill a process.
137
138        autoserv process has a handle on SIGTERM, in which it can do some
139        cleanup work. However, abort a process with SIGTERM then SIGKILL has
140        its overhead, detailed in following CL:
141        https://chromium-review.googlesource.com/230323
142        This method checks the process's argument and determine if SIGTERM is
143        required, and returns signal queue accordingly.
144
145        @param process: A drone_manager.Process object to be killed.
146
147        @return: The signal queue needed to kill a process.
148
149        """
150        signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
151        try:
152            ps_output = subprocess.check_output(
153                    ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
154            # For test running with server-side packaging, SIGTERM needs to be
155            # sent for autoserv process to destroy container used by the test.
156            if '--require-ssp' in ps_output:
157                logging.debug('PID %d requires SIGTERM to abort to cleanup '
158                              'container.', process.pid)
159                return signal_queue_with_sigterm
160        except subprocess.CalledProcessError:
161            # Ignore errors, return the signal queue with SIGTERM to be safe.
162            return signal_queue_with_sigterm
163        # Default to kill the process with SIGKILL directly.
164        return (signal.SIGKILL,)
165
166
167    def kill_processes(self, process_list):
168        """Send signals escalating in severity to the processes in process_list.
169
170        @param process_list: A list of drone_manager.Process objects
171                             representing the processes to kill.
172        """
173        try:
174            logging.info('List of process to be killed: %s', process_list)
175            processes_to_kill = {}
176            for p in process_list:
177                signal_queue = self.get_signal_queue_to_kill(p)
178                processes_to_kill[signal_queue] = (
179                        processes_to_kill.get(signal_queue, []) + [p])
180            sig_counts = {}
181            for signal_queue, processes in processes_to_kill.iteritems():
182                sig_counts.update(utils.nuke_pids(
183                        [-process.pid for process in processes],
184                        signal_queue=signal_queue))
185        except error.AutoservRunError as e:
186            self._warn('Error occured when killing processes. Error: %s' % e)
187
188
189    def _ensure_directory_exists(self, path):
190        if not os.path.exists(path):
191            os.makedirs(path)
192            return
193        if os.path.isdir(path):
194            return
195        assert os.path.isfile(path)
196        if '/hosts/' in path:
197            return
198        raise IOError('Path %s exists as a file, not a directory')
199
200
201    def execute_command(self, command, working_directory, log_file,
202                        pidfile_name):
203        out_file = None
204        if log_file:
205            self._ensure_directory_exists(os.path.dirname(log_file))
206            try:
207                out_file = open(log_file, 'a')
208                separator = ('*' * 80) + '\n'
209                out_file.write('\n' + separator)
210                out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
211                out_file.write(separator)
212            except (OSError, IOError):
213                pass
214
215        if not out_file:
216            out_file = open('/dev/null', 'w')
217
218        in_devnull = open('/dev/null', 'r')
219
220        self._ensure_directory_exists(working_directory)
221        pidfile_path = os.path.join(working_directory, pidfile_name)
222        if os.path.exists(pidfile_path):
223            self._warn('Pidfile %s already exists' % pidfile_path)
224            os.remove(pidfile_path)
225
226        subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
227                         stdin=in_devnull)
228        out_file.close()
229        in_devnull.close()
230
231
232    def write_to_file(self, file_path, contents, is_retry=False):
233        """Write the specified contents to the end of the given file.
234
235        @param file_path: Path to the file.
236        @param contents: Content to be written to the file.
237        @param is_retry: True if this is a retry after file permission be
238                         corrected.
239        """
240        self._ensure_directory_exists(os.path.dirname(file_path))
241        try:
242            file_object = open(file_path, 'a')
243            file_object.write(contents)
244            file_object.close()
245        except IOError as e:
246            # TODO(dshi): crbug.com/459344 Remove following retry when test
247            # container can be unprivileged container.
248            # If write failed with error 'Permission denied', one possible cause
249            # is that the file was created in a container and thus owned by
250            # root. If so, fix the file permission, and try again.
251            if e.errno == 13 and not is_retry:
252                logging.error('Error write to file %s: %s. Will be retried.',
253                              file_path, e)
254                utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
255                utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
256                self.write_to_file(file_path, contents, is_retry=True)
257            else:
258                self._warn('Error write to file %s: %s' % (file_path, e))
259
260
261    def copy_file_or_directory(self, source_path, destination_path):
262        """
263        This interface is designed to match server.hosts.abstract_ssh.get_file
264        (and send_file).  That is, if the source_path ends with a slash, the
265        contents of the directory are copied; otherwise, the directory iself is
266        copied.
267        """
268        if self._same_file(source_path, destination_path):
269            return
270        self._ensure_directory_exists(os.path.dirname(destination_path))
271        if source_path.endswith('/'):
272            # copying a directory's contents to another directory
273            assert os.path.isdir(source_path)
274            assert os.path.isdir(destination_path)
275            for filename in os.listdir(source_path):
276                self.copy_file_or_directory(
277                    os.path.join(source_path, filename),
278                    os.path.join(destination_path, filename))
279        elif os.path.isdir(source_path):
280            try:
281                shutil.copytree(source_path, destination_path, symlinks=True)
282            except shutil.Error:
283                # Ignore copy directory error due to missing files. The cause
284                # of this behavior is that, gs_offloader zips up folders with
285                # too many files. There is a race condition that repair job
286                # tries to copy provision job results to the test job result
287                # folder, meanwhile gs_offloader is uploading the provision job
288                # result and zipping up folders which contains too many files.
289                pass
290        elif os.path.islink(source_path):
291            # copied from shutil.copytree()
292            link_to = os.readlink(source_path)
293            os.symlink(link_to, destination_path)
294        else:
295            try:
296                shutil.copy(source_path, destination_path)
297            except IOError:
298                # Ignore copy error following the same above reason.
299                pass
300
301
302    def _same_file(self, source_path, destination_path):
303        """Checks if the source and destination are the same
304
305        Returns True if the destination is the same as the source, False
306        otherwise. Also returns False if the destination does not exist.
307        """
308        if not os.path.exists(destination_path):
309            return False
310        return os.path.samefile(source_path, destination_path)
311
312
313    def cleanup_orphaned_containers(self):
314        """Run lxc_cleanup script to clean up orphaned container.
315        """
316        # The script needs to run with sudo as the containers are privileged.
317        # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
318        # container can be unprivileged container.
319        command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
320                   LXC_CLEANUP_LOG_FILE]
321        logging.info('Running %s', command)
322        # stdout and stderr needs to be direct to /dev/null, otherwise existing
323        # of drone_utils process will kill lxc_cleanup script.
324        subprocess.Popen(
325                command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
326                stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
327
328
329    def wait_for_all_async_commands(self):
330        for subproc in self._subcommands:
331            subproc.fork_waitfor()
332        self._subcommands = []
333
334
335    def _poll_async_commands(self):
336        still_running = []
337        for subproc in self._subcommands:
338            if subproc.poll() is None:
339                still_running.append(subproc)
340        self._subcommands = still_running
341
342
343    def _wait_for_some_async_commands(self):
344        self._poll_async_commands()
345        max_processes = scheduler_config.config.max_transfer_processes
346        while len(self._subcommands) >= max_processes:
347            time.sleep(1)
348            self._poll_async_commands()
349
350
351    def run_async_command(self, function, args):
352        subproc = subcommand.subcommand(function, args)
353        self._subcommands.append(subproc)
354        subproc.fork_start()
355
356
357    def _sync_get_file_from(self, hostname, source_path, destination_path):
358        logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
359                      'destination_path: %s', hostname, source_path,
360                      destination_path)
361        self._ensure_directory_exists(os.path.dirname(destination_path))
362        host = create_host(hostname)
363        host.get_file(source_path, destination_path, delete_dest=True)
364
365
366    def get_file_from(self, hostname, source_path, destination_path):
367        self.run_async_command(self._sync_get_file_from,
368                               (hostname, source_path, destination_path))
369
370
371    def _sync_send_file_to(self, hostname, source_path, destination_path,
372                           can_fail):
373        logging.debug('_sync_send_file_to. hostname: %s, source_path: %s, '
374                      'destination_path: %s, can_fail:%s', hostname,
375                      source_path, destination_path, can_fail)
376        host = create_host(hostname)
377        try:
378            host.run('mkdir -p ' + os.path.dirname(destination_path))
379            host.send_file(source_path, destination_path, delete_dest=True)
380        except error.AutoservError:
381            if not can_fail:
382                raise
383
384            if os.path.isdir(source_path):
385                failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
386                file_object = open(failed_file, 'w')
387                try:
388                    file_object.write('%s:%s\n%s\n%s' %
389                                      (hostname, destination_path,
390                                       datetime.datetime.now(),
391                                       traceback.format_exc()))
392                finally:
393                    file_object.close()
394            else:
395                copy_to = destination_path + _TRANSFER_FAILED_FILE
396                self._ensure_directory_exists(os.path.dirname(copy_to))
397                self.copy_file_or_directory(source_path, copy_to)
398
399
400    def send_file_to(self, hostname, source_path, destination_path,
401                     can_fail=False):
402        self.run_async_command(self._sync_send_file_to,
403                               (hostname, source_path, destination_path,
404                                can_fail))
405
406
407    def _report_long_execution(self, calls, duration):
408        call_count = {}
409        for call in calls:
410            call_count.setdefault(call._method, 0)
411            call_count[call._method] += 1
412        call_summary = '\n'.join('%d %s' % (count, method)
413                                 for method, count in call_count.iteritems())
414        self._warn('Execution took %f sec\n%s' % (duration, call_summary))
415
416
417    def execute_calls(self, calls):
418        results = []
419        start_time = time.time()
420        max_processes = scheduler_config.config.max_transfer_processes
421        for method_call in calls:
422            results.append(method_call.execute_on(self))
423            if len(self._subcommands) >= max_processes:
424                self._wait_for_some_async_commands()
425        self.wait_for_all_async_commands()
426
427        duration = time.time() - start_time
428        if duration > self._WARNING_DURATION:
429            self._report_long_execution(calls, duration)
430
431        warnings = self.warnings
432        self.warnings = []
433        return dict(results=results, warnings=warnings)
434
435
436_MAX_REFRESH_POOL_SIZE = 50
437
438class ProcessRefresher(object):
439    """Object to refresh process information from give pidfiles.
440
441    Usage: ProcessRefresh(True)(pidfile_list)
442    """
443
444    def __init__(self, check_mark, use_pool=False):
445        """
446        @param check_mark: If True, only consider processes that were
447                explicitly marked by a former drone_utility call as autotest
448                related processes.
449        @param use_pool: If True, use a multiprocessing.Pool to parallelize
450                costly operations.
451        """
452        self._check_mark = check_mark
453        self._use_pool = use_pool
454        self._pool = None
455
456
457    def __call__(self, pidfile_paths):
458        """
459        @param pidfile_paths: A list of paths to check for pidfiles.
460
461        @returns (result, warnings)
462            where result is a dict with the following keys:
463            - pidfiles: dict mapping pidfile paths to file contents, for
464              pidfiles that exist.
465            - all_processes: list of dicts corresponding to all running
466              processes. Each dict contain pid, pgid, ppid, comm, and args (see
467              "man ps" for details).
468            - autoserv_processes: likewise, restricted to autoserv processes.
469            - parse_processes: likewise, restricted to parse processes.
470            - pidfiles_second_read: same info as pidfiles, but gathered after
471              the processes are scanned.
472            and warnings is a list of warnings genearted during process refresh.
473        """
474
475        if self._use_pool:
476            pool_size = max(
477                    min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE),
478                    1)
479            self._pool = multiprocessing.Pool(pool_size)
480        else:
481            pool_size = 0
482        logging.info('Refreshing %d pidfiles with %d helper processes',
483                     len(pidfile_paths), pool_size)
484
485        warnings = []
486        # It is necessary to explicitly force this to be a list because results
487        # are pickled by DroneUtility.
488        proc_infos = list(_get_process_info())
489
490        autoserv_processes, extra_warnings = self._filter_proc_infos(
491                proc_infos, 'autoserv')
492        warnings += extra_warnings
493        parse_processes, extra_warnings = self._filter_proc_infos(proc_infos,
494                                                                  'parse')
495        warnings += extra_warnings
496        site_parse_processes, extra_warnings = self._filter_proc_infos(
497                proc_infos, 'site_parse')
498        warnings += extra_warnings
499
500        result = {
501                'pidfiles': self._read_pidfiles(pidfile_paths),
502                'all_processes': proc_infos,
503                'autoserv_processes': autoserv_processes,
504                'parse_processes': (parse_processes + site_parse_processes),
505                'pidfiles_second_read': self._read_pidfiles(pidfile_paths),
506        }
507        return result, warnings
508
509
510    def _read_pidfiles(self, pidfile_paths):
511        """Uses a process pool to read requested pidfile_paths."""
512        if self._use_pool:
513            contents = self._pool.map(_read_pidfile, pidfile_paths)
514            contents = [c for c in contents if c is not None]
515            return {k: v for k, v in contents}
516        else:
517            pidfiles = {}
518            for path in pidfile_paths:
519                content = _read_pidfile(path)
520                if content is None:
521                    continue
522                pidfiles[content.path] = content.content
523            return pidfiles
524
525
526    def _filter_proc_infos(self, proc_infos, command_name):
527        """Filters process info for the given command_name.
528
529        Examines ps output as returned by get_process_info and return
530        the process dicts for processes matching the given command name.
531
532        @proc_infos: ps output as returned by _get_process_info.
533        @param command_name: The name of the command, eg 'autoserv'.
534
535        @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo
536                as returned by _get_process_info and warnings is a list of
537                warnings generated while filtering.
538                """
539        proc_infos = [info for info in proc_infos
540                      if info['comm'] == command_name]
541        if not self._check_mark:
542            return proc_infos, []
543
544        if self._use_pool:
545            dark_marks = self._pool.map(
546                    _process_has_dark_mark,
547                    [info['pid'] for info in proc_infos]
548            )
549        else:
550            dark_marks = [_process_has_dark_mark(info['pid'])
551                          for info in proc_infos]
552
553        marked_proc_infos = []
554        warnings = []
555        for marked, info in itertools.izip(dark_marks, proc_infos):
556            if marked:
557                marked_proc_infos.append(info)
558            else:
559                warnings.append(
560                        '%(comm)s process pid %(pid)s has no dark mark; '
561                        'ignoring.' % info)
562        return marked_proc_infos, warnings
563
564
565def create_host(hostname):
566    # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty
567    # drone_utility calls that don't actually interact with DUTs.
568    from autotest_lib.server import hosts
569    username = global_config.global_config.get_config_value(
570        'SCHEDULER', hostname + '_username', default=getpass.getuser())
571    return hosts.SSHHost(hostname, user=username)
572
573
574def parse_input():
575    input_chunks = []
576    chunk_of_input = sys.stdin.read()
577    while chunk_of_input:
578        input_chunks.append(chunk_of_input)
579        chunk_of_input = sys.stdin.read()
580    pickled_input = ''.join(input_chunks)
581
582    try:
583        return pickle.loads(pickled_input)
584    except Exception:
585        separator = '*' * 50
586        raise ValueError('Unpickling input failed\n'
587                         'Input: %r\n'
588                         'Exception from pickle:\n'
589                         '%s\n%s\n%s' %
590                         (pickled_input, separator, traceback.format_exc(),
591                          separator))
592
593
594def _parse_args(args):
595    parser = argparse.ArgumentParser(description='Local drone process manager.')
596    parser.add_argument('--call_time',
597                        help='Time this process was invoked from the master',
598                        default=None, type=float)
599    return parser.parse_args(args)
600
601
602def return_data(data):
603    print pickle.dumps(data)
604
605def _process_has_dark_mark(pid):
606    """Checks if a process was launched earlier by drone_utility.
607
608    @param pid: The pid of the process to check.
609    """
610    try:
611        with open('/proc/%s/environ' % pid, 'rb') as env_file:
612            env_data = env_file.read()
613    except EnvironmentError:
614        return False
615    return DARK_MARK_ENVIRONMENT_VAR in env_data
616
617
618_PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
619def _get_process_info():
620    """Parse ps output for all process information.
621
622    @returns A generator of dicts. Each dict has the following keys:
623        - comm: command_name,
624        - pgid: process group id,
625        - ppid: parent process id,
626        - pid: process id,
627        - args: args the command was invoked with,
628    """
629    @retry.retry(subprocess.CalledProcessError,
630                    timeout_min=0.5, delay_sec=0.25)
631    def run_ps():
632        return subprocess.check_output(
633                ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)])
634
635    ps_output = run_ps()
636    # split each line into the columns output by ps
637    split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
638    return (dict(itertools.izip(_PS_ARGS, line_components))
639            for line_components in split_lines)
640
641
642_PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content'])
643def _read_pidfile(pidfile_path):
644    """Reads the content of the given pidfile if it exists
645
646    @param: pidfile_path: Path of the file to read.
647    @returns: _PidfileContent tuple on success, None otherwise.
648    """
649    if not os.path.exists(pidfile_path):
650        return None
651    try:
652        with open(pidfile_path, 'r') as file_object:
653            return _PidfileContent(pidfile_path, file_object.read())
654    except IOError:
655        return None
656
657
658def main():
659    logging_manager.configure_logging(
660            drone_logging_config.DroneLoggingConfig())
661    calls = parse_input()
662    args = _parse_args(sys.argv[1:])
663
664    drone_utility = DroneUtility()
665    return_value = drone_utility.execute_calls(calls)
666    return_data(return_value)
667
668
669if __name__ == '__main__':
670    main()
671