1# Lint as: python2, python3 2from __future__ import absolute_import 3from __future__ import division 4from __future__ import print_function 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007""" 7 8import sys, os, signal, time, six.moves.cPickle, logging 9 10from autotest_lib.client.common_lib import error, utils 11from autotest_lib.client.common_lib.cros import retry 12from six.moves import zip 13 14 15# entry points that use subcommand must set this to their logging manager 16# to get log redirection for subcommands 17logging_manager_object = None 18 19 20def parallel(tasklist, timeout=None, return_results=False): 21 """ 22 Run a set of predefined subcommands in parallel. 23 24 @param tasklist: A list of subcommand instances to execute. 25 @param timeout: Number of seconds after which the commands should timeout. 26 @param return_results: If True instead of an AutoServError being raised 27 on any error a list of the results|exceptions from the tasks is 28 returned. [default: False] 29 """ 30 run_error = False 31 for task in tasklist: 32 task.fork_start() 33 34 remaining_timeout = None 35 if timeout: 36 endtime = time.time() + timeout 37 38 results = [] 39 for task in tasklist: 40 if timeout: 41 remaining_timeout = max(endtime - time.time(), 1) 42 try: 43 status = task.fork_waitfor(timeout=remaining_timeout) 44 except error.AutoservSubcommandError: 45 run_error = True 46 else: 47 if status != 0: 48 run_error = True 49 50 results.append(six.moves.cPickle.load(task.result_pickle)) 51 task.result_pickle.close() 52 53 if return_results: 54 return results 55 elif run_error: 56 message = 'One or more subcommands failed:\n' 57 for task, result in zip(tasklist, results): 58 message += 'task: %s returned/raised: %r\n' % (task, result) 59 raise error.AutoservError(message) 60 61 62def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x), 63 log=True, timeout=None, return_results=False): 64 """ 65 Each element in the arglist used to create a subcommand object, 66 where that arg is used both as a subdir name, and a single argument 67 to pass to "function". 68 69 We create a subcommand object for each element in the list, 70 then execute those subcommand objects in parallel. 71 72 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used. 73 74 @param function: A callable to run in parallel once per arg in arglist. 75 @param arglist: A list of arguments to be used one per subcommand 76 @param subdir_name_constructor: A function that returns a name for the 77 result sub-directory created per subcommand. 78 Signature is: 79 subdir_name_constructor(arg) 80 where arg is the argument passed to function. 81 @param log: If True, output will be written to output in a subdirectory 82 named after each subcommand's arg. 83 @param timeout: Number of seconds after which the commands should timeout. 84 @param return_results: If True instead of an AutoServError being raised 85 on any error a list of the results|exceptions from the function 86 called on each arg is returned. [default: False] 87 88 @returns None or a list of results/exceptions. 89 """ 90 if not arglist: 91 logging.warning('parallel_simple was called with an empty arglist, ' 92 'did you forget to pass in a list of machines?') 93 94 # Bypass the multithreading if only one machine. 95 if len(arglist) == 1: 96 arg = arglist[0] 97 if return_results: 98 try: 99 result = function(arg) 100 except Exception as e: 101 return [e] 102 return [result] 103 else: 104 function(arg) 105 return 106 107 subcommands = [] 108 for arg in arglist: 109 args = [arg] 110 subdir = subdir_name_constructor(arg) if log else None 111 subcommands.append(subcommand(function, args, subdir)) 112 return parallel(subcommands, timeout, return_results=return_results) 113 114 115class subcommand(object): 116 fork_hooks, join_hooks = [], [] 117 118 def __init__(self, func, args, subdir = None): 119 # func(args) - the subcommand to run 120 # subdir - the subdirectory to log results in 121 if subdir: 122 self.subdir = os.path.abspath(subdir) 123 if not os.path.exists(self.subdir): 124 os.mkdir(self.subdir) 125 self.debug = os.path.join(self.subdir, 'debug') 126 if not os.path.exists(self.debug): 127 os.mkdir(self.debug) 128 else: 129 self.subdir = None 130 self.debug = None 131 132 self.func = func 133 self.args = args 134 self.pid = None 135 self.returncode = None 136 137 138 def __str__(self): 139 return str('subcommand(func=%s, args=%s, subdir=%s)' % 140 (self.func, self.args, self.subdir)) 141 142 143 @classmethod 144 def register_fork_hook(cls, hook): 145 """ Register a function to be called from the child process after 146 forking. """ 147 cls.fork_hooks.append(hook) 148 149 150 @classmethod 151 def register_join_hook(cls, hook): 152 """ Register a function to be called when from the child process 153 just before the child process terminates (joins to the parent). """ 154 cls.join_hooks.append(hook) 155 156 157 def redirect_output(self): 158 if self.subdir and logging_manager_object: 159 tag = os.path.basename(self.subdir) 160 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag) 161 162 163 def fork_start(self): 164 sys.stdout.flush() 165 sys.stderr.flush() 166 r, w = os.pipe() 167 self.returncode = None 168 self.pid = os.fork() 169 170 if self.pid: # I am the parent 171 os.close(w) 172 self.result_pickle = os.fdopen(r, 'r') 173 return 174 else: 175 os.close(r) 176 177 # We are the child from this point on. Never return. 178 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler 179 if self.subdir: 180 os.chdir(self.subdir) 181 self.redirect_output() 182 183 try: 184 for hook in self.fork_hooks: 185 hook(self) 186 result = self.func(*self.args) 187 os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL)) 188 exit_code = 0 189 except Exception as e: 190 logging.exception('function failed') 191 exit_code = 1 192 os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL)) 193 194 os.close(w) 195 196 try: 197 for hook in self.join_hooks: 198 hook(self) 199 finally: 200 sys.stdout.flush() 201 sys.stderr.flush() 202 os._exit(exit_code) 203 204 205 def _handle_exitstatus(self, sts): 206 """ 207 This is partially borrowed from subprocess.Popen. 208 """ 209 if os.WIFSIGNALED(sts): 210 self.returncode = -os.WTERMSIG(sts) 211 elif os.WIFEXITED(sts): 212 self.returncode = os.WEXITSTATUS(sts) 213 else: 214 # Should never happen 215 raise RuntimeError("Unknown child exit status!") 216 217 if self.returncode != 0: 218 print("subcommand failed pid %d" % self.pid) 219 print("%s" % (self.func,)) 220 print("rc=%d" % self.returncode) 221 print() 222 if self.debug: 223 stderr_file = os.path.join(self.debug, 'autoserv.stderr') 224 if os.path.exists(stderr_file): 225 for line in open(stderr_file).readlines(): 226 print(line, end=' ') 227 print("\n--------------------------------------------\n") 228 raise error.AutoservSubcommandError(self.func, self.returncode) 229 230 231 def poll(self): 232 """ 233 This is borrowed from subprocess.Popen. 234 """ 235 if self.returncode is None: 236 try: 237 pid, sts = os.waitpid(self.pid, os.WNOHANG) 238 if pid == self.pid: 239 self._handle_exitstatus(sts) 240 except os.error: 241 pass 242 return self.returncode 243 244 245 def wait(self): 246 """ 247 This is borrowed from subprocess.Popen. 248 """ 249 if self.returncode is None: 250 pid, sts = os.waitpid(self.pid, 0) 251 self._handle_exitstatus(sts) 252 return self.returncode 253 254 255 def fork_waitfor(self, timeout=None): 256 if not timeout: 257 return self.wait() 258 else: 259 _, result = retry.timeout(self.wait, timeout_sec=timeout) 260 261 if result is None: 262 utils.nuke_pid(self.pid) 263 print("subcommand failed pid %d" % self.pid) 264 print("%s" % (self.func,)) 265 print("timeout after %ds" % timeout) 266 print() 267 result = self.wait() 268 269 return result 270