1# Lint as: python2, python3 2""" Parallel execution management """ 3 4__author__ = """Copyright Andy Whitcroft 2006""" 5 6import gc 7import logging 8import os 9import pickle 10import six 11import sys 12import time 13import traceback 14 15from autotest_lib.client.common_lib import error, utils 16 17def fork_start(tmp, l): 18 sys.stdout.flush() 19 sys.stderr.flush() 20 pid = os.fork() 21 if pid: 22 # Parent 23 return pid 24 25 try: 26 try: 27 l() 28 except error.AutotestError: 29 raise 30 except Exception as e: 31 raise error.UnhandledTestError(e) 32 except Exception as detail: 33 try: 34 try: 35 logging.error('child process failed') 36 # logging.exception() uses ERROR level, but we want DEBUG for 37 # the traceback 38 for line in traceback.format_exc().splitlines(): 39 logging.debug(line) 40 finally: 41 # note that exceptions originating in this block won't make it 42 # to the logs 43 output_dir = os.path.join(tmp, 'debug') 44 if not os.path.exists(output_dir): 45 os.makedirs(output_dir) 46 ename = os.path.join(output_dir, "error-%d" % os.getpid()) 47 48 # Python 3+ requires binary mode. 49 mode = 'w' if six.PY2 else 'wb' 50 with open(ename, mode) as pickle_out: 51 pickle.dump(detail, pickle_out) 52 53 sys.stdout.flush() 54 sys.stderr.flush() 55 finally: 56 # clear exception information to allow garbage collection of 57 # objects referenced by the exception's traceback 58 # exc_clear() doesn't exist in py3 (nor is needed). 59 if six.PY2: 60 sys.exc_clear() 61 gc.collect() 62 os._exit(1) 63 else: 64 try: 65 sys.stdout.flush() 66 sys.stderr.flush() 67 finally: 68 os._exit(0) 69 70 71def _check_for_subprocess_exception(temp_dir, pid): 72 ename = temp_dir + "/debug/error-%d" % pid 73 if os.path.exists(ename): 74 try: 75 # Python 3+ requires binary mode. 76 mode = 'r' if six.PY2 else 'rb' 77 with open(ename, mode) as rf: 78 e = pickle.load(rf) 79 except ImportError: 80 with open(ename, 'r') as fp: 81 file_text = fp.read() 82 raise error.TestError( 83 'Subprocess raised an exception that could not be ' 84 'identified. The root cause exception is in the text ' 85 'that follows: ' + file_text) 86 finally: 87 # Rename the error-pid file so that they do not affect later child 88 # processes that use recycled pids. 89 i = 0 90 while True: 91 pename = ename + ('-%d' % i) 92 i += 1 93 if not os.path.exists(pename): 94 break 95 os.rename(ename, pename) 96 raise e 97 98 99def fork_waitfor(tmp, pid): 100 (pid, status) = os.waitpid(pid, 0) 101 102 _check_for_subprocess_exception(tmp, pid) 103 104 if status: 105 raise error.TestError("Test subprocess failed rc=%d" % (status)) 106 107def fork_waitfor_timed(tmp, pid, timeout): 108 """ 109 Waits for pid until it terminates or timeout expires. 110 If timeout expires, test subprocess is killed. 111 """ 112 timer_expired = True 113 poll_time = 2 114 time_passed = 0 115 while time_passed < timeout: 116 time.sleep(poll_time) 117 (child_pid, status) = os.waitpid(pid, os.WNOHANG) 118 if (child_pid, status) == (0, 0): 119 time_passed = time_passed + poll_time 120 else: 121 timer_expired = False 122 break 123 124 if timer_expired: 125 logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid) 126 utils.nuke_pid(pid) 127 (child_pid, status) = os.waitpid(pid, 0) 128 raise error.TestError("Test timeout expired, rc=%d" % (status)) 129 else: 130 _check_for_subprocess_exception(tmp, pid) 131 132 if status: 133 raise error.TestError("Test subprocess failed rc=%d" % (status)) 134 135def fork_nuke_subprocess(tmp, pid): 136 utils.nuke_pid(pid) 137 _check_for_subprocess_exception(tmp, pid) 138