• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import atexit
2import errno
3import os
4import selectors
5import signal
6import socket
7import struct
8import sys
9import threading
10import warnings
11
12from . import connection
13from . import process
14from .context import reduction
15from . import resource_tracker
16from . import spawn
17from . import util
18
19__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
20           'set_forkserver_preload']
21
22#
23#
24#
25
26MAXFDS_TO_SEND = 256
27SIGNED_STRUCT = struct.Struct('q')     # large enough for pid_t
28
29#
30# Forkserver class
31#
32
33class ForkServer(object):
34
35    def __init__(self):
36        self._forkserver_address = None
37        self._forkserver_alive_fd = None
38        self._forkserver_pid = None
39        self._inherited_fds = None
40        self._lock = threading.Lock()
41        self._preload_modules = ['__main__']
42
43    def _stop(self):
44        # Method used by unit tests to stop the server
45        with self._lock:
46            self._stop_unlocked()
47
48    def _stop_unlocked(self):
49        if self._forkserver_pid is None:
50            return
51
52        # close the "alive" file descriptor asks the server to stop
53        os.close(self._forkserver_alive_fd)
54        self._forkserver_alive_fd = None
55
56        os.waitpid(self._forkserver_pid, 0)
57        self._forkserver_pid = None
58
59        if not util.is_abstract_socket_namespace(self._forkserver_address):
60            os.unlink(self._forkserver_address)
61        self._forkserver_address = None
62
63    def set_forkserver_preload(self, modules_names):
64        '''Set list of module names to try to load in forkserver process.'''
65        if not all(type(mod) is str for mod in modules_names):
66            raise TypeError('module_names must be a list of strings')
67        self._preload_modules = modules_names
68
69    def get_inherited_fds(self):
70        '''Return list of fds inherited from parent process.
71
72        This returns None if the current process was not started by fork
73        server.
74        '''
75        return self._inherited_fds
76
77    def connect_to_new_process(self, fds):
78        '''Request forkserver to create a child process.
79
80        Returns a pair of fds (status_r, data_w).  The calling process can read
81        the child process's pid and (eventually) its returncode from status_r.
82        The calling process should write to data_w the pickled preparation and
83        process data.
84        '''
85        self.ensure_running()
86        if len(fds) + 4 >= MAXFDS_TO_SEND:
87            raise ValueError('too many fds')
88        with socket.socket(socket.AF_UNIX) as client:
89            client.connect(self._forkserver_address)
90            parent_r, child_w = os.pipe()
91            child_r, parent_w = os.pipe()
92            allfds = [child_r, child_w, self._forkserver_alive_fd,
93                      resource_tracker.getfd()]
94            allfds += fds
95            try:
96                reduction.sendfds(client, allfds)
97                return parent_r, parent_w
98            except:
99                os.close(parent_r)
100                os.close(parent_w)
101                raise
102            finally:
103                os.close(child_r)
104                os.close(child_w)
105
106    def ensure_running(self):
107        '''Make sure that a fork server is running.
108
109        This can be called from any process.  Note that usually a child
110        process will just reuse the forkserver started by its parent, so
111        ensure_running() will do nothing.
112        '''
113        with self._lock:
114            resource_tracker.ensure_running()
115            if self._forkserver_pid is not None:
116                # forkserver was launched before, is it still running?
117                pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
118                if not pid:
119                    # still alive
120                    return
121                # dead, launch it again
122                os.close(self._forkserver_alive_fd)
123                self._forkserver_address = None
124                self._forkserver_alive_fd = None
125                self._forkserver_pid = None
126
127            cmd = ('from multiprocessing.forkserver import main; ' +
128                   'main(%d, %d, %r, **%r)')
129
130            if self._preload_modules:
131                desired_keys = {'main_path', 'sys_path'}
132                data = spawn.get_preparation_data('ignore')
133                data = {x: y for x, y in data.items() if x in desired_keys}
134            else:
135                data = {}
136
137            with socket.socket(socket.AF_UNIX) as listener:
138                address = connection.arbitrary_address('AF_UNIX')
139                listener.bind(address)
140                if not util.is_abstract_socket_namespace(address):
141                    os.chmod(address, 0o600)
142                listener.listen()
143
144                # all client processes own the write end of the "alive" pipe;
145                # when they all terminate the read end becomes ready.
146                alive_r, alive_w = os.pipe()
147                try:
148                    fds_to_pass = [listener.fileno(), alive_r]
149                    cmd %= (listener.fileno(), alive_r, self._preload_modules,
150                            data)
151                    exe = spawn.get_executable()
152                    args = [exe] + util._args_from_interpreter_flags()
153                    args += ['-c', cmd]
154                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
155                except:
156                    os.close(alive_w)
157                    raise
158                finally:
159                    os.close(alive_r)
160                self._forkserver_address = address
161                self._forkserver_alive_fd = alive_w
162                self._forkserver_pid = pid
163
164#
165#
166#
167
168def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
169    '''Run forkserver.'''
170    if preload:
171        if sys_path is not None:
172            sys.path[:] = sys_path
173        if '__main__' in preload and main_path is not None:
174            process.current_process()._inheriting = True
175            try:
176                spawn.import_main_path(main_path)
177            finally:
178                del process.current_process()._inheriting
179        for modname in preload:
180            try:
181                __import__(modname)
182            except ImportError:
183                pass
184
185    util._close_stdin()
186
187    sig_r, sig_w = os.pipe()
188    os.set_blocking(sig_r, False)
189    os.set_blocking(sig_w, False)
190
191    def sigchld_handler(*_unused):
192        # Dummy signal handler, doesn't do anything
193        pass
194
195    handlers = {
196        # unblocking SIGCHLD allows the wakeup fd to notify our event loop
197        signal.SIGCHLD: sigchld_handler,
198        # protect the process from ^C
199        signal.SIGINT: signal.SIG_IGN,
200        }
201    old_handlers = {sig: signal.signal(sig, val)
202                    for (sig, val) in handlers.items()}
203
204    # calling os.write() in the Python signal handler is racy
205    signal.set_wakeup_fd(sig_w)
206
207    # map child pids to client fds
208    pid_to_fd = {}
209
210    with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
211         selectors.DefaultSelector() as selector:
212        _forkserver._forkserver_address = listener.getsockname()
213
214        selector.register(listener, selectors.EVENT_READ)
215        selector.register(alive_r, selectors.EVENT_READ)
216        selector.register(sig_r, selectors.EVENT_READ)
217
218        while True:
219            try:
220                while True:
221                    rfds = [key.fileobj for (key, events) in selector.select()]
222                    if rfds:
223                        break
224
225                if alive_r in rfds:
226                    # EOF because no more client processes left
227                    assert os.read(alive_r, 1) == b'', "Not at EOF?"
228                    raise SystemExit
229
230                if sig_r in rfds:
231                    # Got SIGCHLD
232                    os.read(sig_r, 65536)  # exhaust
233                    while True:
234                        # Scan for child processes
235                        try:
236                            pid, sts = os.waitpid(-1, os.WNOHANG)
237                        except ChildProcessError:
238                            break
239                        if pid == 0:
240                            break
241                        child_w = pid_to_fd.pop(pid, None)
242                        if child_w is not None:
243                            returncode = os.waitstatus_to_exitcode(sts)
244
245                            # Send exit code to client process
246                            try:
247                                write_signed(child_w, returncode)
248                            except BrokenPipeError:
249                                # client vanished
250                                pass
251                            os.close(child_w)
252                        else:
253                            # This shouldn't happen really
254                            warnings.warn('forkserver: waitpid returned '
255                                          'unexpected pid %d' % pid)
256
257                if listener in rfds:
258                    # Incoming fork request
259                    with listener.accept()[0] as s:
260                        # Receive fds from client
261                        fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
262                        if len(fds) > MAXFDS_TO_SEND:
263                            raise RuntimeError(
264                                "Too many ({0:n}) fds to send".format(
265                                    len(fds)))
266                        child_r, child_w, *fds = fds
267                        s.close()
268                        pid = os.fork()
269                        if pid == 0:
270                            # Child
271                            code = 1
272                            try:
273                                listener.close()
274                                selector.close()
275                                unused_fds = [alive_r, child_w, sig_r, sig_w]
276                                unused_fds.extend(pid_to_fd.values())
277                                atexit._clear()
278                                atexit.register(util._exit_function)
279                                code = _serve_one(child_r, fds,
280                                                  unused_fds,
281                                                  old_handlers)
282                            except Exception:
283                                sys.excepthook(*sys.exc_info())
284                                sys.stderr.flush()
285                            finally:
286                                atexit._run_exitfuncs()
287                                os._exit(code)
288                        else:
289                            # Send pid to client process
290                            try:
291                                write_signed(child_w, pid)
292                            except BrokenPipeError:
293                                # client vanished
294                                pass
295                            pid_to_fd[pid] = child_w
296                            os.close(child_r)
297                            for fd in fds:
298                                os.close(fd)
299
300            except OSError as e:
301                if e.errno != errno.ECONNABORTED:
302                    raise
303
304
305def _serve_one(child_r, fds, unused_fds, handlers):
306    # close unnecessary stuff and reset signal handlers
307    signal.set_wakeup_fd(-1)
308    for sig, val in handlers.items():
309        signal.signal(sig, val)
310    for fd in unused_fds:
311        os.close(fd)
312
313    (_forkserver._forkserver_alive_fd,
314     resource_tracker._resource_tracker._fd,
315     *_forkserver._inherited_fds) = fds
316
317    # Run process object received over pipe
318    parent_sentinel = os.dup(child_r)
319    code = spawn._main(child_r, parent_sentinel)
320
321    return code
322
323
324#
325# Read and write signed numbers
326#
327
328def read_signed(fd):
329    data = b''
330    length = SIGNED_STRUCT.size
331    while len(data) < length:
332        s = os.read(fd, length - len(data))
333        if not s:
334            raise EOFError('unexpected EOF')
335        data += s
336    return SIGNED_STRUCT.unpack(data)[0]
337
338def write_signed(fd, n):
339    msg = SIGNED_STRUCT.pack(n)
340    while msg:
341        nbytes = os.write(fd, msg)
342        if nbytes == 0:
343            raise RuntimeError('should not get here')
344        msg = msg[nbytes:]
345
346#
347#
348#
349
350_forkserver = ForkServer()
351ensure_running = _forkserver.ensure_running
352get_inherited_fds = _forkserver.get_inherited_fds
353connect_to_new_process = _forkserver.connect_to_new_process
354set_forkserver_preload = _forkserver.set_forkserver_preload
355