1import atexit 2import errno 3import os 4import selectors 5import signal 6import socket 7import struct 8import sys 9import threading 10import warnings 11 12from . import connection 13from . import process 14from .context import reduction 15from . import resource_tracker 16from . import spawn 17from . import util 18 19__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process', 20 'set_forkserver_preload'] 21 22# 23# 24# 25 26MAXFDS_TO_SEND = 256 27SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t 28 29# 30# Forkserver class 31# 32 33class ForkServer(object): 34 35 def __init__(self): 36 self._forkserver_address = None 37 self._forkserver_alive_fd = None 38 self._forkserver_pid = None 39 self._inherited_fds = None 40 self._lock = threading.Lock() 41 self._preload_modules = ['__main__'] 42 43 def _stop(self): 44 # Method used by unit tests to stop the server 45 with self._lock: 46 self._stop_unlocked() 47 48 def _stop_unlocked(self): 49 if self._forkserver_pid is None: 50 return 51 52 # close the "alive" file descriptor asks the server to stop 53 os.close(self._forkserver_alive_fd) 54 self._forkserver_alive_fd = None 55 56 os.waitpid(self._forkserver_pid, 0) 57 self._forkserver_pid = None 58 59 if not util.is_abstract_socket_namespace(self._forkserver_address): 60 os.unlink(self._forkserver_address) 61 self._forkserver_address = None 62 63 def set_forkserver_preload(self, modules_names): 64 '''Set list of module names to try to load in forkserver process.''' 65 if not all(type(mod) is str for mod in modules_names): 66 raise TypeError('module_names must be a list of strings') 67 self._preload_modules = modules_names 68 69 def get_inherited_fds(self): 70 '''Return list of fds inherited from parent process. 71 72 This returns None if the current process was not started by fork 73 server. 74 ''' 75 return self._inherited_fds 76 77 def connect_to_new_process(self, fds): 78 '''Request forkserver to create a child process. 79 80 Returns a pair of fds (status_r, data_w). The calling process can read 81 the child process's pid and (eventually) its returncode from status_r. 82 The calling process should write to data_w the pickled preparation and 83 process data. 84 ''' 85 self.ensure_running() 86 if len(fds) + 4 >= MAXFDS_TO_SEND: 87 raise ValueError('too many fds') 88 with socket.socket(socket.AF_UNIX) as client: 89 client.connect(self._forkserver_address) 90 parent_r, child_w = os.pipe() 91 child_r, parent_w = os.pipe() 92 allfds = [child_r, child_w, self._forkserver_alive_fd, 93 resource_tracker.getfd()] 94 allfds += fds 95 try: 96 reduction.sendfds(client, allfds) 97 return parent_r, parent_w 98 except: 99 os.close(parent_r) 100 os.close(parent_w) 101 raise 102 finally: 103 os.close(child_r) 104 os.close(child_w) 105 106 def ensure_running(self): 107 '''Make sure that a fork server is running. 108 109 This can be called from any process. Note that usually a child 110 process will just reuse the forkserver started by its parent, so 111 ensure_running() will do nothing. 112 ''' 113 with self._lock: 114 resource_tracker.ensure_running() 115 if self._forkserver_pid is not None: 116 # forkserver was launched before, is it still running? 117 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) 118 if not pid: 119 # still alive 120 return 121 # dead, launch it again 122 os.close(self._forkserver_alive_fd) 123 self._forkserver_address = None 124 self._forkserver_alive_fd = None 125 self._forkserver_pid = None 126 127 cmd = ('from multiprocessing.forkserver import main; ' + 128 'main(%d, %d, %r, **%r)') 129 130 if self._preload_modules: 131 desired_keys = {'main_path', 'sys_path'} 132 data = spawn.get_preparation_data('ignore') 133 data = {x: y for x, y in data.items() if x in desired_keys} 134 else: 135 data = {} 136 137 with socket.socket(socket.AF_UNIX) as listener: 138 address = connection.arbitrary_address('AF_UNIX') 139 listener.bind(address) 140 if not util.is_abstract_socket_namespace(address): 141 os.chmod(address, 0o600) 142 listener.listen() 143 144 # all client processes own the write end of the "alive" pipe; 145 # when they all terminate the read end becomes ready. 146 alive_r, alive_w = os.pipe() 147 try: 148 fds_to_pass = [listener.fileno(), alive_r] 149 cmd %= (listener.fileno(), alive_r, self._preload_modules, 150 data) 151 exe = spawn.get_executable() 152 args = [exe] + util._args_from_interpreter_flags() 153 args += ['-c', cmd] 154 pid = util.spawnv_passfds(exe, args, fds_to_pass) 155 except: 156 os.close(alive_w) 157 raise 158 finally: 159 os.close(alive_r) 160 self._forkserver_address = address 161 self._forkserver_alive_fd = alive_w 162 self._forkserver_pid = pid 163 164# 165# 166# 167 168def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): 169 '''Run forkserver.''' 170 if preload: 171 if sys_path is not None: 172 sys.path[:] = sys_path 173 if '__main__' in preload and main_path is not None: 174 process.current_process()._inheriting = True 175 try: 176 spawn.import_main_path(main_path) 177 finally: 178 del process.current_process()._inheriting 179 for modname in preload: 180 try: 181 __import__(modname) 182 except ImportError: 183 pass 184 185 util._close_stdin() 186 187 sig_r, sig_w = os.pipe() 188 os.set_blocking(sig_r, False) 189 os.set_blocking(sig_w, False) 190 191 def sigchld_handler(*_unused): 192 # Dummy signal handler, doesn't do anything 193 pass 194 195 handlers = { 196 # unblocking SIGCHLD allows the wakeup fd to notify our event loop 197 signal.SIGCHLD: sigchld_handler, 198 # protect the process from ^C 199 signal.SIGINT: signal.SIG_IGN, 200 } 201 old_handlers = {sig: signal.signal(sig, val) 202 for (sig, val) in handlers.items()} 203 204 # calling os.write() in the Python signal handler is racy 205 signal.set_wakeup_fd(sig_w) 206 207 # map child pids to client fds 208 pid_to_fd = {} 209 210 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \ 211 selectors.DefaultSelector() as selector: 212 _forkserver._forkserver_address = listener.getsockname() 213 214 selector.register(listener, selectors.EVENT_READ) 215 selector.register(alive_r, selectors.EVENT_READ) 216 selector.register(sig_r, selectors.EVENT_READ) 217 218 while True: 219 try: 220 while True: 221 rfds = [key.fileobj for (key, events) in selector.select()] 222 if rfds: 223 break 224 225 if alive_r in rfds: 226 # EOF because no more client processes left 227 assert os.read(alive_r, 1) == b'', "Not at EOF?" 228 raise SystemExit 229 230 if sig_r in rfds: 231 # Got SIGCHLD 232 os.read(sig_r, 65536) # exhaust 233 while True: 234 # Scan for child processes 235 try: 236 pid, sts = os.waitpid(-1, os.WNOHANG) 237 except ChildProcessError: 238 break 239 if pid == 0: 240 break 241 child_w = pid_to_fd.pop(pid, None) 242 if child_w is not None: 243 returncode = os.waitstatus_to_exitcode(sts) 244 245 # Send exit code to client process 246 try: 247 write_signed(child_w, returncode) 248 except BrokenPipeError: 249 # client vanished 250 pass 251 os.close(child_w) 252 else: 253 # This shouldn't happen really 254 warnings.warn('forkserver: waitpid returned ' 255 'unexpected pid %d' % pid) 256 257 if listener in rfds: 258 # Incoming fork request 259 with listener.accept()[0] as s: 260 # Receive fds from client 261 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) 262 if len(fds) > MAXFDS_TO_SEND: 263 raise RuntimeError( 264 "Too many ({0:n}) fds to send".format( 265 len(fds))) 266 child_r, child_w, *fds = fds 267 s.close() 268 pid = os.fork() 269 if pid == 0: 270 # Child 271 code = 1 272 try: 273 listener.close() 274 selector.close() 275 unused_fds = [alive_r, child_w, sig_r, sig_w] 276 unused_fds.extend(pid_to_fd.values()) 277 atexit._clear() 278 atexit.register(util._exit_function) 279 code = _serve_one(child_r, fds, 280 unused_fds, 281 old_handlers) 282 except Exception: 283 sys.excepthook(*sys.exc_info()) 284 sys.stderr.flush() 285 finally: 286 atexit._run_exitfuncs() 287 os._exit(code) 288 else: 289 # Send pid to client process 290 try: 291 write_signed(child_w, pid) 292 except BrokenPipeError: 293 # client vanished 294 pass 295 pid_to_fd[pid] = child_w 296 os.close(child_r) 297 for fd in fds: 298 os.close(fd) 299 300 except OSError as e: 301 if e.errno != errno.ECONNABORTED: 302 raise 303 304 305def _serve_one(child_r, fds, unused_fds, handlers): 306 # close unnecessary stuff and reset signal handlers 307 signal.set_wakeup_fd(-1) 308 for sig, val in handlers.items(): 309 signal.signal(sig, val) 310 for fd in unused_fds: 311 os.close(fd) 312 313 (_forkserver._forkserver_alive_fd, 314 resource_tracker._resource_tracker._fd, 315 *_forkserver._inherited_fds) = fds 316 317 # Run process object received over pipe 318 parent_sentinel = os.dup(child_r) 319 code = spawn._main(child_r, parent_sentinel) 320 321 return code 322 323 324# 325# Read and write signed numbers 326# 327 328def read_signed(fd): 329 data = b'' 330 length = SIGNED_STRUCT.size 331 while len(data) < length: 332 s = os.read(fd, length - len(data)) 333 if not s: 334 raise EOFError('unexpected EOF') 335 data += s 336 return SIGNED_STRUCT.unpack(data)[0] 337 338def write_signed(fd, n): 339 msg = SIGNED_STRUCT.pack(n) 340 while msg: 341 nbytes = os.write(fd, msg) 342 if nbytes == 0: 343 raise RuntimeError('should not get here') 344 msg = msg[nbytes:] 345 346# 347# 348# 349 350_forkserver = ForkServer() 351ensure_running = _forkserver.ensure_running 352get_inherited_fds = _forkserver.get_inherited_fds 353connect_to_new_process = _forkserver.connect_to_new_process 354set_forkserver_preload = _forkserver.set_forkserver_preload 355