1__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007""" 2 3import sys, os, signal, time, cPickle, logging 4 5from autotest_lib.client.common_lib import error, utils 6from autotest_lib.client.common_lib.cros import retry 7 8 9# entry points that use subcommand must set this to their logging manager 10# to get log redirection for subcommands 11logging_manager_object = None 12 13 14def parallel(tasklist, timeout=None, return_results=False): 15 """ 16 Run a set of predefined subcommands in parallel. 17 18 @param tasklist: A list of subcommand instances to execute. 19 @param timeout: Number of seconds after which the commands should timeout. 20 @param return_results: If True instead of an AutoServError being raised 21 on any error a list of the results|exceptions from the tasks is 22 returned. [default: False] 23 """ 24 run_error = False 25 for task in tasklist: 26 task.fork_start() 27 28 remaining_timeout = None 29 if timeout: 30 endtime = time.time() + timeout 31 32 results = [] 33 for task in tasklist: 34 if timeout: 35 remaining_timeout = max(endtime - time.time(), 1) 36 try: 37 status = task.fork_waitfor(timeout=remaining_timeout) 38 except error.AutoservSubcommandError: 39 run_error = True 40 else: 41 if status != 0: 42 run_error = True 43 44 results.append(cPickle.load(task.result_pickle)) 45 task.result_pickle.close() 46 47 if return_results: 48 return results 49 elif run_error: 50 message = 'One or more subcommands failed:\n' 51 for task, result in zip(tasklist, results): 52 message += 'task: %s returned/raised: %r\n' % (task, result) 53 raise error.AutoservError(message) 54 55 56def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x), 57 log=True, timeout=None, return_results=False): 58 """ 59 Each element in the arglist used to create a subcommand object, 60 where that arg is used both as a subdir name, and a single argument 61 to pass to "function". 62 63 We create a subcommand object for each element in the list, 64 then execute those subcommand objects in parallel. 65 66 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used. 67 68 @param function: A callable to run in parallel once per arg in arglist. 69 @param arglist: A list of arguments to be used one per subcommand 70 @param subdir_name_constructor: A function that returns a name for the 71 result sub-directory created per subcommand. 72 Signature is: 73 subdir_name_constructor(arg) 74 where arg is the argument passed to function. 75 @param log: If True, output will be written to output in a subdirectory 76 named after each subcommand's arg. 77 @param timeout: Number of seconds after which the commands should timeout. 78 @param return_results: If True instead of an AutoServError being raised 79 on any error a list of the results|exceptions from the function 80 called on each arg is returned. [default: False] 81 82 @returns None or a list of results/exceptions. 83 """ 84 if not arglist: 85 logging.warning('parallel_simple was called with an empty arglist, ' 86 'did you forget to pass in a list of machines?') 87 88 # Bypass the multithreading if only one machine. 89 if len(arglist) == 1: 90 arg = arglist[0] 91 if return_results: 92 try: 93 result = function(arg) 94 except Exception, e: 95 return [e] 96 return [result] 97 else: 98 function(arg) 99 return 100 101 subcommands = [] 102 for arg in arglist: 103 args = [arg] 104 subdir = subdir_name_constructor(arg) if log else None 105 subcommands.append(subcommand(function, args, subdir)) 106 return parallel(subcommands, timeout, return_results=return_results) 107 108 109class subcommand(object): 110 fork_hooks, join_hooks = [], [] 111 112 def __init__(self, func, args, subdir = None): 113 # func(args) - the subcommand to run 114 # subdir - the subdirectory to log results in 115 if subdir: 116 self.subdir = os.path.abspath(subdir) 117 if not os.path.exists(self.subdir): 118 os.mkdir(self.subdir) 119 self.debug = os.path.join(self.subdir, 'debug') 120 if not os.path.exists(self.debug): 121 os.mkdir(self.debug) 122 else: 123 self.subdir = None 124 self.debug = None 125 126 self.func = func 127 self.args = args 128 self.pid = None 129 self.returncode = None 130 131 132 def __str__(self): 133 return str('subcommand(func=%s, args=%s, subdir=%s)' % 134 (self.func, self.args, self.subdir)) 135 136 137 @classmethod 138 def register_fork_hook(cls, hook): 139 """ Register a function to be called from the child process after 140 forking. """ 141 cls.fork_hooks.append(hook) 142 143 144 @classmethod 145 def register_join_hook(cls, hook): 146 """ Register a function to be called when from the child process 147 just before the child process terminates (joins to the parent). """ 148 cls.join_hooks.append(hook) 149 150 151 def redirect_output(self): 152 if self.subdir and logging_manager_object: 153 tag = os.path.basename(self.subdir) 154 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag) 155 156 157 def fork_start(self): 158 sys.stdout.flush() 159 sys.stderr.flush() 160 r, w = os.pipe() 161 self.returncode = None 162 self.pid = os.fork() 163 164 if self.pid: # I am the parent 165 os.close(w) 166 self.result_pickle = os.fdopen(r, 'r') 167 return 168 else: 169 os.close(r) 170 171 # We are the child from this point on. Never return. 172 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler 173 if self.subdir: 174 os.chdir(self.subdir) 175 self.redirect_output() 176 177 try: 178 for hook in self.fork_hooks: 179 hook(self) 180 result = self.func(*self.args) 181 os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)) 182 exit_code = 0 183 except Exception, e: 184 logging.exception('function failed') 185 exit_code = 1 186 os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL)) 187 188 os.close(w) 189 190 try: 191 for hook in self.join_hooks: 192 hook(self) 193 finally: 194 sys.stdout.flush() 195 sys.stderr.flush() 196 os._exit(exit_code) 197 198 199 def _handle_exitstatus(self, sts): 200 """ 201 This is partially borrowed from subprocess.Popen. 202 """ 203 if os.WIFSIGNALED(sts): 204 self.returncode = -os.WTERMSIG(sts) 205 elif os.WIFEXITED(sts): 206 self.returncode = os.WEXITSTATUS(sts) 207 else: 208 # Should never happen 209 raise RuntimeError("Unknown child exit status!") 210 211 if self.returncode != 0: 212 print "subcommand failed pid %d" % self.pid 213 print "%s" % (self.func,) 214 print "rc=%d" % self.returncode 215 print 216 if self.debug: 217 stderr_file = os.path.join(self.debug, 'autoserv.stderr') 218 if os.path.exists(stderr_file): 219 for line in open(stderr_file).readlines(): 220 print line, 221 print "\n--------------------------------------------\n" 222 raise error.AutoservSubcommandError(self.func, self.returncode) 223 224 225 def poll(self): 226 """ 227 This is borrowed from subprocess.Popen. 228 """ 229 if self.returncode is None: 230 try: 231 pid, sts = os.waitpid(self.pid, os.WNOHANG) 232 if pid == self.pid: 233 self._handle_exitstatus(sts) 234 except os.error: 235 pass 236 return self.returncode 237 238 239 def wait(self): 240 """ 241 This is borrowed from subprocess.Popen. 242 """ 243 if self.returncode is None: 244 pid, sts = os.waitpid(self.pid, 0) 245 self._handle_exitstatus(sts) 246 return self.returncode 247 248 249 def fork_waitfor(self, timeout=None): 250 if not timeout: 251 return self.wait() 252 else: 253 _, result = retry.timeout(self.wait, timeout_sec=timeout) 254 255 if result is None: 256 utils.nuke_pid(self.pid) 257 print "subcommand failed pid %d" % self.pid 258 print "%s" % (self.func,) 259 print "timeout after %ds" % timeout 260 print 261 result = self.wait() 262 263 return result 264