1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import os 19import shlex 20import signal 21import subprocess 22import sys 23import time 24from threading import Thread 25 26_on_windows = sys.platform == 'win32' 27 28 29class ProcessError(Exception): 30 """Raised when invalid operations are run on a Process.""" 31 32 33class Process(object): 34 """A Process object used to run various commands. 35 36 Attributes: 37 _command: The initial command to run. 38 _subprocess_kwargs: The kwargs to send to Popen for more control over 39 execution. 40 _process: The subprocess.Popen object currently executing a process. 41 _listening_thread: The thread that is listening for the process to stop. 42 _redirection_thread: The thread that is redirecting process output. 43 _on_output_callback: The callback to call when output is received. 44 _on_terminate_callback: The callback to call when the process terminates 45 without stop() being called first. 46 _started: Whether or not start() was called. 47 _stopped: Whether or not stop() was called. 48 """ 49 50 def __init__(self, command, **kwargs): 51 """Creates a Process object. 52 53 Note that this constructor does not begin the process. To start the 54 process, use Process.start(). 55 """ 56 # Split command string into list if shell=True is not specified 57 self._use_shell = kwargs.get('shell', False) 58 if not self._use_shell and isinstance(command, str): 59 command = shlex.split(command) 60 self._command = command 61 self._subprocess_kwargs = kwargs 62 if _on_windows: 63 self._subprocess_kwargs['creationflags'] = ( 64 subprocess.CREATE_NEW_PROCESS_GROUP) 65 else: 66 self._subprocess_kwargs['start_new_session'] = True 67 self._process = None 68 69 self._listening_thread = None 70 self._redirection_thread = None 71 self._on_output_callback = lambda *args, **kw: None 72 self._binary_output = False 73 self._on_terminate_callback = lambda *args, **kw: '' 74 75 self._started = False 76 self._stopped = False 77 78 def set_on_output_callback(self, on_output_callback, binary=False): 79 """Sets the on_output_callback function. 80 81 Args: 82 on_output_callback: The function to be called when output is sent to 83 the output. The output callback has the following signature: 84 85 >>> def on_output_callback(output_line): 86 >>> return None 87 88 binary: If True, read the process output as raw binary. 89 Returns: 90 self 91 """ 92 self._on_output_callback = on_output_callback 93 self._binary_output = binary 94 return self 95 96 def set_on_terminate_callback(self, on_terminate_callback): 97 """Sets the on_self_terminate callback function. 98 99 Args: 100 on_terminate_callback: The function to be called when the process 101 has terminated on its own. The callback has the following 102 signature: 103 104 >>> def on_self_terminate_callback(popen_process): 105 >>> return 'command to run' or None 106 107 If a string is returned, the string returned will be the command 108 line used to run the command again. If None is returned, the 109 process will end without restarting. 110 111 Returns: 112 self 113 """ 114 self._on_terminate_callback = on_terminate_callback 115 return self 116 117 def start(self): 118 """Starts the process's execution.""" 119 if self._started: 120 raise ProcessError('Process has already started.') 121 self._started = True 122 self._process = None 123 124 self._listening_thread = Thread(target=self._exec_loop) 125 self._listening_thread.start() 126 127 time_up_at = time.time() + 1 128 129 while self._process is None: 130 if time.time() > time_up_at: 131 raise OSError('Unable to open process!') 132 133 self._stopped = False 134 135 @staticmethod 136 def _get_timeout_left(timeout, start_time): 137 return max(.1, timeout - (time.time() - start_time)) 138 139 def is_running(self): 140 """Checks that the underlying Popen process is still running 141 142 Returns: 143 True if the process is running. 144 """ 145 return self._process is not None and self._process.poll() is None 146 147 def _join_threads(self): 148 """Waits for the threads associated with the process to terminate.""" 149 if self._listening_thread is not None: 150 self._listening_thread.join() 151 self._listening_thread = None 152 153 if self._redirection_thread is not None: 154 self._redirection_thread.join() 155 self._redirection_thread = None 156 157 def _kill_process(self): 158 """Kills the underlying process/process group. Implementation is 159 platform-dependent.""" 160 if _on_windows: 161 subprocess.check_call('taskkill /F /T /PID %s' % self._process.pid) 162 else: 163 self.signal(signal.SIGKILL) 164 165 def wait(self, kill_timeout=60.0): 166 """Waits for the process to finish execution. 167 168 If the process has reached the kill_timeout, the process will be killed 169 instead. 170 171 Note: the on_self_terminate callback will NOT be called when calling 172 this function. 173 174 Args: 175 kill_timeout: The amount of time to wait until killing the process. 176 """ 177 if self._stopped: 178 raise ProcessError('Process is already being stopped.') 179 self._stopped = True 180 181 try: 182 self._process.wait(kill_timeout) 183 except subprocess.TimeoutExpired: 184 self._kill_process() 185 finally: 186 self._join_threads() 187 self._started = False 188 189 def signal(self, sig): 190 """Sends a signal to the process. 191 192 Args: 193 sig: The signal to be sent. 194 """ 195 if _on_windows: 196 raise ProcessError('Unable to call Process.signal on windows.') 197 198 pgid = os.getpgid(self._process.pid) 199 os.killpg(pgid, sig) 200 201 def stop(self): 202 """Stops the process. 203 204 This command is effectively equivalent to kill, but gives time to clean 205 up any related work on the process, such as output redirection. 206 207 Note: the on_self_terminate callback will NOT be called when calling 208 this function. 209 """ 210 self.wait(0) 211 212 def _redirect_output(self): 213 """Redirects the output from the command into the on_output_callback.""" 214 if self._binary_output: 215 while True: 216 data = self._process.stdout.read(1024) 217 218 if not data: 219 return 220 else: 221 self._on_output_callback(data) 222 else: 223 while True: 224 line = self._process.stdout.readline().decode('utf-8', 225 errors='replace') 226 227 if not line: 228 return 229 else: 230 # Output the line without trailing \n and whitespace. 231 self._on_output_callback(line.rstrip()) 232 233 @staticmethod 234 def __start_process(command, **kwargs): 235 """A convenient wrapper function for starting the process.""" 236 acts_logger = logging.getLogger() 237 acts_logger.debug( 238 'Starting command "%s" with kwargs %s', command, kwargs) 239 return subprocess.Popen(command, **kwargs) 240 241 def _exec_loop(self): 242 """Executes Popen in a loop. 243 244 When Popen terminates without stop() being called, 245 self._on_terminate_callback() will be called. The returned value from 246 _on_terminate_callback will then be used to determine if the loop should 247 continue and start up the process again. See set_on_terminate_callback() 248 for more information. 249 """ 250 command = self._command 251 while True: 252 self._process = self.__start_process(command, 253 stdout=subprocess.PIPE, 254 stderr=subprocess.STDOUT, 255 bufsize=1, 256 **self._subprocess_kwargs) 257 self._redirection_thread = Thread(target=self._redirect_output) 258 self._redirection_thread.start() 259 self._process.wait() 260 261 if self._stopped: 262 logging.debug('The process for command %s was stopped.', 263 command) 264 break 265 else: 266 logging.debug('The process for command %s terminated.', 267 command) 268 # Wait for all output to be processed before sending 269 # _on_terminate_callback() 270 self._redirection_thread.join() 271 logging.debug('Beginning on_terminate_callback for %s.', 272 command) 273 retry_value = self._on_terminate_callback(self._process) 274 if retry_value: 275 if not self._use_shell and isinstance(retry_value, str): 276 retry_value = shlex.split(retry_value) 277 command = retry_value 278 else: 279 break 280