• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import itertools
8import logging
9import os
10import pipes
11import select
12import subprocess
13import threading
14
15from subprocess import PIPE
16from autotest_lib.client.common_lib.utils import TEE_TO_LOGS
17
18_popen_lock = threading.Lock()
19_logging_service = None
20_command_serial_number = itertools.count(1)
21
22_LOG_BUFSIZE = 4096
23_PIPE_CLOSED = -1
24
25class _LoggerProxy(object):
26
27    def __init__(self, logger):
28        self._logger = logger
29
30    def fileno(self):
31        return self._logger._pipe[1]
32
33    def __del__(self):
34        self._logger.close()
35
36
37class _PipeLogger(object):
38
39    def __init__(self, level, prefix):
40        self._pipe = list(os.pipe())
41        self._level = level
42        self._prefix = prefix
43
44    def close(self):
45        if self._pipe[1] != _PIPE_CLOSED:
46            os.close(self._pipe[1])
47            self._pipe[1] = _PIPE_CLOSED
48
49
50class _LoggingService(object):
51
52    def __init__(self):
53        # Python's list is thread safe
54        self._loggers = []
55
56        # Change tuple  to list so that we can change the value when
57        # closing the pipe.
58        self._pipe = list(os.pipe())
59        self._thread = threading.Thread(target=self._service_run)
60        self._thread.daemon = True
61        self._thread.start()
62
63
64    def _service_run(self):
65        terminate_loop = False
66        while not terminate_loop:
67            rlist = [l._pipe[0] for l in self._loggers]
68            rlist.append(self._pipe[0])
69            for r in select.select(rlist, [], [])[0]:
70                data = os.read(r, _LOG_BUFSIZE)
71                if r != self._pipe[0]:
72                    self._output_logger_message(r, data)
73                elif len(data) == 0:
74                    terminate_loop = True
75        # Release resources.
76        os.close(self._pipe[0])
77        for logger in self._loggers:
78            os.close(logger._pipe[0])
79
80
81    def _output_logger_message(self, r, data):
82        logger = next(l for l in self._loggers if l._pipe[0] == r)
83
84        if len(data) == 0:
85            os.close(logger._pipe[0])
86            self._loggers.remove(logger)
87            return
88
89        for line in data.split('\n'):
90            logging.log(logger._level, '%s%s', logger._prefix, line)
91
92
93    def create_logger(self, level=logging.DEBUG, prefix=''):
94        logger = _PipeLogger(level=level, prefix=prefix)
95        self._loggers.append(logger)
96        os.write(self._pipe[1], '\0')
97        return _LoggerProxy(logger)
98
99
100    def shutdown(self):
101        if self._pipe[1] != _PIPE_CLOSED:
102            os.close(self._pipe[1])
103            self._pipe[1] = _PIPE_CLOSED
104            self._thread.join()
105
106
107def create_logger(level=logging.DEBUG, prefix=''):
108    global _logging_service
109    if _logging_service is None:
110        _logging_service = _LoggingService()
111        atexit.register(_logging_service.shutdown)
112    return _logging_service.create_logger(level=level, prefix=prefix)
113
114
115def kill_or_log_returncode(*popens):
116    '''Kills all the processes of the given Popens or logs the return code.
117
118    @param poopens: The Popens to be killed.
119    '''
120    for p in popens:
121        if p.poll() is None:
122            try:
123                p.kill()
124            except Exception as e:
125                logging.warning('failed to kill %d, %s', p.pid, e)
126        else:
127            logging.warning('command exit (pid=%d, rc=%d): %s',
128                            p.pid, p.returncode, p.command)
129
130
131def wait_and_check_returncode(*popens):
132    '''Wait for all the Popens and check the return code is 0.
133
134    If the return code is not 0, it raises an RuntimeError.
135
136    @param popens: The Popens to be checked.
137    '''
138    error_message = None
139    for p in popens:
140        if p.wait() != 0:
141            error_message = ('Command failed(%d, %d): %s' %
142                             (p.pid, p.returncode, p.command))
143            logging.error(error_message)
144    if error_message:
145        raise RuntimeError(error_message)
146
147
148def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS):
149    '''Executes a child command and wait for it.
150
151    Returns the output from standard output if 'stdout' is subprocess.PIPE.
152    Raises RuntimeException if the return code of the child command is not 0.
153
154    @param args: the command to be executed
155    @param stdin: the executed program's standard input
156    @param stdout: the executed program's stdandrd output
157    '''
158    ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr)
159    out = ps.communicate()[0] if stdout == subprocess.PIPE else None
160    wait_and_check_returncode(ps)
161    return out
162
163
164def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None):
165    '''Returns a Popen object just as subprocess.Popen does but with the
166    executed command stored in Popen.command.
167    '''
168    command_id = _command_serial_number.next()
169    prefix = '[%04d] ' % command_id
170
171    if stdout is TEE_TO_LOGS:
172        stdout = create_logger(level=logging.DEBUG, prefix=prefix)
173    if stderr is TEE_TO_LOGS:
174        stderr = create_logger(level=logging.ERROR, prefix=prefix)
175
176    command = ' '.join(pipes.quote(x) for x in args)
177    logging.info('%sRunning: %s', prefix, command)
178
179    # The lock is required for http://crbug.com/323843.
180    with _popen_lock:
181        ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
182            env=env)
183    logging.info('%spid is %d', prefix, ps.pid)
184    ps.command_id = command_id
185    ps.command = command
186    return ps
187