1import io 2import os 3 4from .context import reduction, set_spawning_popen 5if not reduction.HAVE_SEND_HANDLE: 6 raise ImportError('No support for sending fds between processes') 7from . import forkserver 8from . import popen_fork 9from . import spawn 10from . import util 11 12 13__all__ = ['Popen'] 14 15# 16# Wrapper for an fd used while launching a process 17# 18 19class _DupFd(object): 20 def __init__(self, ind): 21 self.ind = ind 22 def detach(self): 23 return forkserver.get_inherited_fds()[self.ind] 24 25# 26# Start child process using a server process 27# 28 29class Popen(popen_fork.Popen): 30 method = 'forkserver' 31 DupFd = _DupFd 32 33 def __init__(self, process_obj): 34 self._fds = [] 35 super().__init__(process_obj) 36 37 def duplicate_for_child(self, fd): 38 self._fds.append(fd) 39 return len(self._fds) - 1 40 41 def _launch(self, process_obj): 42 prep_data = spawn.get_preparation_data(process_obj._name) 43 buf = io.BytesIO() 44 set_spawning_popen(self) 45 try: 46 reduction.dump(prep_data, buf) 47 reduction.dump(process_obj, buf) 48 finally: 49 set_spawning_popen(None) 50 51 self.sentinel, w = forkserver.connect_to_new_process(self._fds) 52 # Keep a duplicate of the data pipe's write end as a sentinel of the 53 # parent process used by the child process. 54 _parent_w = os.dup(w) 55 self.finalizer = util.Finalize(self, util.close_fds, 56 (_parent_w, self.sentinel)) 57 with open(w, 'wb', closefd=True) as f: 58 f.write(buf.getbuffer()) 59 self.pid = forkserver.read_signed(self.sentinel) 60 61 def poll(self, flag=os.WNOHANG): 62 if self.returncode is None: 63 from multiprocessing.connection import wait 64 timeout = 0 if flag == os.WNOHANG else None 65 if not wait([self.sentinel], timeout): 66 return None 67 try: 68 self.returncode = forkserver.read_signed(self.sentinel) 69 except (OSError, EOFError): 70 # This should not happen usually, but perhaps the forkserver 71 # process itself got killed 72 self.returncode = 255 73 74 return self.returncode 75