• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2""" Parallel execution management """
3
4__author__ = """Copyright Andy Whitcroft 2006"""
5
6import gc
7import logging
8import os
9import pickle
10import six
11import sys
12import time
13import traceback
14
15from autotest_lib.client.common_lib import error, utils
16
17def fork_start(tmp, l):
18    sys.stdout.flush()
19    sys.stderr.flush()
20    pid = os.fork()
21    if pid:
22        # Parent
23        return pid
24
25    try:
26        try:
27            l()
28        except error.AutotestError:
29            raise
30        except Exception as e:
31            raise error.UnhandledTestError(e)
32    except Exception as detail:
33        try:
34            try:
35                logging.error('child process failed')
36                # logging.exception() uses ERROR level, but we want DEBUG for
37                # the traceback
38                for line in traceback.format_exc().splitlines():
39                    logging.debug(line)
40            finally:
41                # note that exceptions originating in this block won't make it
42                # to the logs
43                output_dir = os.path.join(tmp, 'debug')
44                if not os.path.exists(output_dir):
45                    os.makedirs(output_dir)
46                ename = os.path.join(output_dir, "error-%d" % os.getpid())
47
48                # Python 3+ requires binary mode.
49                mode = 'w' if six.PY2 else 'wb'
50                with open(ename, mode) as pickle_out:
51                    pickle.dump(detail, pickle_out)
52
53                sys.stdout.flush()
54                sys.stderr.flush()
55        finally:
56            # clear exception information to allow garbage collection of
57            # objects referenced by the exception's traceback
58            # exc_clear() doesn't exist in py3 (nor is needed).
59            if six.PY2:
60                sys.exc_clear()
61            gc.collect()
62            os._exit(1)
63    else:
64        try:
65            sys.stdout.flush()
66            sys.stderr.flush()
67        finally:
68            os._exit(0)
69
70
71def _check_for_subprocess_exception(temp_dir, pid):
72    ename = temp_dir + "/debug/error-%d" % pid
73    if os.path.exists(ename):
74        try:
75            # Python 3+ requires binary mode.
76            mode = 'r' if six.PY2 else 'rb'
77            with open(ename, mode) as rf:
78                e = pickle.load(rf)
79        except ImportError:
80            with open(ename, 'r') as fp:
81                file_text = fp.read()
82            raise error.TestError(
83                    'Subprocess raised an exception that could not be '
84                    'identified. The root cause exception is in the text '
85                    'that follows: ' + file_text)
86        finally:
87            # Rename the error-pid file so that they do not affect later child
88            # processes that use recycled pids.
89            i = 0
90            while True:
91                pename = ename + ('-%d' % i)
92                i += 1
93                if not os.path.exists(pename):
94                    break
95            os.rename(ename, pename)
96        raise e
97
98
99def fork_waitfor(tmp, pid):
100    (pid, status) = os.waitpid(pid, 0)
101
102    _check_for_subprocess_exception(tmp, pid)
103
104    if status:
105        raise error.TestError("Test subprocess failed rc=%d" % (status))
106
107def fork_waitfor_timed(tmp, pid, timeout):
108    """
109    Waits for pid until it terminates or timeout expires.
110    If timeout expires, test subprocess is killed.
111    """
112    timer_expired = True
113    poll_time = 2
114    time_passed = 0
115    while time_passed < timeout:
116        time.sleep(poll_time)
117        (child_pid, status) = os.waitpid(pid, os.WNOHANG)
118        if (child_pid, status) == (0, 0):
119            time_passed = time_passed + poll_time
120        else:
121            timer_expired = False
122            break
123
124    if timer_expired:
125        logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
126        utils.nuke_pid(pid)
127        (child_pid, status) = os.waitpid(pid, 0)
128        raise error.TestError("Test timeout expired, rc=%d" % (status))
129    else:
130        _check_for_subprocess_exception(tmp, pid)
131
132    if status:
133        raise error.TestError("Test subprocess failed rc=%d" % (status))
134
135def fork_nuke_subprocess(tmp, pid):
136    utils.nuke_pid(pid)
137    _check_for_subprocess_exception(tmp, pid)
138