1#!/usr/bin/python 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import atexit 7import itertools 8import logging 9import os 10import pipes 11import select 12import subprocess 13import threading 14 15from subprocess import PIPE 16from autotest_lib.client.common_lib.utils import TEE_TO_LOGS 17 18_popen_lock = threading.Lock() 19_logging_service = None 20_command_serial_number = itertools.count(1) 21 22_LOG_BUFSIZE = 4096 23_PIPE_CLOSED = -1 24 25class _LoggerProxy(object): 26 27 def __init__(self, logger): 28 self._logger = logger 29 30 def fileno(self): 31 return self._logger._pipe[1] 32 33 def __del__(self): 34 self._logger.close() 35 36 37class _PipeLogger(object): 38 39 def __init__(self, level, prefix): 40 self._pipe = list(os.pipe()) 41 self._level = level 42 self._prefix = prefix 43 44 def close(self): 45 if self._pipe[1] != _PIPE_CLOSED: 46 os.close(self._pipe[1]) 47 self._pipe[1] = _PIPE_CLOSED 48 49 50class _LoggingService(object): 51 52 def __init__(self): 53 # Python's list is thread safe 54 self._loggers = [] 55 56 # Change tuple to list so that we can change the value when 57 # closing the pipe. 58 self._pipe = list(os.pipe()) 59 self._thread = threading.Thread(target=self._service_run) 60 self._thread.daemon = True 61 self._thread.start() 62 63 64 def _service_run(self): 65 terminate_loop = False 66 while not terminate_loop: 67 rlist = [l._pipe[0] for l in self._loggers] 68 rlist.append(self._pipe[0]) 69 for r in select.select(rlist, [], [])[0]: 70 data = os.read(r, _LOG_BUFSIZE) 71 if r != self._pipe[0]: 72 self._output_logger_message(r, data) 73 elif len(data) == 0: 74 terminate_loop = True 75 # Release resources. 76 os.close(self._pipe[0]) 77 for logger in self._loggers: 78 os.close(logger._pipe[0]) 79 80 81 def _output_logger_message(self, r, data): 82 logger = next(l for l in self._loggers if l._pipe[0] == r) 83 84 if len(data) == 0: 85 os.close(logger._pipe[0]) 86 self._loggers.remove(logger) 87 return 88 89 for line in data.split('\n'): 90 logging.log(logger._level, '%s%s', logger._prefix, line) 91 92 93 def create_logger(self, level=logging.DEBUG, prefix=''): 94 logger = _PipeLogger(level=level, prefix=prefix) 95 self._loggers.append(logger) 96 os.write(self._pipe[1], '\0') 97 return _LoggerProxy(logger) 98 99 100 def shutdown(self): 101 if self._pipe[1] != _PIPE_CLOSED: 102 os.close(self._pipe[1]) 103 self._pipe[1] = _PIPE_CLOSED 104 self._thread.join() 105 106 107def create_logger(level=logging.DEBUG, prefix=''): 108 global _logging_service 109 if _logging_service is None: 110 _logging_service = _LoggingService() 111 atexit.register(_logging_service.shutdown) 112 return _logging_service.create_logger(level=level, prefix=prefix) 113 114 115def kill_or_log_returncode(*popens): 116 '''Kills all the processes of the given Popens or logs the return code. 117 118 @param poopens: The Popens to be killed. 119 ''' 120 for p in popens: 121 if p.poll() is None: 122 try: 123 p.kill() 124 except Exception as e: 125 logging.warning('failed to kill %d, %s', p.pid, e) 126 else: 127 logging.warning('command exit (pid=%d, rc=%d): %s', 128 p.pid, p.returncode, p.command) 129 130 131def wait_and_check_returncode(*popens): 132 '''Wait for all the Popens and check the return code is 0. 133 134 If the return code is not 0, it raises an RuntimeError. 135 136 @param popens: The Popens to be checked. 137 ''' 138 error_message = None 139 for p in popens: 140 if p.wait() != 0: 141 error_message = ('Command failed(%d, %d): %s' % 142 (p.pid, p.returncode, p.command)) 143 logging.error(error_message) 144 if error_message: 145 raise RuntimeError(error_message) 146 147 148def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS): 149 '''Executes a child command and wait for it. 150 151 Returns the output from standard output if 'stdout' is subprocess.PIPE. 152 Raises RuntimeException if the return code of the child command is not 0. 153 154 @param args: the command to be executed 155 @param stdin: the executed program's standard input 156 @param stdout: the executed program's stdandrd output 157 ''' 158 ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr) 159 out = ps.communicate()[0] if stdout == subprocess.PIPE else None 160 wait_and_check_returncode(ps) 161 return out 162 163 164def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None): 165 '''Returns a Popen object just as subprocess.Popen does but with the 166 executed command stored in Popen.command. 167 ''' 168 command_id = _command_serial_number.next() 169 prefix = '[%04d] ' % command_id 170 171 if stdout is TEE_TO_LOGS: 172 stdout = create_logger(level=logging.DEBUG, prefix=prefix) 173 if stderr is TEE_TO_LOGS: 174 stderr = create_logger(level=logging.ERROR, prefix=prefix) 175 176 command = ' '.join(pipes.quote(x) for x in args) 177 logging.info('%sRunning: %s', prefix, command) 178 179 # The lock is required for http://crbug.com/323843. 180 with _popen_lock: 181 ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, 182 env=env) 183 logging.info('%spid is %d', prefix, ps.pid) 184 ps.command_id = command_id 185 ps.command = command 186 return ps 187