1# Copyright 2011 Google Inc. All Rights Reserved. 2# 3"""Classes that help running commands in a subshell. 4 5Commands can be run locally, or remotly using SSH connection. You may log the 6output of a command to a terminal or a file, or any other destination. 7""" 8 9__author__ = 'kbaclawski@google.com (Krystian Baclawski)' 10 11import fcntl 12import logging 13import os 14import select 15import subprocess 16import time 17 18from automation.common import logger 19 20 21class CommandExecuter(object): 22 DRY_RUN = False 23 24 def __init__(self, dry_run=False): 25 self._logger = logging.getLogger(self.__class__.__name__) 26 self._dry_run = dry_run or self.DRY_RUN 27 28 @classmethod 29 def Configure(cls, dry_run): 30 cls.DRY_RUN = dry_run 31 32 def RunCommand(self, 33 cmd, 34 machine=None, 35 username=None, 36 command_terminator=None, 37 command_timeout=None): 38 cmd = str(cmd) 39 40 if self._dry_run: 41 return 0 42 43 if not command_terminator: 44 command_terminator = CommandTerminator() 45 46 if command_terminator.IsTerminated(): 47 self._logger.warning('Command has been already terminated!') 48 return 1 49 50 # Rewrite command for remote execution. 51 if machine: 52 if username: 53 login = '%s@%s' % (username, machine) 54 else: 55 login = machine 56 57 self._logger.debug("Executing '%s' on %s.", cmd, login) 58 59 # FIXME(asharif): Remove this after crosbug.com/33007 is fixed. 60 cmd = "ssh -t -t %s -- '%s'" % (login, cmd) 61 else: 62 self._logger.debug("Executing: '%s'.", cmd) 63 64 child = self._SpawnProcess(cmd, command_terminator, command_timeout) 65 66 self._logger.debug('{PID: %d} Finished with %d code.', child.pid, 67 child.returncode) 68 69 return child.returncode 70 71 def _Terminate(self, child, command_timeout, wait_timeout=10): 72 """Gracefully shutdown the child by sending SIGTERM.""" 73 74 if command_timeout: 75 self._logger.warning('{PID: %d} Timeout of %s seconds reached since ' 76 'process started.', child.pid, command_timeout) 77 78 self._logger.warning('{PID: %d} Terminating child.', child.pid) 79 80 try: 81 child.terminate() 82 except OSError: 83 pass 84 85 wait_started = time.time() 86 87 while not child.poll(): 88 if time.time() - wait_started >= wait_timeout: 89 break 90 time.sleep(0.1) 91 92 return child.poll() 93 94 def _Kill(self, child): 95 """Kill the child with immediate result.""" 96 self._logger.warning('{PID: %d} Process still alive.', child.pid) 97 self._logger.warning('{PID: %d} Killing child.', child.pid) 98 child.kill() 99 child.wait() 100 101 def _SpawnProcess(self, cmd, command_terminator, command_timeout): 102 # Create a child process executing provided command. 103 child = subprocess.Popen(cmd, 104 stdout=subprocess.PIPE, 105 stderr=subprocess.PIPE, 106 stdin=subprocess.PIPE, 107 shell=True) 108 109 # Close stdin so the child won't be able to block on read. 110 child.stdin.close() 111 112 started_time = time.time() 113 114 # Watch for data on process stdout, stderr. 115 pipes = [child.stdout, child.stderr] 116 117 # Put pipes into non-blocking mode. 118 for pipe in pipes: 119 fd = pipe.fileno() 120 fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL) 121 fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK) 122 123 already_terminated = False 124 125 while pipes: 126 # Maybe timeout reached? 127 if command_timeout and time.time() - started_time > command_timeout: 128 command_terminator.Terminate() 129 130 # Check if terminate request was received. 131 if command_terminator.IsTerminated() and not already_terminated: 132 if not self._Terminate(child, command_timeout): 133 self._Kill(child) 134 # Don't exit the loop immediately. Firstly try to read everything that 135 # was left on stdout and stderr. 136 already_terminated = True 137 138 # Wait for pipes to become ready. 139 ready_pipes, _, _ = select.select(pipes, [], [], 0.1) 140 141 # Handle file descriptors ready to be read. 142 for pipe in ready_pipes: 143 fd = pipe.fileno() 144 145 data = os.read(fd, 4096) 146 147 # check for end-of-file 148 if not data: 149 pipes.remove(pipe) 150 continue 151 152 # read all data that's available 153 while data: 154 if pipe == child.stdout: 155 self.DataReceivedOnOutput(data) 156 elif pipe == child.stderr: 157 self.DataReceivedOnError(data) 158 159 try: 160 data = os.read(fd, 4096) 161 except OSError: 162 # terminate loop if EWOULDBLOCK (EAGAIN) is received 163 data = '' 164 165 if not already_terminated: 166 self._logger.debug('Waiting for command to finish.') 167 child.wait() 168 169 return child 170 171 def DataReceivedOnOutput(self, data): 172 """Invoked when the child process wrote data to stdout.""" 173 sys.stdout.write(data) 174 175 def DataReceivedOnError(self, data): 176 """Invoked when the child process wrote data to stderr.""" 177 sys.stderr.write(data) 178 179 180class LoggingCommandExecuter(CommandExecuter): 181 182 def __init__(self, *args, **kwargs): 183 super(LoggingCommandExecuter, self).__init__(*args, **kwargs) 184 185 # Create a logger for command's stdout/stderr streams. 186 self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output')) 187 188 def OpenLog(self, log_path): 189 """The messages are going to be saved to gzip compressed file.""" 190 formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s', 191 '%Y-%m-%d %H:%M:%S') 192 handler = logger.CompressedFileHandler(log_path, delay=True) 193 handler.setFormatter(formatter) 194 self._output.addHandler(handler) 195 196 # Set a flag to prevent log records from being propagated up the logger 197 # hierarchy tree. We don't want for command output messages to appear in 198 # the main log. 199 self._output.propagate = 0 200 201 def CloseLog(self): 202 """Remove handlers and reattach the logger to its parent.""" 203 for handler in list(self._output.handlers): 204 self._output.removeHandler(handler) 205 handler.flush() 206 handler.close() 207 208 self._output.propagate = 1 209 210 def DataReceivedOnOutput(self, data): 211 """Invoked when the child process wrote data to stdout.""" 212 for line in data.splitlines(): 213 self._output.info(line, extra={'prefix': 'STDOUT'}) 214 215 def DataReceivedOnError(self, data): 216 """Invoked when the child process wrote data to stderr.""" 217 for line in data.splitlines(): 218 self._output.warning(line, extra={'prefix': 'STDERR'}) 219 220 221class CommandTerminator(object): 222 223 def __init__(self): 224 self.terminated = False 225 226 def Terminate(self): 227 self.terminated = True 228 229 def IsTerminated(self): 230 return self.terminated 231