• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
7
8import sys, os, signal, time, six.moves.cPickle, logging
9
10from autotest_lib.client.common_lib import error, utils
11from autotest_lib.client.common_lib.cros import retry
12from six.moves import zip
13
14
15# entry points that use subcommand must set this to their logging manager
16# to get log redirection for subcommands
17logging_manager_object = None
18
19
20def parallel(tasklist, timeout=None, return_results=False):
21    """
22    Run a set of predefined subcommands in parallel.
23
24    @param tasklist: A list of subcommand instances to execute.
25    @param timeout: Number of seconds after which the commands should timeout.
26    @param return_results: If True instead of an AutoServError being raised
27            on any error a list of the results|exceptions from the tasks is
28            returned.  [default: False]
29    """
30    run_error = False
31    for task in tasklist:
32        task.fork_start()
33
34    remaining_timeout = None
35    if timeout:
36        endtime = time.time() + timeout
37
38    results = []
39    for task in tasklist:
40        if timeout:
41            remaining_timeout = max(endtime - time.time(), 1)
42        try:
43            status = task.fork_waitfor(timeout=remaining_timeout)
44        except error.AutoservSubcommandError:
45            run_error = True
46        else:
47            if status != 0:
48                run_error = True
49
50        results.append(six.moves.cPickle.load(task.result_pickle))
51        task.result_pickle.close()
52
53    if return_results:
54        return results
55    elif run_error:
56        message = 'One or more subcommands failed:\n'
57        for task, result in zip(tasklist, results):
58            message += 'task: %s returned/raised: %r\n' % (task, result)
59        raise error.AutoservError(message)
60
61
62def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
63                    log=True, timeout=None, return_results=False):
64    """
65    Each element in the arglist used to create a subcommand object,
66    where that arg is used both as a subdir name, and a single argument
67    to pass to "function".
68
69    We create a subcommand object for each element in the list,
70    then execute those subcommand objects in parallel.
71
72    NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
73
74    @param function: A callable to run in parallel once per arg in arglist.
75    @param arglist: A list of arguments to be used one per subcommand
76    @param subdir_name_constructor: A function that returns a name for the
77            result sub-directory created per subcommand.
78            Signature is:
79                subdir_name_constructor(arg)
80            where arg is the argument passed to function.
81    @param log: If True, output will be written to output in a subdirectory
82            named after each subcommand's arg.
83    @param timeout: Number of seconds after which the commands should timeout.
84    @param return_results: If True instead of an AutoServError being raised
85            on any error a list of the results|exceptions from the function
86            called on each arg is returned.  [default: False]
87
88    @returns None or a list of results/exceptions.
89    """
90    if not arglist:
91        logging.warning('parallel_simple was called with an empty arglist, '
92                        'did you forget to pass in a list of machines?')
93
94    # Bypass the multithreading if only one machine.
95    if len(arglist) == 1:
96        arg = arglist[0]
97        if return_results:
98            try:
99                result = function(arg)
100            except Exception as e:
101                return [e]
102            return [result]
103        else:
104            function(arg)
105            return
106
107    subcommands = []
108    for arg in arglist:
109        args = [arg]
110        subdir = subdir_name_constructor(arg) if log else None
111        subcommands.append(subcommand(function, args, subdir))
112    return parallel(subcommands, timeout, return_results=return_results)
113
114
115class subcommand(object):
116    fork_hooks, join_hooks = [], []
117
118    def __init__(self, func, args, subdir = None):
119        # func(args) - the subcommand to run
120        # subdir     - the subdirectory to log results in
121        if subdir:
122            self.subdir = os.path.abspath(subdir)
123            if not os.path.exists(self.subdir):
124                os.mkdir(self.subdir)
125            self.debug = os.path.join(self.subdir, 'debug')
126            if not os.path.exists(self.debug):
127                os.mkdir(self.debug)
128        else:
129            self.subdir = None
130            self.debug = None
131
132        self.func = func
133        self.args = args
134        self.pid = None
135        self.returncode = None
136
137
138    def __str__(self):
139        return str('subcommand(func=%s,  args=%s, subdir=%s)' %
140                   (self.func, self.args, self.subdir))
141
142
143    @classmethod
144    def register_fork_hook(cls, hook):
145        """ Register a function to be called from the child process after
146        forking. """
147        cls.fork_hooks.append(hook)
148
149
150    @classmethod
151    def register_join_hook(cls, hook):
152        """ Register a function to be called when from the child process
153        just before the child process terminates (joins to the parent). """
154        cls.join_hooks.append(hook)
155
156
157    def redirect_output(self):
158        if self.subdir and logging_manager_object:
159            tag = os.path.basename(self.subdir)
160            logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
161
162
163    def fork_start(self):
164        sys.stdout.flush()
165        sys.stderr.flush()
166        r, w = os.pipe()
167        self.returncode = None
168        self.pid = os.fork()
169
170        if self.pid:                            # I am the parent
171            os.close(w)
172            self.result_pickle = os.fdopen(r, 'r')
173            return
174        else:
175            os.close(r)
176
177        # We are the child from this point on. Never return.
178        signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
179        if self.subdir:
180            os.chdir(self.subdir)
181        self.redirect_output()
182
183        try:
184            for hook in self.fork_hooks:
185                hook(self)
186            result = self.func(*self.args)
187            os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL))
188            exit_code = 0
189        except Exception as e:
190            logging.exception('function failed')
191            exit_code = 1
192            os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL))
193
194        os.close(w)
195
196        try:
197            for hook in self.join_hooks:
198                hook(self)
199        finally:
200            sys.stdout.flush()
201            sys.stderr.flush()
202            os._exit(exit_code)
203
204
205    def _handle_exitstatus(self, sts):
206        """
207        This is partially borrowed from subprocess.Popen.
208        """
209        if os.WIFSIGNALED(sts):
210            self.returncode = -os.WTERMSIG(sts)
211        elif os.WIFEXITED(sts):
212            self.returncode = os.WEXITSTATUS(sts)
213        else:
214            # Should never happen
215            raise RuntimeError("Unknown child exit status!")
216
217        if self.returncode != 0:
218            print("subcommand failed pid %d" % self.pid)
219            print("%s" % (self.func,))
220            print("rc=%d" % self.returncode)
221            print()
222            if self.debug:
223                stderr_file = os.path.join(self.debug, 'autoserv.stderr')
224                if os.path.exists(stderr_file):
225                    for line in open(stderr_file).readlines():
226                        print(line, end=' ')
227            print("\n--------------------------------------------\n")
228            raise error.AutoservSubcommandError(self.func, self.returncode)
229
230
231    def poll(self):
232        """
233        This is borrowed from subprocess.Popen.
234        """
235        if self.returncode is None:
236            try:
237                pid, sts = os.waitpid(self.pid, os.WNOHANG)
238                if pid == self.pid:
239                    self._handle_exitstatus(sts)
240            except os.error:
241                pass
242        return self.returncode
243
244
245    def wait(self):
246        """
247        This is borrowed from subprocess.Popen.
248        """
249        if self.returncode is None:
250            pid, sts = os.waitpid(self.pid, 0)
251            self._handle_exitstatus(sts)
252        return self.returncode
253
254
255    def fork_waitfor(self, timeout=None):
256        if not timeout:
257            return self.wait()
258        else:
259            _, result = retry.timeout(self.wait, timeout_sec=timeout)
260
261            if result is None:
262                utils.nuke_pid(self.pid)
263                print("subcommand failed pid %d" % self.pid)
264                print("%s" % (self.func,))
265                print("timeout after %ds" % timeout)
266                print()
267                result = self.wait()
268
269            return result
270