1#!/usr/bin/python 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import atexit 7import itertools 8import logging 9import os 10import pipes 11import select 12import subprocess 13import threading 14 15from autotest_lib.client.common_lib.utils import TEE_TO_LOGS 16 17_popen_lock = threading.Lock() 18_logging_service = None 19_command_serial_number = itertools.count(1) 20 21_LOG_BUFSIZE = 4096 22_PIPE_CLOSED = -1 23 24class _LoggerProxy(object): 25 26 def __init__(self, logger): 27 self._logger = logger 28 29 def fileno(self): 30 """Returns the fileno of the logger pipe.""" 31 return self._logger._pipe[1] 32 33 def __del__(self): 34 self._logger.close() 35 36 37class _PipeLogger(object): 38 39 def __init__(self, level, prefix): 40 self._pipe = list(os.pipe()) 41 self._level = level 42 self._prefix = prefix 43 44 def close(self): 45 """Closes the logger.""" 46 if self._pipe[1] != _PIPE_CLOSED: 47 os.close(self._pipe[1]) 48 self._pipe[1] = _PIPE_CLOSED 49 50 51class _LoggingService(object): 52 53 def __init__(self): 54 # Python's list is thread safe 55 self._loggers = [] 56 57 # Change tuple to list so that we can change the value when 58 # closing the pipe. 59 self._pipe = list(os.pipe()) 60 self._thread = threading.Thread(target=self._service_run) 61 self._thread.daemon = True 62 self._thread.start() 63 64 65 def _service_run(self): 66 terminate_loop = False 67 while not terminate_loop: 68 rlist = [l._pipe[0] for l in self._loggers] 69 rlist.append(self._pipe[0]) 70 for r in select.select(rlist, [], [])[0]: 71 data = os.read(r, _LOG_BUFSIZE) 72 if r != self._pipe[0]: 73 self._output_logger_message(r, data) 74 elif len(data) == 0: 75 terminate_loop = True 76 # Release resources. 77 os.close(self._pipe[0]) 78 for logger in self._loggers: 79 os.close(logger._pipe[0]) 80 81 82 def _output_logger_message(self, r, data): 83 logger = next(l for l in self._loggers if l._pipe[0] == r) 84 85 if len(data) == 0: 86 os.close(logger._pipe[0]) 87 self._loggers.remove(logger) 88 return 89 90 for line in data.split('\n'): 91 logging.log(logger._level, '%s%s', logger._prefix, line) 92 93 94 def create_logger(self, level=logging.DEBUG, prefix=''): 95 """Creates a new logger. 96 97 @param level: the desired logging level 98 @param prefix: the prefix to add to each log entry 99 """ 100 logger = _PipeLogger(level=level, prefix=prefix) 101 self._loggers.append(logger) 102 os.write(self._pipe[1], '\0') 103 return _LoggerProxy(logger) 104 105 106 def shutdown(self): 107 """Shuts down the logger.""" 108 if self._pipe[1] != _PIPE_CLOSED: 109 os.close(self._pipe[1]) 110 self._pipe[1] = _PIPE_CLOSED 111 self._thread.join() 112 113 114def create_logger(level=logging.DEBUG, prefix=''): 115 """Creates a new logger. 116 117 @param level: the desired logging level 118 @param prefix: the prefix to add to each log entry 119 """ 120 global _logging_service 121 if _logging_service is None: 122 _logging_service = _LoggingService() 123 atexit.register(_logging_service.shutdown) 124 return _logging_service.create_logger(level=level, prefix=prefix) 125 126 127def kill_or_log_returncode(*popens): 128 """Kills all the processes of the given Popens or logs the return code. 129 130 @param popens: The Popens to be killed. 131 """ 132 for p in popens: 133 if p.poll() is None: 134 try: 135 p.kill() 136 except Exception as e: 137 logging.warning('failed to kill %d, %s', p.pid, e) 138 else: 139 logging.warning('command exit (pid=%d, rc=%d): %s', 140 p.pid, p.returncode, p.command) 141 142 143def wait_and_check_returncode(*popens): 144 """Wait for all the Popens and check the return code is 0. 145 146 If the return code is not 0, it raises an RuntimeError. 147 148 @param popens: The Popens to be checked. 149 """ 150 error_message = None 151 for p in popens: 152 if p.wait() != 0: 153 error_message = ('Command failed(%d, %d): %s' % 154 (p.pid, p.returncode, p.command)) 155 logging.error(error_message) 156 if error_message: 157 raise RuntimeError(error_message) 158 159 160def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS): 161 """Executes a child command and wait for it. 162 163 Returns the output from standard output if 'stdout' is subprocess.PIPE. 164 Raises RuntimeException if the return code of the child command is not 0. 165 166 @param args: the command to be executed 167 @param stdin: the executed program's standard input 168 @param stdout: the executed program's standard output 169 @param stderr: the executed program's standard error 170 """ 171 ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr) 172 out = ps.communicate()[0] if stdout == subprocess.PIPE else None 173 wait_and_check_returncode(ps) 174 return out 175 176 177def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None): 178 """Returns a Popen object just as subprocess.Popen does but with the 179 executed command stored in Popen.command. 180 181 @param args: the command to be executed 182 @param stdin: the executed program's standard input 183 @param stdout: the executed program's standard output 184 @param stderr: the executed program's standard error 185 @param env: the executed program's environment 186 """ 187 command_id = _command_serial_number.next() 188 prefix = '[%04d] ' % command_id 189 190 if stdout is TEE_TO_LOGS: 191 stdout = create_logger(level=logging.DEBUG, prefix=prefix) 192 if stderr is TEE_TO_LOGS: 193 stderr = create_logger(level=logging.ERROR, prefix=prefix) 194 195 command = ' '.join(pipes.quote(x) for x in args) 196 logging.info('%sRunning: %s', prefix, command) 197 198 # The lock is required for http://crbug.com/323843. 199 with _popen_lock: 200 ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, 201 env=env) 202 logging.info('%spid is %d', prefix, ps.pid) 203 ps.command_id = command_id 204 ps.command = command 205 return ps 206